# file: tool/web_browser.py import random import json import os import time import urllib.parse from playwright.async_api import async_playwright from playwright.sync_api import sync_playwright def create_directory(directory_path): """创建目录(如果不存在)""" if not os.path.exists(directory_path): os.makedirs(directory_path) print(f"已创建目录: {directory_path}") class WebBrowser: def __init__(self, cookie_path="browser_cookies.json"): self.cookie_path = cookie_path self.browser = None self.context = None self.page = None self.playwright = None def human_like_actions(self): """更逼真的人类行为模拟""" # 模拟页面加载后的自然浏览行为 time.sleep(random.uniform(1, 3)) # 模拟鼠标悬停 try: hover_elements = self.page.query_selector_all("a, button") if hover_elements: element = random.choice(hover_elements[:min(5, len(hover_elements))]) self.page.mouse.move(0, 0) # 先移开 time.sleep(0.5) element.hover() time.sleep(random.uniform(0.5, 1.5)) except: pass def click_element(self, selector): """ 模拟点击页面元素 Args: selector (str): CSS选择器或XPath表达式 Returns: bool: 点击成功返回True,否则返回False """ try: # 等待元素出现 self.page.wait_for_selector(selector, timeout=10000) # 查找元素 element = self.page.query_selector(selector) if not element: print(f"未找到元素: {selector}") return False # 模拟鼠标移动到元素 element.hover() time.sleep(random.uniform(0.5, 1.0)) # 点击元素 element.click() # 模拟人类点击后的等待 time.sleep(random.uniform(1, 2)) return True except Exception as e: print(f"点击元素失败: {selector}, 错误: {str(e)}") return False def input_and_enter(self, selector, text): """ 在指定输入框输入文本并按回车键 Args: selector (str): 输入框的CSS选择器 text (str): 要输入的文本 Returns: bool: 输入成功返回True,否则返回False """ try: # 等待输入框出现 self.page.wait_for_selector(selector, timeout=10000) # 查找输入框元素 input_element = self.page.query_selector(selector) if not input_element: print(f"未找到输入框: {selector}") return False # 点击输入框以获得焦点 input_element.click() time.sleep(random.uniform(0.5, 1.0)) # 清空现有内容并输入新文本 input_element.fill(text) # 模拟输入间隔 time.sleep(random.uniform(1, 2)) # 按回车键 input_element.press('Enter') # 等待页面响应 time.sleep(random.uniform(2, 3)) return True except Exception as e: print(f"输入并回车失败: {selector}, 错误: {str(e)}") return False def interact_with_element(self, selector, action_type="click", text=None, callback=None): """ 通用元素交互方法,支持多种操作类型和回调 Args: selector (str): 元素的CSS选择器 action_type (str): 操作类型 ("click", "input_enter", "hover") text (str): 输入文本(仅在action_type为"input_enter"时需要) callback (function): 回调函数,在操作完成后执行 Returns: bool: 操作成功返回True,否则返回False """ try: # 等待元素出现 self.page.wait_for_selector(selector, timeout=10000) element = self.page.query_selector(selector) if not element: print(f"未找到元素: {selector}") return False result = False if action_type == "click": # 模拟点击 element.hover() time.sleep(random.uniform(0.5, 1.0)) element.click() result = True elif action_type == "input_enter": if text is None: print("输入操作需要提供text参数") return False # 模拟输入并回车 element.click() time.sleep(random.uniform(0.5, 1.0)) element.fill(text) time.sleep(random.uniform(1, 2)) element.press('Enter') result = True elif action_type == "hover": # 模拟悬停 element.hover() time.sleep(random.uniform(1, 2)) result = True # 模拟人类行为延迟 time.sleep(random.uniform(1, 2)) # 执行回调函数 if callback and callable(callback): try: callback(result, selector) except Exception as e: print(f"回调函数执行失败: {e}") return result except Exception as e: print(f"元素交互失败: {selector}, 错误: {str(e)}") if callback and callable(callback): try: callback(False, selector) except Exception as cb_e: print(f"回调函数执行失败: {cb_e}") return False def get_random_user_agent(self): """获取随机User-Agent""" user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36" ] return random.choice(user_agents) def enhanced_anti_detection(self): """增强的反检测脚本""" self.page.add_init_script(""" // 更彻底地隐藏webdriver痕迹 Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); // 伪装chrome csi和loadTimes if (!window.chrome) { window.chrome = { runtime: {} }; } // 伪装permissions查询 const originalQuery = window.navigator.permissions.query; window.navigator.permissions.query = (parameters) => { if (parameters.name === 'notifications') { return Promise.resolve({ state: Notification.permission }); } return originalQuery(parameters); }; """) def anti_detection(self): """注入更全面的反检测脚本""" self.page.add_init_script(""" // 隐藏webdriver属性 delete navigator.__proto__.webdriver; // 伪装chrome属性 Object.defineProperty(navigator, 'chrome', { value: { runtime: {}, loadTimes: function() {} }, writable: false, enumerable: true, configurable: true }); // 伪装plugins和mimeTypes Object.defineProperty(navigator, 'plugins', { get: () => [ { 0: { type: 'application/pdf' } }, { 0: { type: 'application/x-google-chrome-pdf' } } ], }); Object.defineProperty(navigator, 'mimeTypes', { get: () => [ { type: 'application/pdf' }, { type: 'application/x-google-chrome-pdf' } ], }); // 伪装languages Object.defineProperty(navigator, 'languages', { get: () => ['zh-CN', 'zh'], }); // 禁用调试功能 window.console.debug = function() {}; window.console.log = function() {}; // 伪装屏幕信息 Object.defineProperty(screen, 'width', {get: () => 1366}); Object.defineProperty(screen, 'height', {get: () => 768}); Object.defineProperty(screen, 'availWidth', {get: () => 1366}); Object.defineProperty(screen, 'availHeight', {get: () => 768}); Object.defineProperty(screen, 'colorDepth', {get: () => 24}); Object.defineProperty(screen, 'pixelDepth', {get: () => 24}); // 伪装时间戳 window.chrome = { runtime: {} }; // 伪装outerHeight和outerWidth Object.defineProperty(window, 'outerHeight', {get: () => 768}); Object.defineProperty(window, 'outerWidth', {get: () => 1366}); // 伪装innerHeight和innerWidth Object.defineProperty(window, 'innerHeight', {get: () => 768}); Object.defineProperty(window, 'innerWidth', {get: () => 1366}); """) def random_behavior(self): """模拟更复杂的人类操作""" # 随机等待 time.sleep(random.uniform(2, 5)) # 随机鼠标移动 for _ in range(random.randint(3, 7)): self.page.mouse.move( random.randint(100, 1200), random.randint(100, 600) ) time.sleep(random.uniform(0.1, 0.8)) # 随机滚动页面 if random.choice([True, False]): scroll_distance = random.randint(200, 800) self.page.mouse.wheel(0, scroll_distance) time.sleep(random.uniform(1, 2)) def init_cookie_file(self): """初始化cookie文件""" if not os.path.exists(self.cookie_path): with open(self.cookie_path, 'w') as f: json.dump([], f) def save_cookies(self): """保存cookies到文件""" cookies = self.context.cookies() with open(self.cookie_path, 'w') as f: json.dump(cookies, f, indent=2) def load_cookies(self): """从文件加载cookies""" try: with open(self.cookie_path, 'r') as f: cookies = json.load(f) if cookies: self.context.add_cookies(cookies) return True return False except: return False def start_browser_with_proxy(self, proxy_host=None, proxy_port=None): """启动带代理的浏览器""" self.init_cookie_file() browser_args = [ "--disable-blink-features=AutomationControlled", "--disable-infobars", "--no-sandbox", "--disable-dev-shm-usage" ] if proxy_host and proxy_port: browser_args.append(f"--proxy-server=http://{proxy_host}:{proxy_port}") self.playwright = sync_playwright().start() self.browser = self.playwright.chromium.launch( headless=False, args=browser_args ) # ... 其余初始化代码 def bypass_debugger(self): """绕过调试器检测""" self.page.add_init_script(""" window.Function.prototype.constructor = function() {}; window.console.debug = function(){}; Object.defineProperty(navigator, 'webdriver', {get: () => false}); """) async def start_browser_async(self): """异步启动浏览器""" try: self.playwright = await async_playwright().start() self.browser = await self.playwright.chromium.launch( headless=False, args=[ "--disable-blink-features=AutomationControlled", "--disable-infobars" ] ) self.context = await self.browser.new_context() self.page = await self.context.new_page() return True except Exception as e: print(f"异步启动浏览器失败: {e}") return False async def close_browser_async(self): """异步关闭浏览器""" if self.browser: await self.browser.close() if self.playwright: await self.playwright.stop() def start_browser(self): """启动浏览器""" self.init_cookie_file() self.playwright = sync_playwright().start() self.browser = self.playwright.chromium.launch( headless=False, args=[ "--disable-blink-features=AutomationControlled", "--disable-infobars", "--disable-extensions", "--disable-plugins", "--no-sandbox", "--disable-dev-shm-usage", "--disable-web-security", "--disable-features=IsolateOrigins,site-per-process", "--disable-background-timer-throttling", "--disable-backgrounding-occluded-windows", "--disable-renderer-backgrounding", "--disable-ipc-flooding-protection" ] ) self.context = self.browser.new_context( user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", viewport={"width": 1366, "height": 768}, device_scale_factor=1, is_mobile=False, has_touch=False, locale="zh-CN", timezone_id="Asia/Shanghai" ) self.page = self.context.new_page() self.anti_detection() self.enhanced_anti_detection() # 立即执行一次反检测 self.page.evaluate(""" delete navigator.__proto__.webdriver; """) self.human_like_actions() self.random_behavior() def close_browser(self): """关闭浏览器""" if self.context: self.context.close() if self.browser: self.browser.close() if self.playwright: self.playwright.stop() def save_page_html(self, filepath): """保存当前页面HTML到文件""" try: # 获取页面HTML内容 html_content = self.page.content() # 确保目录存在 create_directory(os.path.dirname(filepath)) # 写入文件 with open(filepath, 'w', encoding='utf-8') as f: f.write(html_content) print(f"页面HTML已保存到: {filepath}") except Exception as e: print(f"保存页面HTML失败: {e}") def visit_page(self, url): """访问指定页面""" try: # 在 `visit_page` 之前执行更全面的反检测 self.page.add_init_script(""" // 隐藏webdriver属性 delete navigator.__proto__.webdriver; // 伪装chrome属性 Object.defineProperty(navigator, 'chrome', { value: { runtime: {}, loadTimes: function() {} }, writable: false, enumerable: true, configurable: true }); // 禁用调试功能 window.console.debug = function() {}; window.console.log = function() {}; """) # 设置更真实的请求头 self.page.set_extra_http_headers({ "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Accept-Encoding": "gzip, deflate, br", "Upgrade-Insecure-Requests": "1", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none", "Cache-Control": "max-age=0" }) self.page.goto(url) # 页面加载后执行反检测 self.page.evaluate(""" delete navigator.__proto__.webdriver; """) # self.page.wait_for_load_state("networkidle") # 3. 等待页面加载状态而不是特定元素 try: self.page.wait_for_load_state('networkidle', timeout=15000) print("networkidle, timeout=5000页面已加载") except Exception as e: print(f"等待页面加载状态时出错: {e}") # self.page.wait_for_load_state('networkidle', timeout=5000) self.human_like_actions() self.random_behavior() return True except Exception as e: print(f"访问页面失败: {url}, 错误: {str(e)}") return False def extract_links(self, selector="h2 a"): """ 提取页面中的链接 Args: selector (str): CSS选择器,默认为"h2 a" Returns: list: 包含title、url和request_url的字典列表 """ links = [] try: elements = self.page.query_selector_all(selector) for element in elements: try: title = element.inner_text().replace('\n', '').replace(',', ' ').strip() url = element.get_attribute('href') current_url = self.page.url if not url.startswith(('http://', 'https://')): # 处理相对链接 url = urllib.parse.urljoin(current_url, url) if title and url: links.append({ 'title': title, 'url': url, 'request_url': current_url }) except Exception as e: print(f"提取链接失败: {e}") continue except Exception as e: print(f"提取链接失败: {e}") return links # # # 模拟点击搜索按钮 # browser.click_element('#sb_form_go') # # # 在搜索框输入并回车 # browser.input_and_enter('#sb_form_q', '搜索关键词') # # # 使用通用方法点击搜索按钮 # browser.interact_with_element('#sb_form_go', 'click') # # # 使用通用方法输入并回车 # browser.interact_with_element('#sb_form_q', 'input_enter', '搜索关键词') # # # 带回调的交互 # def search_callback(success, selector): # if success: # print(f"成功操作元素: {selector}") # else: # print(f"操作元素失败: {selector}") # # browser.interact_with_element( # '#sb_form_q', # 'input_enter', # '搜索关键词', # search_callback # )