Files
SearchCompany/tool/web_browser.py
2025-11-13 07:28:15 +08:00

572 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# file: tool/web_browser.py
import random
import json
import os
import time
import urllib.parse
from playwright.async_api import async_playwright
from playwright.sync_api import sync_playwright
def create_directory(directory_path):
"""创建目录(如果不存在)"""
if not os.path.exists(directory_path):
os.makedirs(directory_path)
print(f"已创建目录: {directory_path}")
class WebBrowser:
def __init__(self, cookie_path="browser_cookies.json"):
self.cookie_path = cookie_path
self.browser = None
self.context = None
self.page = None
self.playwright = None
def human_like_actions(self):
"""更逼真的人类行为模拟"""
# 模拟页面加载后的自然浏览行为
time.sleep(random.uniform(1, 3))
# 模拟鼠标悬停
try:
hover_elements = self.page.query_selector_all("a, button")
if hover_elements:
element = random.choice(hover_elements[:min(5, len(hover_elements))])
self.page.mouse.move(0, 0) # 先移开
time.sleep(0.5)
element.hover()
time.sleep(random.uniform(0.5, 1.5))
except:
pass
def click_element(self, selector):
"""
模拟点击页面元素
Args:
selector (str): CSS选择器或XPath表达式
Returns:
bool: 点击成功返回True否则返回False
"""
try:
# 等待元素出现
self.page.wait_for_selector(selector, timeout=10000)
# 查找元素
element = self.page.query_selector(selector)
if not element:
print(f"未找到元素: {selector}")
return False
# 模拟鼠标移动到元素
element.hover()
time.sleep(random.uniform(0.5, 1.0))
# 点击元素
element.click()
# 模拟人类点击后的等待
time.sleep(random.uniform(1, 2))
return True
except Exception as e:
print(f"点击元素失败: {selector}, 错误: {str(e)}")
return False
def input_and_enter(self, selector, text):
"""
在指定输入框输入文本并按回车键
Args:
selector (str): 输入框的CSS选择器
text (str): 要输入的文本
Returns:
bool: 输入成功返回True否则返回False
"""
try:
# 等待输入框出现
self.page.wait_for_selector(selector, timeout=10000)
# 查找输入框元素
input_element = self.page.query_selector(selector)
if not input_element:
print(f"未找到输入框: {selector}")
return False
# 点击输入框以获得焦点
input_element.click()
time.sleep(random.uniform(0.5, 1.0))
# 清空现有内容并输入新文本
input_element.fill(text)
# 模拟输入间隔
time.sleep(random.uniform(1, 2))
# 按回车键
input_element.press('Enter')
# 等待页面响应
time.sleep(random.uniform(2, 3))
return True
except Exception as e:
print(f"输入并回车失败: {selector}, 错误: {str(e)}")
return False
def interact_with_element(self, selector, action_type="click", text=None, callback=None):
"""
通用元素交互方法,支持多种操作类型和回调
Args:
selector (str): 元素的CSS选择器
action_type (str): 操作类型 ("click", "input_enter", "hover")
text (str): 输入文本仅在action_type为"input_enter"时需要)
callback (function): 回调函数,在操作完成后执行
Returns:
bool: 操作成功返回True否则返回False
"""
try:
# 等待元素出现
self.page.wait_for_selector(selector, timeout=10000)
element = self.page.query_selector(selector)
if not element:
print(f"未找到元素: {selector}")
return False
result = False
if action_type == "click":
# 模拟点击
element.hover()
time.sleep(random.uniform(0.5, 1.0))
element.click()
result = True
elif action_type == "input_enter":
if text is None:
print("输入操作需要提供text参数")
return False
# 模拟输入并回车
element.click()
time.sleep(random.uniform(0.5, 1.0))
element.fill(text)
time.sleep(random.uniform(1, 2))
element.press('Enter')
result = True
elif action_type == "hover":
# 模拟悬停
element.hover()
time.sleep(random.uniform(1, 2))
result = True
# 模拟人类行为延迟
time.sleep(random.uniform(1, 2))
# 执行回调函数
if callback and callable(callback):
try:
callback(result, selector)
except Exception as e:
print(f"回调函数执行失败: {e}")
return result
except Exception as e:
print(f"元素交互失败: {selector}, 错误: {str(e)}")
if callback and callable(callback):
try:
callback(False, selector)
except Exception as cb_e:
print(f"回调函数执行失败: {cb_e}")
return False
def get_random_user_agent(self):
"""获取随机User-Agent"""
user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
]
return random.choice(user_agents)
def enhanced_anti_detection(self):
"""增强的反检测脚本"""
self.page.add_init_script("""
// 更彻底地隐藏webdriver痕迹
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
// 伪装chrome csi和loadTimes
if (!window.chrome) {
window.chrome = {
runtime: {}
};
}
// 伪装permissions查询
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => {
if (parameters.name === 'notifications') {
return Promise.resolve({
state: Notification.permission
});
}
return originalQuery(parameters);
};
""")
def anti_detection(self):
"""注入更全面的反检测脚本"""
self.page.add_init_script("""
// 隐藏webdriver属性
delete navigator.__proto__.webdriver;
// 伪装chrome属性
Object.defineProperty(navigator, 'chrome', {
value: {
runtime: {},
loadTimes: function() {}
},
writable: false,
enumerable: true,
configurable: true
});
// 伪装plugins和mimeTypes
Object.defineProperty(navigator, 'plugins', {
get: () => [
{ 0: { type: 'application/pdf' } },
{ 0: { type: 'application/x-google-chrome-pdf' } }
],
});
Object.defineProperty(navigator, 'mimeTypes', {
get: () => [
{ type: 'application/pdf' },
{ type: 'application/x-google-chrome-pdf' }
],
});
// 伪装languages
Object.defineProperty(navigator, 'languages', {
get: () => ['zh-CN', 'zh'],
});
// 禁用调试功能
window.console.debug = function() {};
window.console.log = function() {};
// 伪装屏幕信息
Object.defineProperty(screen, 'width', {get: () => 1366});
Object.defineProperty(screen, 'height', {get: () => 768});
Object.defineProperty(screen, 'availWidth', {get: () => 1366});
Object.defineProperty(screen, 'availHeight', {get: () => 768});
Object.defineProperty(screen, 'colorDepth', {get: () => 24});
Object.defineProperty(screen, 'pixelDepth', {get: () => 24});
// 伪装时间戳
window.chrome = {
runtime: {}
};
// 伪装outerHeight和outerWidth
Object.defineProperty(window, 'outerHeight', {get: () => 768});
Object.defineProperty(window, 'outerWidth', {get: () => 1366});
// 伪装innerHeight和innerWidth
Object.defineProperty(window, 'innerHeight', {get: () => 768});
Object.defineProperty(window, 'innerWidth', {get: () => 1366});
""")
def random_behavior(self):
"""模拟更复杂的人类操作"""
# 随机等待
time.sleep(random.uniform(2, 5))
# 随机鼠标移动
for _ in range(random.randint(3, 7)):
self.page.mouse.move(
random.randint(100, 1200),
random.randint(100, 600)
)
time.sleep(random.uniform(0.1, 0.8))
# 随机滚动页面
if random.choice([True, False]):
scroll_distance = random.randint(200, 800)
self.page.mouse.wheel(0, scroll_distance)
time.sleep(random.uniform(1, 2))
def init_cookie_file(self):
"""初始化cookie文件"""
if not os.path.exists(self.cookie_path):
with open(self.cookie_path, 'w') as f:
json.dump([], f)
def save_cookies(self):
"""保存cookies到文件"""
cookies = self.context.cookies()
with open(self.cookie_path, 'w') as f:
json.dump(cookies, f, indent=2)
def load_cookies(self):
"""从文件加载cookies"""
try:
with open(self.cookie_path, 'r') as f:
cookies = json.load(f)
if cookies:
self.context.add_cookies(cookies)
return True
return False
except:
return False
def start_browser_with_proxy(self, proxy_host=None, proxy_port=None):
"""启动带代理的浏览器"""
self.init_cookie_file()
browser_args = [
"--disable-blink-features=AutomationControlled",
"--disable-infobars",
"--no-sandbox",
"--disable-dev-shm-usage"
]
if proxy_host and proxy_port:
browser_args.append(f"--proxy-server=http://{proxy_host}:{proxy_port}")
self.playwright = sync_playwright().start()
self.browser = self.playwright.chromium.launch(
headless=False,
args=browser_args
)
# ... 其余初始化代码
def bypass_debugger(self):
"""绕过调试器检测"""
self.page.add_init_script("""
window.Function.prototype.constructor = function() {};
window.console.debug = function(){};
Object.defineProperty(navigator, 'webdriver', {get: () => false});
""")
async def start_browser_async(self):
"""异步启动浏览器"""
try:
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch(
headless=False,
args=[
"--disable-blink-features=AutomationControlled",
"--disable-infobars"
]
)
self.context = await self.browser.new_context()
self.page = await self.context.new_page()
return True
except Exception as e:
print(f"异步启动浏览器失败: {e}")
return False
async def close_browser_async(self):
"""异步关闭浏览器"""
if self.browser:
await self.browser.close()
if self.playwright:
await self.playwright.stop()
def start_browser(self):
"""启动浏览器"""
self.init_cookie_file()
self.playwright = sync_playwright().start()
self.browser = self.playwright.chromium.launch(
headless=False,
args=[
"--disable-blink-features=AutomationControlled",
"--disable-infobars",
"--disable-extensions",
"--disable-plugins",
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-web-security",
"--disable-features=IsolateOrigins,site-per-process",
"--disable-background-timer-throttling",
"--disable-backgrounding-occluded-windows",
"--disable-renderer-backgrounding",
"--disable-ipc-flooding-protection"
]
)
self.context = self.browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
viewport={"width": 1366, "height": 768},
device_scale_factor=1,
is_mobile=False,
has_touch=False,
locale="zh-CN",
timezone_id="Asia/Shanghai"
)
self.page = self.context.new_page()
self.anti_detection()
self.enhanced_anti_detection()
# 立即执行一次反检测
self.page.evaluate("""
delete navigator.__proto__.webdriver;
""")
self.human_like_actions()
self.random_behavior()
def close_browser(self):
"""关闭浏览器"""
if self.context:
self.context.close()
if self.browser:
self.browser.close()
if self.playwright:
self.playwright.stop()
def save_page_html(self, filepath):
"""保存当前页面HTML到文件"""
try:
# 获取页面HTML内容
html_content = self.page.content()
# 确保目录存在
create_directory(os.path.dirname(filepath))
# 写入文件
with open(filepath, 'w', encoding='utf-8') as f:
f.write(html_content)
print(f"页面HTML已保存到: {filepath}")
except Exception as e:
print(f"保存页面HTML失败: {e}")
def visit_page(self, url):
"""访问指定页面"""
try:
# 在 `visit_page` 之前执行更全面的反检测
self.page.add_init_script("""
// 隐藏webdriver属性
delete navigator.__proto__.webdriver;
// 伪装chrome属性
Object.defineProperty(navigator, 'chrome', {
value: {
runtime: {},
loadTimes: function() {}
},
writable: false,
enumerable: true,
configurable: true
});
// 禁用调试功能
window.console.debug = function() {};
window.console.log = function() {};
""")
# 设置更真实的请求头
self.page.set_extra_http_headers({
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Cache-Control": "max-age=0"
})
self.page.goto(url)
# 页面加载后执行反检测
self.page.evaluate("""
delete navigator.__proto__.webdriver;
""")
# self.page.wait_for_load_state("networkidle")
# 3. 等待页面加载状态而不是特定元素
try:
self.page.wait_for_load_state('networkidle', timeout=15000)
print("networkidle, timeout=5000页面已加载")
except Exception as e:
print(f"等待页面加载状态时出错: {e}")
# self.page.wait_for_load_state('networkidle', timeout=5000)
self.human_like_actions()
self.random_behavior()
return True
except Exception as e:
print(f"访问页面失败: {url}, 错误: {str(e)}")
return False
def extract_links(self, selector="h2 a"):
"""
提取页面中的链接
Args:
selector (str): CSS选择器默认为"h2 a"
Returns:
list: 包含title、url和request_url的字典列表
"""
links = []
try:
elements = self.page.query_selector_all(selector)
for element in elements:
try:
title = element.inner_text().replace('\n', '').replace(',', ' ').strip()
url = element.get_attribute('href')
current_url = self.page.url
if not url.startswith(('http://', 'https://')):
# 处理相对链接
url = urllib.parse.urljoin(current_url, url)
if title and url:
links.append({
'title': title,
'url': url,
'request_url': current_url
})
except Exception as e:
print(f"提取链接失败: {e}")
continue
except Exception as e:
print(f"提取链接失败: {e}")
return links
#
# # 模拟点击搜索按钮
# browser.click_element('#sb_form_go')
#
# # 在搜索框输入并回车
# browser.input_and_enter('#sb_form_q', '搜索关键词')
#
# # 使用通用方法点击搜索按钮
# browser.interact_with_element('#sb_form_go', 'click')
#
# # 使用通用方法输入并回车
# browser.interact_with_element('#sb_form_q', 'input_enter', '搜索关键词')
#
# # 带回调的交互
# def search_callback(success, selector):
# if success:
# print(f"成功操作元素: {selector}")
# else:
# print(f"操作元素失败: {selector}")
#
# browser.interact_with_element(
# '#sb_form_q',
# 'input_enter',
# '搜索关键词',
# search_callback
# )