mirror of
https://github.com/codeflash-ai/codeflash-agent.git
synced 2026-05-04 18:25:19 +00:00
612 lines
No EOL
18 KiB
Markdown
612 lines
No EOL
18 KiB
Markdown
# Web and Testing Containers
|
|
|
|
Containers for web services, browser automation, and testing infrastructure including Nginx, Selenium WebDriver, and specialized testing utilities for comprehensive web application testing.
|
|
|
|
## Capabilities
|
|
|
|
### Browser WebDriver Container
|
|
|
|
Selenium browser container for Chrome and Firefox automation with VNC support and video recording capabilities.
|
|
|
|
```python { .api }
|
|
class BrowserWebDriverContainer:
|
|
def __init__(
|
|
self,
|
|
capabilities: dict,
|
|
options: Optional[Any] = None,
|
|
image: Optional[str] = None,
|
|
port: int = 4444,
|
|
vnc_port: int = 5900,
|
|
**kwargs: Any
|
|
):
|
|
"""
|
|
Initialize browser WebDriver container.
|
|
|
|
Args:
|
|
capabilities: Selenium capabilities dictionary
|
|
options: Browser-specific options
|
|
image: Docker image (auto-selected if None)
|
|
port: Selenium Grid port (default 4444)
|
|
vnc_port: VNC port for remote viewing (default 5900)
|
|
**kwargs: Additional container options
|
|
"""
|
|
|
|
def get_driver(self):
|
|
"""
|
|
Get configured WebDriver instance.
|
|
|
|
Returns:
|
|
Selenium WebDriver instance
|
|
"""
|
|
|
|
def get_connection_url(self) -> str:
|
|
"""
|
|
Get Selenium Grid connection URL.
|
|
|
|
Returns:
|
|
Selenium Grid URL string
|
|
"""
|
|
|
|
def with_options(self, options: Any) -> "BrowserWebDriverContainer":
|
|
"""
|
|
Set browser-specific options.
|
|
|
|
Args:
|
|
options: Chrome/Firefox options object
|
|
|
|
Returns:
|
|
Self for method chaining
|
|
"""
|
|
|
|
def with_video(self, image: Optional[str] = None, video_path: Optional[str] = None) -> "BrowserWebDriverContainer":
|
|
"""
|
|
Enable video recording of browser session.
|
|
|
|
Args:
|
|
image: Video recorder image
|
|
video_path: Host path to save videos
|
|
|
|
Returns:
|
|
Self for method chaining
|
|
"""
|
|
```
|
|
|
|
### Nginx Container
|
|
|
|
Nginx web server container for serving static content, reverse proxy testing, and web server functionality.
|
|
|
|
```python { .api }
|
|
class NginxContainer:
|
|
def __init__(
|
|
self,
|
|
image: str = "nginx:alpine",
|
|
port: int = 80,
|
|
**kwargs: Any
|
|
):
|
|
"""
|
|
Initialize Nginx container.
|
|
|
|
Args:
|
|
image: Nginx Docker image
|
|
port: HTTP port (default 80)
|
|
**kwargs: Additional container options
|
|
"""
|
|
|
|
def get_url(self) -> str:
|
|
"""
|
|
Get Nginx server URL.
|
|
|
|
Returns:
|
|
Nginx server URL string
|
|
"""
|
|
```
|
|
|
|
### Testing Utility Containers
|
|
|
|
Specialized containers for testing scenarios and development utilities.
|
|
|
|
```python { .api }
|
|
class MailpitContainer:
|
|
def __init__(
|
|
self,
|
|
image: str = "axllent/mailpit:latest",
|
|
smtp_port: int = 1025,
|
|
web_port: int = 8025,
|
|
**kwargs: Any
|
|
):
|
|
"""
|
|
Initialize Mailpit email testing container.
|
|
|
|
Args:
|
|
image: Mailpit Docker image
|
|
smtp_port: SMTP server port (default 1025)
|
|
web_port: Web interface port (default 8025)
|
|
**kwargs: Additional container options
|
|
"""
|
|
|
|
def get_smtp_connection_url(self) -> str:
|
|
"""
|
|
Get SMTP connection URL.
|
|
|
|
Returns:
|
|
SMTP connection URL string
|
|
"""
|
|
|
|
def get_web_url(self) -> str:
|
|
"""
|
|
Get web interface URL.
|
|
|
|
Returns:
|
|
Web interface URL string
|
|
"""
|
|
|
|
class SftpContainer:
|
|
def __init__(
|
|
self,
|
|
image: str = "atmoz/sftp:latest",
|
|
port: int = 22,
|
|
username: str = "testuser",
|
|
password: str = "testpass",
|
|
**kwargs: Any
|
|
):
|
|
"""
|
|
Initialize SFTP server container.
|
|
|
|
Args:
|
|
image: SFTP Docker image
|
|
port: SFTP port (default 22)
|
|
username: SFTP username
|
|
password: SFTP password
|
|
**kwargs: Additional container options
|
|
"""
|
|
|
|
def get_connection_url(self) -> str:
|
|
"""
|
|
Get SFTP connection URL.
|
|
|
|
Returns:
|
|
SFTP connection URL string
|
|
"""
|
|
```
|
|
|
|
## Usage Examples
|
|
|
|
### Selenium Browser Automation
|
|
|
|
```python
|
|
from testcontainers.selenium import BrowserWebDriverContainer
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
|
|
# Chrome browser automation
|
|
chrome_capabilities = {
|
|
"browserName": "chrome",
|
|
"browserVersion": "latest"
|
|
}
|
|
|
|
with BrowserWebDriverContainer(chrome_capabilities) as chrome:
|
|
# Get WebDriver instance
|
|
driver = chrome.get_driver()
|
|
|
|
try:
|
|
# Navigate to a website
|
|
driver.get("https://example.com")
|
|
|
|
# Wait for page to load
|
|
wait = WebDriverWait(driver, 10)
|
|
title_element = wait.until(
|
|
EC.presence_of_element_located((By.TAG_NAME, "h1"))
|
|
)
|
|
|
|
# Interact with page
|
|
print(f"Page title: {driver.title}")
|
|
print(f"H1 text: {title_element.text}")
|
|
|
|
# Take screenshot
|
|
driver.save_screenshot("example_page.png")
|
|
|
|
# Find and click elements
|
|
links = driver.find_elements(By.TAG_NAME, "a")
|
|
print(f"Found {len(links)} links on the page")
|
|
|
|
finally:
|
|
driver.quit()
|
|
```
|
|
|
|
### Firefox with Custom Options
|
|
|
|
```python
|
|
from testcontainers.selenium import BrowserWebDriverContainer
|
|
from selenium.webdriver.firefox.options import Options
|
|
|
|
# Configure Firefox options
|
|
firefox_options = Options()
|
|
firefox_options.add_argument("--headless") # Run in background
|
|
firefox_options.set_preference("network.http.pipelining", True)
|
|
|
|
firefox_capabilities = {
|
|
"browserName": "firefox",
|
|
"browserVersion": "latest"
|
|
}
|
|
|
|
with BrowserWebDriverContainer(firefox_capabilities) as firefox:
|
|
firefox.with_options(firefox_options)
|
|
|
|
driver = firefox.get_driver()
|
|
|
|
try:
|
|
# Test JavaScript execution
|
|
driver.get("data:text/html,<html><body><h1 id='test'>Hello World</h1></body></html>")
|
|
|
|
# Execute JavaScript
|
|
result = driver.execute_script("return document.getElementById('test').textContent;")
|
|
print(f"JavaScript result: {result}")
|
|
|
|
# Test page performance
|
|
navigation_start = driver.execute_script("return window.performance.timing.navigationStart")
|
|
load_complete = driver.execute_script("return window.performance.timing.loadEventEnd")
|
|
page_load_time = load_complete - navigation_start
|
|
|
|
print(f"Page load time: {page_load_time}ms")
|
|
|
|
finally:
|
|
driver.quit()
|
|
```
|
|
|
|
### Web Application Testing with Nginx
|
|
|
|
```python
|
|
from testcontainers.nginx import NginxContainer
|
|
import requests
|
|
import tempfile
|
|
import os
|
|
|
|
# Create test HTML content
|
|
test_html = """
|
|
<!DOCTYPE html>
|
|
<html>
|
|
<head>
|
|
<title>Test Page</title>
|
|
</head>
|
|
<body>
|
|
<h1>Welcome to Test Site</h1>
|
|
<div id="content">
|
|
<p>This is a test page served by Nginx.</p>
|
|
<form action="/submit" method="post">
|
|
<input type="text" name="data" placeholder="Enter data">
|
|
<button type="submit">Submit</button>
|
|
</form>
|
|
</div>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
# Create temporary directory with test content
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
# Write test HTML file
|
|
html_file = os.path.join(temp_dir, "index.html")
|
|
with open(html_file, "w") as f:
|
|
f.write(test_html)
|
|
|
|
# Start Nginx container with custom content
|
|
nginx = NginxContainer("nginx:alpine") \
|
|
.with_volume_mapping(temp_dir, "/usr/share/nginx/html", "ro") \
|
|
.with_exposed_ports(80)
|
|
|
|
with nginx:
|
|
# Get server URL
|
|
server_url = nginx.get_url()
|
|
|
|
# Test static content serving
|
|
response = requests.get(server_url)
|
|
assert response.status_code == 200
|
|
assert "Welcome to Test Site" in response.text
|
|
|
|
# Test different HTTP methods
|
|
head_response = requests.head(server_url)
|
|
assert head_response.status_code == 200
|
|
|
|
# Test non-existent page
|
|
not_found = requests.get(f"{server_url}/nonexistent")
|
|
assert not_found.status_code == 404
|
|
|
|
print(f"Nginx serving content at: {server_url}")
|
|
print(f"Content length: {len(response.text)} bytes")
|
|
```
|
|
|
|
### Email Testing with Mailpit
|
|
|
|
```python
|
|
from testcontainers.mailpit import MailpitContainer
|
|
import smtplib
|
|
from email.mime.text import MIMEText
|
|
from email.mime.multipart import MIMEMultipart
|
|
import requests
|
|
|
|
with MailpitContainer() as mailpit:
|
|
# Get connection details
|
|
smtp_url = mailpit.get_smtp_connection_url()
|
|
web_url = mailpit.get_web_url()
|
|
|
|
# Parse SMTP connection
|
|
smtp_host = smtp_url.split("://")[1].split(":")[0]
|
|
smtp_port = int(smtp_url.split(":")[2])
|
|
|
|
# Send test emails
|
|
with smtplib.SMTP(smtp_host, smtp_port) as server:
|
|
# Send plain text email
|
|
plain_msg = MIMEText("This is a plain text test email.")
|
|
plain_msg["Subject"] = "Plain Text Test"
|
|
plain_msg["From"] = "sender@example.com"
|
|
plain_msg["To"] = "recipient@example.com"
|
|
|
|
server.send_message(plain_msg)
|
|
|
|
# Send HTML email
|
|
html_msg = MIMEMultipart("alternative")
|
|
html_msg["Subject"] = "HTML Test Email"
|
|
html_msg["From"] = "sender@example.com"
|
|
html_msg["To"] = "recipient@example.com"
|
|
|
|
html_content = """
|
|
<html>
|
|
<body>
|
|
<h1>Test Email</h1>
|
|
<p>This is an <b>HTML</b> test email.</p>
|
|
<a href="https://example.com">Click here</a>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
html_part = MIMEText(html_content, "html")
|
|
html_msg.attach(html_part)
|
|
|
|
server.send_message(html_msg)
|
|
|
|
# Check emails via web API
|
|
import time
|
|
time.sleep(1) # Wait for emails to be processed
|
|
|
|
# Get emails via Mailpit API
|
|
api_response = requests.get(f"{web_url}/api/v1/messages")
|
|
emails = api_response.json()
|
|
|
|
print(f"Received {len(emails['messages'])} emails")
|
|
for email in emails["messages"]:
|
|
print(f"- Subject: {email['Subject']}")
|
|
print(f" From: {email['From']['Address']}")
|
|
print(f" To: {email['To'][0]['Address']}")
|
|
```
|
|
|
|
### SFTP File Transfer Testing
|
|
|
|
```python
|
|
from testcontainers.sftp import SftpContainer
|
|
import paramiko
|
|
import io
|
|
|
|
with SftpContainer() as sftp:
|
|
connection_url = sftp.get_connection_url()
|
|
|
|
# Parse connection details
|
|
host = connection_url.split("://")[1].split("@")[1].split(":")[0]
|
|
port = int(connection_url.split(":")[3])
|
|
username = "testuser"
|
|
password = "testpass"
|
|
|
|
# Create SSH client
|
|
ssh = paramiko.SSHClient()
|
|
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
|
|
try:
|
|
# Connect to SFTP server
|
|
ssh.connect(hostname=host, port=port, username=username, password=password)
|
|
sftp_client = ssh.open_sftp()
|
|
|
|
# Upload file
|
|
test_content = "Hello, SFTP!\nThis is a test file."
|
|
file_buffer = io.StringIO(test_content)
|
|
|
|
with sftp_client.open("test_upload.txt", "w") as remote_file:
|
|
remote_file.write(test_content)
|
|
|
|
# List files
|
|
files = sftp_client.listdir(".")
|
|
print(f"Files on SFTP server: {files}")
|
|
|
|
# Download file
|
|
with sftp_client.open("test_upload.txt", "r") as remote_file:
|
|
downloaded_content = remote_file.read()
|
|
print(f"Downloaded content: {downloaded_content}")
|
|
|
|
# Create directory and upload multiple files
|
|
sftp_client.mkdir("test_directory")
|
|
|
|
for i in range(3):
|
|
filename = f"test_directory/file_{i}.txt"
|
|
content = f"Content of file {i}"
|
|
with sftp_client.open(filename, "w") as remote_file:
|
|
remote_file.write(content)
|
|
|
|
# List directory contents
|
|
dir_files = sftp_client.listdir("test_directory")
|
|
print(f"Files in test_directory: {dir_files}")
|
|
|
|
finally:
|
|
sftp_client.close()
|
|
ssh.close()
|
|
```
|
|
|
|
### Complete Web Application Testing Stack
|
|
|
|
```python
|
|
from testcontainers.selenium import BrowserWebDriverContainer
|
|
from testcontainers.nginx import NginxContainer
|
|
from testcontainers.mailpit import MailpitContainer
|
|
from testcontainers.postgres import PostgresContainer
|
|
from testcontainers.core.network import Network
|
|
import tempfile
|
|
import os
|
|
|
|
# Create test web application
|
|
app_html = """
|
|
<!DOCTYPE html>
|
|
<html>
|
|
<head>
|
|
<title>Test App</title>
|
|
<script>
|
|
function submitForm() {
|
|
// Simulate form submission
|
|
document.getElementById('result').innerText = 'Form submitted successfully!';
|
|
}
|
|
</script>
|
|
</head>
|
|
<body>
|
|
<h1>Test Application</h1>
|
|
<form onsubmit="submitForm(); return false;">
|
|
<input type="email" id="email" placeholder="Enter email" required>
|
|
<button type="submit">Submit</button>
|
|
</form>
|
|
<div id="result"></div>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
# Create test HTML
|
|
html_file = os.path.join(temp_dir, "index.html")
|
|
with open(html_file, "w") as f:
|
|
f.write(app_html)
|
|
|
|
# Create network for services
|
|
with Network() as network:
|
|
# Start all services
|
|
with NginxContainer() as web_server, \
|
|
MailpitContainer() as email_server, \
|
|
PostgresContainer("postgres:13") as database, \
|
|
BrowserWebDriverContainer({"browserName": "chrome"}) as browser:
|
|
|
|
# Configure web server
|
|
web_server.with_volume_mapping(temp_dir, "/usr/share/nginx/html", "ro")
|
|
web_server.with_network(network).with_network_aliases("web")
|
|
|
|
# Configure other services
|
|
email_server.with_network(network).with_network_aliases("mail")
|
|
database.with_network(network).with_network_aliases("db")
|
|
browser.with_network(network)
|
|
|
|
# Get service URLs
|
|
web_url = web_server.get_url()
|
|
mail_web_url = email_server.get_web_url()
|
|
db_url = database.get_connection_url()
|
|
|
|
print(f"Web server: {web_url}")
|
|
print(f"Mail server: {mail_web_url}")
|
|
print(f"Database: {db_url}")
|
|
|
|
# Automated testing
|
|
driver = browser.get_driver()
|
|
|
|
try:
|
|
# Test web application
|
|
driver.get(web_url)
|
|
|
|
# Fill form
|
|
email_input = driver.find_element("id", "email")
|
|
email_input.send_keys("test@example.com")
|
|
|
|
# Submit form
|
|
submit_button = driver.find_element("css selector", "button[type='submit']")
|
|
submit_button.click()
|
|
|
|
# Verify result
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.webdriver.common.by import By
|
|
|
|
wait = WebDriverWait(driver, 10)
|
|
result_element = wait.until(
|
|
EC.text_to_be_present_in_element((By.ID, "result"), "Form submitted successfully!")
|
|
)
|
|
|
|
print("✓ Web application test passed")
|
|
|
|
# Take screenshot of success
|
|
driver.save_screenshot("test_success.png")
|
|
|
|
finally:
|
|
driver.quit()
|
|
|
|
print("✓ Complete web application testing stack verified")
|
|
```
|
|
|
|
### Performance Testing Setup
|
|
|
|
```python
|
|
from testcontainers.nginx import NginxContainer
|
|
import requests
|
|
import time
|
|
import concurrent.futures
|
|
import statistics
|
|
|
|
def performance_test(url, num_requests=100, concurrent_users=10):
|
|
"""Run performance test against web server."""
|
|
|
|
def make_request():
|
|
start_time = time.time()
|
|
try:
|
|
response = requests.get(url, timeout=10)
|
|
end_time = time.time()
|
|
return {
|
|
"status_code": response.status_code,
|
|
"response_time": end_time - start_time,
|
|
"success": response.status_code == 200
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"status_code": 0,
|
|
"response_time": 0,
|
|
"success": False,
|
|
"error": str(e)
|
|
}
|
|
|
|
# Run concurrent requests
|
|
results = []
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor:
|
|
futures = [executor.submit(make_request) for _ in range(num_requests)]
|
|
results = [future.result() for future in concurrent.futures.as_completed(futures)]
|
|
|
|
# Calculate statistics
|
|
successful_requests = [r for r in results if r["success"]]
|
|
response_times = [r["response_time"] for r in successful_requests]
|
|
|
|
if response_times:
|
|
stats = {
|
|
"total_requests": num_requests,
|
|
"successful_requests": len(successful_requests),
|
|
"success_rate": len(successful_requests) / num_requests * 100,
|
|
"avg_response_time": statistics.mean(response_times),
|
|
"min_response_time": min(response_times),
|
|
"max_response_time": max(response_times),
|
|
"median_response_time": statistics.median(response_times)
|
|
}
|
|
else:
|
|
stats = {"error": "No successful requests"}
|
|
|
|
return stats
|
|
|
|
# Run performance test
|
|
with NginxContainer() as nginx:
|
|
server_url = nginx.get_url()
|
|
|
|
print(f"Running performance test against: {server_url}")
|
|
results = performance_test(server_url, num_requests=50, concurrent_users=5)
|
|
|
|
print("\nPerformance Test Results:")
|
|
for key, value in results.items():
|
|
if isinstance(value, float):
|
|
print(f"{key}: {value:.4f}")
|
|
else:
|
|
print(f"{key}: {value}")
|
|
``` |