# file: tool/web_browser.py import random import json import os import time import urllib.parse from playwright.async_api import async_playwright from playwright.sync_api import sync_playwright def create_directory(directory_path): """创建目录(如果不存在)""" if not os.path.exists(directory_path): os.makedirs(directory_path) print(f"已创建目录: {directory_path}") class WebBrowser: def __init__(self, cookie_path="browser_cookies.json"): self.cookie_path = cookie_path self.browser = None self.context = None self.page = None self.playwright = None def human_like_actions(self): """更逼真的人类行为模拟""" # 模拟页面加载后的自然浏览行为 time.sleep(random.uniform(1, 3)) # 模拟鼠标悬停 try: hover_elements = self.page.query_selector_all("a, button") if hover_elements: element = random.choice(hover_elements[:min(5, len(hover_elements))]) self.page.mouse.move(0, 0) # 先移开 time.sleep(0.5) element.hover() time.sleep(random.uniform(0.5, 1.5)) except: pass def get_random_user_agent(self): """获取随机User-Agent""" user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36" ] return random.choice(user_agents) def enhanced_anti_detection(self): """增强的反检测脚本""" self.page.add_init_script(""" // 更彻底地隐藏webdriver痕迹 Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); // 伪装chrome csi和loadTimes if (!window.chrome) { window.chrome = { runtime: {} }; } // 伪装permissions查询 const originalQuery = window.navigator.permissions.query; window.navigator.permissions.query = (parameters) => { if (parameters.name === 'notifications') { return Promise.resolve({ state: Notification.permission }); } return originalQuery(parameters); }; """) def anti_detection(self): """注入更全面的反检测脚本""" self.page.add_init_script(""" // 隐藏webdriver属性 delete navigator.__proto__.webdriver; // 伪装chrome属性 Object.defineProperty(navigator, 'chrome', { value: { runtime: {}, loadTimes: function() {} }, writable: false, enumerable: true, configurable: true }); // 伪装plugins和mimeTypes Object.defineProperty(navigator, 'plugins', { get: () => [ { 0: { type: 'application/pdf' } }, { 0: { type: 'application/x-google-chrome-pdf' } } ], }); Object.defineProperty(navigator, 'mimeTypes', { get: () => [ { type: 'application/pdf' }, { type: 'application/x-google-chrome-pdf' } ], }); // 伪装languages Object.defineProperty(navigator, 'languages', { get: () => ['zh-CN', 'zh'], }); // 禁用调试功能 window.console.debug = function() {}; window.console.log = function() {}; // 伪装屏幕信息 Object.defineProperty(screen, 'width', {get: () => 1366}); Object.defineProperty(screen, 'height', {get: () => 768}); Object.defineProperty(screen, 'availWidth', {get: () => 1366}); Object.defineProperty(screen, 'availHeight', {get: () => 768}); Object.defineProperty(screen, 'colorDepth', {get: () => 24}); Object.defineProperty(screen, 'pixelDepth', {get: () => 24}); // 伪装时间戳 window.chrome = { runtime: {} }; // 伪装outerHeight和outerWidth Object.defineProperty(window, 'outerHeight', {get: () => 768}); Object.defineProperty(window, 'outerWidth', {get: () => 1366}); // 伪装innerHeight和innerWidth Object.defineProperty(window, 'innerHeight', {get: () => 768}); Object.defineProperty(window, 'innerWidth', {get: () => 1366}); """) def random_behavior(self): """模拟更复杂的人类操作""" # 随机等待 time.sleep(random.uniform(2, 5)) # 随机鼠标移动 for _ in range(random.randint(3, 7)): self.page.mouse.move( random.randint(100, 1200), random.randint(100, 600) ) time.sleep(random.uniform(0.1, 0.8)) # 随机滚动页面 if random.choice([True, False]): scroll_distance = random.randint(200, 800) self.page.mouse.wheel(0, scroll_distance) time.sleep(random.uniform(1, 2)) def init_cookie_file(self): """初始化cookie文件""" if not os.path.exists(self.cookie_path): with open(self.cookie_path, 'w') as f: json.dump([], f) def save_cookies(self): """保存cookies到文件""" cookies = self.context.cookies() with open(self.cookie_path, 'w') as f: json.dump(cookies, f, indent=2) def load_cookies(self): """从文件加载cookies""" try: with open(self.cookie_path, 'r') as f: cookies = json.load(f) if cookies: self.context.add_cookies(cookies) return True return False except: return False def start_browser_with_proxy(self, proxy_host=None, proxy_port=None): """启动带代理的浏览器""" self.init_cookie_file() browser_args = [ "--disable-blink-features=AutomationControlled", "--disable-infobars", "--no-sandbox", "--disable-dev-shm-usage" ] if proxy_host and proxy_port: browser_args.append(f"--proxy-server=http://{proxy_host}:{proxy_port}") self.playwright = sync_playwright().start() self.browser = self.playwright.chromium.launch( headless=False, args=browser_args ) # ... 其余初始化代码 def bypass_debugger(self): """绕过调试器检测""" self.page.add_init_script(""" window.Function.prototype.constructor = function() {}; window.console.debug = function(){}; Object.defineProperty(navigator, 'webdriver', {get: () => false}); """) async def start_browser_async(self): """异步启动浏览器""" try: self.playwright = await async_playwright().start() self.browser = await self.playwright.chromium.launch( headless=False, args=[ "--disable-blink-features=AutomationControlled", "--disable-infobars" ] ) self.context = await self.browser.new_context() self.page = await self.context.new_page() return True except Exception as e: print(f"异步启动浏览器失败: {e}") return False async def close_browser_async(self): """异步关闭浏览器""" if self.browser: await self.browser.close() if self.playwright: await self.playwright.stop() def start_browser(self): """启动浏览器""" self.init_cookie_file() self.playwright = sync_playwright().start() self.browser = self.playwright.chromium.launch( headless=False, args=[ "--disable-blink-features=AutomationControlled", "--disable-infobars", "--disable-extensions", "--disable-plugins", "--no-sandbox", "--disable-dev-shm-usage", "--disable-web-security", "--disable-features=IsolateOrigins,site-per-process", "--disable-background-timer-throttling", "--disable-backgrounding-occluded-windows", "--disable-renderer-backgrounding", "--disable-ipc-flooding-protection" ] ) self.context = self.browser.new_context( user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", viewport={"width": 1366, "height": 768}, device_scale_factor=1, is_mobile=False, has_touch=False, locale="zh-CN", timezone_id="Asia/Shanghai" ) self.page = self.context.new_page() self.anti_detection() self.enhanced_anti_detection() # 立即执行一次反检测 self.page.evaluate(""" delete navigator.__proto__.webdriver; """) self.human_like_actions() self.random_behavior() def close_browser(self): """关闭浏览器""" if self.context: self.context.close() if self.browser: self.browser.close() if self.playwright: self.playwright.stop() def save_page_html(self, filepath): """保存当前页面HTML到文件""" try: # 获取页面HTML内容 html_content = self.page.content() # 确保目录存在 create_directory(os.path.dirname(filepath)) # 写入文件 with open(filepath, 'w', encoding='utf-8') as f: f.write(html_content) print(f"页面HTML已保存到: {filepath}") except Exception as e: print(f"保存页面HTML失败: {e}") def visit_page(self, url): """访问指定页面""" try: # 在 `visit_page` 之前执行更全面的反检测 self.page.add_init_script(""" // 隐藏webdriver属性 delete navigator.__proto__.webdriver; // 伪装chrome属性 Object.defineProperty(navigator, 'chrome', { value: { runtime: {}, loadTimes: function() {} }, writable: false, enumerable: true, configurable: true }); // 禁用调试功能 window.console.debug = function() {}; window.console.log = function() {}; """) # 设置更真实的请求头 self.page.set_extra_http_headers({ "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Accept-Encoding": "gzip, deflate, br", "Upgrade-Insecure-Requests": "1", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none", "Cache-Control": "max-age=0" }) self.page.goto(url) # 页面加载后执行反检测 self.page.evaluate(""" delete navigator.__proto__.webdriver; """) # self.page.wait_for_load_state("networkidle") # 3. 等待页面加载状态而不是特定元素 try: self.page.wait_for_load_state('networkidle', timeout=5000) print("networkidle, timeout=5000页面已加载") except Exception as e: print(f"等待页面加载状态时出错: {e}") # self.page.wait_for_load_state('networkidle', timeout=5000) self.human_like_actions() self.random_behavior() return True except Exception as e: print(f"访问页面失败: {url}, 错误: {str(e)}") return False def extract_links(self, selector="h2 a"): """ 提取页面中的链接 Args: selector (str): CSS选择器,默认为"h2 a" Returns: list: 包含title、url和request_url的字典列表 """ links = [] try: elements = self.page.query_selector_all(selector) for element in elements: try: title = element.inner_text().replace('\n', '').replace(',', ' ').strip() url = element.get_attribute('href') current_url = self.page.url if not url.startswith(('http://', 'https://')): # 处理相对链接 url = urllib.parse.urljoin(current_url, url) if title and url: links.append({ 'title': title, 'url': url, 'request_url': current_url }) except Exception as e: print(f"提取链接失败: {e}") continue except Exception as e: print(f"提取链接失败: {e}") return links