# -*- coding: utf-8 -*- import asyncio import os import sys import urllib.parse from urllib.parse import quote import aiohttp from bs4 import BeautifulSoup from colorama import Fore # 添加项目根目录到 sys.path sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import config bingheaders=config.bingheaders proxy=config.proxy timeout = aiohttp.ClientTimeout( total=None, # 总超时 sock_connect=5.5, # 连接超时时间5.5 sock_read=5.5 # 读取超时为5.5秒 ) # 新增基于 Playwright 的 Bing 搜索函数 async def getbing_with_click(keyword, page_num=1): """使用 Playwright 进行 Bing 搜索并模拟点击""" from tool.web_browser import WebBrowser import asyncio data_list = [] browser = WebBrowser() try: # 启动浏览器 await browser.start_browser_async() # 访问 Bing 搜索页面 search_url = f'https://cn.bing.com/search?q={keyword}&first={(page_num - 1) * 10 + 1}&count=10' success = browser.visit_page(search_url) if success: # 等待页面加载完成 await asyncio.sleep(10) # 等待3秒 browser.input_and_enter('#sb_form_q', keyword) # 等待页面加载完成 # 或者等待特定元素出现 # browser.page.wait_for_selector('h2 a', timeout=10000) # 等待页面加载完成 await asyncio.sleep(20) # 等待3秒 # 提取搜索结果 data_list = browser.extract_links('h2 a') except Exception as e: print(f"Bing页面爬取失败: {e}") finally: await browser.close_browser_async() return data_list async def getbing(url, session): # url_list = [] # title_list = [] data_list =[] async with session.get(url, headers=bingheaders,timeout=timeout) as resp: # print("正在爬取url:"+url) # # 使用通用方法点击搜索按钮 # browser.interact_with_element('#sb_form_go', 'click') try: a = await resp.text() soup = BeautifulSoup(a, 'lxml') h2a = soup.select('h2 a') for h in h2a: htext = h.text.replace('\n', '').replace(',', ' ').strip() hurl=h.get('href') if not hurl.startswith(('http://', 'https://')): domain = 'https://cn.bing.com/' hurl = urllib.parse.urljoin(domain, hurl) print(htext," ",hurl) data_list.append({'title': htext, 'url': hurl,'request_url':url}) # title_list.append(htext) # url_list.append(hurl) except: print(f"必应页面爬取失败,{url}该url无法正常获取数据。") return [],[] return data_list #url_list, title_list # 在 [getbing](file:///Users/liyaya/gitstudy/Spider/SearchCompany/search/Bing.py#L20-L46) 函数中添加表单提交方式 async def getbingPost(keyword, session): data_list = [] # 先访问首页获取必要的cookies和参数 async with session.get('https://cn.bing.com', headers=bingheaders, timeout=timeout) as resp: homepage = await resp.text() # 模拟表单提交搜索 search_data = { 'q': keyword, 'go': 'Submit', 'first': '1', 'count': '10' } search_url = 'https://cn.bing.com/search' async with session.post(search_url, data=search_data, headers=bingheaders, timeout=timeout) as resp: try: a = await resp.text() soup = BeautifulSoup(a, 'lxml') h2a = soup.select('h2 a') for h in h2a: htext = h.text.replace('\n', '').replace(',', ' ').strip() hurl = h.get('href') if not hurl.startswith(('http://', 'https://')): domain = 'https://cn.bing.com/' hurl = urllib.parse.urljoin(domain, hurl) print(htext, " ", hurl) data_list.append({'title': htext, 'url': hurl, 'request_url': search_url}) except: print(f"必应页面爬取失败,该url无法正常获取数据。") return [] return data_list async def bing_spinder(keyword, num): print(f'必应爬取任务进行中,爬取页数为{num}...') print('标题 url') data_list = [] tasks = [] if ':' in num: if num.count(':') > 1: raise ValueError("输入中必须且只能包含一个 ':'") else: start_page, end_page = num.split(':') if not (start_page.isdigit() and end_page.isdigit()): raise ValueError("':' 两侧的值必须是数字") else: start_page = (int(start_page) - 1) * 10 end_page = (int(end_page)) * 10 else: start_page, end_page = 0, int(num) * 10 async with aiohttp.ClientSession() as session: tasks = tasks + [asyncio.create_task(getbing_with_click(keyword, session))] # for pn in range(start_page, end_page, 10): # # tasks = tasks + [asyncio.create_task(getbing_with_click(keyword, session))] # 直接传递keyword而不是构建URL # tasks = tasks + [asyncio.create_task(getbing(keyword, session))] result = await asyncio.gather(*tasks) for res in result: data_list += res count = len(data_list) print(f"必应搜索爬取结果为{count}") print(Fore.GREEN + '必应爬取任务完成\n' + Fore.RESET) return data_list # async def bing_spinder(keyword, num): # print(f'必应爬取任务进行中,爬取页数为{num}...') # print('标题 url') # # urllist = [] # # titlelist = [] # data_list =[] # tasks = [] # if ':' in num: # if num.count(':') > 1: # raise ValueError("输入中必须且只能包含一个 ':'") # else: # # 分割字符串,确保分割后的两部分都是数字 # start_page, end_page = num.split(':') # # 判断两边是否都是数字 # if not (start_page.isdigit() and end_page.isdigit()): # raise ValueError("':' 两侧的值必须是数字") # else: # start_page = (int(start_page)-1)*10 # end_page = (int(end_page))*10 # else: # start_page, end_page =0,int(num) * 10 # async with aiohttp.ClientSession() as session: # # for pn in range(start_page, end_page, 10): # url = f'https://cn.bing.com/search?q={keyword}&first={pn + 1}&count=10' # print("正在爬取的url为:" + url) # tasks = tasks + [asyncio.create_task(getbing(url, session))] # result = await asyncio.gather(*tasks) # # # # url = f'https://cn.bing.com/search?q={keyword}&go=Submit&first={pn + 1}&count=10' # # # url = f'https://cn.bing.com/search?q={keyword}' # &first={pn + 1}' #&count=10&FORM=PERE' # # print("正在爬取的url为:" + url) # # tasks = tasks + [asyncio.create_task(getbing(url, session))] # # # for pn in range(start_page, end_page, 10): # # # #url = f'https://cn.bing.com/search?q={keyword}&first={pn}&mkt=zh-CN' # # # # 修复:使用正确的分页参数 # # # url = f'https://cn.bing.com/search?q={keyword}' #&first={pn + 1}' #&count=10&FORM=PERE' # # # print("正在爬取的url为:" + url) # # # tasks = tasks + [asyncio.create_task(getbing(url, session))] # # result = await asyncio.gather(*tasks) # for i in range(int((end_page-start_page) / 10)): # # urllist += result[i][0] # # titlelist += result[i][1] # data_list += result[i] # count=len(data_list) # print(f"必应搜索爬取结果为{count}") # print(Fore.GREEN + '必应爬取任务完成\n' + Fore.RESET) # return data_list # # await bingwriteCSV(titlelist, urllist, keyword) def bing_main(keyword,num): keyword = quote(keyword) if sys.platform.startswith('win'): asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) loop = asyncio.get_event_loop() return loop.run_until_complete(bing_spinder(keyword,num)) async def Bing_main(keywords, num): return await bing_spinder(keywords, num)