Files
SearchCompany/crawler_bing_main.py.bak
2025-11-13 07:28:15 +08:00

197 lines
7.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

def writeExcel(titles, links,ws):
infos = list(zip(titles, links))
for row in infos:
ws.append(row)
def create_sheet_and_write(wb, engine, keywords, num, title):
ws = wb.create_sheet(title=title)
result = engine(keywords, num)
writeExcel(result[0], result[1], ws)
def excel_text2url(link_url): #如果函数内部没有进行异步操作,使用 async 并不会对性能或功能产生实际影响。
'''把一个网址字符串转换为 Excel公式使其可以点击直接转跳'''
return f'=HYPERLINK("{link_url}","{link_url}")'
# 遍历所有工作表,并将第二列的所有数据传递给 excel_text2url 函数重新赋值
def update_hyperlinks(wb):
for sheet in wb.worksheets: # 遍历每一个工作表
for row in sheet.iter_rows(min_row=1, max_row=sheet.max_row, min_col=2, max_col=2): # 遍历第二列
for cell in row:
if cell.value: # 检查单元格是否有内容
cell.value = excel_text2url(cell.value) # 将网址转换为超链接公式
else:
break
def commend():
parser = argparse.ArgumentParser(prog="Searcher", description='此工具用于对百度、必应和谷歌搜索的协程爬取--天欣安全实验室', usage='please read -h')
parser.add_argument("-k", type=str, help="搜索的关键词", nargs='+')
# 添加一个positional arguments叫a,读取类型为int默认是字符串
parser.add_argument("-p", type=str, help="需要搜索页数,默认为5,支持范围搜索例如搜索从第2页到第五页的参数为 2:5", default='5')
parser.add_argument("-m", type=str, help="使用的搜索引擎:百度:bd,必应:bin,谷歌:goo 不填写默认使用全部", default='all',nargs='+')
# parser.add_argument("-t", '--task', type=int, help="设置的线程,默认为8", default=8)
parser.exit_on_error = False
args = parser.parse_args()
if len(sys.argv) == 1:
printascii()
parser.print_help()
sys.exit()
return args
# for i in range(len(result[0])):
# title= result[0][i]
# url = result[1][i]
# print(f"必应搜索爬取结果为,title:{title}, url:{url}")
# if re.match(r"^https://aiqicha.baidu.com/company_detail_.*|https://www.qcc.com/firm/.*|https://www.tianyancha.com/company/.*", url):
# data_list.append({"title":title, "url":url})
# return data_list
def filter_company_sites(urls):
# urls https://www.tianyancha.com/company/5226478758
# url:https://aiqicha.baidu.com/company_detail_26602790857925
# url:https://www.qcc.com/firm/05b449eb5cc417d0f97c14104051f5c0.html
# 匹配 前缀https://aiqicha.baidu.com/company_detail_*,https://www.qcc.com/firm/*.html,https://www.tianyancha.com/company/5226478758*
filtered_urls = [url for url in urls if re.match(r"^https://aiqicha.baidu.com/company_detail_.*|https://www.qcc.com/firm/.*|https://www.tianyancha.com/company/.*", url)]
return filtered_urls
def save_to_csv(filter_list):
if filter_list is None or len(filter_list) == 0:
print('filter_list is None or len(filter_list) == 0, 没有数据可写入')
return False
"""
将结果追加写入csv文件中
Args:
filter_list: 需要写入的数据列表
"""
csv_file = 'company_search_result_data.csv'
headers = ['title', 'url', 'web_site_type', 'request_url', 'company_name', 'create_time']
# 判断文件是否存在,不存在则创建并写入列头
file_exists = os.path.exists(csv_file)
# 读取现有数据,用于判断重复项
existing_data = set()
if file_exists:
with open(csv_file, 'r', encoding='utf-8') as f:
reader_ins = csv.reader(f)
header_skipped = False
for row in reader_ins:
if not header_skipped:
header_skipped = True
continue
if len(row) >= 5: # 确保行数据完整
company_name = row[4] # company_name在第5列(索引4)
web_site_type = row[2] if len(row) > 2 else "" # web_site_type在第3列(索引2)
existing_data.add((company_name, web_site_type))
# 写入数据
with open(csv_file, 'a', encoding='utf-8', newline='') as f:
writer = csv.writer(f)
# 如果文件不存在,写入列头
if not file_exists:
writer.writerow(headers)
# 追加写入数据,去重处理
for data_node in filter_list:
company_name = data_node.get('company_name', '')
web_site_type = data_node.get('web_site_type', '')
# 判断是否已存在相同的company_name和web_site_type组合
if (company_name, web_site_type) not in existing_data:
# 创建时间格式化
create_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 写入数据行
row_data = [
data_node.get('title', ''),
data_node.get('url', ''),
web_site_type,
data_node.get('request_url', ''),
company_name,
create_time
]
writer.writerow(row_data)
# 添加到已存在数据集合中,避免本次写入中的重复
existing_data.add((company_name, web_site_type))
print(f"写入数据成功,title:{data_node.get('title', '')}, "
f"url:{data_node.get('url', '')}, "
f"web_site_type:{web_site_type}, "
f"request_url:{data_node.get('request_url', '')}, "
f"company_name:{company_name}, "
f"create_time:{create_time}")
def search_company_info(company_name_key, addon_args, num):
search_key = company_name_key.strip() + " " + addon_args
search_key = search_key.strip()
result = Bing.bing_main(search_key, num)
# for 循环 遍历 result[0] 和 result[1]
return result
def search_one_company(company_name_arg, num):
keywords = company_name_arg
# for key in keyword:
# keywords = keywords + key + " "
keywords = keywords.strip()
print(f"---==您搜索的关键词为:{keywords}")
wb = openpyxl.Workbook()
# 删除默认创建的工作表(现在名为 "数据表1"
wb.remove(wb['Sheet'])
printascii()
pattern = r"[\\/:\*\?\"<>|]"
keyword = re.sub(pattern, "", keywords)
create_sheet_and_write(wb, Bing.bing_main, keywords, num, "必应爬取结果")
create_sheet_and_write(wb, Baidu.baidu_main, keywords, num, "百度爬取结果")
# 将所有url变为超链接,点击即可打开转跳
update_hyperlinks(wb)
wb.save(f'./{keyword}-{company_name_arg}.xlsx')
print(Fore.GREEN + '总任务结束!' + Fore.RESET)
end = time.time()
print(Fore.RED + f'脚本总时间: {end - start:.2f}')
if __name__ == '__main__':
if True:
print("exit")
exit(0)
i = 1
for company_name_ele in check_result:
company_name = company_name_ele["company_name"]
company_exists = company_name_ele["exists"]
# 如果公司已存在,跳过处理
if company_exists:
print(f"公司 {company_name} 已存在,跳过处理")
continue
sleep_time = 5
sleep_time += random.randint(3, 10)
time.sleep(sleep_time)
addon_args = " 爱企查|企查查"
data_list = search_company_info(company_name, addon_args, '1')
filter_list = filter_aiqicha_qcc(data_list, company_name)
print("filter_list:",filter_list)
save_to_csv(filter_list)
if len(filter_list)<= 0:
print("没有数据 filter_list is empty. "+company_name)
continue
i=i+1
if i > 100:
break