This commit is contained in:
manchuwork
2025-09-25 03:19:34 +08:00
parent 9d0f18a121
commit 102dd78c26
13 changed files with 1987 additions and 56 deletions

319
company/aiqicha_crawler.py Normal file
View File

@@ -0,0 +1,319 @@
import random
from playwright.sync_api import sync_playwright
import json
import os
import time
COOKIE_PATH = "aiqicha_cookies.json"
class AiqichaCrawler:
def __init__(self):
self.browser = None
self.context = None
self.page = None
def anti_detection(self):
"""注入更全面的反检测脚本"""
self.page.add_init_script("""
// 隐藏webdriver属性
delete navigator.__proto__.webdriver;
// 伪装chrome属性
Object.defineProperty(navigator, 'chrome', {
value: {
runtime: {},
loadTimes: function() {}
},
writable: false,
enumerable: true,
configurable: true
});
// 伪装plugins和mimeTypes
Object.defineProperty(navigator, 'plugins', {
get: () => [
{ 0: { type: 'application/pdf' } },
{ 0: { type: 'application/x-google-chrome-pdf' } }
],
});
Object.defineProperty(navigator, 'mimeTypes', {
get: () => [
{ type: 'application/pdf' },
{ type: 'application/x-google-chrome-pdf' }
],
});
// 伪装languages
Object.defineProperty(navigator, 'languages', {
get: () => ['zh-CN', 'zh'],
});
// 禁用调试功能
window.console.debug = function() {};
window.console.log = function() {};
// 伪装屏幕信息
Object.defineProperty(screen, 'width', {get: () => 1366});
Object.defineProperty(screen, 'height', {get: () => 768});
Object.defineProperty(screen, 'availWidth', {get: () => 1366});
Object.defineProperty(screen, 'availHeight', {get: () => 768});
Object.defineProperty(screen, 'colorDepth', {get: () => 24});
Object.defineProperty(screen, 'pixelDepth', {get: () => 24});
// 伪装时间戳
window.chrome = {
runtime: {}
};
// 伪装outerHeight和outerWidth
Object.defineProperty(window, 'outerHeight', {get: () => 768});
Object.defineProperty(window, 'outerWidth', {get: () => 1366});
// 伪装innerHeight和innerWidth
Object.defineProperty(window, 'innerHeight', {get: () => 768});
Object.defineProperty(window, 'innerWidth', {get: () => 1366});
""")
def random_behavior(self):
"""模拟更复杂的人类操作"""
# 随机等待
time.sleep(random.uniform(2, 5))
# 随机鼠标移动
for _ in range(random.randint(3, 7)):
self.page.mouse.move(
random.randint(100, 1200),
random.randint(100, 600)
)
time.sleep(random.uniform(0.1, 0.8))
# 随机滚动页面
if random.choice([True, False]):
scroll_distance = random.randint(200, 800)
self.page.mouse.wheel(0, scroll_distance)
time.sleep(random.uniform(1, 2))
def init_cookie_file(self):
if not os.path.exists(COOKIE_PATH):
with open(COOKIE_PATH, 'w') as f:
json.dump([], f)
def save_cookies(self):
cookies = self.context.cookies()
with open(COOKIE_PATH, 'w') as f:
json.dump(cookies, f, indent=2)
def load_cookies(self):
try:
with open(COOKIE_PATH, 'r') as f:
cookies = json.load(f)
if cookies:
self.context.add_cookies(cookies)
return True
return False
except:
return False
def bypass_debugger(self):
self.page.add_init_script("""
window.Function.prototype.constructor = function() {};
window.console.debug = function(){};
Object.defineProperty(navigator, 'webdriver', {get: () => false});
""")
def check_login_status(self):
"""检测登录状态返回True表示已登录"""
try:
# 先关闭可能的功能上新弹窗
self.close_feature_popup()
# 等待页面加载完成
self.page.wait_for_load_state("networkidle")
# 优先检查 .header-user-center-menu 元素判断是否已登录
logged_in_elements = self.page.query_selector_all('.header-user-center-menu, .user-center')
for element in logged_in_elements:
if element and element.is_visible():
print("检测到已登录状态")
return True
# 检测用户中心元素判断已登录
user_center = self.page.query_selector('.user-center')
if user_center and user_center.is_visible():
print("检测到已登录状态")
return True
# 检测登录相关元素
#self.page.wait_for_selector('.ivu-tooltip-light', timeout=10000)
#self.page.wait_for_selector('img[src*="app-qrcode.png"]', timeout=20000)
#print("检测到未登录状态")
#return False
# 检测登录相关元素
login_element = self.page.query_selector('.login')
if login_element and login_element.is_visible():
print("检测到未登录状态")
return False
except:
try:
# 检测用户中心元素判断已登录
self.page.wait_for_selector('.user-center', timeout=3000)
print("检测到已登录状态")
return True
except:
print("登录状态检测异常")
return False
def close_feature_popup(self):
"""关闭功能上新弹窗"""
try:
# 查找并点击关闭按钮
close_buttons = self.page.query_selector_all('.close-icon.ivu-icon-ios-close')
for close_button in close_buttons:
if close_button.is_visible():
close_button.click()
print("已关闭功能上新弹窗")
# 等待弹窗消失
time.sleep(1)
break
except Exception as e:
# 如果没有找到弹窗,继续执行
pass
def login(self):
"""带状态检测的登录流程"""
self.page.goto("https://aiqicha.baidu.com")
# 页面加载后执行反检测
self.page.evaluate("""
delete navigator.__proto__.webdriver;
""")
# 等待页面加载完成
self.page.wait_for_load_state("networkidle")
# 关闭可能的功能上新弹窗
self.close_feature_popup()
if not self.check_login_status():
print("开始执行登录流程...")
# 点击登录按钮
login_btn = self.page.wait_for_selector('.login', timeout=20000)
login_btn.click()
# try:
# 等待二维码容器出现并确保可见
# self.page.wait_for_selector('.app-qrcode', timeout=20000)
print("请扫描页面二维码登录...")
time.sleep(3) # 给一些时间让二维码完全加载
# 等待登录完成
# 等待登录完成先尝试URL检测失败后再尝试元素检测
# try:
# self.page.wait_for_url("https://aiqicha.baidu.com/usercenter/**", timeout=5000)
# except:
# # 如果URL检测失败尝试通过元素检测
# self.page.wait_for_selector('.header-user-center-menu, .user-center', timeout=10000)
# 如果URL检测失败尝试通过元素检测
self.page.wait_for_selector('.header-user-center-menu, .user-center', timeout=10000)
# self.page.wait_for_url("**/usercenter**", timeout=120000)
self.save_cookies()
print("登录成功!")
def search_company(self, company_name):
self.page.goto(f"https://aiqicha.baidu.com/s?q={company_name}")
# 页面加载后执行反检测
self.page.evaluate("""
delete navigator.__proto__.webdriver;
""")
# 关闭可能的功能上新弹窗
self.close_feature_popup()
self.page.wait_for_selector(".search-item", timeout=10000)
# 提取企业基础信息
company_card = self.page.query_selector(".search-item")
return {
"name": company_card.query_selector(".company-name").inner_text(),
"legal_person": company_card.query_selector(".legal-person").inner_text(),
"reg_capital": company_card.query_selector(".reg-capital").inner_text(),
"status": company_card.query_selector(".company-status").inner_text()
}
def run(self, companies):
self.init_cookie_file()
with sync_playwright() as p:
# self.browser = p.chromium.launch(headless=False)
self.browser = p.chromium.launch(
headless=False,
args=[
"--disable-blink-features=AutomationControlled",
"--disable-infobars",
"--disable-extensions",
"--disable-plugins",
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-web-security",
"--disable-features=IsolateOrigins,site-per-process",
"--disable-background-timer-throttling",
"--disable-backgrounding-occluded-windows",
"--disable-renderer-backgrounding",
"--disable-ipc-flooding-protection"
]
)
# self.context = self.browser.new_context()
self.context = self.browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
viewport={"width": 1366, "height": 768},
device_scale_factor=1,
is_mobile=False,
has_touch=False,
locale="zh-CN",
timezone_id="Asia/Shanghai"
)
self.page = self.context.new_page()
self.anti_detection()
# 立即执行一次反检测
self.page.evaluate("""
delete navigator.__proto__.webdriver;
""")
self.random_behavior()
if not self.load_cookies():
print("未找到有效Cookie开始登录流程...")
self.login()
else:
print("已加载Cookie验证登录状态...")
# 加载cookie后访问页面验证是否真正登录
self.page.goto("https://aiqicha.baidu.com")
# 等待页面加载完成
self.page.wait_for_load_state("networkidle")
# 验证登录状态
if not self.check_login_status():
print("Cookie已过期或无效重新登录...")
self.login()
else:
print("Cookie有效已登录")
for company in companies:
try:
data = self.search_company(company)
print(f"{data['name']} | 法人:{data['legal_person']} | 注册资本:{data['reg_capital']}")
self.save_cookies() # 每次操作后更新cookies
time.sleep(3) # 防止请求过快
except Exception as e:
print(f"查询 {company} 失败: {str(e)}")
self.context.close()
self.browser.close()
if __name__ == "__main__":
crawler = AiqichaCrawler()
companies = ["阿里巴巴", "腾讯科技", "华为技术"]
crawler.run(companies)

View File

@@ -268,16 +268,15 @@ def save_cookies(context, cookie_file):
print("已保存cookies到文件")
def wait_for_login(page, cookie_file):
def wait_for_login_and_save_cookies(page, cookie_file):
"""
等待用户扫码登录
等待用户扫码登录并保存cookies
"""
print("检测到需要登录,请使用手机扫码登录...")
print("登录成功后将自动跳转到目标页面")
# 等待页面跳转到非登录页面
page.wait_for_url("**/weblogin", timeout=3000)
page.wait_for_url(lambda url: "weblogin" not in url, timeout=120000)
# 等待页面跳转到非登录页面即跳转回firm页面
page.wait_for_url("**/firm/**", timeout=120000)
# 保存登录后的cookies
save_cookies(page.context, cookie_file)
@@ -299,22 +298,19 @@ def main():
page = context.new_page()
try:
# 尝试加载本地保存的cookies
if load_cookies(context, args.cookie_file):
print("使用已保存的登录信息")
# 启动应用时自动加载cookies文件
load_cookies(context, args.cookie_file)
# 访问指定URL
page.goto(args.url)
# 检查是否跳转到了登录页面
# 检查是否登录页面
if "weblogin" in page.url:
wait_for_login(page, args.cookie_file)
# 等待用户扫码登录并自动保存cookies
wait_for_login_and_save_cookies(page, args.cookie_file)
else:
print("已登录或无需登录")
# 重新访问目标URL确保页面正确加载
page.goto(args.url)
# 创建解析器并解析信息
parser = QCCParser(page)
company_info = parser.parse_company_info()
@@ -334,11 +330,3 @@ def main():
if __name__ == "__main__":
main()
# python qcc.py "https://www.qcc.com/firm/50b0e3189f2eb2b20304b255669ce1a1.html"
# # 首次运行需要扫码登录
# python qcc.py "https://www.qcc.com/firm/公司URL"
#
# # 后续运行将自动使用已保存的登录信息
# python qcc.py "https://www.qcc.com/firm/公司URL"
#
# # 指定自定义cookies文件
# python qcc.py --cookie-file my_cookies.txt "https://www.qcc.com/firm/公司URL"

View File

@@ -0,0 +1,114 @@
import random
from tool.csv_tool import CSVTool
from tool.aiqicha_detail import AiqichaDetailCrawler
import time
def query_init_company_data(csv_file_name):
# 创建CSV工具实例
csv_tool = CSVTool(
csv_file_name=csv_file_name,
headers=['title', 'url', 'web_site_type', 'request_url', 'company_name', 'create_time']
)
# 查询所有数据
all_data = csv_tool.get_all_data()
print("所有数据:", all_data)
return all_data
def crawl_and_save_aiqicha_details(input_csv, output_csv):
"""
从CSV文件中读取爱企查URL爬取企业详情并保存到新的CSV文件中
Args:
input_csv (str): 包含爱企查URL的输入CSV文件
output_csv (str): 保存企业详情的输出CSV文件
"""
# 读取输入数据
input_data = query_init_company_data(input_csv)
# 筛选出爱企查数据
aiqicha_data = [item for item in input_data if item['web_site_type'] == 'aiqicha']
print(f'找到 {len(aiqicha_data)} 条爱企查数据')
# 定义输出CSV的表头
output_headers = [
'company_name', # 公司名称
'credit_code', # 统一社会信用代码
'legal_representative', # 法定代表人
'registered_capital', # 注册资本
'establishment_date', # 成立日期
'business_status', # 经营状态
'address', # 公司地址
'business_scope', # 经营范围
'source_url', # 原始URL
'create_time' # 创建时间
]
# 创建输出CSV工具实例
output_csv_tool = CSVTool(
csv_file_name=output_csv,
headers=output_headers
)
# 使用爱企查详情爬虫
with AiqichaDetailCrawler() as crawler:
company_details = []
success_count = 0
for i, item in enumerate(aiqicha_data):
url = item['url']
refer_url: str = item['request_url']
print(f"正在处理: {url}")
# 爬取企业详情
detail = crawler.crawl_company_detail(url, refer_url)
if detail:
# 添加来源URL和公司名称
detail['source_url'] = url
# 转换字段名以匹配CSV表头
converted_item = {
'company_name': detail.get('name', ''),
'credit_code': detail.get('credit_code', ''),
'legal_representative': detail.get('legal_representative', ''),
'registered_capital': detail.get('registered_capital', ''),
'establishment_date': detail.get('establishment_date', ''),
'business_status': detail.get('business_status', ''),
'address': detail.get('address', ''),
'business_scope': detail.get('business_scope', ''),
'source_url': detail.get('source_url', '')
}
# 立即保存每条数据,避免数据丢失
written_count = output_csv_tool.save_data(
[converted_item],
unique_titles=['company_name'],
create_time=True
)
if written_count > 0:
success_count += 1
print(f"成功保存 {detail.get('name', '未知公司')} 的信息")
else:
print(f"保存 {detail.get('name', '未知公司')} 的信息失败(可能已存在)")
else:
print(f"获取 {url} 的信息失败")
# 添加延迟,避免请求过快
time.sleep(2)
next_sleep_interval = random.uniform(5, 15)
time.sleep(next_sleep_interval)
print(f"总共成功处理并保存了 {success_count} 条企业详情数据到 {output_csv}")
if __name__ == '__main__':
# 从原始搜索结果CSV中读取爱企查URL爬取详情并保存到新CSV文件
crawl_and_save_aiqicha_details('company_search_bing_data.csv', 'aiqicha_company_details.csv')
# 原有代码保留
# all_data = query_init_company_data('company_search_bing_data.csv')
# filter = [item for item in all_data if item['web_site_type'] == 'aiqicha']
# print('aiqicha数据:', filter)
# for item in filter:
# pass

322
main.py
View File

@@ -1,5 +1,7 @@
# -*- coding: utf-8 -*-
import asyncio
import csv
import os
import random
import aiohttp
@@ -15,6 +17,8 @@ from search import Bing,Baidu
import openpyxl
import ssl
from tool.bing_search import BingSearcher
from tool.csv_tool import CSVTool
from tool.read_csv import CSVReader
start = time.time()
@@ -68,23 +72,22 @@ def commend():
parser.print_help()
sys.exit()
return args
def search_company_info(company_name_arg, num):
keywords = company_name_arg
# for key in keyword:
# keywords = keywords + key + " "
keywords = keywords.strip()
result = Bing.bing_main(keywords, num)
def search_company_info(company_name_key, addon_args, num):
search_key = company_name_key.strip() + " " + addon_args
search_key = search_key.strip()
result = Bing.bing_main(search_key, num)
# for 循环 遍历 result[0] 和 result[1]
data_list =[]
for i in range(len(result[0])):
title= result[0][i]
url = result[1][i]
print(f"必应搜索爬取结果为,title:{title}, url:{url}")
if re.match(r"^https://aiqicha.baidu.com/company_detail_.*|https://www.qcc.com/firm/.*|https://www.tianyancha.com/company/.*", url):
data_list.append([title, url])
return data_list
return result
# for i in range(len(result[0])):
# title= result[0][i]
# url = result[1][i]
# print(f"必应搜索爬取结果为,title:{title}, url:{url}")
# if re.match(r"^https://aiqicha.baidu.com/company_detail_.*|https://www.qcc.com/firm/.*|https://www.tianyancha.com/company/.*", url):
# data_list.append({"title":title, "url":url})
# return data_list
def filter_company_sites(urls):
# urls https://www.tianyancha.com/company/5226478758
@@ -94,6 +97,33 @@ def filter_company_sites(urls):
filtered_urls = [url for url in urls if re.match(r"^https://aiqicha.baidu.com/company_detail_.*|https://www.qcc.com/firm/.*|https://www.tianyancha.com/company/.*", url)]
return filtered_urls
def filter_aiqicha_qcc(search_result, company_name_, with_not_match = False):
datas = []
for i in range(len(search_result)):
data_node = search_result[i]
title = data_node['title']
url = data_node['url']
print(f"必应搜索爬取结果为,title:{title}, url:{url}")
# 判断title是否包含 company_name_
# if re.match(
# r"^https://aiqicha.baidu.com/company_detail_.*|https://www.qcc.com/firm/.*|https://www.tianyancha.com/company/.*",
# url) and title.find(company_name_) != -1:
if title.find(company_name_) != -1 or with_not_match:
web_site_type = None
if re.match(r"^https://aiqicha.baidu.com/company_detail_.*", url):
web_site_type = "aiqicha"
elif re.match(r"^https://www.tianyancha.com/company/.*", url):
web_site_type = "tianyancha"
elif re.match(r"^https://www.qcc.com/firm/.*", url):
web_site_type = "qcc"
if web_site_type is not None:
data_node['web_site_type'] = web_site_type
data_node['company_name'] = company_name_
datas.append(data_node)
return datas
def search_one_company(company_name_arg, num):
@@ -101,7 +131,7 @@ def search_one_company(company_name_arg, num):
# for key in keyword:
# keywords = keywords + key + " "
keywords = keywords.strip()
print(f"您搜索的关键词为:{keywords}")
print(f"---==您搜索的关键词为:{keywords}")
wb = openpyxl.Workbook()
# 删除默认创建的工作表(现在名为 "数据表1"
wb.remove(wb['Sheet'])
@@ -117,21 +147,269 @@ def search_one_company(company_name_arg, num):
end = time.time()
print(Fore.RED + f'脚本总时间: {end - start:.2f}')
def save_to_csv(filter_list):
if filter_list is None or len(filter_list) == 0:
print('filter_list is None or len(filter_list) == 0, 没有数据可写入')
return False
"""
将结果追加写入csv文件中
Args:
filter_list: 需要写入的数据列表
"""
csv_file = 'company_search_result_data.csv'
headers = ['title', 'url', 'web_site_type', 'request_url', 'company_name', 'create_time']
# 判断文件是否存在,不存在则创建并写入列头
file_exists = os.path.exists(csv_file)
# 读取现有数据,用于判断重复项
existing_data = set()
if file_exists:
with open(csv_file, 'r', encoding='utf-8') as f:
reader_ins = csv.reader(f)
header_skipped = False
for row in reader_ins:
if not header_skipped:
header_skipped = True
continue
if len(row) >= 5: # 确保行数据完整
company_name = row[4] # company_name在第5列(索引4)
web_site_type = row[2] if len(row) > 2 else "" # web_site_type在第3列(索引2)
existing_data.add((company_name, web_site_type))
# 写入数据
with open(csv_file, 'a', encoding='utf-8', newline='') as f:
writer = csv.writer(f)
# 如果文件不存在,写入列头
if not file_exists:
writer.writerow(headers)
# 追加写入数据,去重处理
for data_node in filter_list:
company_name = data_node.get('company_name', '')
web_site_type = data_node.get('web_site_type', '')
# 判断是否已存在相同的company_name和web_site_type组合
if (company_name, web_site_type) not in existing_data:
# 创建时间格式化
create_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 写入数据行
row_data = [
data_node.get('title', ''),
data_node.get('url', ''),
web_site_type,
data_node.get('request_url', ''),
company_name,
create_time
]
writer.writerow(row_data)
# 添加到已存在数据集合中,避免本次写入中的重复
existing_data.add((company_name, web_site_type))
print(f"写入数据成功,title:{data_node.get('title', '')}, "
f"url:{data_node.get('url', '')}, "
f"web_site_type:{web_site_type}, "
f"request_url:{data_node.get('request_url', '')}, "
f"company_name:{company_name}, "
f"create_time:{create_time}")
def check_company_exists(company_names, type_list):
"""
读取 company_search_result_data.csv 数据,检查指定的公司名称和类型是否存在
Args:
company_names (list): 公司名称列表
type_list (list): 类型列表
Returns:
list: 包含公司名称和存在状态的字典列表
格式: [{"company_name": "公司名", "exists": True/False}, ...]
"""
csv_file = 'company_search_result_data.csv'
result = []
# 初始化所有公司为不存在状态
for company_name_item in company_names:
result.append({
"company_name": company_name_item,
"exists": False
})
# 如果文件不存在,直接返回初始化结果
if not os.path.exists(csv_file):
return result
# 读取CSV文件中的现有数据
existing_combinations = set() # 存储(公司名, 类型)组合
try:
with open(csv_file, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
header_skipped = False
for row in reader:
if not header_skipped:
header_skipped = True
continue
# 确保行数据完整
if len(row) >= 5:
company_name_item = row[4] # company_name在第5列(索引4)
web_site_type = row[2] if len(row) > 2 else "" # web_site_type在第3列(索引2)
# 添加到现有组合集合中
existing_combinations.add((company_name_item, web_site_type))
except Exception as e:
print(f"读取CSV文件时出错: {e}")
return result
# 检查每个公司是否存在于指定的类型中
for item in result:
company_name_item = item["company_name"]
exists = False
# 如果type_list为空检查公司是否存在任何类型中
if not type_list:
for existing_company, _ in existing_combinations:
if existing_company == company_name_item:
exists = True
break
else:
# 检查公司是否存在于指定的类型中
for web_site_type in type_list:
if (company_name_item, web_site_type) in existing_combinations:
exists = True
break
item["exists"] = exists
return result
if __name__ == '__main__':
reader = CSVReader('data.csv')
company_names = reader.read_column(0, has_header=False)
print("所有数据:", company_names)
i= 1
for company_name in company_names:
# 检查已存在的公司
type_list = ["aiqicha", "qcc", "tianyancha"]
check_result = check_company_exists(company_names, type_list)
print("检查结果:", check_result)
i = 1
# 方法2: 使用上下文管理器
with BingSearcher() as searcher:
# 创建CSV工具实例
csv_tool = CSVTool(
csv_file_name='company_search_bing_data.csv',
headers=['title', 'url', 'web_site_type', 'request_url', 'company_name', 'create_time']
)
# 查询所有数据
all_data = csv_tool.get_all_data()
print("所有数据:", all_data)
# 查询所有数据
all_data = csv_tool.get_all_data()
print("所有数据:", all_data)
# 初始化所有公司为不存在状态
company_names_saved_set = set()
for company_name_item in all_data:
company_names_saved_set.add(company_name_item["company_name"])
for company_name in company_names:
# 如果公司已存在,跳过处理
if company_name in company_names_saved_set:
print(f"公司 {company_name} 已存在,跳过处理")
continue
# if company_exists:
# print(f"公司 {company_name} 已存在,跳过处理")
# continue
print(f"正在处理第 {i} 个公司: {company_name}")
addon_args = " 爱企查|企查查"
data_list = searcher.search(company_name+" "+addon_args, 1)
filter_list = filter_aiqicha_qcc(data_list, company_name)
print(company_name, "filter_list:", filter_list)
if len(filter_list) <= 0:
print("没有数据 filter_list is empty. " + company_name)
filter_list_with_not_match = filter_aiqicha_qcc(data_list, company_name, with_not_match=True)
# 创建CSV工具实例
csv_tool = CSVTool(
csv_file_name='company_search_filter_is_none_data.csv',
headers=['company_name','title', 'web_site_type','url', 'request_url', 'create_time']
)
# 保存数据,指定去重字段
csv_tool.save_data(filter_list_with_not_match, unique_titles=['company_name', 'title','url','web_site_type'])
continue
else:
# 创建CSV工具实例
csv_tool = CSVTool(
csv_file_name='company_search_bing_data.csv',
headers=['company_name','title', 'web_site_type','url', 'request_url', 'create_time']
)
# 保存数据,指定去重字段
csv_tool.save_data(filter_list,
unique_titles=['company_name', 'web_site_type'])
# save_to_csv(filter_list)
# i = i + 1
# if i > 3:
# print("结束循环")
# break
# results2 = searcher.search("腾讯", 1)
# results3 = searcher.search("百度", 1)
sleep_time = 5
sleep_time += random.randint(3, 10)
time.sleep(sleep_time)
pass
pass
if True:
print("exit")
exit(0)
i = 1
for company_name_ele in check_result:
company_name = company_name_ele["company_name"]
company_exists = company_name_ele["exists"]
# 如果公司已存在,跳过处理
if company_exists:
print(f"公司 {company_name} 已存在,跳过处理")
continue
sleep_time = 5
sleep_time += random.randint(1, 5)
sleep_time += random.randint(3, 10)
time.sleep(sleep_time)
company_name += " 爱企查|企查查"
data_list = search_company_info(company_name, '1')
print(data_list)
addon_args = " 爱企查|企查查"
data_list = search_company_info(company_name, addon_args, '1')
filter_list = filter_aiqicha_qcc(data_list, company_name)
print("filter_list:",filter_list)
save_to_csv(filter_list)
if len(filter_list)<= 0:
print("没有数据 filter_list is empty. "+company_name)
continue
i=i+1
if i > 1:
if i > 100:
break

View File

@@ -0,0 +1,85 @@
import os
import cv2
from paddleocr import PaddleOCR
import numpy as np
def imread_chinese(path):
"""支持中文路径的图像读取函数"""
try:
# 使用 numpy 读取文件
img_array = np.fromfile(path, dtype=np.uint8)
# 使用 imdecode 解码图像
img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
return img
except Exception as e:
print(f"读取图像失败 {path}: {e}")
return None
def split_image_vertically(img_path, split_num=3):
"""将图片垂直分割为三部分(上中下栏)"""
#img = cv2.imread(img_path)
img = imread_chinese(img_path)
if img is None:
print(f"无法读取图像: {img_path}")
return None
height = img.shape[0]
section_height = height // split_num
return [
img[i * section_height:(i + 1) * section_height, :]
for i in range(split_num)
]
def detect_vertical_text(ocr, img_section):
"""识别竖直英文文本"""
# 将图像旋转90度使竖直文本变为水平
rotated = cv2.rotate(img_section, cv2.ROTATE_90_CLOCKWISE)
result = ocr.predict(rotated, use_textline_orientation=True)
return [line[1][0] for line in result[0]] if result else []
def process_images(image_dir, start_num=1, end_num=1097):
"""批量处理图片序列"""
ocr = PaddleOCR(
lang='en',
use_textline_orientation=True,
text_det_unclip_ratio=2.0, # 调整检测框扩展系数
#rec_char_dict_path='en_dict.txt' # 英文专用字典
)
for i in range(start_num, end_num + 1):
img_path = os.path.join(image_dir, f"{i}.png")
if not os.path.exists(img_path):
continue
sections = split_image_vertically(img_path)
page_results = {
"page_number": i,
"sections": []
}
for idx, section in enumerate(sections):
# 识别页码(假设位于第一栏顶部)
if idx == 0:
page_results["detected_page"] = detect_vertical_text(ocr, section[:50, :])
# 识别各栏英文内容
eng_text = detect_vertical_text(ocr, section)
page_results["sections"].append({
"section": ["top", "middle", "bottom"][idx],
"english_text": eng_text
})
yield page_results
if __name__ == "__main__":
IMAGE_DIR = r"D:/gitstudy/pythonwork/manchuspider/data/满洲语字典/满汉大辞典/images"
for result in process_images(IMAGE_DIR):
print(f"Page {result['page_number']}:")
print(f"Detected Page No: {result.get('detected_page', 'N/A')}")
for section in result["sections"]:
print(f"{section['section']} section English: {', '.join(section['english_text'])}")
print("-" * 50)

View File

@@ -2,19 +2,58 @@ aiofiles==24.1.0
aiohappyeyeballs==2.4.0
aiohttp==3.10.5
aiosignal==1.3.1
annotated-types==0.7.0
async-timeout==4.0.3
attrs==24.2.0
bce-python-sdk==0.9.46
beautifulsoup4==4.12.3
certifi==2025.8.3
chardet==5.2.0
charset-normalizer==3.4.3
click==8.3.0
colorama==0.4.6
colorlog==6.9.0
et-xmlfile==1.1.0
filelock==3.19.1
frozenlist==1.4.1
fsspec==2025.9.0
future==1.0.0
greenlet==3.2.4
idna==3.8
imagesize==1.4.1
lxml==5.3.0
multidict==6.1.0
numpy==2.3.3
opencv-contrib-python==4.10.0.84
openpyxl==3.1.5
packaging==25.0
pandas==2.3.2
pillow==11.3.0
playwright==1.55.0
prettytable==3.16.0
psutil==7.1.0
py-cpuinfo==9.0.0
pyclipper==1.3.0.post6
pycryptodome==3.23.0
pydantic==2.11.9
pydantic_core==2.33.2
pyee==13.0.0
pypdfium2==4.30.0
python-dateutil==2.9.0.post0
pytz==2025.2
PyYAML==6.0.2
requests==2.32.5
ruamel.yaml==0.18.15
ruamel.yaml.clib==0.2.12
setuptools==80.9.0
shapely==2.1.1
six==1.17.0
soupsieve==2.6
tqdm==4.67.1
typing-inspection==0.4.1
typing_extensions==4.12.2
tzdata==2025.2
ujson==5.11.0
urllib3==2.5.0
wcwidth==0.2.13
yarl==1.11.1

View File

@@ -19,8 +19,9 @@ timeout = aiohttp.ClientTimeout(
sock_read=5.5 # 读取超时为5.5秒
)
async def getbing(url, session):
url_list = []
title_list = []
# url_list = []
# title_list = []
data_list =[]
async with session.get(url, headers=bingheaders,timeout=timeout) as resp:
# print("正在爬取url:"+url)
try:
@@ -34,19 +35,24 @@ async def getbing(url, session):
domain = 'https://cn.bing.com/'
hurl = urllib.parse.urljoin(domain, hurl)
print(htext," ",hurl)
title_list.append(htext)
url_list.append(hurl)
data_list.append({'title': htext, 'url': hurl,'request_url':url})
# title_list.append(htext)
# url_list.append(hurl)
except:
print(f"必应页面爬取失败,{url}该url无法正常获取数据。")
return [],[]
return url_list, title_list
return data_list
#url_list, title_list
async def bing_spinder(keyword, num):
print(f'必应爬取任务进行中,爬取页数为{num}...')
print('标题 url')
urllist = []
titlelist = []
# urllist = []
# titlelist = []
data_list =[]
tasks = []
if ':' in num:
if num.count(':') > 1:
@@ -65,17 +71,19 @@ async def bing_spinder(keyword, num):
async with aiohttp.ClientSession() as session:
for pn in range(start_page, end_page, 10):
#url = f'https://cn.bing.com/search?q={keyword}&first={pn}&mkt=zh-CN'
url = f'https://cn.bing.com/search?q={keyword}&qs=n&form=QBRE&sp=-1&lq=0'
# print("正在爬取的url为:"+url)
# 修复:使用正确的分页参数
url = f'https://cn.bing.com/search?q={keyword}&first={pn + 1}&count=10&FORM=PERE'
print("正在爬取的url为:" + url)
tasks = tasks + [asyncio.create_task(getbing(url, session))]
result = await asyncio.gather(*tasks)
for i in range(int((end_page-start_page) / 10)):
urllist += result[i][0]
titlelist += result[i][1]
count=len(urllist)
# urllist += result[i][0]
# titlelist += result[i][1]
data_list += result[i]
count=len(data_list)
print(f"必应搜索爬取结果为{count}")
print(Fore.GREEN + '必应爬取任务完成\n' + Fore.RESET)
return titlelist, urllist
return data_list
# await bingwriteCSV(titlelist, urllist, keyword)

130
tool/aiqicha_detail.py Normal file
View File

@@ -0,0 +1,130 @@
# file: tool/aiqicha_detail.py
import time
import json
from tool.web_browser import WebBrowser
from tool.aiqicha_login import AiqichaLoginManager # 导入登录管理器
from tool.aiqicha_detail_parser import AiqichaDetailParser # 导入解析器
class AiqichaDetailCrawler:
def __init__(self, cookie_path="aiqicha_cookies.json"):
self.browser = WebBrowser(cookie_path)
self.browser_started = False
self.login_manager = None # 添加登录管理器实例
def start_browser(self):
"""启动浏览器"""
if not self.browser_started:
try:
self.browser.start_browser()
# 初始化登录管理器
self.login_manager = AiqichaLoginManager(self.browser)
# 加载cookies
if not self.browser.load_cookies():
print("未找到有效Cookie")
else:
print("已加载Cookie")
# 使用登录管理器检测登录状态
logined = self.login_manager.check_and_login()
if logined:
print("登录成功")
else:
print("登录失败")
self.browser_started = True
except Exception as e:
print(f"启动浏览器失败: {e}")
self.browser_started = False
def close_browser(self):
"""关闭浏览器"""
if self.browser_started:
try:
# 保存cookies
self.browser.save_cookies()
self.browser.close_browser()
except Exception as e:
print(f"关闭浏览器时出错: {e}")
finally:
self.browser_started = False
def crawl_company_detail(self, url: str, refer_url: str = None):
"""
爬取爱企查企业详情页数据
Args:
url (str): 企业详情页URL例如 https://aiqicha.baidu.com/company_detail_45719927199916
Returns:
dict: 包含企业详细信息的字典
"""
if not self.browser_started:
self.start_browser()
if not self.browser_started:
print("浏览器未启动,无法执行爬取")
return {}
print(f'正在爬取企业详情: {url}')
try:
# 设置 Referer 头部模拟搜索引擎点击
if refer_url:
self.browser.page.set_extra_http_headers({"Referer": refer_url})
# 访问页面
if self.browser.visit_page(url):
# 增强页面加载检查
print("等待页面关键元素加载...")
try:
# 等待关键元素加载,增加超时时间
self.browser.page.wait_for_selector('.addr-enter-bg-ele', timeout=15000)
print("关键元素已加载")
# 额外等待一段时间确保页面完全加载
import time
time.sleep(2)
print("额外等待完成,页面应该已完全加载")
except Exception as e:
print(f"等待页面元素时出错: {e}")
print("继续尝试解析页面内容...")
# 提取基本信息
print("开始解析页面信息...")
parser = AiqichaDetailParser(self.browser.page)
company_info = parser.parse_company_info()
print(f"成功爬取企业信息: {company_info['name']}")
return company_info
else:
print("访问页面失败")
return {}
except Exception as e:
print(f"爬取过程中出现错误: {e}")
return {}
def __enter__(self):
"""上下文管理器入口"""
self.start_browser()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""上下文管理器出口"""
self.close_browser()
# 使用示例:
# 方法1: 手动管理浏览器生命周期
# crawler = QiqichaDetailCrawler()
# crawler.start_browser()
# detail = crawler.crawl_company_detail("https://aiqicha.baidu.com/company_detail_45719927199916")
# crawler.close_browser()
# 方法2: 使用上下文管理器
# with QiqichaDetailCrawler() as crawler:
# detail = crawler.crawl_company_detail("https://aiqicha.baidu.com/company_detail_45719927199916")
# print(detail)

View File

@@ -0,0 +1,142 @@
# file: tool/aiqicha_detail_parser.py
import re
class AiqichaDetailParser:
"""爱企查企业详情页解析器"""
def __init__(self, page):
"""
初始化解析器
Args:
page: 浏览器页面对象
"""
self.page = page
def parse_company_info(self):
"""
解析页面中的企业基本信息,参考 AiQiChaParser 实现
Returns:
dict: 包含企业基本信息的字典
"""
company_info = {}
# 定义要提取的信息字段和对应的CSS选择器
fields = {
'name': ['.company-name', '.enterprise-name'],
'credit_code': ['.credit-code', '.unified-social-credit-code'],
'legal_representative': ['.legal-person', '.legal-representative'],
'registered_capital': ['.reg-capital', '.registered-capital'],
'establishment_date': ['.establishment-date', '.setup-date'],
'business_status': ['.business-status', '.operating-state'],
'address': ['.address', '.registered-address'],
'business_scope': ['.business-scope', '.business-scope-content'],
'company_type': ['.company-type', '.enterprise-type'],
'industry': ['.industry', '.industry-category'],
'registration_authority': ['.registration-authority', '.register-authority'],
'operating_period': ['.operating-period', '.business-period'],
'actual_capital': ['.actual-capital', '.paid-capital'],
'taxpayer_id': ['.taxpayer-id', '.tax-id-number'],
'organization_code': ['.organization-code'],
'english_name': ['.english-name'],
'approved_date': ['.approved-date', '.approval-date'],
'staff_size': ['.staff-size', '.insured-persons'],
'former_name': ['.former-name', '.previous-name']
}
# 批量提取信息
for field, selectors in fields.items():
company_info[field] = self._extract_field_value(selectors)
# 特殊处理电话号码
company_info['phone'] = self._extract_phone_number()
return company_info
def _extract_field_value(self, selectors):
"""
根据多个选择器提取字段值
Args:
selectors (list): CSS选择器列表
Returns:
str: 提取到的值或"未知"
"""
for selector in selectors:
try:
# 添加日志:显示当前尝试的选择器
print(f"尝试选择器: {selector}")
# 尝试查找带有 enter-bg-ele 类的元素
element = self.page.query_selector(f"{selector} .enter-bg-ele")
if element:
print(f"找到 enter-bg-ele 元素,选择器: {selector} .enter-bg-ele")
else:
# 尝试查找带有 addr-enter-bg-ele 类的元素
element = self.page.query_selector(f"{selector} .addr-enter-bg-ele")
if element:
print(f"找到 addr-enter-bg-ele 元素,选择器: {selector} .addr-enter-bg-ele")
else:
# 直接查找元素
element = self.page.query_selector(selector)
if element:
print(f"找到直接元素,选择器: {selector}")
if element:
text = element.inner_text().strip()
print(f"提取到原始文本: '{text}'")
# 清理文本内容
text = self._clean_text(text)
print(f"清理后文本: '{text}'")
if text:
print(f"返回文本: '{text}'")
return text
else:
print("文本为空或仅包含空白字符")
else:
print(f"未找到元素,选择器: {selector}")
except Exception as e:
print(f"提取字段时出错,选择器: {selector}, 错误: {e}")
continue
print("所有选择器都未找到有效元素,返回默认值")
return "未知"
def _clean_text(self, text):
"""
清理文本内容
Args:
text (str): 原始文本
Returns:
str: 清理后的文本
"""
# 移除多余的空白字符
text = re.sub(r'\s+', ' ', text)
# 移除换行符和制表符
text = re.sub(r'[\r\n\t]', '', text)
return text.strip()
def _extract_phone_number(self):
"""
提取电话号码信息
Returns:
str: 电话号码或"未知"
"""
try:
# 查找电话信息容器
phone_container = self.page.query_selector("div.business-info div.telphone-lists-wrap")
if phone_container:
# 查找包含电话号码的元素
phone_element = phone_container.query_selector("span.copy-box span")
if phone_element:
return self._clean_text(phone_element.inner_text())
except Exception:
pass
return "未知"

122
tool/aiqicha_login.py Normal file
View File

@@ -0,0 +1,122 @@
# file: tool/aiqicha_login.py
from tool.web_browser import WebBrowser
import time
class AiqichaLoginManager:
def __init__(self, browser: WebBrowser):
self.browser = browser
def check_and_login(self):
"""启动后检测登录状态并自动登录"""
if not self.browser.load_cookies():
print("未找到有效Cookie开始登录流程...")
return self.login()
else:
print("已加载Cookie验证登录状态...")
# 加载cookie后访问页面验证是否真正登录
self.browser.page.goto("https://aiqicha.baidu.com")
# 等待页面加载完成
self.browser.page.wait_for_load_state("networkidle")
# 验证登录状态
if not self.check_login_status():
print("Cookie已过期或无效重新登录...")
return self.login()
else:
print("Cookie有效已登录")
return True
def check_login_status(self):
"""检测登录状态返回True表示已登录"""
try:
# 先关闭可能的功能上新弹窗
self.close_feature_popup()
# 等待页面加载完成
self.browser.page.wait_for_load_state("networkidle")
# 优先检查 .header-user-center-menu 元素判断是否已登录
logged_in_elements = self.browser.page.query_selector_all('.header-user-center-menu, .user-center')
for element in logged_in_elements:
if element and element.is_visible():
print("检测到已登录状态")
return True
# 检测用户中心元素判断已登录
user_center = self.browser.page.query_selector('.user-center')
if user_center and user_center.is_visible():
print("检测到已登录状态")
return True
# 检测登录相关元素
login_element = self.browser.page.query_selector('.login')
if login_element and login_element.is_visible():
print("检测到未登录状态")
return False
except:
try:
# 检测用户中心元素判断已登录
self.browser.page.wait_for_selector('.user-center', timeout=3000)
print("检测到已登录状态")
return True
except:
print("登录状态检测异常")
return False
def close_feature_popup(self):
"""关闭功能上新弹窗"""
try:
# 查找并点击关闭按钮
close_buttons = self.browser.page.query_selector_all('.close-icon.ivu-icon-ios-close')
for close_button in close_buttons:
if close_button.is_visible():
close_button.click()
print("已关闭功能上新弹窗")
# 等待弹窗消失
time.sleep(1)
break
except Exception as e:
# 如果没有找到弹窗,继续执行
pass
def login(self):
"""带状态检测的登录流程"""
self.browser.page.goto("https://aiqicha.baidu.com")
# 页面加载后执行反检测
self.browser.page.evaluate("""
delete navigator.__proto__.webdriver;
""")
# 等待页面加载完成
self.browser.page.wait_for_load_state("networkidle")
# 关闭可能的功能上新弹窗
self.close_feature_popup()
if not self.check_login_status():
print("开始执行登录流程...")
# 点击登录按钮
login_btn = self.browser.page.wait_for_selector('.login', timeout=20000)
login_btn.click()
print("请扫描页面二维码登录...")
time.sleep(3) # 给一些时间让二维码完全加载
# 等待登录完成,通过元素检测
try:
# 检测用户中心元素判断已登录
self.page.wait_for_selector('.header-user-center-menu', timeout=30000)
self.browser.save_cookies()
print("检测到已登录状态")
return True
except:
self.browser.save_cookies()
print("登录状态检测异常")
return False
## self.browser.page.wait_for_selector('.header-user-center-menu, .user-center', timeout=10000)
return True
if __name__ == '__main__':
print("登录成功!")

152
tool/bing_search.py Normal file
View File

@@ -0,0 +1,152 @@
# file: tool/bing_search.py
import time
import urllib.parse
from tool.web_browser import WebBrowser
class BingSearcher:
def __init__(self, cookie_path="bing_cookies.json"):
self.browser = WebBrowser(cookie_path)
self.browser_started = False
def start_browser(self):
"""启动浏览器"""
if not self.browser_started:
try:
self.browser.start_browser()
# 加载cookies
if not self.browser.load_cookies():
print("未找到有效Cookie")
else:
print("已加载Cookie")
self.browser_started = True
except Exception as e:
print(f"启动浏览器失败: {e}")
self.browser_started = False
def close_browser(self):
"""关闭浏览器"""
if self.browser_started:
try:
# 保存cookies
self.browser.save_cookies()
self.browser.close_browser()
except Exception as e:
print(f"关闭浏览器时出错: {e}")
finally:
self.browser_started = False
def search(self, keyword, num_pages=1):
"""
在Bing上搜索关键词并返回结果
Args:
keyword (str): 搜索关键词
num_pages (int): 搜索页数默认为1
Returns:
list: 搜索结果列表每个元素包含title、url和request_url
"""
if not self.browser_started:
self.start_browser()
if not self.browser_started:
print("浏览器未启动,无法执行搜索")
return []
print(f'必应爬取任务进行中,爬取页数为{num_pages}...')
all_results = []
try:
# 执行搜索
for page in range(num_pages):
first = page * 10 + 1
url = f"https://cn.bing.com/search?q={urllib.parse.quote(keyword)}&first={first}&count=10&FORM=PERE"
print("正在爬取的url为:" + url)
print('标题 url')
# 访问页面
if self.browser.visit_page(url):
# 提取搜索结果
results = self.browser.extract_links("h2 a")
all_results.extend(results)
# 打印结果
for result in results:
print(result['title'], " ", result['url'])
# 随机延迟,避免请求过快
time.sleep(2)
except Exception as e:
print(f"搜索过程中出现错误: {e}")
count = len(all_results)
print(f"必应搜索爬取结果为{count}")
return all_results
def __enter__(self):
"""上下文管理器入口"""
self.start_browser()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""上下文管理器出口"""
self.close_browser()
# 兼容旧接口的函数
def bing_main(keyword, num='1'):
"""
兼容原有接口的函数
Args:
keyword (str): 搜索关键词
num (str): 搜索页数,支持范围格式如'1:5'
Returns:
tuple: (titles列表, urls列表)
"""
searcher = BingSearcher()
# 解析页数参数
if ':' in num:
if num.count(':') > 1:
raise ValueError("输入中必须且只能包含一个 ':'")
else:
start_page, end_page = num.split(':')
if not (start_page.isdigit() and end_page.isdigit()):
raise ValueError("':' 两侧的值必须是数字")
else:
num_pages = int(end_page) - int(start_page) + 1
else:
num_pages = int(num)
try:
searcher.start_browser()
results = searcher.search(keyword, num_pages)
# 分离titles和urls
titles = [result['title'] for result in results]
urls = [result['url'] for result in results]
return (titles, urls)
finally:
searcher.close_browser()
# 使用示例:
# 方法1: 手动管理浏览器生命周期
# searcher = BingSearcher()
# searcher.start_browser()
# results1 = searcher.search("阿里巴巴", 1)
# results2 = searcher.search("腾讯", 1)
# searcher.close_browser()
# 方法2: 使用上下文管理器
# with BingSearcher() as searcher:
# results1 = searcher.search("阿里巴巴", 1)
# results2 = searcher.search("腾讯", 1)

266
tool/csv_tool.py Normal file
View File

@@ -0,0 +1,266 @@
import csv
import os
import time
from typing import List, Dict, Any, Optional
class CSVTool:
def __init__(self, csv_file_name: str, headers: List[str]):
"""
初始化CSV工具
Args:
csv_file_name (str): CSV文件名
headers (List[str]): 表头列表
"""
self.csv_file_name = csv_file_name
self.headers = headers
def init_csv_file(self):
"""
初始化CSV文件如果文件不存在则创建并写入表头
"""
if not os.path.exists(self.csv_file_name):
with open(self.csv_file_name, 'w', encoding='utf-8', newline='') as f:
writer = csv.writer(f)
writer.writerow(self.headers)
def get_existing_data(self, unique_titles: List[str]) -> set:
"""
读取现有数据,用于去重检查
Args:
unique_titles (List[str]): 用于唯一性检查的列名列表
Returns:
set: 包含唯一标识符元组的集合
"""
existing_data = set()
if not os.path.exists(self.csv_file_name):
return existing_data
try:
with open(self.csv_file_name, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
header_row = next(reader, None) # 读取表头
if header_row is None:
return existing_data
# 获取唯一列的索引
unique_indices = []
for title in unique_titles:
try:
index = header_row.index(title)
unique_indices.append(index)
except ValueError:
print(f"警告: 表头中未找到列 '{title}'")
continue
# 读取数据行
for row in reader:
if len(row) >= len(header_row): # 确保行数据完整
# 提取唯一标识符
unique_values = tuple(row[i] if i < len(row) else "" for i in unique_indices)
existing_data.add(unique_values)
except Exception as e:
print(f"读取CSV文件时出错: {e}")
return existing_data
def save_data(self, data_list: List[Dict[str, Any]], unique_titles: List[str], create_time: bool = True) -> int:
"""
将数据保存到CSV文件中自动去重
Args:
data_list (List[Dict[str, Any]]): 要保存的数据列表
unique_titles (List[str]): 用于唯一性检查的列名列表
create_time (bool): 是否自动添加创建时间默认为True
Returns:
int: 实际写入的行数
"""
if not data_list:
print('数据列表为空,没有数据可写入')
return 0
# 初始化文件
self.init_csv_file()
# 获取现有数据用于去重
existing_data = self.get_existing_data(unique_titles)
# 准备写入的数据
rows_to_write = []
written_count = 0
for data_node in data_list:
# 构建唯一标识符元组
unique_values = tuple(data_node.get(title, "") for title in unique_titles)
# 检查是否已存在
if unique_values in existing_data:
continue # 跳过已存在的数据
# 构建行数据
row_data = []
for header in self.headers:
if header == 'create_time' and create_time:
# 自动添加创建时间
row_data.append(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
else:
# 从数据节点获取对应值
row_data.append(data_node.get(header, ""))
rows_to_write.append(row_data)
existing_data.add(unique_values) # 添加到已存在数据集合中,避免本次写入中的重复
written_count += 1
# 写入数据
if rows_to_write:
with open(self.csv_file_name, 'a', encoding='utf-8', newline='') as f:
writer = csv.writer(f)
writer.writerows(rows_to_write)
print(f"成功写入 {written_count} 行数据到 {self.csv_file_name}")
else:
print("没有新数据需要写入")
return written_count
def query_data(self, filter_func=None) -> List[Dict[str, str]]:
"""
查询CSV文件中的数据
Args:
filter_func (callable, optional): 过滤函数接受一行数据字典作为参数返回True/False
Returns:
List[Dict[str, str]]: 查询结果列表
"""
if not os.path.exists(self.csv_file_name):
return []
result = []
try:
with open(self.csv_file_name, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
header_row = next(reader, None) # 读取表头
if header_row is None:
return result
for row in reader:
if len(row) >= len(header_row): # 确保行数据完整
# 将行数据转换为字典
row_dict = dict(zip(header_row, row))
# 应用过滤条件
if filter_func is None or filter_func(row_dict):
result.append(row_dict)
except Exception as e:
print(f"查询CSV文件时出错: {e}")
return result
def query_by_conditions(self, **kwargs) -> List[Dict[str, str]]:
"""
根据条件查询数据
Args:
**kwargs: 查询条件,键值对形式
Returns:
List[Dict[str, str]]: 查询结果列表
"""
def filter_func(row_dict):
for key, value in kwargs.items():
if key in row_dict and row_dict[key] != value:
return False
return True
return self.query_data(filter_func)
def get_all_data(self) -> List[Dict[str, str]]:
"""
获取所有数据
Returns:
List[Dict[str, str]]: 所有数据列表
"""
return self.query_data()
# 保持向后兼容的函数
def save_to_csv(filter_list: List[Dict[str, Any]],
csv_file_name: str = 'company_search_result_data.csv',
headers: List[str] = None,
unique_titles: List[str] = None) -> bool:
"""
将结果追加写入csv文件中向后兼容函数
Args:
filter_list: 需要写入的数据列表
csv_file_name: CSV文件名
headers: 表头列表
unique_titles: 用于唯一性检查的列名列表
Returns:
bool: 是否成功写入
"""
if headers is None:
headers = ['title', 'url', 'web_site_type', 'request_url', 'company_name', 'create_time']
if unique_titles is None:
unique_titles = ['company_name', 'web_site_type']
try:
csv_tool = CSVTool(csv_file_name, headers)
written_count = csv_tool.save_data(filter_list, unique_titles)
return written_count > 0
except Exception as e:
print(f"保存CSV时出错: {e}")
return False
# 使用示例:
if __name__ == "__main__":
# 示例数据
sample_data = [
{
'title': '测试公司1',
'url': 'https://example.com/1',
'web_site_type': 'aiqicha',
'request_url': 'https://bing.com/search?q=测试公司1',
'company_name': '测试公司1'
},
{
'title': '测试公司2',
'url': 'https://example.com/2',
'web_site_type': 'qcc',
'request_url': 'https://bing.com/search?q=测试公司2',
'company_name': '测试公司2'
}
]
# 创建CSV工具实例
csv_tool = CSVTool(
csv_file_name='test_data.csv',
headers=['title', 'url', 'web_site_type', 'request_url', 'company_name', 'create_time']
)
# 保存数据
csv_tool.save_data(sample_data, unique_titles=['company_name', 'web_site_type'])
# 查询所有数据
all_data = csv_tool.get_all_data()
print("所有数据:", all_data)
# 根据条件查询
filtered_data = csv_tool.query_by_conditions(web_site_type='aiqicha')
print("查询结果:", filtered_data)

288
tool/web_browser.py Normal file
View File

@@ -0,0 +1,288 @@
# file: tool/web_browser.py
import random
import json
import os
import time
import urllib.parse
from playwright.sync_api import sync_playwright
class WebBrowser:
def __init__(self, cookie_path="browser_cookies.json"):
self.cookie_path = cookie_path
self.browser = None
self.context = None
self.page = None
self.playwright = None
def anti_detection(self):
"""注入更全面的反检测脚本"""
self.page.add_init_script("""
// 隐藏webdriver属性
delete navigator.__proto__.webdriver;
// 伪装chrome属性
Object.defineProperty(navigator, 'chrome', {
value: {
runtime: {},
loadTimes: function() {}
},
writable: false,
enumerable: true,
configurable: true
});
// 伪装plugins和mimeTypes
Object.defineProperty(navigator, 'plugins', {
get: () => [
{ 0: { type: 'application/pdf' } },
{ 0: { type: 'application/x-google-chrome-pdf' } }
],
});
Object.defineProperty(navigator, 'mimeTypes', {
get: () => [
{ type: 'application/pdf' },
{ type: 'application/x-google-chrome-pdf' }
],
});
// 伪装languages
Object.defineProperty(navigator, 'languages', {
get: () => ['zh-CN', 'zh'],
});
// 禁用调试功能
window.console.debug = function() {};
window.console.log = function() {};
// 伪装屏幕信息
Object.defineProperty(screen, 'width', {get: () => 1366});
Object.defineProperty(screen, 'height', {get: () => 768});
Object.defineProperty(screen, 'availWidth', {get: () => 1366});
Object.defineProperty(screen, 'availHeight', {get: () => 768});
Object.defineProperty(screen, 'colorDepth', {get: () => 24});
Object.defineProperty(screen, 'pixelDepth', {get: () => 24});
// 伪装时间戳
window.chrome = {
runtime: {}
};
// 伪装outerHeight和outerWidth
Object.defineProperty(window, 'outerHeight', {get: () => 768});
Object.defineProperty(window, 'outerWidth', {get: () => 1366});
// 伪装innerHeight和innerWidth
Object.defineProperty(window, 'innerHeight', {get: () => 768});
Object.defineProperty(window, 'innerWidth', {get: () => 1366});
""")
def random_behavior(self):
"""模拟更复杂的人类操作"""
# 随机等待
time.sleep(random.uniform(2, 5))
# 随机鼠标移动
for _ in range(random.randint(3, 7)):
self.page.mouse.move(
random.randint(100, 1200),
random.randint(100, 600)
)
time.sleep(random.uniform(0.1, 0.8))
# 随机滚动页面
if random.choice([True, False]):
scroll_distance = random.randint(200, 800)
self.page.mouse.wheel(0, scroll_distance)
time.sleep(random.uniform(1, 2))
def init_cookie_file(self):
"""初始化cookie文件"""
if not os.path.exists(self.cookie_path):
with open(self.cookie_path, 'w') as f:
json.dump([], f)
def save_cookies(self):
"""保存cookies到文件"""
cookies = self.context.cookies()
with open(self.cookie_path, 'w') as f:
json.dump(cookies, f, indent=2)
def load_cookies(self):
"""从文件加载cookies"""
try:
with open(self.cookie_path, 'r') as f:
cookies = json.load(f)
if cookies:
self.context.add_cookies(cookies)
return True
return False
except:
return False
def bypass_debugger(self):
"""绕过调试器检测"""
self.page.add_init_script("""
window.Function.prototype.constructor = function() {};
window.console.debug = function(){};
Object.defineProperty(navigator, 'webdriver', {get: () => false});
""")
async def start_browser_async(self):
"""异步启动浏览器"""
try:
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch(
headless=False,
args=[
"--disable-blink-features=AutomationControlled",
"--disable-infobars"
]
)
self.context = await self.browser.new_context()
self.page = await self.context.new_page()
return True
except Exception as e:
print(f"异步启动浏览器失败: {e}")
return False
async def close_browser_async(self):
"""异步关闭浏览器"""
if self.browser:
await self.browser.close()
if self.playwright:
await self.playwright.stop()
def start_browser(self):
"""启动浏览器"""
self.init_cookie_file()
self.playwright = sync_playwright().start()
self.browser = self.playwright.chromium.launch(
headless=False,
args=[
"--disable-blink-features=AutomationControlled",
"--disable-infobars",
"--disable-extensions",
"--disable-plugins",
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-web-security",
"--disable-features=IsolateOrigins,site-per-process",
"--disable-background-timer-throttling",
"--disable-backgrounding-occluded-windows",
"--disable-renderer-backgrounding",
"--disable-ipc-flooding-protection"
]
)
self.context = self.browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
viewport={"width": 1366, "height": 768},
device_scale_factor=1,
is_mobile=False,
has_touch=False,
locale="zh-CN",
timezone_id="Asia/Shanghai"
)
self.page = self.context.new_page()
self.anti_detection()
# 立即执行一次反检测
self.page.evaluate("""
delete navigator.__proto__.webdriver;
""")
self.random_behavior()
def close_browser(self):
"""关闭浏览器"""
if self.context:
self.context.close()
if self.browser:
self.browser.close()
if self.playwright:
self.playwright.stop()
def visit_page(self, url):
"""访问指定页面"""
try:
# 在 `visit_page` 之前执行更全面的反检测
self.page.add_init_script("""
// 隐藏webdriver属性
delete navigator.__proto__.webdriver;
// 伪装chrome属性
Object.defineProperty(navigator, 'chrome', {
value: {
runtime: {},
loadTimes: function() {}
},
writable: false,
enumerable: true,
configurable: true
});
// 禁用调试功能
window.console.debug = function() {};
window.console.log = function() {};
""")
# 设置更真实的请求头
self.page.set_extra_http_headers({
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Cache-Control": "max-age=0"
})
self.page.goto(url)
# 页面加载后执行反检测
self.page.evaluate("""
delete navigator.__proto__.webdriver;
""")
self.page.wait_for_load_state("networkidle")
self.random_behavior()
return True
except Exception as e:
print(f"访问页面失败: {url}, 错误: {str(e)}")
return False
def extract_links(self, selector="h2 a"):
"""
提取页面中的链接
Args:
selector (str): CSS选择器默认为"h2 a"
Returns:
list: 包含title、url和request_url的字典列表
"""
links = []
try:
elements = self.page.query_selector_all(selector)
for element in elements:
try:
title = element.inner_text().replace('\n', '').replace(',', ' ').strip()
url = element.get_attribute('href')
current_url = self.page.url
if not url.startswith(('http://', 'https://')):
# 处理相对链接
url = urllib.parse.urljoin(current_url, url)
if title and url:
links.append({
'title': title,
'url': url,
'request_url': current_url
})
except Exception as e:
print(f"提取链接失败: {e}")
continue
except Exception as e:
print(f"提取链接失败: {e}")
return links