From 102dd78c26bd09758022f29035ca6cc8551536e7 Mon Sep 17 00:00:00 2001 From: manchuwork Date: Thu, 25 Sep 2025 03:19:34 +0800 Subject: [PATCH] aiqicha --- company/aiqicha_crawler.py | 319 +++++++++++++++++++++++++ company/qcc.py | 30 +-- crawler_campany_detail_by_data_csv.py | 114 +++++++++ main.py | 322 ++++++++++++++++++++++++-- parse_img/process_manchu_dict.py | 85 +++++++ requirements.txt | 39 ++++ search/Bing.py | 34 +-- tool/aiqicha_detail.py | 130 +++++++++++ tool/aiqicha_detail_parser.py | 142 ++++++++++++ tool/aiqicha_login.py | 122 ++++++++++ tool/bing_search.py | 152 ++++++++++++ tool/csv_tool.py | 266 +++++++++++++++++++++ tool/web_browser.py | 288 +++++++++++++++++++++++ 13 files changed, 1987 insertions(+), 56 deletions(-) create mode 100644 company/aiqicha_crawler.py create mode 100644 crawler_campany_detail_by_data_csv.py create mode 100644 parse_img/process_manchu_dict.py create mode 100644 tool/aiqicha_detail.py create mode 100644 tool/aiqicha_detail_parser.py create mode 100644 tool/aiqicha_login.py create mode 100644 tool/bing_search.py create mode 100644 tool/csv_tool.py create mode 100644 tool/web_browser.py diff --git a/company/aiqicha_crawler.py b/company/aiqicha_crawler.py new file mode 100644 index 0000000..ee4b334 --- /dev/null +++ b/company/aiqicha_crawler.py @@ -0,0 +1,319 @@ +import random + +from playwright.sync_api import sync_playwright +import json +import os +import time + +COOKIE_PATH = "aiqicha_cookies.json" + + +class AiqichaCrawler: + def __init__(self): + self.browser = None + self.context = None + self.page = None + + def anti_detection(self): + """注入更全面的反检测脚本""" + self.page.add_init_script(""" + // 隐藏webdriver属性 + delete navigator.__proto__.webdriver; + + // 伪装chrome属性 + Object.defineProperty(navigator, 'chrome', { + value: { + runtime: {}, + loadTimes: function() {} + }, + writable: false, + enumerable: true, + configurable: true + }); + + // 伪装plugins和mimeTypes + Object.defineProperty(navigator, 'plugins', { + get: () => [ + { 0: { type: 'application/pdf' } }, + { 0: { type: 'application/x-google-chrome-pdf' } } + ], + }); + + Object.defineProperty(navigator, 'mimeTypes', { + get: () => [ + { type: 'application/pdf' }, + { type: 'application/x-google-chrome-pdf' } + ], + }); + + // 伪装languages + Object.defineProperty(navigator, 'languages', { + get: () => ['zh-CN', 'zh'], + }); + + // 禁用调试功能 + window.console.debug = function() {}; + window.console.log = function() {}; + + // 伪装屏幕信息 + Object.defineProperty(screen, 'width', {get: () => 1366}); + Object.defineProperty(screen, 'height', {get: () => 768}); + Object.defineProperty(screen, 'availWidth', {get: () => 1366}); + Object.defineProperty(screen, 'availHeight', {get: () => 768}); + Object.defineProperty(screen, 'colorDepth', {get: () => 24}); + Object.defineProperty(screen, 'pixelDepth', {get: () => 24}); + + // 伪装时间戳 + window.chrome = { + runtime: {} + }; + + // 伪装outerHeight和outerWidth + Object.defineProperty(window, 'outerHeight', {get: () => 768}); + Object.defineProperty(window, 'outerWidth', {get: () => 1366}); + + // 伪装innerHeight和innerWidth + Object.defineProperty(window, 'innerHeight', {get: () => 768}); + Object.defineProperty(window, 'innerWidth', {get: () => 1366}); + """) + + def random_behavior(self): + """模拟更复杂的人类操作""" + # 随机等待 + time.sleep(random.uniform(2, 5)) + + # 随机鼠标移动 + for _ in range(random.randint(3, 7)): + self.page.mouse.move( + random.randint(100, 1200), + random.randint(100, 600) + ) + time.sleep(random.uniform(0.1, 0.8)) + + # 随机滚动页面 + if random.choice([True, False]): + scroll_distance = random.randint(200, 800) + self.page.mouse.wheel(0, scroll_distance) + time.sleep(random.uniform(1, 2)) + def init_cookie_file(self): + if not os.path.exists(COOKIE_PATH): + with open(COOKIE_PATH, 'w') as f: + json.dump([], f) + + def save_cookies(self): + cookies = self.context.cookies() + with open(COOKIE_PATH, 'w') as f: + json.dump(cookies, f, indent=2) + + def load_cookies(self): + try: + with open(COOKIE_PATH, 'r') as f: + cookies = json.load(f) + if cookies: + self.context.add_cookies(cookies) + return True + return False + except: + return False + + def bypass_debugger(self): + self.page.add_init_script(""" + window.Function.prototype.constructor = function() {}; + window.console.debug = function(){}; + Object.defineProperty(navigator, 'webdriver', {get: () => false}); + """) + + def check_login_status(self): + """检测登录状态,返回True表示已登录""" + try: + # 先关闭可能的功能上新弹窗 + self.close_feature_popup() + + # 等待页面加载完成 + self.page.wait_for_load_state("networkidle") + + # 优先检查 .header-user-center-menu 元素判断是否已登录 + logged_in_elements = self.page.query_selector_all('.header-user-center-menu, .user-center') + for element in logged_in_elements: + if element and element.is_visible(): + print("检测到已登录状态") + return True + + # 检测用户中心元素判断已登录 + user_center = self.page.query_selector('.user-center') + if user_center and user_center.is_visible(): + print("检测到已登录状态") + return True + + + # 检测登录相关元素 + #self.page.wait_for_selector('.ivu-tooltip-light', timeout=10000) + #self.page.wait_for_selector('img[src*="app-qrcode.png"]', timeout=20000) + #print("检测到未登录状态") + #return False + + # 检测登录相关元素 + login_element = self.page.query_selector('.login') + if login_element and login_element.is_visible(): + print("检测到未登录状态") + return False + except: + try: + # 检测用户中心元素判断已登录 + self.page.wait_for_selector('.user-center', timeout=3000) + print("检测到已登录状态") + return True + except: + print("登录状态检测异常") + return False + + def close_feature_popup(self): + """关闭功能上新弹窗""" + try: + # 查找并点击关闭按钮 + close_buttons = self.page.query_selector_all('.close-icon.ivu-icon-ios-close') + for close_button in close_buttons: + if close_button.is_visible(): + close_button.click() + print("已关闭功能上新弹窗") + # 等待弹窗消失 + time.sleep(1) + break + except Exception as e: + # 如果没有找到弹窗,继续执行 + pass + def login(self): + """带状态检测的登录流程""" + self.page.goto("https://aiqicha.baidu.com") + # 页面加载后执行反检测 + self.page.evaluate(""" + delete navigator.__proto__.webdriver; + """) + + # 等待页面加载完成 + self.page.wait_for_load_state("networkidle") + + # 关闭可能的功能上新弹窗 + self.close_feature_popup() + + if not self.check_login_status(): + print("开始执行登录流程...") + # 点击登录按钮 + login_btn = self.page.wait_for_selector('.login', timeout=20000) + login_btn.click() + # try: + # 等待二维码容器出现并确保可见 + # self.page.wait_for_selector('.app-qrcode', timeout=20000) + print("请扫描页面二维码登录...") + time.sleep(3) # 给一些时间让二维码完全加载 + + + # 等待登录完成 + # 等待登录完成,先尝试URL检测,失败后再尝试元素检测 + # try: + # self.page.wait_for_url("https://aiqicha.baidu.com/usercenter/**", timeout=5000) + # except: + # # 如果URL检测失败,尝试通过元素检测 + # self.page.wait_for_selector('.header-user-center-menu, .user-center', timeout=10000) + + # 如果URL检测失败,尝试通过元素检测 + self.page.wait_for_selector('.header-user-center-menu, .user-center', timeout=10000) + # self.page.wait_for_url("**/usercenter**", timeout=120000) + self.save_cookies() + print("登录成功!") + + def search_company(self, company_name): + self.page.goto(f"https://aiqicha.baidu.com/s?q={company_name}") + # 页面加载后执行反检测 + self.page.evaluate(""" + delete navigator.__proto__.webdriver; + """) + + # 关闭可能的功能上新弹窗 + self.close_feature_popup() + + self.page.wait_for_selector(".search-item", timeout=10000) + + # 提取企业基础信息 + company_card = self.page.query_selector(".search-item") + return { + "name": company_card.query_selector(".company-name").inner_text(), + "legal_person": company_card.query_selector(".legal-person").inner_text(), + "reg_capital": company_card.query_selector(".reg-capital").inner_text(), + "status": company_card.query_selector(".company-status").inner_text() + } + + def run(self, companies): + self.init_cookie_file() + + with sync_playwright() as p: + # self.browser = p.chromium.launch(headless=False) + self.browser = p.chromium.launch( + headless=False, + args=[ + "--disable-blink-features=AutomationControlled", + "--disable-infobars", + "--disable-extensions", + "--disable-plugins", + "--no-sandbox", + "--disable-dev-shm-usage", + "--disable-web-security", + "--disable-features=IsolateOrigins,site-per-process", + "--disable-background-timer-throttling", + "--disable-backgrounding-occluded-windows", + "--disable-renderer-backgrounding", + "--disable-ipc-flooding-protection" + ] + ) + # self.context = self.browser.new_context() + self.context = self.browser.new_context( + user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", + viewport={"width": 1366, "height": 768}, + device_scale_factor=1, + is_mobile=False, + has_touch=False, + locale="zh-CN", + timezone_id="Asia/Shanghai" + ) + self.page = self.context.new_page() + self.anti_detection() + # 立即执行一次反检测 + self.page.evaluate(""" + delete navigator.__proto__.webdriver; + """) + self.random_behavior() + + if not self.load_cookies(): + print("未找到有效Cookie,开始登录流程...") + self.login() + else: + print("已加载Cookie,验证登录状态...") + # 加载cookie后访问页面验证是否真正登录 + self.page.goto("https://aiqicha.baidu.com") + # 等待页面加载完成 + self.page.wait_for_load_state("networkidle") + + # 验证登录状态 + if not self.check_login_status(): + print("Cookie已过期或无效,重新登录...") + self.login() + else: + print("Cookie有效,已登录") + + for company in companies: + try: + data = self.search_company(company) + print(f"{data['name']} | 法人:{data['legal_person']} | 注册资本:{data['reg_capital']}") + self.save_cookies() # 每次操作后更新cookies + time.sleep(3) # 防止请求过快 + except Exception as e: + print(f"查询 {company} 失败: {str(e)}") + + self.context.close() + self.browser.close() + + +if __name__ == "__main__": + crawler = AiqichaCrawler() + companies = ["阿里巴巴", "腾讯科技", "华为技术"] + crawler.run(companies) diff --git a/company/qcc.py b/company/qcc.py index f7cfa21..5776557 100644 --- a/company/qcc.py +++ b/company/qcc.py @@ -268,16 +268,15 @@ def save_cookies(context, cookie_file): print("已保存cookies到文件") -def wait_for_login(page, cookie_file): +def wait_for_login_and_save_cookies(page, cookie_file): """ - 等待用户扫码登录 + 等待用户扫码登录并保存cookies """ print("检测到需要登录,请使用手机扫码登录...") print("登录成功后将自动跳转到目标页面") - # 等待页面跳转到非登录页面 - page.wait_for_url("**/weblogin", timeout=3000) - page.wait_for_url(lambda url: "weblogin" not in url, timeout=120000) + # 等待页面跳转到非登录页面(即跳转回firm页面) + page.wait_for_url("**/firm/**", timeout=120000) # 保存登录后的cookies save_cookies(page.context, cookie_file) @@ -299,22 +298,19 @@ def main(): page = context.new_page() try: - # 尝试加载本地保存的cookies - if load_cookies(context, args.cookie_file): - print("使用已保存的登录信息") + # 启动应用时自动加载cookies文件 + load_cookies(context, args.cookie_file) # 访问指定URL page.goto(args.url) - # 检查是否跳转到了登录页面 + # 检查是否在登录页面 if "weblogin" in page.url: - wait_for_login(page, args.cookie_file) + # 等待用户扫码登录并自动保存cookies + wait_for_login_and_save_cookies(page, args.cookie_file) else: print("已登录或无需登录") - # 重新访问目标URL(确保页面正确加载) - page.goto(args.url) - # 创建解析器并解析信息 parser = QCCParser(page) company_info = parser.parse_company_info() @@ -334,11 +330,3 @@ def main(): if __name__ == "__main__": main() # python qcc.py "https://www.qcc.com/firm/50b0e3189f2eb2b20304b255669ce1a1.html" -# # 首次运行需要扫码登录 -# python qcc.py "https://www.qcc.com/firm/公司URL" -# -# # 后续运行将自动使用已保存的登录信息 -# python qcc.py "https://www.qcc.com/firm/公司URL" -# -# # 指定自定义cookies文件 -# python qcc.py --cookie-file my_cookies.txt "https://www.qcc.com/firm/公司URL" \ No newline at end of file diff --git a/crawler_campany_detail_by_data_csv.py b/crawler_campany_detail_by_data_csv.py new file mode 100644 index 0000000..24b03ae --- /dev/null +++ b/crawler_campany_detail_by_data_csv.py @@ -0,0 +1,114 @@ +import random + +from tool.csv_tool import CSVTool +from tool.aiqicha_detail import AiqichaDetailCrawler +import time + +def query_init_company_data(csv_file_name): + # 创建CSV工具实例 + csv_tool = CSVTool( + csv_file_name=csv_file_name, + headers=['title', 'url', 'web_site_type', 'request_url', 'company_name', 'create_time'] + ) + # 查询所有数据 + all_data = csv_tool.get_all_data() + print("所有数据:", all_data) + return all_data + +def crawl_and_save_aiqicha_details(input_csv, output_csv): + """ + 从CSV文件中读取爱企查URL,爬取企业详情,并保存到新的CSV文件中 + + Args: + input_csv (str): 包含爱企查URL的输入CSV文件 + output_csv (str): 保存企业详情的输出CSV文件 + """ + # 读取输入数据 + input_data = query_init_company_data(input_csv) + + # 筛选出爱企查数据 + aiqicha_data = [item for item in input_data if item['web_site_type'] == 'aiqicha'] + print(f'找到 {len(aiqicha_data)} 条爱企查数据') + + # 定义输出CSV的表头 + output_headers = [ + 'company_name', # 公司名称 + 'credit_code', # 统一社会信用代码 + 'legal_representative', # 法定代表人 + 'registered_capital', # 注册资本 + 'establishment_date', # 成立日期 + 'business_status', # 经营状态 + 'address', # 公司地址 + 'business_scope', # 经营范围 + 'source_url', # 原始URL + 'create_time' # 创建时间 + ] + + # 创建输出CSV工具实例 + output_csv_tool = CSVTool( + csv_file_name=output_csv, + headers=output_headers + ) + + # 使用爱企查详情爬虫 + with AiqichaDetailCrawler() as crawler: + company_details = [] + success_count = 0 + + for i, item in enumerate(aiqicha_data): + url = item['url'] + refer_url: str = item['request_url'] + print(f"正在处理: {url}") + + # 爬取企业详情 + detail = crawler.crawl_company_detail(url, refer_url) + + if detail: + # 添加来源URL和公司名称 + detail['source_url'] = url + # 转换字段名以匹配CSV表头 + converted_item = { + 'company_name': detail.get('name', ''), + 'credit_code': detail.get('credit_code', ''), + 'legal_representative': detail.get('legal_representative', ''), + 'registered_capital': detail.get('registered_capital', ''), + 'establishment_date': detail.get('establishment_date', ''), + 'business_status': detail.get('business_status', ''), + 'address': detail.get('address', ''), + 'business_scope': detail.get('business_scope', ''), + 'source_url': detail.get('source_url', '') + } + + # 立即保存每条数据,避免数据丢失 + written_count = output_csv_tool.save_data( + [converted_item], + unique_titles=['company_name'], + create_time=True + ) + + if written_count > 0: + success_count += 1 + print(f"成功保存 {detail.get('name', '未知公司')} 的信息") + else: + print(f"保存 {detail.get('name', '未知公司')} 的信息失败(可能已存在)") + else: + print(f"获取 {url} 的信息失败") + + # 添加延迟,避免请求过快 + + time.sleep(2) + next_sleep_interval = random.uniform(5, 15) + time.sleep(next_sleep_interval) + + print(f"总共成功处理并保存了 {success_count} 条企业详情数据到 {output_csv}") + +if __name__ == '__main__': + # 从原始搜索结果CSV中读取爱企查URL,爬取详情并保存到新CSV文件 + crawl_and_save_aiqicha_details('company_search_bing_data.csv', 'aiqicha_company_details.csv') + + # 原有代码保留 + # all_data = query_init_company_data('company_search_bing_data.csv') + # filter = [item for item in all_data if item['web_site_type'] == 'aiqicha'] + # print('aiqicha数据:', filter) + # for item in filter: + # pass \ No newline at end of file diff --git a/main.py b/main.py index d2c205a..ec8bac1 100644 --- a/main.py +++ b/main.py @@ -1,5 +1,7 @@ # -*- coding: utf-8 -*- import asyncio +import csv +import os import random import aiohttp @@ -15,6 +17,8 @@ from search import Bing,Baidu import openpyxl import ssl +from tool.bing_search import BingSearcher +from tool.csv_tool import CSVTool from tool.read_csv import CSVReader start = time.time() @@ -68,23 +72,22 @@ def commend(): parser.print_help() sys.exit() return args -def search_company_info(company_name_arg, num): - keywords = company_name_arg - # for key in keyword: - # keywords = keywords + key + " " - keywords = keywords.strip() - result = Bing.bing_main(keywords, num) +def search_company_info(company_name_key, addon_args, num): + + search_key = company_name_key.strip() + " " + addon_args + search_key = search_key.strip() + result = Bing.bing_main(search_key, num) # for 循环 遍历 result[0] 和 result[1] - data_list =[] - for i in range(len(result[0])): - title= result[0][i] - url = result[1][i] - print(f"必应搜索爬取结果为,title:{title}, url:{url}") - if re.match(r"^https://aiqicha.baidu.com/company_detail_.*|https://www.qcc.com/firm/.*|https://www.tianyancha.com/company/.*", url): - data_list.append([title, url]) - return data_list + return result + # for i in range(len(result[0])): + # title= result[0][i] + # url = result[1][i] + # print(f"必应搜索爬取结果为,title:{title}, url:{url}") + # if re.match(r"^https://aiqicha.baidu.com/company_detail_.*|https://www.qcc.com/firm/.*|https://www.tianyancha.com/company/.*", url): + # data_list.append({"title":title, "url":url}) + # return data_list def filter_company_sites(urls): # urls https://www.tianyancha.com/company/5226478758 @@ -94,6 +97,33 @@ def filter_company_sites(urls): filtered_urls = [url for url in urls if re.match(r"^https://aiqicha.baidu.com/company_detail_.*|https://www.qcc.com/firm/.*|https://www.tianyancha.com/company/.*", url)] return filtered_urls +def filter_aiqicha_qcc(search_result, company_name_, with_not_match = False): + datas = [] + + for i in range(len(search_result)): + data_node = search_result[i] + title = data_node['title'] + url = data_node['url'] + print(f"必应搜索爬取结果为,title:{title}, url:{url}") + + # 判断title是否包含 company_name_ + # if re.match( + # r"^https://aiqicha.baidu.com/company_detail_.*|https://www.qcc.com/firm/.*|https://www.tianyancha.com/company/.*", + # url) and title.find(company_name_) != -1: + if title.find(company_name_) != -1 or with_not_match: + web_site_type = None + if re.match(r"^https://aiqicha.baidu.com/company_detail_.*", url): + web_site_type = "aiqicha" + elif re.match(r"^https://www.tianyancha.com/company/.*", url): + web_site_type = "tianyancha" + elif re.match(r"^https://www.qcc.com/firm/.*", url): + web_site_type = "qcc" + + if web_site_type is not None: + data_node['web_site_type'] = web_site_type + data_node['company_name'] = company_name_ + datas.append(data_node) + return datas def search_one_company(company_name_arg, num): @@ -101,7 +131,7 @@ def search_one_company(company_name_arg, num): # for key in keyword: # keywords = keywords + key + " " keywords = keywords.strip() - print(f"您搜索的关键词为:{keywords}") + print(f"---==您搜索的关键词为:{keywords}") wb = openpyxl.Workbook() # 删除默认创建的工作表(现在名为 "数据表1") wb.remove(wb['Sheet']) @@ -117,21 +147,269 @@ def search_one_company(company_name_arg, num): end = time.time() print(Fore.RED + f'脚本总时间: {end - start:.2f}') + +def save_to_csv(filter_list): + + if filter_list is None or len(filter_list) == 0: + print('filter_list is None or len(filter_list) == 0, 没有数据可写入') + return False + """ + 将结果追加写入csv文件中 + + Args: + filter_list: 需要写入的数据列表 + """ + csv_file = 'company_search_result_data.csv' + headers = ['title', 'url', 'web_site_type', 'request_url', 'company_name', 'create_time'] + + # 判断文件是否存在,不存在则创建并写入列头 + file_exists = os.path.exists(csv_file) + + # 读取现有数据,用于判断重复项 + existing_data = set() + if file_exists: + with open(csv_file, 'r', encoding='utf-8') as f: + reader_ins = csv.reader(f) + header_skipped = False + for row in reader_ins: + if not header_skipped: + header_skipped = True + continue + if len(row) >= 5: # 确保行数据完整 + company_name = row[4] # company_name在第5列(索引4) + web_site_type = row[2] if len(row) > 2 else "" # web_site_type在第3列(索引2) + existing_data.add((company_name, web_site_type)) + + # 写入数据 + with open(csv_file, 'a', encoding='utf-8', newline='') as f: + writer = csv.writer(f) + + # 如果文件不存在,写入列头 + if not file_exists: + writer.writerow(headers) + + # 追加写入数据,去重处理 + for data_node in filter_list: + company_name = data_node.get('company_name', '') + web_site_type = data_node.get('web_site_type', '') + + # 判断是否已存在相同的company_name和web_site_type组合 + if (company_name, web_site_type) not in existing_data: + # 创建时间格式化 + create_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + + # 写入数据行 + row_data = [ + data_node.get('title', ''), + data_node.get('url', ''), + web_site_type, + data_node.get('request_url', ''), + company_name, + create_time + ] + writer.writerow(row_data) + + # 添加到已存在数据集合中,避免本次写入中的重复 + existing_data.add((company_name, web_site_type)) + + print(f"写入数据成功,title:{data_node.get('title', '')}, " + f"url:{data_node.get('url', '')}, " + f"web_site_type:{web_site_type}, " + f"request_url:{data_node.get('request_url', '')}, " + f"company_name:{company_name}, " + f"create_time:{create_time}") + + +def check_company_exists(company_names, type_list): + """ + 读取 company_search_result_data.csv 数据,检查指定的公司名称和类型是否存在 + + Args: + company_names (list): 公司名称列表 + type_list (list): 类型列表 + + Returns: + list: 包含公司名称和存在状态的字典列表 + 格式: [{"company_name": "公司名", "exists": True/False}, ...] + """ + csv_file = 'company_search_result_data.csv' + result = [] + + # 初始化所有公司为不存在状态 + for company_name_item in company_names: + result.append({ + "company_name": company_name_item, + "exists": False + }) + + # 如果文件不存在,直接返回初始化结果 + if not os.path.exists(csv_file): + return result + + # 读取CSV文件中的现有数据 + existing_combinations = set() # 存储(公司名, 类型)组合 + try: + with open(csv_file, 'r', encoding='utf-8') as f: + reader = csv.reader(f) + header_skipped = False + + for row in reader: + if not header_skipped: + header_skipped = True + continue + + # 确保行数据完整 + if len(row) >= 5: + company_name_item = row[4] # company_name在第5列(索引4) + web_site_type = row[2] if len(row) > 2 else "" # web_site_type在第3列(索引2) + + # 添加到现有组合集合中 + existing_combinations.add((company_name_item, web_site_type)) + except Exception as e: + print(f"读取CSV文件时出错: {e}") + return result + + # 检查每个公司是否存在于指定的类型中 + for item in result: + company_name_item = item["company_name"] + exists = False + + # 如果type_list为空,检查公司是否存在任何类型中 + if not type_list: + for existing_company, _ in existing_combinations: + if existing_company == company_name_item: + exists = True + break + else: + # 检查公司是否存在于指定的类型中 + for web_site_type in type_list: + if (company_name_item, web_site_type) in existing_combinations: + exists = True + break + + item["exists"] = exists + + return result + + + if __name__ == '__main__': reader = CSVReader('data.csv') company_names = reader.read_column(0, has_header=False) print("所有数据:", company_names) - i= 1 - for company_name in company_names: + # 检查已存在的公司 + type_list = ["aiqicha", "qcc", "tianyancha"] + check_result = check_company_exists(company_names, type_list) + print("检查结果:", check_result) + i = 1 + # 方法2: 使用上下文管理器 + with BingSearcher() as searcher: + # 创建CSV工具实例 + csv_tool = CSVTool( + csv_file_name='company_search_bing_data.csv', + headers=['title', 'url', 'web_site_type', 'request_url', 'company_name', 'create_time'] + ) + # 查询所有数据 + all_data = csv_tool.get_all_data() + print("所有数据:", all_data) + # 查询所有数据 + all_data = csv_tool.get_all_data() + print("所有数据:", all_data) + + # 初始化所有公司为不存在状态 + company_names_saved_set = set() + for company_name_item in all_data: + company_names_saved_set.add(company_name_item["company_name"]) + + + for company_name in company_names: + + # 如果公司已存在,跳过处理 + if company_name in company_names_saved_set: + print(f"公司 {company_name} 已存在,跳过处理") + continue + # if company_exists: + # print(f"公司 {company_name} 已存在,跳过处理") + # continue + + print(f"正在处理第 {i} 个公司: {company_name}") + addon_args = " 爱企查|企查查" + data_list = searcher.search(company_name+" "+addon_args, 1) + filter_list = filter_aiqicha_qcc(data_list, company_name) + print(company_name, "filter_list:", filter_list) + + + if len(filter_list) <= 0: + print("没有数据 filter_list is empty. " + company_name) + + filter_list_with_not_match = filter_aiqicha_qcc(data_list, company_name, with_not_match=True) + # 创建CSV工具实例 + csv_tool = CSVTool( + csv_file_name='company_search_filter_is_none_data.csv', + headers=['company_name','title', 'web_site_type','url', 'request_url', 'create_time'] + ) + + # 保存数据,指定去重字段 + csv_tool.save_data(filter_list_with_not_match, unique_titles=['company_name', 'title','url','web_site_type']) + + continue + else: + # 创建CSV工具实例 + csv_tool = CSVTool( + csv_file_name='company_search_bing_data.csv', + headers=['company_name','title', 'web_site_type','url', 'request_url', 'create_time'] + ) + + # 保存数据,指定去重字段 + csv_tool.save_data(filter_list, + unique_titles=['company_name', 'web_site_type']) + # save_to_csv(filter_list) + + # i = i + 1 + # if i > 3: + # print("结束循环") + # break + # results2 = searcher.search("腾讯", 1) + # results3 = searcher.search("百度", 1) + + sleep_time = 5 + sleep_time += random.randint(3, 10) + time.sleep(sleep_time) + pass + pass + + if True: + print("exit") + exit(0) + + + i = 1 + for company_name_ele in check_result: + company_name = company_name_ele["company_name"] + company_exists = company_name_ele["exists"] + + # 如果公司已存在,跳过处理 + if company_exists: + print(f"公司 {company_name} 已存在,跳过处理") + continue + sleep_time = 5 - sleep_time += random.randint(1, 5) + sleep_time += random.randint(3, 10) time.sleep(sleep_time) - company_name += " 爱企查|企查查" - data_list = search_company_info(company_name, '1') - print(data_list) + + addon_args = " 爱企查|企查查" + data_list = search_company_info(company_name, addon_args, '1') + filter_list = filter_aiqicha_qcc(data_list, company_name) + print("filter_list:",filter_list) + + save_to_csv(filter_list) + if len(filter_list)<= 0: + print("没有数据 filter_list is empty. "+company_name) + continue + i=i+1 - if i > 1: + if i > 100: break diff --git a/parse_img/process_manchu_dict.py b/parse_img/process_manchu_dict.py new file mode 100644 index 0000000..2421a86 --- /dev/null +++ b/parse_img/process_manchu_dict.py @@ -0,0 +1,85 @@ +import os +import cv2 +from paddleocr import PaddleOCR +import numpy as np + +def imread_chinese(path): + """支持中文路径的图像读取函数""" + try: + # 使用 numpy 读取文件 + img_array = np.fromfile(path, dtype=np.uint8) + # 使用 imdecode 解码图像 + img = cv2.imdecode(img_array, cv2.IMREAD_COLOR) + return img + except Exception as e: + print(f"读取图像失败 {path}: {e}") + return None + +def split_image_vertically(img_path, split_num=3): + """将图片垂直分割为三部分(上中下栏)""" + #img = cv2.imread(img_path) + img = imread_chinese(img_path) + + if img is None: + print(f"无法读取图像: {img_path}") + return None + + height = img.shape[0] + section_height = height // split_num + return [ + img[i * section_height:(i + 1) * section_height, :] + for i in range(split_num) + ] + + +def detect_vertical_text(ocr, img_section): + """识别竖直英文文本""" + # 将图像旋转90度使竖直文本变为水平 + rotated = cv2.rotate(img_section, cv2.ROTATE_90_CLOCKWISE) + result = ocr.predict(rotated, use_textline_orientation=True) + return [line[1][0] for line in result[0]] if result else [] + + +def process_images(image_dir, start_num=1, end_num=1097): + """批量处理图片序列""" + ocr = PaddleOCR( + lang='en', + use_textline_orientation=True, + text_det_unclip_ratio=2.0, # 调整检测框扩展系数 + #rec_char_dict_path='en_dict.txt' # 英文专用字典 + ) + + for i in range(start_num, end_num + 1): + img_path = os.path.join(image_dir, f"{i}.png") + if not os.path.exists(img_path): + continue + + sections = split_image_vertically(img_path) + page_results = { + "page_number": i, + "sections": [] + } + + for idx, section in enumerate(sections): + # 识别页码(假设位于第一栏顶部) + if idx == 0: + page_results["detected_page"] = detect_vertical_text(ocr, section[:50, :]) + + # 识别各栏英文内容 + eng_text = detect_vertical_text(ocr, section) + page_results["sections"].append({ + "section": ["top", "middle", "bottom"][idx], + "english_text": eng_text + }) + + yield page_results + + +if __name__ == "__main__": + IMAGE_DIR = r"D:/gitstudy/pythonwork/manchuspider/data/满洲语字典/满汉大辞典/images" + for result in process_images(IMAGE_DIR): + print(f"Page {result['page_number']}:") + print(f"Detected Page No: {result.get('detected_page', 'N/A')}") + for section in result["sections"]: + print(f"{section['section']} section English: {', '.join(section['english_text'])}") + print("-" * 50) diff --git a/requirements.txt b/requirements.txt index 40311e5..16b2026 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,19 +2,58 @@ aiofiles==24.1.0 aiohappyeyeballs==2.4.0 aiohttp==3.10.5 aiosignal==1.3.1 +annotated-types==0.7.0 async-timeout==4.0.3 attrs==24.2.0 +bce-python-sdk==0.9.46 beautifulsoup4==4.12.3 +certifi==2025.8.3 +chardet==5.2.0 +charset-normalizer==3.4.3 +click==8.3.0 colorama==0.4.6 +colorlog==6.9.0 et-xmlfile==1.1.0 +filelock==3.19.1 frozenlist==1.4.1 +fsspec==2025.9.0 +future==1.0.0 greenlet==3.2.4 idna==3.8 +imagesize==1.4.1 lxml==5.3.0 multidict==6.1.0 +numpy==2.3.3 +opencv-contrib-python==4.10.0.84 openpyxl==3.1.5 +packaging==25.0 +pandas==2.3.2 +pillow==11.3.0 playwright==1.55.0 +prettytable==3.16.0 +psutil==7.1.0 +py-cpuinfo==9.0.0 +pyclipper==1.3.0.post6 +pycryptodome==3.23.0 +pydantic==2.11.9 +pydantic_core==2.33.2 pyee==13.0.0 +pypdfium2==4.30.0 +python-dateutil==2.9.0.post0 +pytz==2025.2 +PyYAML==6.0.2 +requests==2.32.5 +ruamel.yaml==0.18.15 +ruamel.yaml.clib==0.2.12 +setuptools==80.9.0 +shapely==2.1.1 +six==1.17.0 soupsieve==2.6 +tqdm==4.67.1 +typing-inspection==0.4.1 typing_extensions==4.12.2 +tzdata==2025.2 +ujson==5.11.0 +urllib3==2.5.0 +wcwidth==0.2.13 yarl==1.11.1 diff --git a/search/Bing.py b/search/Bing.py index 22a5064..e4f5da4 100644 --- a/search/Bing.py +++ b/search/Bing.py @@ -19,8 +19,9 @@ timeout = aiohttp.ClientTimeout( sock_read=5.5 # 读取超时为5.5秒 ) async def getbing(url, session): - url_list = [] - title_list = [] + # url_list = [] + # title_list = [] + data_list =[] async with session.get(url, headers=bingheaders,timeout=timeout) as resp: # print("正在爬取url:"+url) try: @@ -34,19 +35,24 @@ async def getbing(url, session): domain = 'https://cn.bing.com/' hurl = urllib.parse.urljoin(domain, hurl) print(htext," ",hurl) - title_list.append(htext) - url_list.append(hurl) + + data_list.append({'title': htext, 'url': hurl,'request_url':url}) + # title_list.append(htext) + # url_list.append(hurl) except: print(f"必应页面爬取失败,{url}该url无法正常获取数据。") return [],[] - return url_list, title_list + + return data_list + #url_list, title_list async def bing_spinder(keyword, num): print(f'必应爬取任务进行中,爬取页数为{num}...') print('标题 url') - urllist = [] - titlelist = [] + # urllist = [] + # titlelist = [] + data_list =[] tasks = [] if ':' in num: if num.count(':') > 1: @@ -65,17 +71,19 @@ async def bing_spinder(keyword, num): async with aiohttp.ClientSession() as session: for pn in range(start_page, end_page, 10): #url = f'https://cn.bing.com/search?q={keyword}&first={pn}&mkt=zh-CN' - url = f'https://cn.bing.com/search?q={keyword}&qs=n&form=QBRE&sp=-1&lq=0' - # print("正在爬取的url为:"+url) + # 修复:使用正确的分页参数 + url = f'https://cn.bing.com/search?q={keyword}&first={pn + 1}&count=10&FORM=PERE' + print("正在爬取的url为:" + url) tasks = tasks + [asyncio.create_task(getbing(url, session))] result = await asyncio.gather(*tasks) for i in range(int((end_page-start_page) / 10)): - urllist += result[i][0] - titlelist += result[i][1] - count=len(urllist) + # urllist += result[i][0] + # titlelist += result[i][1] + data_list += result[i] + count=len(data_list) print(f"必应搜索爬取结果为{count}") print(Fore.GREEN + '必应爬取任务完成\n' + Fore.RESET) - return titlelist, urllist + return data_list # await bingwriteCSV(titlelist, urllist, keyword) diff --git a/tool/aiqicha_detail.py b/tool/aiqicha_detail.py new file mode 100644 index 0000000..b9862b1 --- /dev/null +++ b/tool/aiqicha_detail.py @@ -0,0 +1,130 @@ +# file: tool/aiqicha_detail.py + +import time +import json +from tool.web_browser import WebBrowser +from tool.aiqicha_login import AiqichaLoginManager # 导入登录管理器 +from tool.aiqicha_detail_parser import AiqichaDetailParser # 导入解析器 + +class AiqichaDetailCrawler: + def __init__(self, cookie_path="aiqicha_cookies.json"): + self.browser = WebBrowser(cookie_path) + self.browser_started = False + self.login_manager = None # 添加登录管理器实例 + + + def start_browser(self): + """启动浏览器""" + if not self.browser_started: + try: + self.browser.start_browser() + # 初始化登录管理器 + self.login_manager = AiqichaLoginManager(self.browser) + + # 加载cookies + if not self.browser.load_cookies(): + print("未找到有效Cookie") + else: + print("已加载Cookie") + + # 使用登录管理器检测登录状态 + logined = self.login_manager.check_and_login() + if logined: + print("登录成功") + else: + print("登录失败") + self.browser_started = True + except Exception as e: + print(f"启动浏览器失败: {e}") + self.browser_started = False + + def close_browser(self): + """关闭浏览器""" + if self.browser_started: + try: + # 保存cookies + self.browser.save_cookies() + self.browser.close_browser() + except Exception as e: + print(f"关闭浏览器时出错: {e}") + finally: + self.browser_started = False + + def crawl_company_detail(self, url: str, refer_url: str = None): + """ + 爬取爱企查企业详情页数据 + + Args: + url (str): 企业详情页URL,例如 https://aiqicha.baidu.com/company_detail_45719927199916 + + Returns: + dict: 包含企业详细信息的字典 + """ + if not self.browser_started: + self.start_browser() + + if not self.browser_started: + print("浏览器未启动,无法执行爬取") + return {} + + print(f'正在爬取企业详情: {url}') + + try: + # 设置 Referer 头部模拟搜索引擎点击 + if refer_url: + self.browser.page.set_extra_http_headers({"Referer": refer_url}) + + # 访问页面 + if self.browser.visit_page(url): + # 增强页面加载检查 + print("等待页面关键元素加载...") + try: + # 等待关键元素加载,增加超时时间 + self.browser.page.wait_for_selector('.addr-enter-bg-ele', timeout=15000) + print("关键元素已加载") + + # 额外等待一段时间确保页面完全加载 + import time + time.sleep(2) + print("额外等待完成,页面应该已完全加载") + except Exception as e: + print(f"等待页面元素时出错: {e}") + print("继续尝试解析页面内容...") + + # 提取基本信息 + print("开始解析页面信息...") + parser = AiqichaDetailParser(self.browser.page) + company_info = parser.parse_company_info() + + print(f"成功爬取企业信息: {company_info['name']}") + return company_info + else: + print("访问页面失败") + return {} + + except Exception as e: + print(f"爬取过程中出现错误: {e}") + return {} + + + def __enter__(self): + """上下文管理器入口""" + self.start_browser() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """上下文管理器出口""" + self.close_browser() + + +# 使用示例: +# 方法1: 手动管理浏览器生命周期 +# crawler = QiqichaDetailCrawler() +# crawler.start_browser() +# detail = crawler.crawl_company_detail("https://aiqicha.baidu.com/company_detail_45719927199916") +# crawler.close_browser() + +# 方法2: 使用上下文管理器 +# with QiqichaDetailCrawler() as crawler: +# detail = crawler.crawl_company_detail("https://aiqicha.baidu.com/company_detail_45719927199916") +# print(detail) \ No newline at end of file diff --git a/tool/aiqicha_detail_parser.py b/tool/aiqicha_detail_parser.py new file mode 100644 index 0000000..bf41c6f --- /dev/null +++ b/tool/aiqicha_detail_parser.py @@ -0,0 +1,142 @@ +# file: tool/aiqicha_detail_parser.py + +import re + + +class AiqichaDetailParser: + """爱企查企业详情页解析器""" + + def __init__(self, page): + """ + 初始化解析器 + + Args: + page: 浏览器页面对象 + """ + self.page = page + + def parse_company_info(self): + """ + 解析页面中的企业基本信息,参考 AiQiChaParser 实现 + + Returns: + dict: 包含企业基本信息的字典 + """ + company_info = {} + + # 定义要提取的信息字段和对应的CSS选择器 + fields = { + 'name': ['.company-name', '.enterprise-name'], + 'credit_code': ['.credit-code', '.unified-social-credit-code'], + 'legal_representative': ['.legal-person', '.legal-representative'], + 'registered_capital': ['.reg-capital', '.registered-capital'], + 'establishment_date': ['.establishment-date', '.setup-date'], + 'business_status': ['.business-status', '.operating-state'], + 'address': ['.address', '.registered-address'], + 'business_scope': ['.business-scope', '.business-scope-content'], + 'company_type': ['.company-type', '.enterprise-type'], + 'industry': ['.industry', '.industry-category'], + 'registration_authority': ['.registration-authority', '.register-authority'], + 'operating_period': ['.operating-period', '.business-period'], + 'actual_capital': ['.actual-capital', '.paid-capital'], + 'taxpayer_id': ['.taxpayer-id', '.tax-id-number'], + 'organization_code': ['.organization-code'], + 'english_name': ['.english-name'], + 'approved_date': ['.approved-date', '.approval-date'], + 'staff_size': ['.staff-size', '.insured-persons'], + 'former_name': ['.former-name', '.previous-name'] + } + + # 批量提取信息 + for field, selectors in fields.items(): + company_info[field] = self._extract_field_value(selectors) + + # 特殊处理电话号码 + company_info['phone'] = self._extract_phone_number() + + return company_info + + def _extract_field_value(self, selectors): + """ + 根据多个选择器提取字段值 + + Args: + selectors (list): CSS选择器列表 + + Returns: + str: 提取到的值或"未知" + """ + for selector in selectors: + try: + # 添加日志:显示当前尝试的选择器 + print(f"尝试选择器: {selector}") + + # 尝试查找带有 enter-bg-ele 类的元素 + element = self.page.query_selector(f"{selector} .enter-bg-ele") + if element: + print(f"找到 enter-bg-ele 元素,选择器: {selector} .enter-bg-ele") + else: + # 尝试查找带有 addr-enter-bg-ele 类的元素 + element = self.page.query_selector(f"{selector} .addr-enter-bg-ele") + if element: + print(f"找到 addr-enter-bg-ele 元素,选择器: {selector} .addr-enter-bg-ele") + else: + # 直接查找元素 + element = self.page.query_selector(selector) + if element: + print(f"找到直接元素,选择器: {selector}") + + if element: + text = element.inner_text().strip() + print(f"提取到原始文本: '{text}'") + # 清理文本内容 + text = self._clean_text(text) + print(f"清理后文本: '{text}'") + if text: + print(f"返回文本: '{text}'") + return text + else: + print("文本为空或仅包含空白字符") + else: + print(f"未找到元素,选择器: {selector}") + except Exception as e: + print(f"提取字段时出错,选择器: {selector}, 错误: {e}") + continue + + print("所有选择器都未找到有效元素,返回默认值") + return "未知" + + def _clean_text(self, text): + """ + 清理文本内容 + + Args: + text (str): 原始文本 + + Returns: + str: 清理后的文本 + """ + # 移除多余的空白字符 + text = re.sub(r'\s+', ' ', text) + # 移除换行符和制表符 + text = re.sub(r'[\r\n\t]', '', text) + return text.strip() + + def _extract_phone_number(self): + """ + 提取电话号码信息 + + Returns: + str: 电话号码或"未知" + """ + try: + # 查找电话信息容器 + phone_container = self.page.query_selector("div.business-info div.telphone-lists-wrap") + if phone_container: + # 查找包含电话号码的元素 + phone_element = phone_container.query_selector("span.copy-box span") + if phone_element: + return self._clean_text(phone_element.inner_text()) + except Exception: + pass + return "未知" diff --git a/tool/aiqicha_login.py b/tool/aiqicha_login.py new file mode 100644 index 0000000..6f82a68 --- /dev/null +++ b/tool/aiqicha_login.py @@ -0,0 +1,122 @@ +# file: tool/aiqicha_login.py + +from tool.web_browser import WebBrowser +import time + + +class AiqichaLoginManager: + def __init__(self, browser: WebBrowser): + self.browser = browser + + def check_and_login(self): + """启动后检测登录状态并自动登录""" + if not self.browser.load_cookies(): + print("未找到有效Cookie,开始登录流程...") + return self.login() + else: + print("已加载Cookie,验证登录状态...") + # 加载cookie后访问页面验证是否真正登录 + self.browser.page.goto("https://aiqicha.baidu.com") + # 等待页面加载完成 + self.browser.page.wait_for_load_state("networkidle") + + # 验证登录状态 + if not self.check_login_status(): + print("Cookie已过期或无效,重新登录...") + return self.login() + else: + print("Cookie有效,已登录") + return True + + def check_login_status(self): + """检测登录状态,返回True表示已登录""" + try: + # 先关闭可能的功能上新弹窗 + self.close_feature_popup() + + # 等待页面加载完成 + self.browser.page.wait_for_load_state("networkidle") + + # 优先检查 .header-user-center-menu 元素判断是否已登录 + logged_in_elements = self.browser.page.query_selector_all('.header-user-center-menu, .user-center') + for element in logged_in_elements: + if element and element.is_visible(): + print("检测到已登录状态") + return True + + # 检测用户中心元素判断已登录 + user_center = self.browser.page.query_selector('.user-center') + if user_center and user_center.is_visible(): + print("检测到已登录状态") + return True + + # 检测登录相关元素 + login_element = self.browser.page.query_selector('.login') + if login_element and login_element.is_visible(): + print("检测到未登录状态") + return False + except: + try: + # 检测用户中心元素判断已登录 + self.browser.page.wait_for_selector('.user-center', timeout=3000) + print("检测到已登录状态") + return True + except: + print("登录状态检测异常") + return False + + def close_feature_popup(self): + """关闭功能上新弹窗""" + try: + # 查找并点击关闭按钮 + close_buttons = self.browser.page.query_selector_all('.close-icon.ivu-icon-ios-close') + for close_button in close_buttons: + if close_button.is_visible(): + close_button.click() + print("已关闭功能上新弹窗") + # 等待弹窗消失 + time.sleep(1) + break + except Exception as e: + # 如果没有找到弹窗,继续执行 + pass + + def login(self): + """带状态检测的登录流程""" + self.browser.page.goto("https://aiqicha.baidu.com") + # 页面加载后执行反检测 + self.browser.page.evaluate(""" + delete navigator.__proto__.webdriver; + """) + + # 等待页面加载完成 + self.browser.page.wait_for_load_state("networkidle") + + # 关闭可能的功能上新弹窗 + self.close_feature_popup() + + if not self.check_login_status(): + print("开始执行登录流程...") + # 点击登录按钮 + login_btn = self.browser.page.wait_for_selector('.login', timeout=20000) + login_btn.click() + print("请扫描页面二维码登录...") + time.sleep(3) # 给一些时间让二维码完全加载 + + # 等待登录完成,通过元素检测 + try: + # 检测用户中心元素判断已登录 + self.page.wait_for_selector('.header-user-center-menu', timeout=30000) + self.browser.save_cookies() + print("检测到已登录状态") + return True + except: + self.browser.save_cookies() + print("登录状态检测异常") + return False + ## self.browser.page.wait_for_selector('.header-user-center-menu, .user-center', timeout=10000) + + return True + +if __name__ == '__main__': + print("登录成功!") diff --git a/tool/bing_search.py b/tool/bing_search.py new file mode 100644 index 0000000..5f6b01f --- /dev/null +++ b/tool/bing_search.py @@ -0,0 +1,152 @@ +# file: tool/bing_search.py + +import time +import urllib.parse +from tool.web_browser import WebBrowser + + +class BingSearcher: + def __init__(self, cookie_path="bing_cookies.json"): + self.browser = WebBrowser(cookie_path) + self.browser_started = False + + def start_browser(self): + """启动浏览器""" + if not self.browser_started: + try: + self.browser.start_browser() + # 加载cookies + if not self.browser.load_cookies(): + print("未找到有效Cookie") + else: + print("已加载Cookie") + self.browser_started = True + except Exception as e: + print(f"启动浏览器失败: {e}") + self.browser_started = False + + def close_browser(self): + """关闭浏览器""" + if self.browser_started: + try: + # 保存cookies + self.browser.save_cookies() + self.browser.close_browser() + except Exception as e: + print(f"关闭浏览器时出错: {e}") + finally: + self.browser_started = False + + def search(self, keyword, num_pages=1): + """ + 在Bing上搜索关键词并返回结果 + + Args: + keyword (str): 搜索关键词 + num_pages (int): 搜索页数,默认为1 + + Returns: + list: 搜索结果列表,每个元素包含title、url和request_url + """ + if not self.browser_started: + self.start_browser() + + if not self.browser_started: + print("浏览器未启动,无法执行搜索") + return [] + + print(f'必应爬取任务进行中,爬取页数为{num_pages}...') + + + all_results = [] + + try: + # 执行搜索 + for page in range(num_pages): + first = page * 10 + 1 + url = f"https://cn.bing.com/search?q={urllib.parse.quote(keyword)}&first={first}&count=10&FORM=PERE" + + print("正在爬取的url为:" + url) + print('标题 url') + # 访问页面 + if self.browser.visit_page(url): + # 提取搜索结果 + results = self.browser.extract_links("h2 a") + all_results.extend(results) + + # 打印结果 + for result in results: + print(result['title'], " ", result['url']) + + # 随机延迟,避免请求过快 + time.sleep(2) + + except Exception as e: + print(f"搜索过程中出现错误: {e}") + + count = len(all_results) + print(f"必应搜索爬取结果为{count}") + return all_results + + def __enter__(self): + """上下文管理器入口""" + self.start_browser() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """上下文管理器出口""" + self.close_browser() + + +# 兼容旧接口的函数 +def bing_main(keyword, num='1'): + """ + 兼容原有接口的函数 + + Args: + keyword (str): 搜索关键词 + num (str): 搜索页数,支持范围格式如'1:5' + + Returns: + tuple: (titles列表, urls列表) + """ + searcher = BingSearcher() + + # 解析页数参数 + if ':' in num: + if num.count(':') > 1: + raise ValueError("输入中必须且只能包含一个 ':'") + else: + start_page, end_page = num.split(':') + if not (start_page.isdigit() and end_page.isdigit()): + raise ValueError("':' 两侧的值必须是数字") + else: + num_pages = int(end_page) - int(start_page) + 1 + else: + num_pages = int(num) + + try: + searcher.start_browser() + results = searcher.search(keyword, num_pages) + + # 分离titles和urls + titles = [result['title'] for result in results] + urls = [result['url'] for result in results] + + return (titles, urls) + + finally: + searcher.close_browser() + +# 使用示例: +# 方法1: 手动管理浏览器生命周期 +# searcher = BingSearcher() +# searcher.start_browser() +# results1 = searcher.search("阿里巴巴", 1) +# results2 = searcher.search("腾讯", 1) +# searcher.close_browser() + +# 方法2: 使用上下文管理器 +# with BingSearcher() as searcher: +# results1 = searcher.search("阿里巴巴", 1) +# results2 = searcher.search("腾讯", 1) diff --git a/tool/csv_tool.py b/tool/csv_tool.py new file mode 100644 index 0000000..ac8bc80 --- /dev/null +++ b/tool/csv_tool.py @@ -0,0 +1,266 @@ +import csv +import os +import time +from typing import List, Dict, Any, Optional + + +class CSVTool: + def __init__(self, csv_file_name: str, headers: List[str]): + """ + 初始化CSV工具 + + Args: + csv_file_name (str): CSV文件名 + headers (List[str]): 表头列表 + """ + self.csv_file_name = csv_file_name + self.headers = headers + + def init_csv_file(self): + """ + 初始化CSV文件,如果文件不存在则创建并写入表头 + """ + if not os.path.exists(self.csv_file_name): + with open(self.csv_file_name, 'w', encoding='utf-8', newline='') as f: + writer = csv.writer(f) + writer.writerow(self.headers) + + def get_existing_data(self, unique_titles: List[str]) -> set: + """ + 读取现有数据,用于去重检查 + + Args: + unique_titles (List[str]): 用于唯一性检查的列名列表 + + Returns: + set: 包含唯一标识符元组的集合 + """ + existing_data = set() + + if not os.path.exists(self.csv_file_name): + return existing_data + + try: + with open(self.csv_file_name, 'r', encoding='utf-8') as f: + reader = csv.reader(f) + header_row = next(reader, None) # 读取表头 + + if header_row is None: + return existing_data + + # 获取唯一列的索引 + unique_indices = [] + for title in unique_titles: + try: + index = header_row.index(title) + unique_indices.append(index) + except ValueError: + print(f"警告: 表头中未找到列 '{title}'") + continue + + # 读取数据行 + for row in reader: + if len(row) >= len(header_row): # 确保行数据完整 + # 提取唯一标识符 + unique_values = tuple(row[i] if i < len(row) else "" for i in unique_indices) + existing_data.add(unique_values) + + except Exception as e: + print(f"读取CSV文件时出错: {e}") + + return existing_data + + def save_data(self, data_list: List[Dict[str, Any]], unique_titles: List[str], create_time: bool = True) -> int: + """ + 将数据保存到CSV文件中,自动去重 + + Args: + data_list (List[Dict[str, Any]]): 要保存的数据列表 + unique_titles (List[str]): 用于唯一性检查的列名列表 + create_time (bool): 是否自动添加创建时间,默认为True + + Returns: + int: 实际写入的行数 + """ + if not data_list: + print('数据列表为空,没有数据可写入') + return 0 + + # 初始化文件 + self.init_csv_file() + + # 获取现有数据用于去重 + existing_data = self.get_existing_data(unique_titles) + + # 准备写入的数据 + rows_to_write = [] + written_count = 0 + + for data_node in data_list: + # 构建唯一标识符元组 + unique_values = tuple(data_node.get(title, "") for title in unique_titles) + + # 检查是否已存在 + if unique_values in existing_data: + continue # 跳过已存在的数据 + + # 构建行数据 + row_data = [] + for header in self.headers: + if header == 'create_time' and create_time: + # 自动添加创建时间 + row_data.append(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) + else: + # 从数据节点获取对应值 + row_data.append(data_node.get(header, "")) + + rows_to_write.append(row_data) + existing_data.add(unique_values) # 添加到已存在数据集合中,避免本次写入中的重复 + written_count += 1 + + # 写入数据 + if rows_to_write: + with open(self.csv_file_name, 'a', encoding='utf-8', newline='') as f: + writer = csv.writer(f) + writer.writerows(rows_to_write) + + print(f"成功写入 {written_count} 行数据到 {self.csv_file_name}") + else: + print("没有新数据需要写入") + + return written_count + + def query_data(self, filter_func=None) -> List[Dict[str, str]]: + """ + 查询CSV文件中的数据 + + Args: + filter_func (callable, optional): 过滤函数,接受一行数据字典作为参数,返回True/False + + Returns: + List[Dict[str, str]]: 查询结果列表 + """ + if not os.path.exists(self.csv_file_name): + return [] + + result = [] + + try: + with open(self.csv_file_name, 'r', encoding='utf-8') as f: + reader = csv.reader(f) + header_row = next(reader, None) # 读取表头 + + if header_row is None: + return result + + for row in reader: + if len(row) >= len(header_row): # 确保行数据完整 + # 将行数据转换为字典 + row_dict = dict(zip(header_row, row)) + + # 应用过滤条件 + if filter_func is None or filter_func(row_dict): + result.append(row_dict) + + except Exception as e: + print(f"查询CSV文件时出错: {e}") + + return result + + def query_by_conditions(self, **kwargs) -> List[Dict[str, str]]: + """ + 根据条件查询数据 + + Args: + **kwargs: 查询条件,键值对形式 + + Returns: + List[Dict[str, str]]: 查询结果列表 + """ + + def filter_func(row_dict): + for key, value in kwargs.items(): + if key in row_dict and row_dict[key] != value: + return False + return True + + return self.query_data(filter_func) + + def get_all_data(self) -> List[Dict[str, str]]: + """ + 获取所有数据 + + Returns: + List[Dict[str, str]]: 所有数据列表 + """ + return self.query_data() + + +# 保持向后兼容的函数 +def save_to_csv(filter_list: List[Dict[str, Any]], + csv_file_name: str = 'company_search_result_data.csv', + headers: List[str] = None, + unique_titles: List[str] = None) -> bool: + """ + 将结果追加写入csv文件中(向后兼容函数) + + Args: + filter_list: 需要写入的数据列表 + csv_file_name: CSV文件名 + headers: 表头列表 + unique_titles: 用于唯一性检查的列名列表 + + Returns: + bool: 是否成功写入 + """ + if headers is None: + headers = ['title', 'url', 'web_site_type', 'request_url', 'company_name', 'create_time'] + + if unique_titles is None: + unique_titles = ['company_name', 'web_site_type'] + + try: + csv_tool = CSVTool(csv_file_name, headers) + written_count = csv_tool.save_data(filter_list, unique_titles) + return written_count > 0 + except Exception as e: + print(f"保存CSV时出错: {e}") + return False + + +# 使用示例: +if __name__ == "__main__": + # 示例数据 + sample_data = [ + { + 'title': '测试公司1', + 'url': 'https://example.com/1', + 'web_site_type': 'aiqicha', + 'request_url': 'https://bing.com/search?q=测试公司1', + 'company_name': '测试公司1' + }, + { + 'title': '测试公司2', + 'url': 'https://example.com/2', + 'web_site_type': 'qcc', + 'request_url': 'https://bing.com/search?q=测试公司2', + 'company_name': '测试公司2' + } + ] + + # 创建CSV工具实例 + csv_tool = CSVTool( + csv_file_name='test_data.csv', + headers=['title', 'url', 'web_site_type', 'request_url', 'company_name', 'create_time'] + ) + + # 保存数据 + csv_tool.save_data(sample_data, unique_titles=['company_name', 'web_site_type']) + + # 查询所有数据 + all_data = csv_tool.get_all_data() + print("所有数据:", all_data) + + # 根据条件查询 + filtered_data = csv_tool.query_by_conditions(web_site_type='aiqicha') + print("查询结果:", filtered_data) diff --git a/tool/web_browser.py b/tool/web_browser.py new file mode 100644 index 0000000..bb26b67 --- /dev/null +++ b/tool/web_browser.py @@ -0,0 +1,288 @@ +# file: tool/web_browser.py + +import random +import json +import os +import time +import urllib.parse +from playwright.sync_api import sync_playwright + + +class WebBrowser: + def __init__(self, cookie_path="browser_cookies.json"): + self.cookie_path = cookie_path + self.browser = None + self.context = None + self.page = None + self.playwright = None + + def anti_detection(self): + """注入更全面的反检测脚本""" + self.page.add_init_script(""" + // 隐藏webdriver属性 + delete navigator.__proto__.webdriver; + + // 伪装chrome属性 + Object.defineProperty(navigator, 'chrome', { + value: { + runtime: {}, + loadTimes: function() {} + }, + writable: false, + enumerable: true, + configurable: true + }); + + // 伪装plugins和mimeTypes + Object.defineProperty(navigator, 'plugins', { + get: () => [ + { 0: { type: 'application/pdf' } }, + { 0: { type: 'application/x-google-chrome-pdf' } } + ], + }); + + Object.defineProperty(navigator, 'mimeTypes', { + get: () => [ + { type: 'application/pdf' }, + { type: 'application/x-google-chrome-pdf' } + ], + }); + + // 伪装languages + Object.defineProperty(navigator, 'languages', { + get: () => ['zh-CN', 'zh'], + }); + + // 禁用调试功能 + window.console.debug = function() {}; + window.console.log = function() {}; + + // 伪装屏幕信息 + Object.defineProperty(screen, 'width', {get: () => 1366}); + Object.defineProperty(screen, 'height', {get: () => 768}); + Object.defineProperty(screen, 'availWidth', {get: () => 1366}); + Object.defineProperty(screen, 'availHeight', {get: () => 768}); + Object.defineProperty(screen, 'colorDepth', {get: () => 24}); + Object.defineProperty(screen, 'pixelDepth', {get: () => 24}); + + // 伪装时间戳 + window.chrome = { + runtime: {} + }; + + // 伪装outerHeight和outerWidth + Object.defineProperty(window, 'outerHeight', {get: () => 768}); + Object.defineProperty(window, 'outerWidth', {get: () => 1366}); + + // 伪装innerHeight和innerWidth + Object.defineProperty(window, 'innerHeight', {get: () => 768}); + Object.defineProperty(window, 'innerWidth', {get: () => 1366}); + """) + + def random_behavior(self): + """模拟更复杂的人类操作""" + # 随机等待 + time.sleep(random.uniform(2, 5)) + + # 随机鼠标移动 + for _ in range(random.randint(3, 7)): + self.page.mouse.move( + random.randint(100, 1200), + random.randint(100, 600) + ) + time.sleep(random.uniform(0.1, 0.8)) + + # 随机滚动页面 + if random.choice([True, False]): + scroll_distance = random.randint(200, 800) + self.page.mouse.wheel(0, scroll_distance) + time.sleep(random.uniform(1, 2)) + + def init_cookie_file(self): + """初始化cookie文件""" + if not os.path.exists(self.cookie_path): + with open(self.cookie_path, 'w') as f: + json.dump([], f) + + def save_cookies(self): + """保存cookies到文件""" + cookies = self.context.cookies() + with open(self.cookie_path, 'w') as f: + json.dump(cookies, f, indent=2) + + def load_cookies(self): + """从文件加载cookies""" + try: + with open(self.cookie_path, 'r') as f: + cookies = json.load(f) + if cookies: + self.context.add_cookies(cookies) + return True + return False + except: + return False + + def bypass_debugger(self): + """绕过调试器检测""" + self.page.add_init_script(""" + window.Function.prototype.constructor = function() {}; + window.console.debug = function(){}; + Object.defineProperty(navigator, 'webdriver', {get: () => false}); + """) + + async def start_browser_async(self): + """异步启动浏览器""" + try: + self.playwright = await async_playwright().start() + self.browser = await self.playwright.chromium.launch( + headless=False, + args=[ + "--disable-blink-features=AutomationControlled", + "--disable-infobars" + ] + ) + self.context = await self.browser.new_context() + self.page = await self.context.new_page() + return True + except Exception as e: + print(f"异步启动浏览器失败: {e}") + return False + + async def close_browser_async(self): + """异步关闭浏览器""" + if self.browser: + await self.browser.close() + if self.playwright: + await self.playwright.stop() + + def start_browser(self): + """启动浏览器""" + self.init_cookie_file() + + self.playwright = sync_playwright().start() + self.browser = self.playwright.chromium.launch( + headless=False, + args=[ + "--disable-blink-features=AutomationControlled", + "--disable-infobars", + "--disable-extensions", + "--disable-plugins", + "--no-sandbox", + "--disable-dev-shm-usage", + "--disable-web-security", + "--disable-features=IsolateOrigins,site-per-process", + "--disable-background-timer-throttling", + "--disable-backgrounding-occluded-windows", + "--disable-renderer-backgrounding", + "--disable-ipc-flooding-protection" + ] + ) + self.context = self.browser.new_context( + user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", + viewport={"width": 1366, "height": 768}, + device_scale_factor=1, + is_mobile=False, + has_touch=False, + locale="zh-CN", + timezone_id="Asia/Shanghai" + ) + self.page = self.context.new_page() + self.anti_detection() + # 立即执行一次反检测 + self.page.evaluate(""" + delete navigator.__proto__.webdriver; + """) + self.random_behavior() + + def close_browser(self): + """关闭浏览器""" + if self.context: + self.context.close() + if self.browser: + self.browser.close() + if self.playwright: + self.playwright.stop() + + def visit_page(self, url): + """访问指定页面""" + try: + # 在 `visit_page` 之前执行更全面的反检测 + self.page.add_init_script(""" + // 隐藏webdriver属性 + delete navigator.__proto__.webdriver; + + // 伪装chrome属性 + Object.defineProperty(navigator, 'chrome', { + value: { + runtime: {}, + loadTimes: function() {} + }, + writable: false, + enumerable: true, + configurable: true + }); + + // 禁用调试功能 + window.console.debug = function() {}; + window.console.log = function() {}; + """) + + # 设置更真实的请求头 + self.page.set_extra_http_headers({ + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", + "Accept-Encoding": "gzip, deflate, br", + "Upgrade-Insecure-Requests": "1", + "Sec-Fetch-Dest": "document", + "Sec-Fetch-Mode": "navigate", + "Sec-Fetch-Site": "none", + "Cache-Control": "max-age=0" + }) + self.page.goto(url) + # 页面加载后执行反检测 + self.page.evaluate(""" + delete navigator.__proto__.webdriver; + """) + self.page.wait_for_load_state("networkidle") + self.random_behavior() + return True + except Exception as e: + print(f"访问页面失败: {url}, 错误: {str(e)}") + return False + + def extract_links(self, selector="h2 a"): + """ + 提取页面中的链接 + + Args: + selector (str): CSS选择器,默认为"h2 a" + + Returns: + list: 包含title、url和request_url的字典列表 + """ + links = [] + try: + elements = self.page.query_selector_all(selector) + for element in elements: + try: + title = element.inner_text().replace('\n', '').replace(',', ' ').strip() + url = element.get_attribute('href') + + current_url = self.page.url + if not url.startswith(('http://', 'https://')): + # 处理相对链接 + url = urllib.parse.urljoin(current_url, url) + + if title and url: + links.append({ + 'title': title, + 'url': url, + 'request_url': current_url + }) + except Exception as e: + print(f"提取链接失败: {e}") + continue + except Exception as e: + print(f"提取链接失败: {e}") + + return links