Files
SearchCompany/search/Baidu.py
manchuwork 9d0f18a121 cookies
2025-09-05 16:51:46 +08:00

128 lines
4.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import os
import asyncio
import aiohttp
import time
import sys
from bs4 import BeautifulSoup
import re
import aiofiles
import urllib.parse
import argparse
from colorama import init, Fore
import ssl
from urllib.parse import quote
# 添加项目根目录到 sys.path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import config
baiduheaders=config.baiduheaders
timeout = aiohttp.ClientTimeout(
total=None, # 总超时
sock_connect=5.5, # 连接超时时间5.5
sock_read=5.5 # 读取超时为5.5秒
)
#--天欣安全实验室--#
# 初次请求获取百度加密后的url
async def getfirstinfo(keyword, pn,session):
sslcontext = ssl.create_default_context()
sslcontext.check_hostname = False
sslcontext.verify_mode = ssl.CERT_NONE
titlelist = []
fakeurl = []
url = f'https://www.baidu.com/s?wd={keyword}&pn={pn}'
# print("正在爬取的url为:"+url)
j=0
while j<3:
try:
async with session.get(url, headers=baiduheaders, ssl=sslcontext,timeout=timeout) as resp:
html = await resp.text()
soup = BeautifulSoup(html, 'lxml')
h3 = soup.select('h3.t')
for h3 in h3:
h3text = h3.text.replace('\n', '').replace(',', ' ').replace('\ue636', '').strip()
titlelist.append(h3text) #保存h3标签内的文字内容
fakeurl.append(h3.a.get('href')) #获取h3下a标签的href链接此链接为百度跳转链接需要处理
return titlelist, fakeurl
except Exception as e:
# print(e)
print("baidu链接失败正在重新尝试...")
j=j+1
print(f"百度任务出错:{url}该url无法正常获取数据。")
return [],[]
# 再次请求获取真实的网站url
async def gettrueurl(url,printtitle,session):
try:
domain = 'https://www.baidu.com/'
# async with aiohttp.ClientSession() as session:
async with session.get(url, headers=baiduheaders, allow_redirects=False) as resp:
await resp.text()
if str(resp.headers.get('Location')) != None and str(resp.headers.get('Location')) != '':
trueurl=str(resp.headers.get('Location'))
print(printtitle," ",trueurl)
return trueurl
else:
print(url + '该url无法转跳')
url = urllib.parse.urljoin(domain, url)
print(printtitle, " ",url)
return url
except:
return url
async def baidu_spinder(keyword, num):
print(f'百度爬取任务进行中,爬取页数为{num}...')
urllist = []
titlelist = []
tasks1 = []
tasks2 = []
Source = []
if ':' in num:
if num.count(':') > 1:
raise ValueError("输入中必须且只能包含一个 ':'")
else:
# 分割字符串,确保分割后的两部分都是数字
start_page, end_page = num.split(':')
# 判断两边是否都是数字
if not (start_page.isdigit() and end_page.isdigit()):
raise ValueError("':' 两侧的值必须是数字")
else:
start_page = (int(start_page) - 1) * 10
end_page = (int(end_page)) * 10
else:
start_page, end_page = 0, int(num) * 10
async with aiohttp.ClientSession() as session:
for i, pn in enumerate(range(start_page, end_page, 10)):
tasks1 = tasks1 + [asyncio.create_task(getfirstinfo(keyword, pn,session))]
result = await asyncio.gather(*tasks1)
async with aiohttp.ClientSession() as session:
for i in range(int((end_page-start_page) / 10)):
titlelist += result[i][0]
for j,url in enumerate(result[i][1]):
printtitle=result[i][0][j]
if not url.startswith(('http://', 'https://')):
domain = 'http://www.baidu.com/'
url = urllib.parse.urljoin(domain, url)
tasks2 = tasks2 + [asyncio.create_task(gettrueurl(url,printtitle,session))]
print('标题\t URL\t')
urllist += await asyncio.gather(*tasks2)
count = len(urllist)
print(f"百度搜索爬取结果数量为{count}")
print(Fore.GREEN + '百度爬取任务完成!\n' + Fore.RESET)
return titlelist, urllist
# await baiduwriteCSV(titlelist, urllist, keyword)
def baidu_main(keyword, num):
keyword = quote(keyword)
if sys.platform.startswith('win'):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
loop = asyncio.get_event_loop()
return loop.run_until_complete(baidu_spinder(keyword, num))
async def Baidu_main(keywords, num):
return await baidu_spinder(keywords, num)