Web Automation
Advanced Selenium WebDriver, Playwright, and Puppeteer automation code snippets for production environments
Parallel Web Scraping with Playwright
Multi-browser parallel scraping with automatic retry logic, rate limiting, and data persistence. Handles anti-bot detection.
// Advanced Parallel Web Scraping with Playwright
import { chromium, firefox, webkit, Browser, Page } from 'playwright';
import pLimit from 'p-limit';
interface ScrapeConfig {
url: string;
selectors: Record<string, string>;
waitFor?: string;
retries?: number;
}
class ParallelWebScraper {
private browsers: Browser[] = [];
private concurrencyLimit: number;
private retryDelays = [1000, 3000, 5000];
constructor(concurrency: number = 5) {
this.concurrencyLimit = concurrency;
}
async init() {
// Launch multiple browser types for resilience
this.browsers = await Promise.all([
chromium.launch({ headless: true }),
firefox.launch({ headless: true }),
webkit.launch({ headless: true }),
]);
}
async scrapeUrls(configs: ScrapeConfig[]): Promise<any[]> {
const limit = pLimit(this.concurrencyLimit);
const tasks = configs.map((config, index) =>
limit(() => this.scrapeWithRetry(config, index % this.browsers.length))
);
return Promise.all(tasks);
}
private async scrapeWithRetry(
config: ScrapeConfig,
browserIndex: number,
attempt: number = 0
): Promise<any> {
try {
return await this.scrapePage(config, browserIndex);
} catch (error) {
if (attempt < (config.retries || 3)) {
await this.delay(this.retryDelays[attempt] || 5000);
console.log(`Retry ${attempt + 1} for ${config.url}`);
return this.scrapeWithRetry(config, browserIndex, attempt + 1);
}
throw error;
}
}
private async scrapePage(config: ScrapeConfig, browserIndex: number): Promise<any> {
const browser = this.browsers[browserIndex];
const context = await browser.newContext({
userAgent: this.getRandomUserAgent(),
viewport: { width: 1920, height: 1080 },
locale: 'en-US',
});
// Anti-detection: Add realistic headers
await context.setExtraHTTPHeaders({
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Referer': 'https://www.google.com/',
});
const page = await context.newPage();
// Block unnecessary resources for speed
await page.route('**/*', (route) => {
const resourceType = route.request().resourceType();
if (['image', 'stylesheet', 'font', 'media'].includes(resourceType)) {
route.abort();
} else {
route.continue();
}
});
await page.goto(config.url, { waitUntil: 'domcontentloaded', timeout: 30000 });
// Wait for specific element if specified
if (config.waitFor) {
await page.waitForSelector(config.waitFor, { timeout: 10000 });
}
// Random delay to mimic human behavior
await this.delay(Math.random() * 2000 + 1000);
// Extract data using selectors
const data: Record<string, any> = {};
for (const [key, selector] of Object.entries(config.selectors)) {
try {
data[key] = await page.$eval(selector, el => el.textContent?.trim());
} catch {
data[key] = null;
}
}
await context.close();
return { url: config.url, data, scrapedAt: new Date() };
}
private getRandomUserAgent(): string {
const userAgents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',
];
return userAgents[Math.floor(Math.random() * userAgents.length)];
}
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
async close() {
await Promise.all(this.browsers.map(b => b.close()));
}
}
// Usage Example
const scraper = new ParallelWebScraper(10);
await scraper.init();
const configs: ScrapeConfig[] = [
{
url: 'https://example.com/product1',
selectors: {
title: 'h1.product-title',
price: 'span.price',
description: 'div.description',
},
waitFor: 'h1.product-title',
retries: 3,
},
// ... more configs
];
const results = await scraper.scrapeUrls(configs);
await scraper.close();Use Case: Scrape 10,000+ product pages from e-commerce sites with automatic retry, rate limiting, and anti-bot evasion.
Advanced Selenium Dynamic Waits
Custom wait conditions for complex scenarios including AJAX, animations, and dynamic content loading.
// Advanced Selenium Dynamic Wait Conditions
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.common.exceptions import TimeoutException, StaleElementReferenceException
import time
class CustomWaitConditions:
"""Advanced custom wait conditions for Selenium"""
@staticmethod
def element_has_css_class(locator, css_class):
"""Wait until element has specific CSS class"""
def _predicate(driver):
try:
element = driver.find_element(*locator)
return css_class in element.get_attribute('class').split()
except StaleElementReferenceException:
return False
return _predicate
@staticmethod
def number_of_elements_to_be_more_than(locator, number):
"""Wait until number of elements exceeds threshold"""
def _predicate(driver):
try:
elements = driver.find_elements(*locator)
return len(elements) > number
except:
return False
return _predicate
@staticmethod
def element_attribute_contains(locator, attribute, value):
"""Wait until element attribute contains value"""
def _predicate(driver):
try:
element = driver.find_element(*locator)
attr_value = element.get_attribute(attribute)
return attr_value and value in attr_value
except:
return False
return _predicate
@staticmethod
def ajax_complete():
"""Wait for all AJAX requests to complete (jQuery)"""
def _predicate(driver):
try:
return driver.execute_script("return jQuery.active == 0")
except:
return True # jQuery not present
return _predicate
@staticmethod
def page_loaded_completely():
"""Wait for page to load completely including all resources"""
def _predicate(driver):
try:
ready_state = driver.execute_script("return document.readyState")
return ready_state == 'complete'
except:
return False
return _predicate
@staticmethod
def element_stops_moving(locator, stable_time=0.5):
"""Wait until element stops moving (for animations)"""
def _predicate(driver):
try:
element = driver.find_element(*locator)
initial_location = element.location
time.sleep(stable_time)
final_location = element.location
return initial_location == final_location
except:
return False
return _predicate
class AdvancedSeleniumAutomation:
def __init__(self, headless=True):
options = webdriver.ChromeOptions()
if headless:
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option('useAutomationExtension', False)
self.driver = webdriver.Chrome(options=options)
self.wait = WebDriverWait(self.driver, 20)
self.custom_wait = CustomWaitConditions()
def wait_for_lazy_images(self):
"""Wait for all lazy-loaded images to load"""
self.driver.execute_script("""
return Promise.all(Array.from(document.images)
.filter(img => !img.complete)
.map(img => new Promise(resolve => {
img.onload = img.onerror = resolve;
})));
""")
def scroll_to_load_all(self, pause_time=0.5):
"""Scroll page to trigger lazy loading"""
last_height = self.driver.execute_script("return document.body.scrollHeight")
while True:
self.driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(pause_time)
new_height = self.driver.execute_script("return document.body.scrollHeight")
if new_height == last_height:
break
last_height = new_height
def wait_for_angular(self):
"""Wait for Angular application to stabilize"""
self.driver.execute_async_script("""
var callback = arguments[arguments.length - 1];
if (window.getAllAngularTestabilities) {
var testabilities = window.getAllAngularTestabilities();
var count = testabilities.length;
var decrement = function() {
count--;
if (count === 0) callback();
};
testabilities.forEach(function(testability) {
testability.whenStable(decrement);
});
} else {
callback();
}
""")
def click_with_retry(self, locator, max_attempts=3):
"""Click element with retry on common exceptions"""
for attempt in range(max_attempts):
try:
element = self.wait.until(EC.element_to_be_clickable(locator))
# Scroll element into view
self.driver.execute_script("arguments[0].scrollIntoView(true);", element)
time.sleep(0.3)
element.click()
return True
except (StaleElementReferenceException, TimeoutException):
if attempt == max_attempts - 1:
raise
time.sleep(1)
return False
def safe_execute_script(self, script, *args):
"""Execute JavaScript with error handling"""
try:
return self.driver.execute_script(script, *args)
except Exception as e:
print(f"Script execution error: {e}")
return None
# Usage Example
automation = AdvancedSeleniumAutomation(headless=False)
driver = automation.driver
# Navigate and wait for page to load completely
driver.get('https://example.com/dynamic-page')
automation.wait.until(automation.custom_wait.page_loaded_completely())
automation.wait.until(automation.custom_wait.ajax_complete())
# Wait for element to stop animating
automation.wait.until(
automation.custom_wait.element_stops_moving((By.ID, 'animated-element'))
)
# Wait for specific number of results
automation.wait.until(
automation.custom_wait.number_of_elements_to_be_more_than(
(By.CLASS_NAME, 'result-item'), 10
)
)
# Scroll to load all content
automation.scroll_to_load_all()
# Click with automatic retry
automation.click_with_retry((By.ID, 'submit-button'))Puppeteer Stealth Mode Anti-Detection
Bypass bot detection systems including Cloudflare, PerimeterX, and DataDome with stealth techniques.
// Puppeteer Stealth Mode with Anti-Detection
import puppeteer from 'puppeteer-extra';
import StealthPlugin from 'puppeteer-extra-plugin-stealth';
import { Browser, Page } from 'puppeteer';
puppeteer.use(StealthPlugin());
class StealthBrowser {
private browser: Browser | null = null;
async launch() {
this.browser = await puppeteer.launch({
headless: 'new',
args: [
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-web-security',
'--disable-features=IsolateOrigins,site-per-process',
'--window-size=1920,1080',
],
});
return this.browser;
}
async createStealthPage(): Promise<Page> {
if (!this.browser) await this.launch();
const page = await this.browser!.newPage();
// Set realistic viewport
await page.setViewport({ width: 1920, height: 1080 });
// Set realistic user agent
await page.setUserAgent(
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ' +
'(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
);
// Override navigator properties
await page.evaluateOnNewDocument(() => {
// Webdriver flag
Object.defineProperty(navigator, 'webdriver', { get: () => false });
// Chrome property
(window as any).chrome = { runtime: {} };
// Permissions
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters: any) => (
parameters.name === 'notifications'
? Promise.resolve({ state: 'denied' } as PermissionStatus)
: originalQuery(parameters)
);
// Plugins
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5],
});
// Languages
Object.defineProperty(navigator, 'languages', {
get: () => ['en-US', 'en'],
});
});
// Add realistic mouse movements
await page.evaluateOnNewDocument(() => {
let mouseX = 0;
let mouseY = 0;
document.addEventListener('mousemove', (e) => {
mouseX = e.clientX;
mouseY = e.clientY;
});
});
return page;
}
async humanLikeClick(page: Page, selector: string) {
const element = await page.$(selector);
if (!element) throw new Error(`Element ${selector} not found`);
const box = await element.boundingBox();
if (!box) throw new Error('Element not visible');
// Random point within element
const x = box.x + box.width * (0.3 + Math.random() * 0.4);
const y = box.y + box.height * (0.3 + Math.random() * 0.4);
// Move mouse gradually
await page.mouse.move(x, y, { steps: 10 + Math.floor(Math.random() * 10) });
// Random delay before click
await this.randomDelay(50, 150);
await page.mouse.click(x, y, {
delay: 10 + Math.random() * 20,
});
}
async humanLikeType(page: Page, selector: string, text: string) {
await page.focus(selector);
for (const char of text) {
await page.keyboard.type(char, {
delay: 50 + Math.random() * 100,
});
// Random pauses (like thinking)
if (Math.random() < 0.1) {
await this.randomDelay(200, 500);
}
}
}
async scrollLikeHuman(page: Page) {
const scrollHeight = await page.evaluate(() => document.body.scrollHeight);
let currentPosition = 0;
while (currentPosition < scrollHeight) {
const scrollAmount = 100 + Math.random() * 200;
currentPosition += scrollAmount;
await page.evaluate((y) => {
window.scrollBy({ top: y, behavior: 'smooth' });
}, scrollAmount);
await this.randomDelay(100, 300);
}
}
private randomDelay(min: number, max: number): Promise<void> {
const delay = min + Math.random() * (max - min);
return new Promise(resolve => setTimeout(resolve, delay));
}
async bypassCloudflare(page: Page) {
// Wait for Cloudflare challenge
try {
await page.waitForSelector('input[name="cf_captcha_kind"]', { timeout: 5000 });
console.log('Cloudflare challenge detected, waiting...');
await page.waitForNavigation({ waitUntil: 'networkidle0', timeout: 30000 });
} catch {
// No Cloudflare challenge
}
}
async close() {
if (this.browser) {
await this.browser.close();
}
}
}
// Usage Example
const stealth = new StealthBrowser();
await stealth.launch();
const page = await stealth.createStealthPage();
await page.goto('https://bot-protected-site.com', { waitUntil: 'networkidle0' });
await stealth.bypassCloudflare(page);
await stealth.scrollLikeHuman(page);
await stealth.humanLikeClick(page, '#search-button');
await stealth.humanLikeType(page, '#search-input', 'automation testing');
await stealth.close();More Web Automation Topics
Visual Regression Testing with Playwright
Selenium Grid Distributed Testing
Browser Performance Monitoring
File Upload/Download Automation
iframe and Shadow DOM Handling
Cross-Browser Testing Framework
API Mocking with Playwright
Screenshot Comparison Testing
Cookie and Session Management
Proxy and Network Interception
Need Custom Web Automation?
Our team builds enterprise-grade web automation solutions that scale to millions of pages.
Get Free Consultation