# -*- coding: utf-8 -*- import asyncio import csv import os import random import aiohttp import time import sys from bs4 import BeautifulSoup import re import aiofiles import urllib.parse import argparse from colorama import init, Fore from search import Bing,Baidu import openpyxl import ssl from tool.bing_search import BingSearcher from tool.csv_tool import CSVTool from tool.read_csv import CSVReader start = time.time() def printascii(): # 初始化 init() # 设置颜色 print(Fore.GREEN + r''' ____ _ / ___| ___ __ _ _ __ ___| |__ ___ _ __ \___ \ / _ \/ _` | '__/ __| '_ \ / _ \ '__| ___) | __/ (_| | | | (__| | | | __/ | |____/ \___|\__,_|_| \___|_| |_|\___|_| ''' + Fore.RESET) # 天欣安全实验室 def writeExcel(titles, links,ws): infos = list(zip(titles, links)) for row in infos: ws.append(row) def create_sheet_and_write(wb, engine, keywords, num, title): ws = wb.create_sheet(title=title) result = engine(keywords, num) writeExcel(result[0], result[1], ws) def excel_text2url(link_url): #如果函数内部没有进行异步操作,使用 async 并不会对性能或功能产生实际影响。 '''把一个网址字符串转换为 Excel公式,使其可以点击直接转跳''' return f'=HYPERLINK("{link_url}","{link_url}")' # 遍历所有工作表,并将第二列的所有数据传递给 excel_text2url 函数重新赋值 def update_hyperlinks(wb): for sheet in wb.worksheets: # 遍历每一个工作表 for row in sheet.iter_rows(min_row=1, max_row=sheet.max_row, min_col=2, max_col=2): # 遍历第二列 for cell in row: if cell.value: # 检查单元格是否有内容 cell.value = excel_text2url(cell.value) # 将网址转换为超链接公式 else: break def commend(): parser = argparse.ArgumentParser(prog="Searcher", description='此工具用于对百度、必应和谷歌搜索的协程爬取--天欣安全实验室', usage='please read -h') parser.add_argument("-k", type=str, help="搜索的关键词", nargs='+') # 添加一个positional arguments,叫a,读取类型为int(默认是字符串) parser.add_argument("-p", type=str, help="需要搜索页数,默认为5,支持范围搜索,例如搜索从第2页到第五页的参数为 2:5", default='5') parser.add_argument("-m", type=str, help="使用的搜索引擎:百度:bd,必应:bin,谷歌:goo 不填写默认使用全部", default='all',nargs='+') # parser.add_argument("-t", '--task', type=int, help="设置的线程,默认为8", default=8) parser.exit_on_error = False args = parser.parse_args() if len(sys.argv) == 1: printascii() parser.print_help() sys.exit() return args def search_company_info(company_name_key, addon_args, num): search_key = company_name_key.strip() + " " + addon_args search_key = search_key.strip() result = Bing.bing_main(search_key, num) # for 循环 遍历 result[0] 和 result[1] return result # for i in range(len(result[0])): # title= result[0][i] # url = result[1][i] # print(f"必应搜索爬取结果为,title:{title}, url:{url}") # if re.match(r"^https://aiqicha.baidu.com/company_detail_.*|https://www.qcc.com/firm/.*|https://www.tianyancha.com/company/.*", url): # data_list.append({"title":title, "url":url}) # return data_list def filter_company_sites(urls): # urls https://www.tianyancha.com/company/5226478758 # url:https://aiqicha.baidu.com/company_detail_26602790857925 # url:https://www.qcc.com/firm/05b449eb5cc417d0f97c14104051f5c0.html # 匹配 前缀https://aiqicha.baidu.com/company_detail_*,https://www.qcc.com/firm/*.html,https://www.tianyancha.com/company/5226478758* filtered_urls = [url for url in urls if re.match(r"^https://aiqicha.baidu.com/company_detail_.*|https://www.qcc.com/firm/.*|https://www.tianyancha.com/company/.*", url)] return filtered_urls def filter_aiqicha_qcc(search_result, company_name_, with_not_match = False): datas = [] for i in range(len(search_result)): data_node = search_result[i] title = data_node['title'] url = data_node['url'] print(f"必应搜索爬取结果为,title:{title}, url:{url}") # 判断title是否包含 company_name_ # if re.match( # r"^https://aiqicha.baidu.com/company_detail_.*|https://www.qcc.com/firm/.*|https://www.tianyancha.com/company/.*", # url) and title.find(company_name_) != -1: if title.find(company_name_) != -1 or with_not_match: web_site_type = None if re.match(r"^https://aiqicha.baidu.com/company_detail_.*", url): web_site_type = "aiqicha" elif re.match(r"^https://www.tianyancha.com/company/.*", url): web_site_type = "tianyancha" elif re.match(r"^https://www.qcc.com/firm/.*", url): web_site_type = "qcc" if web_site_type is not None: data_node['web_site_type'] = web_site_type data_node['company_name'] = company_name_ datas.append(data_node) return datas def search_one_company(company_name_arg, num): keywords = company_name_arg # for key in keyword: # keywords = keywords + key + " " keywords = keywords.strip() print(f"---==您搜索的关键词为:{keywords}") wb = openpyxl.Workbook() # 删除默认创建的工作表(现在名为 "数据表1") wb.remove(wb['Sheet']) printascii() pattern = r"[\\/:\*\?\"<>|]" keyword = re.sub(pattern, "", keywords) create_sheet_and_write(wb, Bing.bing_main, keywords, num, "必应爬取结果") create_sheet_and_write(wb, Baidu.baidu_main, keywords, num, "百度爬取结果") # 将所有url变为超链接,点击即可打开转跳 update_hyperlinks(wb) wb.save(f'./{keyword}-{company_name_arg}.xlsx') print(Fore.GREEN + '总任务结束!' + Fore.RESET) end = time.time() print(Fore.RED + f'脚本总时间: {end - start:.2f}') def save_to_csv(filter_list): if filter_list is None or len(filter_list) == 0: print('filter_list is None or len(filter_list) == 0, 没有数据可写入') return False """ 将结果追加写入csv文件中 Args: filter_list: 需要写入的数据列表 """ csv_file = 'company_search_result_data.csv' headers = ['title', 'url', 'web_site_type', 'request_url', 'company_name', 'create_time'] # 判断文件是否存在,不存在则创建并写入列头 file_exists = os.path.exists(csv_file) # 读取现有数据,用于判断重复项 existing_data = set() if file_exists: with open(csv_file, 'r', encoding='utf-8') as f: reader_ins = csv.reader(f) header_skipped = False for row in reader_ins: if not header_skipped: header_skipped = True continue if len(row) >= 5: # 确保行数据完整 company_name = row[4] # company_name在第5列(索引4) web_site_type = row[2] if len(row) > 2 else "" # web_site_type在第3列(索引2) existing_data.add((company_name, web_site_type)) # 写入数据 with open(csv_file, 'a', encoding='utf-8', newline='') as f: writer = csv.writer(f) # 如果文件不存在,写入列头 if not file_exists: writer.writerow(headers) # 追加写入数据,去重处理 for data_node in filter_list: company_name = data_node.get('company_name', '') web_site_type = data_node.get('web_site_type', '') # 判断是否已存在相同的company_name和web_site_type组合 if (company_name, web_site_type) not in existing_data: # 创建时间格式化 create_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) # 写入数据行 row_data = [ data_node.get('title', ''), data_node.get('url', ''), web_site_type, data_node.get('request_url', ''), company_name, create_time ] writer.writerow(row_data) # 添加到已存在数据集合中,避免本次写入中的重复 existing_data.add((company_name, web_site_type)) print(f"写入数据成功,title:{data_node.get('title', '')}, " f"url:{data_node.get('url', '')}, " f"web_site_type:{web_site_type}, " f"request_url:{data_node.get('request_url', '')}, " f"company_name:{company_name}, " f"create_time:{create_time}") def check_company_exists(company_names, type_list): """ 读取 company_search_result_data.csv 数据,检查指定的公司名称和类型是否存在 Args: company_names (list): 公司名称列表 type_list (list): 类型列表 Returns: list: 包含公司名称和存在状态的字典列表 格式: [{"company_name": "公司名", "exists": True/False}, ...] """ csv_file = 'company_search_result_data.csv' result = [] # 初始化所有公司为不存在状态 for company_name_item in company_names: result.append({ "company_name": company_name_item, "exists": False }) # 如果文件不存在,直接返回初始化结果 if not os.path.exists(csv_file): return result # 读取CSV文件中的现有数据 existing_combinations = set() # 存储(公司名, 类型)组合 try: with open(csv_file, 'r', encoding='utf-8') as f: reader = csv.reader(f) header_skipped = False for row in reader: if not header_skipped: header_skipped = True continue # 确保行数据完整 if len(row) >= 5: company_name_item = row[4] # company_name在第5列(索引4) web_site_type = row[2] if len(row) > 2 else "" # web_site_type在第3列(索引2) # 添加到现有组合集合中 existing_combinations.add((company_name_item, web_site_type)) except Exception as e: print(f"读取CSV文件时出错: {e}") return result # 检查每个公司是否存在于指定的类型中 for item in result: company_name_item = item["company_name"] exists = False # 如果type_list为空,检查公司是否存在任何类型中 if not type_list: for existing_company, _ in existing_combinations: if existing_company == company_name_item: exists = True break else: # 检查公司是否存在于指定的类型中 for web_site_type in type_list: if (company_name_item, web_site_type) in existing_combinations: exists = True break item["exists"] = exists return result if __name__ == '__main__': reader = CSVReader('data/data.csv') company_names = reader.read_column(0, has_header=False) print("所有数据:", company_names) # 检查已存在的公司 type_list = ["aiqicha", "qcc", "tianyancha"] check_result = check_company_exists(company_names, type_list) print("检查结果:", check_result) i = 1 # 方法2: 使用上下文管理器 with BingSearcher() as searcher: # 创建CSV工具实例 csv_tool = CSVTool( csv_file_name='data/company_search_bing_data.csv', headers=['title', 'url', 'web_site_type', 'request_url', 'company_name', 'create_time'] ) # 查询所有数据 all_data = csv_tool.get_all_data() print("所有数据:", all_data) # 查询所有数据 all_data = csv_tool.get_all_data() print("所有数据:", all_data) # 初始化所有公司为不存在状态 company_names_saved_set = set() for company_name_item in all_data: company_names_saved_set.add(company_name_item["company_name"]) for company_name in company_names: # 如果公司已存在,跳过处理 if company_name in company_names_saved_set: print(f"公司 {company_name} 已存在,跳过处理") continue # if company_exists: # print(f"公司 {company_name} 已存在,跳过处理") # continue print(f"正在处理第 {i} 个公司: {company_name}") addon_args = " 爱企查|企查查" data_list = searcher.search(company_name+" "+addon_args, 1) filter_list = filter_aiqicha_qcc(data_list, company_name) print(company_name, "filter_list:", filter_list) if len(filter_list) <= 0: print("没有数据 filter_list is empty. " + company_name) filter_list_with_not_match = filter_aiqicha_qcc(data_list, company_name, with_not_match=True) # 创建CSV工具实例 csv_tool = CSVTool( csv_file_name='company_search_filter_is_none_data.csv', headers=['company_name','title', 'web_site_type','url', 'request_url', 'create_time'] ) # 保存数据,指定去重字段 csv_tool.save_data(filter_list_with_not_match, unique_titles=['company_name', 'title','url','web_site_type']) continue else: # 创建CSV工具实例 csv_tool = CSVTool( csv_file_name='data/company_search_bing_data.csv', headers=['company_name','title', 'web_site_type','url', 'request_url', 'create_time'] ) # 保存数据,指定去重字段 csv_tool.save_data(filter_list, unique_titles=['company_name', 'web_site_type']) # save_to_csv(filter_list) # i = i + 1 # if i > 3: # print("结束循环") # break # results2 = searcher.search("腾讯", 1) # results3 = searcher.search("百度", 1) sleep_time = 5 sleep_time += random.randint(3, 10) time.sleep(sleep_time) pass pass if True: print("exit") exit(0) i = 1 for company_name_ele in check_result: company_name = company_name_ele["company_name"] company_exists = company_name_ele["exists"] # 如果公司已存在,跳过处理 if company_exists: print(f"公司 {company_name} 已存在,跳过处理") continue sleep_time = 5 sleep_time += random.randint(3, 10) time.sleep(sleep_time) addon_args = " 爱企查|企查查" data_list = search_company_info(company_name, addon_args, '1') filter_list = filter_aiqicha_qcc(data_list, company_name) print("filter_list:",filter_list) save_to_csv(filter_list) if len(filter_list)<= 0: print("没有数据 filter_list is empty. "+company_name) continue i=i+1 if i > 100: break