diff --git a/.idea/SearchCompany.iml b/.idea/SearchCompany.iml index 14adaaa..ab316cb 100644 --- a/.idea/SearchCompany.iml +++ b/.idea/SearchCompany.iml @@ -3,8 +3,9 @@ + - + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml index 817fc93..75cf854 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -3,5 +3,5 @@ - + \ No newline at end of file diff --git a/config.py b/config.py index b0fab9e..302c3ba 100644 --- a/config.py +++ b/config.py @@ -1,10 +1,21 @@ # 最好写入三家搜索引擎登录后的cookie +# bingheaders = { +# 'cookie': """""", +# 'referer': 'https://cn.bing.com/', +# 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/82.0.4051.0 Safari/537.36 Edg/82.0.425.0'} + +# 确保 `bingheaders` 包含以下字段 bingheaders = { - 'cookie': """""", - 'referer': 'https://cn.bing.com/', - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/82.0.4051.0 Safari/537.36 Edg/82.0.425.0'} + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', + 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', + 'Accept-Encoding': 'gzip, deflate, br', + 'Referer': 'https://cn.bing.com/', + 'Connection': 'keep-alive', + 'Cache-Control': 'no-cache', +} baiduheaders = { 'Cookie': """PSTM=1755051967; BAIDUID=9623ABA6AF15935E519C6D57EB04D5BD:FG=1; BIDUPSID=BFDEAE9917763352A1CF94FF7A9AD50F; BD_UPN=12314753; delPer=0; BD_CK_SAM=1; PSINO=3; BAIDUID_BFESS=9623ABA6AF15935E519C6D57EB04D5BD:FG=1; ZFY=LX6tLiXJLyE8Spg0Tn3yWYhYWOqUXgNuD45NXzSsgDY:C; baikeVisitId=6e4f6130-a8eb-49b3-8413-1815a6af31a3; BD_HOME=1; ppfuid=FOCoIC3q5fKa8fgJnwzbE67EJ49BGJeplOzf+4l4EOvDuu2RXBRv6R3A1AZMa49I27C0gDDLrJyxcIIeAeEhD8JYsoLTpBiaCXhLqvzbzmvy3SeAW17tKgNq/Xx+RgOdb8TWCFe62MVrDTY6lMf2GrfqL8c87KLF2qFER3obJGlT/s3qQuIlmw0dmIvm22ZTGEimjy3MrXEpSuItnI4KDyGSNvJz3OVxhMd6l0BD7nHci+eNtO+sUfx41sINYk+w3il4JkBUe91yGyLjoc4piSRx4OH9u8PLj7EqnTyQEyOWgTqV0RFcOD/4ANUzZZkGhGlPjfasITJONp0AJTY8kGLSgWjlFVG9Xmh1+20oPSbrzvDjYtVPmZ+9/6evcXmhcO1Y58MgLozKnaQIaLfWRPAn9I0uOqAMff6fuUeWcH0OjH2+RiDANKDxQc+RdNr2uC5D1fu00TizBtFeq9APvs5FjnYxYstXg/9EfB3EVmJIvdK3BvFGk0IgcgSSzt63lV1Uhhp5FAe6gNJIUptp7EMAaXYKm11G+JVPszQFdp9AJLcm4YSsYUXkaPI2Tl66J246cmjWQDTahAOINR5rXR5r/7VVI1RMZ8gb40q7az7vCK56XLooKT5a+rsFrf5Zu0yyCiiagElhrTEOtNdBJJq8eHwEHuFBni9ahSwpC7lbKkUwaKH69tf0DFV7hJROiLETSFloIVkHdy3+I2JUr1LsplAz0hMkWt/tE4tXVUV7QcTDTZWS/2mCoS/GV3N9awQ6iM6hs/BWjlgnEa1+5gbcves5wJ6gbk0b0Avk9wGRtTVVEE/aHCSd+6WFfR1C5FKazXcZ/j40FJv+iLGBn3nkkgHlne61I8I7KhtQgIkmBMJIjPMkS/L051MeqdGScsKYTJuSucgI5c3+79eVH+y2TvbOTuuHv1uGxwXFb2atIU1ZYPbmmXculmizKcKIUiL64VMhr/ZycHJ3jpdZlyprBJR80ygAVuGrjl4whGbgBRkDPTwtXjYtgzmW74m0fDU2MZaxpBZZF8YurfocYcmDdcxFKeoIFQmVqAoAU+3YcXQt2xKThZZyV1v3sCvnzidUZtKM9cRRUfRWBtQSb50APM+gs/408xg7KHCB8AOKpZpfIpPhQ0RJhew8GR0aTqYsJo1IRCwM3UbbrvtJ7eqPMNzJcGcSYcQWm1FubInMonve94c+p8Vi2wc72MfReeFiTzMp1G6pDt2e40gPDGbdQI+jba4UjRlyA+9CbTW6Mt45W/80hW/gFEKh9+Klyky6FPenbJgt/vQK9TAiTA==; BDUSS=o4ZFV6UTVucGp0Rmx6TlNucFQ1Z1FEMnQyZ1ZOdmRUZWg2Nn5FQWxteWdBTVZvSVFBQUFBJCQAAAAAAAAAAAEAAAAXn3lCu8PRqdTGtssAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAKBznWigc51oZW; BDUSS_BFESS=o4ZFV6UTVucGp0Rmx6TlNucFQ1Z1FEMnQyZ1ZOdmRUZWg2Nn5FQWxteWdBTVZvSVFBQUFBJCQAAAAAAAAAAAEAAAAXn3lCu8PRqdTGtssAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAKBznWigc51oZW; sensorsdata2015jssdkcross=%7B%22distinct_id%22%3A%22370464293%22%2C%22first_id%22%3A%22198a7105b40582-0a1b1b0944bf378-4c657b58-1440000-198a7105b4183a%22%2C%22props%22%3A%7B%7D%2C%22%24device_id%22%3A%22198a7105b40582-0a1b1b0944bf378-4c657b58-1440000-198a7105b4183a%22%7D; MCITY=-179%3A; log_first_time=1755482524636; log_last_time=1755482544322; RT="z=1&dm=baidu.com&si=1403e7da-9af8-439d-bdca-61f492a1b52a&ss=mecm9ry0&sl=0&tt=0&bcn=https%3A%2F%2Ffclog.baidu.com%2Flog%2Fweirwood%3Ftype%3Dperf&ld=jhi&ul=3upx87&hd=3upxa3"; H_PS_PSSID=62325_63147_63327_63948_64048_64174_64248_64245_64258_64260_64317_64358_64366_64362_64363_64395_64414_64429_64436_64442_64450_64457_64473_64483_64502_64512_64448_64087_64559_64571; BA_HECTOR=048lak8h81218h8h8020850k80a00g1ka54mp25; H_WISE_SIDS=62325_63147_63327_63948_64048_64174_64248_64245_64258_64260_64317_64358_64366_64362_64363_64395_64414_64429_64436_64442_64450_64457_64473_64483_64502_64512_64448_64087_64559_64571; BDRCVFR[feWj1Vr5u3D]=I67x6TjHwwYf0; BDORZ=B490B5EBF6F3CD402E515D22BCDA1598; COOKIE_SESSION=21_0_8_9_13_23_0_1_8_9_1_6_498875_0_0_0_1754446941_0_1755485313%7C9%231543375_16_1753882701%7C7; H_PS_645EC=1275d4%2BgYNOGPU5%2Fgp6XcloUiDEOGWs8LNx7nISyDCmJSXMYxQLNnwJypIA""", diff --git a/crawler_bing_main.py b/crawler_bing_main.py index 5037a5c..776a1b5 100644 --- a/crawler_bing_main.py +++ b/crawler_bing_main.py @@ -33,70 +33,8 @@ def printascii(): ___) | __/ (_| | | | (__| | | | __/ | |____/ \___|\__,_|_| \___|_| |_|\___|_| ''' + Fore.RESET) -# 天欣安全实验室 -def writeExcel(titles, links,ws): - infos = list(zip(titles, links)) - for row in infos: - ws.append(row) - -def create_sheet_and_write(wb, engine, keywords, num, title): - ws = wb.create_sheet(title=title) - result = engine(keywords, num) - writeExcel(result[0], result[1], ws) -def excel_text2url(link_url): #如果函数内部没有进行异步操作,使用 async 并不会对性能或功能产生实际影响。 - '''把一个网址字符串转换为 Excel公式,使其可以点击直接转跳''' - return f'=HYPERLINK("{link_url}","{link_url}")' -# 遍历所有工作表,并将第二列的所有数据传递给 excel_text2url 函数重新赋值 -def update_hyperlinks(wb): - for sheet in wb.worksheets: # 遍历每一个工作表 - for row in sheet.iter_rows(min_row=1, max_row=sheet.max_row, min_col=2, max_col=2): # 遍历第二列 - for cell in row: - if cell.value: # 检查单元格是否有内容 - cell.value = excel_text2url(cell.value) # 将网址转换为超链接公式 - else: - break - -def commend(): - parser = argparse.ArgumentParser(prog="Searcher", description='此工具用于对百度、必应和谷歌搜索的协程爬取--天欣安全实验室', usage='please read -h') - parser.add_argument("-k", type=str, help="搜索的关键词", nargs='+') - # 添加一个positional arguments,叫a,读取类型为int(默认是字符串) - parser.add_argument("-p", type=str, help="需要搜索页数,默认为5,支持范围搜索,例如搜索从第2页到第五页的参数为 2:5", default='5') - parser.add_argument("-m", type=str, help="使用的搜索引擎:百度:bd,必应:bin,谷歌:goo 不填写默认使用全部", default='all',nargs='+') - # parser.add_argument("-t", '--task', type=int, help="设置的线程,默认为8", default=8) - parser.exit_on_error = False - args = parser.parse_args() - if len(sys.argv) == 1: - printascii() - parser.print_help() - sys.exit() - return args -def search_company_info(company_name_key, addon_args, num): - - search_key = company_name_key.strip() + " " + addon_args - search_key = search_key.strip() - result = Bing.bing_main(search_key, num) - - # for 循环 遍历 result[0] 和 result[1] - - return result - # for i in range(len(result[0])): - # title= result[0][i] - # url = result[1][i] - # print(f"必应搜索爬取结果为,title:{title}, url:{url}") - # if re.match(r"^https://aiqicha.baidu.com/company_detail_.*|https://www.qcc.com/firm/.*|https://www.tianyancha.com/company/.*", url): - # data_list.append({"title":title, "url":url}) - # return data_list - -def filter_company_sites(urls): - # urls https://www.tianyancha.com/company/5226478758 - # url:https://aiqicha.baidu.com/company_detail_26602790857925 - # url:https://www.qcc.com/firm/05b449eb5cc417d0f97c14104051f5c0.html - # 匹配 前缀https://aiqicha.baidu.com/company_detail_*,https://www.qcc.com/firm/*.html,https://www.tianyancha.com/company/5226478758* - filtered_urls = [url for url in urls if re.match(r"^https://aiqicha.baidu.com/company_detail_.*|https://www.qcc.com/firm/.*|https://www.tianyancha.com/company/.*", url)] - return filtered_urls - def filter_aiqicha_qcc(search_result, company_name_, with_not_match = False): datas = [] @@ -125,100 +63,6 @@ def filter_aiqicha_qcc(search_result, company_name_, with_not_match = False): datas.append(data_node) return datas -def search_one_company(company_name_arg, num): - - keywords = company_name_arg - # for key in keyword: - # keywords = keywords + key + " " - keywords = keywords.strip() - print(f"---==您搜索的关键词为:{keywords}") - wb = openpyxl.Workbook() - # 删除默认创建的工作表(现在名为 "数据表1") - wb.remove(wb['Sheet']) - printascii() - pattern = r"[\\/:\*\?\"<>|]" - keyword = re.sub(pattern, "", keywords) - create_sheet_and_write(wb, Bing.bing_main, keywords, num, "必应爬取结果") - create_sheet_and_write(wb, Baidu.baidu_main, keywords, num, "百度爬取结果") - # 将所有url变为超链接,点击即可打开转跳 - update_hyperlinks(wb) - wb.save(f'./{keyword}-{company_name_arg}.xlsx') - print(Fore.GREEN + '总任务结束!' + Fore.RESET) - end = time.time() - print(Fore.RED + f'脚本总时间: {end - start:.2f}') - - -def save_to_csv(filter_list): - - if filter_list is None or len(filter_list) == 0: - print('filter_list is None or len(filter_list) == 0, 没有数据可写入') - return False - """ - 将结果追加写入csv文件中 - - Args: - filter_list: 需要写入的数据列表 - """ - csv_file = 'company_search_result_data.csv' - headers = ['title', 'url', 'web_site_type', 'request_url', 'company_name', 'create_time'] - - # 判断文件是否存在,不存在则创建并写入列头 - file_exists = os.path.exists(csv_file) - - # 读取现有数据,用于判断重复项 - existing_data = set() - if file_exists: - with open(csv_file, 'r', encoding='utf-8') as f: - reader_ins = csv.reader(f) - header_skipped = False - for row in reader_ins: - if not header_skipped: - header_skipped = True - continue - if len(row) >= 5: # 确保行数据完整 - company_name = row[4] # company_name在第5列(索引4) - web_site_type = row[2] if len(row) > 2 else "" # web_site_type在第3列(索引2) - existing_data.add((company_name, web_site_type)) - - # 写入数据 - with open(csv_file, 'a', encoding='utf-8', newline='') as f: - writer = csv.writer(f) - - # 如果文件不存在,写入列头 - if not file_exists: - writer.writerow(headers) - - # 追加写入数据,去重处理 - for data_node in filter_list: - company_name = data_node.get('company_name', '') - web_site_type = data_node.get('web_site_type', '') - - # 判断是否已存在相同的company_name和web_site_type组合 - if (company_name, web_site_type) not in existing_data: - # 创建时间格式化 - create_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - - # 写入数据行 - row_data = [ - data_node.get('title', ''), - data_node.get('url', ''), - web_site_type, - data_node.get('request_url', ''), - company_name, - create_time - ] - writer.writerow(row_data) - - # 添加到已存在数据集合中,避免本次写入中的重复 - existing_data.add((company_name, web_site_type)) - - print(f"写入数据成功,title:{data_node.get('title', '')}, " - f"url:{data_node.get('url', '')}, " - f"web_site_type:{web_site_type}, " - f"request_url:{data_node.get('request_url', '')}, " - f"company_name:{company_name}, " - f"create_time:{create_time}") - def check_company_exists(company_names, type_list): """ @@ -379,38 +223,9 @@ if __name__ == '__main__': pass pass - if True: - print("exit") - exit(0) - - - i = 1 - for company_name_ele in check_result: - company_name = company_name_ele["company_name"] - company_exists = company_name_ele["exists"] - - # 如果公司已存在,跳过处理 - if company_exists: - print(f"公司 {company_name} 已存在,跳过处理") - continue - - sleep_time = 5 - sleep_time += random.randint(3, 10) - time.sleep(sleep_time) - - addon_args = " 爱企查|企查查" - data_list = search_company_info(company_name, addon_args, '1') - filter_list = filter_aiqicha_qcc(data_list, company_name) - print("filter_list:",filter_list) - - save_to_csv(filter_list) - if len(filter_list)<= 0: - print("没有数据 filter_list is empty. "+company_name) - continue - - i=i+1 - if i > 100: - break + + + diff --git a/crawler_bing_main.py.bak b/crawler_bing_main.py.bak new file mode 100644 index 0000000..cc1adae --- /dev/null +++ b/crawler_bing_main.py.bak @@ -0,0 +1,197 @@ + +def writeExcel(titles, links,ws): + infos = list(zip(titles, links)) + for row in infos: + ws.append(row) + +def create_sheet_and_write(wb, engine, keywords, num, title): + ws = wb.create_sheet(title=title) + result = engine(keywords, num) + writeExcel(result[0], result[1], ws) +def excel_text2url(link_url): #如果函数内部没有进行异步操作,使用 async 并不会对性能或功能产生实际影响。 + '''把一个网址字符串转换为 Excel公式,使其可以点击直接转跳''' + return f'=HYPERLINK("{link_url}","{link_url}")' + + + +# 遍历所有工作表,并将第二列的所有数据传递给 excel_text2url 函数重新赋值 +def update_hyperlinks(wb): + for sheet in wb.worksheets: # 遍历每一个工作表 + for row in sheet.iter_rows(min_row=1, max_row=sheet.max_row, min_col=2, max_col=2): # 遍历第二列 + for cell in row: + if cell.value: # 检查单元格是否有内容 + cell.value = excel_text2url(cell.value) # 将网址转换为超链接公式 + else: + break + +def commend(): + parser = argparse.ArgumentParser(prog="Searcher", description='此工具用于对百度、必应和谷歌搜索的协程爬取--天欣安全实验室', usage='please read -h') + parser.add_argument("-k", type=str, help="搜索的关键词", nargs='+') + # 添加一个positional arguments,叫a,读取类型为int(默认是字符串) + parser.add_argument("-p", type=str, help="需要搜索页数,默认为5,支持范围搜索,例如搜索从第2页到第五页的参数为 2:5", default='5') + parser.add_argument("-m", type=str, help="使用的搜索引擎:百度:bd,必应:bin,谷歌:goo 不填写默认使用全部", default='all',nargs='+') + # parser.add_argument("-t", '--task', type=int, help="设置的线程,默认为8", default=8) + parser.exit_on_error = False + args = parser.parse_args() + if len(sys.argv) == 1: + printascii() + parser.print_help() + sys.exit() + return args + + # for i in range(len(result[0])): + # title= result[0][i] + # url = result[1][i] + # print(f"必应搜索爬取结果为,title:{title}, url:{url}") + # if re.match(r"^https://aiqicha.baidu.com/company_detail_.*|https://www.qcc.com/firm/.*|https://www.tianyancha.com/company/.*", url): + # data_list.append({"title":title, "url":url}) + # return data_list + + + +def filter_company_sites(urls): + # urls https://www.tianyancha.com/company/5226478758 + # url:https://aiqicha.baidu.com/company_detail_26602790857925 + # url:https://www.qcc.com/firm/05b449eb5cc417d0f97c14104051f5c0.html + # 匹配 前缀https://aiqicha.baidu.com/company_detail_*,https://www.qcc.com/firm/*.html,https://www.tianyancha.com/company/5226478758* + filtered_urls = [url for url in urls if re.match(r"^https://aiqicha.baidu.com/company_detail_.*|https://www.qcc.com/firm/.*|https://www.tianyancha.com/company/.*", url)] + return filtered_urls + + +def save_to_csv(filter_list): + + if filter_list is None or len(filter_list) == 0: + print('filter_list is None or len(filter_list) == 0, 没有数据可写入') + return False + """ + 将结果追加写入csv文件中 + + Args: + filter_list: 需要写入的数据列表 + """ + csv_file = 'company_search_result_data.csv' + headers = ['title', 'url', 'web_site_type', 'request_url', 'company_name', 'create_time'] + + # 判断文件是否存在,不存在则创建并写入列头 + file_exists = os.path.exists(csv_file) + + # 读取现有数据,用于判断重复项 + existing_data = set() + if file_exists: + with open(csv_file, 'r', encoding='utf-8') as f: + reader_ins = csv.reader(f) + header_skipped = False + for row in reader_ins: + if not header_skipped: + header_skipped = True + continue + if len(row) >= 5: # 确保行数据完整 + company_name = row[4] # company_name在第5列(索引4) + web_site_type = row[2] if len(row) > 2 else "" # web_site_type在第3列(索引2) + existing_data.add((company_name, web_site_type)) + + # 写入数据 + with open(csv_file, 'a', encoding='utf-8', newline='') as f: + writer = csv.writer(f) + + # 如果文件不存在,写入列头 + if not file_exists: + writer.writerow(headers) + + # 追加写入数据,去重处理 + for data_node in filter_list: + company_name = data_node.get('company_name', '') + web_site_type = data_node.get('web_site_type', '') + + # 判断是否已存在相同的company_name和web_site_type组合 + if (company_name, web_site_type) not in existing_data: + # 创建时间格式化 + create_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + + # 写入数据行 + row_data = [ + data_node.get('title', ''), + data_node.get('url', ''), + web_site_type, + data_node.get('request_url', ''), + company_name, + create_time + ] + writer.writerow(row_data) + + # 添加到已存在数据集合中,避免本次写入中的重复 + existing_data.add((company_name, web_site_type)) + + print(f"写入数据成功,title:{data_node.get('title', '')}, " + f"url:{data_node.get('url', '')}, " + f"web_site_type:{web_site_type}, " + f"request_url:{data_node.get('request_url', '')}, " + f"company_name:{company_name}, " + f"create_time:{create_time}") + + +def search_company_info(company_name_key, addon_args, num): + + search_key = company_name_key.strip() + " " + addon_args + search_key = search_key.strip() + + result = Bing.bing_main(search_key, num) + + # for 循环 遍历 result[0] 和 result[1] + + return result + + def search_one_company(company_name_arg, num): + + keywords = company_name_arg + # for key in keyword: + # keywords = keywords + key + " " + keywords = keywords.strip() + print(f"---==您搜索的关键词为:{keywords}") + wb = openpyxl.Workbook() + # 删除默认创建的工作表(现在名为 "数据表1") + wb.remove(wb['Sheet']) + printascii() + pattern = r"[\\/:\*\?\"<>|]" + keyword = re.sub(pattern, "", keywords) + create_sheet_and_write(wb, Bing.bing_main, keywords, num, "必应爬取结果") + create_sheet_and_write(wb, Baidu.baidu_main, keywords, num, "百度爬取结果") + # 将所有url变为超链接,点击即可打开转跳 + update_hyperlinks(wb) + wb.save(f'./{keyword}-{company_name_arg}.xlsx') + print(Fore.GREEN + '总任务结束!' + Fore.RESET) + end = time.time() + print(Fore.RED + f'脚本总时间: {end - start:.2f}') + + +if __name__ == '__main__': + if True: + print("exit") + exit(0) + i = 1 + for company_name_ele in check_result: + company_name = company_name_ele["company_name"] + company_exists = company_name_ele["exists"] + + # 如果公司已存在,跳过处理 + if company_exists: + print(f"公司 {company_name} 已存在,跳过处理") + continue + + sleep_time = 5 + sleep_time += random.randint(3, 10) + time.sleep(sleep_time) + + addon_args = " 爱企查|企查查" + data_list = search_company_info(company_name, addon_args, '1') + filter_list = filter_aiqicha_qcc(data_list, company_name) + print("filter_list:",filter_list) + + save_to_csv(filter_list) + if len(filter_list)<= 0: + print("没有数据 filter_list is empty. "+company_name) + continue + + i=i+1 + if i > 100: + break \ No newline at end of file diff --git a/search/Bing.py b/search/Bing.py index e4f5da4..c48d7d8 100644 --- a/search/Bing.py +++ b/search/Bing.py @@ -18,12 +18,57 @@ timeout = aiohttp.ClientTimeout( sock_connect=5.5, # 连接超时时间5.5 sock_read=5.5 # 读取超时为5.5秒 ) + + +# 新增基于 Playwright 的 Bing 搜索函数 +async def getbing_with_click(keyword, page_num=1): + """使用 Playwright 进行 Bing 搜索并模拟点击""" + from tool.web_browser import WebBrowser + import asyncio + + data_list = [] + browser = WebBrowser() + + try: + # 启动浏览器 + await browser.start_browser_async() + + # 访问 Bing 搜索页面 + search_url = f'https://cn.bing.com/search?q={keyword}&first={(page_num - 1) * 10 + 1}&count=10' + success = browser.visit_page(search_url) + + if success: + # 等待页面加载完成 + await asyncio.sleep(10) # 等待3秒 + + browser.input_and_enter('#sb_form_q', keyword) + # 等待页面加载完成 + # 或者等待特定元素出现 + # browser.page.wait_for_selector('h2 a', timeout=10000) + + # 等待页面加载完成 + await asyncio.sleep(20) # 等待3秒 + # 提取搜索结果 + data_list = browser.extract_links('h2 a') + + except Exception as e: + print(f"Bing页面爬取失败: {e}") + finally: + await browser.close_browser_async() + + return data_list + + + async def getbing(url, session): # url_list = [] # title_list = [] data_list =[] async with session.get(url, headers=bingheaders,timeout=timeout) as resp: # print("正在爬取url:"+url) + + # # 使用通用方法点击搜索按钮 + # browser.interact_with_element('#sb_form_go', 'click') try: a = await resp.text() soup = BeautifulSoup(a, 'lxml') @@ -47,44 +92,132 @@ async def getbing(url, session): #url_list, title_list + +# 在 [getbing](file:///Users/liyaya/gitstudy/Spider/SearchCompany/search/Bing.py#L20-L46) 函数中添加表单提交方式 +async def getbingPost(keyword, session): + data_list = [] + # 先访问首页获取必要的cookies和参数 + async with session.get('https://cn.bing.com', headers=bingheaders, timeout=timeout) as resp: + homepage = await resp.text() + + # 模拟表单提交搜索 + search_data = { + 'q': keyword, + 'go': 'Submit', + 'first': '1', + 'count': '10' + } + + search_url = 'https://cn.bing.com/search' + async with session.post(search_url, data=search_data, headers=bingheaders, timeout=timeout) as resp: + try: + a = await resp.text() + soup = BeautifulSoup(a, 'lxml') + h2a = soup.select('h2 a') + for h in h2a: + htext = h.text.replace('\n', '').replace(',', ' ').strip() + hurl = h.get('href') + if not hurl.startswith(('http://', 'https://')): + domain = 'https://cn.bing.com/' + hurl = urllib.parse.urljoin(domain, hurl) + print(htext, " ", hurl) + data_list.append({'title': htext, 'url': hurl, 'request_url': search_url}) + except: + print(f"必应页面爬取失败,该url无法正常获取数据。") + return [] + + return data_list + + async def bing_spinder(keyword, num): print(f'必应爬取任务进行中,爬取页数为{num}...') print('标题 url') - # urllist = [] - # titlelist = [] - data_list =[] + data_list = [] tasks = [] + if ':' in num: if num.count(':') > 1: raise ValueError("输入中必须且只能包含一个 ':'") else: - # 分割字符串,确保分割后的两部分都是数字 start_page, end_page = num.split(':') - # 判断两边是否都是数字 if not (start_page.isdigit() and end_page.isdigit()): raise ValueError("':' 两侧的值必须是数字") else: - start_page = (int(start_page)-1)*10 - end_page = (int(end_page))*10 + start_page = (int(start_page) - 1) * 10 + end_page = (int(end_page)) * 10 else: - start_page, end_page =0,int(num) * 10 + start_page, end_page = 0, int(num) * 10 + async with aiohttp.ClientSession() as session: - for pn in range(start_page, end_page, 10): - #url = f'https://cn.bing.com/search?q={keyword}&first={pn}&mkt=zh-CN' - # 修复:使用正确的分页参数 - url = f'https://cn.bing.com/search?q={keyword}&first={pn + 1}&count=10&FORM=PERE' - print("正在爬取的url为:" + url) - tasks = tasks + [asyncio.create_task(getbing(url, session))] + + tasks = tasks + [asyncio.create_task(getbing_with_click(keyword, session))] + + # for pn in range(start_page, end_page, 10): + # + # tasks = tasks + [asyncio.create_task(getbing_with_click(keyword, session))] + + # 直接传递keyword而不是构建URL + # tasks = tasks + [asyncio.create_task(getbing(keyword, session))] result = await asyncio.gather(*tasks) - for i in range(int((end_page-start_page) / 10)): - # urllist += result[i][0] - # titlelist += result[i][1] - data_list += result[i] - count=len(data_list) + + for res in result: + data_list += res + + count = len(data_list) print(f"必应搜索爬取结果为{count}") print(Fore.GREEN + '必应爬取任务完成\n' + Fore.RESET) return data_list - # await bingwriteCSV(titlelist, urllist, keyword) + + +# async def bing_spinder(keyword, num): +# print(f'必应爬取任务进行中,爬取页数为{num}...') +# print('标题 url') +# # urllist = [] +# # titlelist = [] +# data_list =[] +# tasks = [] +# if ':' in num: +# if num.count(':') > 1: +# raise ValueError("输入中必须且只能包含一个 ':'") +# else: +# # 分割字符串,确保分割后的两部分都是数字 +# start_page, end_page = num.split(':') +# # 判断两边是否都是数字 +# if not (start_page.isdigit() and end_page.isdigit()): +# raise ValueError("':' 两侧的值必须是数字") +# else: +# start_page = (int(start_page)-1)*10 +# end_page = (int(end_page))*10 +# else: +# start_page, end_page =0,int(num) * 10 +# async with aiohttp.ClientSession() as session: +# +# for pn in range(start_page, end_page, 10): +# url = f'https://cn.bing.com/search?q={keyword}&first={pn + 1}&count=10' +# print("正在爬取的url为:" + url) +# tasks = tasks + [asyncio.create_task(getbing(url, session))] +# result = await asyncio.gather(*tasks) +# # +# # url = f'https://cn.bing.com/search?q={keyword}&go=Submit&first={pn + 1}&count=10' +# # # url = f'https://cn.bing.com/search?q={keyword}' # &first={pn + 1}' #&count=10&FORM=PERE' +# # print("正在爬取的url为:" + url) +# # tasks = tasks + [asyncio.create_task(getbing(url, session))] +# # # for pn in range(start_page, end_page, 10): +# # # #url = f'https://cn.bing.com/search?q={keyword}&first={pn}&mkt=zh-CN' +# # # # 修复:使用正确的分页参数 +# # # url = f'https://cn.bing.com/search?q={keyword}' #&first={pn + 1}' #&count=10&FORM=PERE' +# # # print("正在爬取的url为:" + url) +# # # tasks = tasks + [asyncio.create_task(getbing(url, session))] +# # result = await asyncio.gather(*tasks) +# for i in range(int((end_page-start_page) / 10)): +# # urllist += result[i][0] +# # titlelist += result[i][1] +# data_list += result[i] +# count=len(data_list) +# print(f"必应搜索爬取结果为{count}") +# print(Fore.GREEN + '必应爬取任务完成\n' + Fore.RESET) +# return data_list +# # await bingwriteCSV(titlelist, urllist, keyword) def bing_main(keyword,num): diff --git a/tool/web_browser.py b/tool/web_browser.py index 7bf24fb..e045528 100644 --- a/tool/web_browser.py +++ b/tool/web_browser.py @@ -42,6 +42,155 @@ class WebBrowser: except: pass + def click_element(self, selector): + """ + 模拟点击页面元素 + + Args: + selector (str): CSS选择器或XPath表达式 + + Returns: + bool: 点击成功返回True,否则返回False + """ + try: + # 等待元素出现 + self.page.wait_for_selector(selector, timeout=10000) + + # 查找元素 + element = self.page.query_selector(selector) + if not element: + print(f"未找到元素: {selector}") + return False + + # 模拟鼠标移动到元素 + element.hover() + time.sleep(random.uniform(0.5, 1.0)) + + # 点击元素 + element.click() + + # 模拟人类点击后的等待 + time.sleep(random.uniform(1, 2)) + + return True + except Exception as e: + print(f"点击元素失败: {selector}, 错误: {str(e)}") + return False + + def input_and_enter(self, selector, text): + """ + 在指定输入框输入文本并按回车键 + + Args: + selector (str): 输入框的CSS选择器 + text (str): 要输入的文本 + + Returns: + bool: 输入成功返回True,否则返回False + """ + try: + # 等待输入框出现 + self.page.wait_for_selector(selector, timeout=10000) + + # 查找输入框元素 + input_element = self.page.query_selector(selector) + if not input_element: + print(f"未找到输入框: {selector}") + return False + + # 点击输入框以获得焦点 + input_element.click() + time.sleep(random.uniform(0.5, 1.0)) + + # 清空现有内容并输入新文本 + input_element.fill(text) + + # 模拟输入间隔 + time.sleep(random.uniform(1, 2)) + + # 按回车键 + input_element.press('Enter') + + # 等待页面响应 + time.sleep(random.uniform(2, 3)) + + return True + except Exception as e: + print(f"输入并回车失败: {selector}, 错误: {str(e)}") + return False + + def interact_with_element(self, selector, action_type="click", text=None, callback=None): + """ + 通用元素交互方法,支持多种操作类型和回调 + + Args: + selector (str): 元素的CSS选择器 + action_type (str): 操作类型 ("click", "input_enter", "hover") + text (str): 输入文本(仅在action_type为"input_enter"时需要) + callback (function): 回调函数,在操作完成后执行 + + Returns: + bool: 操作成功返回True,否则返回False + """ + try: + # 等待元素出现 + self.page.wait_for_selector(selector, timeout=10000) + + element = self.page.query_selector(selector) + if not element: + print(f"未找到元素: {selector}") + return False + + result = False + + if action_type == "click": + # 模拟点击 + element.hover() + time.sleep(random.uniform(0.5, 1.0)) + element.click() + result = True + + elif action_type == "input_enter": + if text is None: + print("输入操作需要提供text参数") + return False + + # 模拟输入并回车 + element.click() + time.sleep(random.uniform(0.5, 1.0)) + element.fill(text) + time.sleep(random.uniform(1, 2)) + element.press('Enter') + result = True + + elif action_type == "hover": + # 模拟悬停 + element.hover() + time.sleep(random.uniform(1, 2)) + result = True + + # 模拟人类行为延迟 + time.sleep(random.uniform(1, 2)) + + # 执行回调函数 + if callback and callable(callback): + try: + callback(result, selector) + except Exception as e: + print(f"回调函数执行失败: {e}") + + return result + except Exception as e: + print(f"元素交互失败: {selector}, 错误: {str(e)}") + if callback and callable(callback): + try: + callback(False, selector) + except Exception as cb_e: + print(f"回调函数执行失败: {cb_e}") + return False + + + def get_random_user_agent(self): """获取随机User-Agent""" user_agents = [ @@ -345,7 +494,7 @@ class WebBrowser: # self.page.wait_for_load_state("networkidle") # 3. 等待页面加载状态而不是特定元素 try: - self.page.wait_for_load_state('networkidle', timeout=5000) + self.page.wait_for_load_state('networkidle', timeout=15000) print("networkidle, timeout=5000页面已加载") except Exception as e: print(f"等待页面加载状态时出错: {e}") @@ -394,3 +543,30 @@ class WebBrowser: print(f"提取链接失败: {e}") return links + +# +# # 模拟点击搜索按钮 +# browser.click_element('#sb_form_go') +# +# # 在搜索框输入并回车 +# browser.input_and_enter('#sb_form_q', '搜索关键词') +# +# # 使用通用方法点击搜索按钮 +# browser.interact_with_element('#sb_form_go', 'click') +# +# # 使用通用方法输入并回车 +# browser.interact_with_element('#sb_form_q', 'input_enter', '搜索关键词') +# +# # 带回调的交互 +# def search_callback(success, selector): +# if success: +# print(f"成功操作元素: {selector}") +# else: +# print(f"操作元素失败: {selector}") +# +# browser.interact_with_element( +# '#sb_form_q', +# 'input_enter', +# '搜索关键词', +# search_callback +# ) \ No newline at end of file