Files
SearchCompany/tool/web_browser.py
manchuwork de3c97e828 aiqicha
2025-11-03 18:57:58 +08:00

397 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# file: tool/web_browser.py
import random
import json
import os
import time
import urllib.parse
from playwright.async_api import async_playwright
from playwright.sync_api import sync_playwright
def create_directory(directory_path):
"""创建目录(如果不存在)"""
if not os.path.exists(directory_path):
os.makedirs(directory_path)
print(f"已创建目录: {directory_path}")
class WebBrowser:
def __init__(self, cookie_path="browser_cookies.json"):
self.cookie_path = cookie_path
self.browser = None
self.context = None
self.page = None
self.playwright = None
def human_like_actions(self):
"""更逼真的人类行为模拟"""
# 模拟页面加载后的自然浏览行为
time.sleep(random.uniform(1, 3))
# 模拟鼠标悬停
try:
hover_elements = self.page.query_selector_all("a, button")
if hover_elements:
element = random.choice(hover_elements[:min(5, len(hover_elements))])
self.page.mouse.move(0, 0) # 先移开
time.sleep(0.5)
element.hover()
time.sleep(random.uniform(0.5, 1.5))
except:
pass
def get_random_user_agent(self):
"""获取随机User-Agent"""
user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
]
return random.choice(user_agents)
def enhanced_anti_detection(self):
"""增强的反检测脚本"""
self.page.add_init_script("""
// 更彻底地隐藏webdriver痕迹
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
// 伪装chrome csi和loadTimes
if (!window.chrome) {
window.chrome = {
runtime: {}
};
}
// 伪装permissions查询
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => {
if (parameters.name === 'notifications') {
return Promise.resolve({
state: Notification.permission
});
}
return originalQuery(parameters);
};
""")
def anti_detection(self):
"""注入更全面的反检测脚本"""
self.page.add_init_script("""
// 隐藏webdriver属性
delete navigator.__proto__.webdriver;
// 伪装chrome属性
Object.defineProperty(navigator, 'chrome', {
value: {
runtime: {},
loadTimes: function() {}
},
writable: false,
enumerable: true,
configurable: true
});
// 伪装plugins和mimeTypes
Object.defineProperty(navigator, 'plugins', {
get: () => [
{ 0: { type: 'application/pdf' } },
{ 0: { type: 'application/x-google-chrome-pdf' } }
],
});
Object.defineProperty(navigator, 'mimeTypes', {
get: () => [
{ type: 'application/pdf' },
{ type: 'application/x-google-chrome-pdf' }
],
});
// 伪装languages
Object.defineProperty(navigator, 'languages', {
get: () => ['zh-CN', 'zh'],
});
// 禁用调试功能
window.console.debug = function() {};
window.console.log = function() {};
// 伪装屏幕信息
Object.defineProperty(screen, 'width', {get: () => 1366});
Object.defineProperty(screen, 'height', {get: () => 768});
Object.defineProperty(screen, 'availWidth', {get: () => 1366});
Object.defineProperty(screen, 'availHeight', {get: () => 768});
Object.defineProperty(screen, 'colorDepth', {get: () => 24});
Object.defineProperty(screen, 'pixelDepth', {get: () => 24});
// 伪装时间戳
window.chrome = {
runtime: {}
};
// 伪装outerHeight和outerWidth
Object.defineProperty(window, 'outerHeight', {get: () => 768});
Object.defineProperty(window, 'outerWidth', {get: () => 1366});
// 伪装innerHeight和innerWidth
Object.defineProperty(window, 'innerHeight', {get: () => 768});
Object.defineProperty(window, 'innerWidth', {get: () => 1366});
""")
def random_behavior(self):
"""模拟更复杂的人类操作"""
# 随机等待
time.sleep(random.uniform(2, 5))
# 随机鼠标移动
for _ in range(random.randint(3, 7)):
self.page.mouse.move(
random.randint(100, 1200),
random.randint(100, 600)
)
time.sleep(random.uniform(0.1, 0.8))
# 随机滚动页面
if random.choice([True, False]):
scroll_distance = random.randint(200, 800)
self.page.mouse.wheel(0, scroll_distance)
time.sleep(random.uniform(1, 2))
def init_cookie_file(self):
"""初始化cookie文件"""
if not os.path.exists(self.cookie_path):
with open(self.cookie_path, 'w') as f:
json.dump([], f)
def save_cookies(self):
"""保存cookies到文件"""
cookies = self.context.cookies()
with open(self.cookie_path, 'w') as f:
json.dump(cookies, f, indent=2)
def load_cookies(self):
"""从文件加载cookies"""
try:
with open(self.cookie_path, 'r') as f:
cookies = json.load(f)
if cookies:
self.context.add_cookies(cookies)
return True
return False
except:
return False
def start_browser_with_proxy(self, proxy_host=None, proxy_port=None):
"""启动带代理的浏览器"""
self.init_cookie_file()
browser_args = [
"--disable-blink-features=AutomationControlled",
"--disable-infobars",
"--no-sandbox",
"--disable-dev-shm-usage"
]
if proxy_host and proxy_port:
browser_args.append(f"--proxy-server=http://{proxy_host}:{proxy_port}")
self.playwright = sync_playwright().start()
self.browser = self.playwright.chromium.launch(
headless=False,
args=browser_args
)
# ... 其余初始化代码
def bypass_debugger(self):
"""绕过调试器检测"""
self.page.add_init_script("""
window.Function.prototype.constructor = function() {};
window.console.debug = function(){};
Object.defineProperty(navigator, 'webdriver', {get: () => false});
""")
async def start_browser_async(self):
"""异步启动浏览器"""
try:
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch(
headless=False,
args=[
"--disable-blink-features=AutomationControlled",
"--disable-infobars"
]
)
self.context = await self.browser.new_context()
self.page = await self.context.new_page()
return True
except Exception as e:
print(f"异步启动浏览器失败: {e}")
return False
async def close_browser_async(self):
"""异步关闭浏览器"""
if self.browser:
await self.browser.close()
if self.playwright:
await self.playwright.stop()
def start_browser(self):
"""启动浏览器"""
self.init_cookie_file()
self.playwright = sync_playwright().start()
self.browser = self.playwright.chromium.launch(
headless=False,
args=[
"--disable-blink-features=AutomationControlled",
"--disable-infobars",
"--disable-extensions",
"--disable-plugins",
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-web-security",
"--disable-features=IsolateOrigins,site-per-process",
"--disable-background-timer-throttling",
"--disable-backgrounding-occluded-windows",
"--disable-renderer-backgrounding",
"--disable-ipc-flooding-protection"
]
)
self.context = self.browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
viewport={"width": 1366, "height": 768},
device_scale_factor=1,
is_mobile=False,
has_touch=False,
locale="zh-CN",
timezone_id="Asia/Shanghai"
)
self.page = self.context.new_page()
self.anti_detection()
self.enhanced_anti_detection()
# 立即执行一次反检测
self.page.evaluate("""
delete navigator.__proto__.webdriver;
""")
self.human_like_actions()
self.random_behavior()
def close_browser(self):
"""关闭浏览器"""
if self.context:
self.context.close()
if self.browser:
self.browser.close()
if self.playwright:
self.playwright.stop()
def save_page_html(self, filepath):
"""保存当前页面HTML到文件"""
try:
# 获取页面HTML内容
html_content = self.page.content()
# 确保目录存在
create_directory(os.path.dirname(filepath))
# 写入文件
with open(filepath, 'w', encoding='utf-8') as f:
f.write(html_content)
print(f"页面HTML已保存到: {filepath}")
except Exception as e:
print(f"保存页面HTML失败: {e}")
def visit_page(self, url):
"""访问指定页面"""
try:
# 在 `visit_page` 之前执行更全面的反检测
self.page.add_init_script("""
// 隐藏webdriver属性
delete navigator.__proto__.webdriver;
// 伪装chrome属性
Object.defineProperty(navigator, 'chrome', {
value: {
runtime: {},
loadTimes: function() {}
},
writable: false,
enumerable: true,
configurable: true
});
// 禁用调试功能
window.console.debug = function() {};
window.console.log = function() {};
""")
# 设置更真实的请求头
self.page.set_extra_http_headers({
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Cache-Control": "max-age=0"
})
self.page.goto(url)
# 页面加载后执行反检测
self.page.evaluate("""
delete navigator.__proto__.webdriver;
""")
# self.page.wait_for_load_state("networkidle")
# 3. 等待页面加载状态而不是特定元素
try:
self.page.wait_for_load_state('networkidle', timeout=5000)
print("networkidle, timeout=5000页面已加载")
except Exception as e:
print(f"等待页面加载状态时出错: {e}")
# self.page.wait_for_load_state('networkidle', timeout=5000)
self.human_like_actions()
self.random_behavior()
return True
except Exception as e:
print(f"访问页面失败: {url}, 错误: {str(e)}")
return False
def extract_links(self, selector="h2 a"):
"""
提取页面中的链接
Args:
selector (str): CSS选择器默认为"h2 a"
Returns:
list: 包含title、url和request_url的字典列表
"""
links = []
try:
elements = self.page.query_selector_all(selector)
for element in elements:
try:
title = element.inner_text().replace('\n', '').replace(',', ' ').strip()
url = element.get_attribute('href')
current_url = self.page.url
if not url.startswith(('http://', 'https://')):
# 处理相对链接
url = urllib.parse.urljoin(current_url, url)
if title and url:
links.append({
'title': title,
'url': url,
'request_url': current_url
})
except Exception as e:
print(f"提取链接失败: {e}")
continue
except Exception as e:
print(f"提取链接失败: {e}")
return links