104 lines
4.1 KiB
Python
104 lines
4.1 KiB
Python
import time
|
|
import random
|
|
from typing import Tuple, Union
|
|
import requests
|
|
from requests import Response
|
|
from requests.exceptions import SSLError, RequestException
|
|
from bs4 import BeautifulSoup
|
|
|
|
from qwen_agent.tools.base import BaseTool, register_tool
|
|
|
|
DEFAULT_MAX_CHARS = 10000
|
|
|
|
# 模拟真实浏览器请求头,防止 GitHub 等网站返回 429
|
|
COMMON_HEADERS = {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36',
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
|
|
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
|
|
'Accept-Encoding': 'gzip, deflate, br',
|
|
'Connection': 'keep-alive'
|
|
}
|
|
|
|
def _normalize_text(text: str) -> str:
|
|
lines = [line.strip() for line in text.splitlines()]
|
|
lines = [line for line in lines if line]
|
|
return '\n'.join(lines)
|
|
|
|
def _fetch_page(url: str, timeout: int = 30, retries: int = 2) -> Tuple[Union[Response, str], bool]:
|
|
"""带有重试机制和伪装头的抓取函数"""
|
|
for i in range(retries + 1):
|
|
try:
|
|
if i > 0:
|
|
time.sleep(2 + random.uniform(1, 2) * i)
|
|
|
|
response = requests.get(url, headers=COMMON_HEADERS, timeout=timeout, verify=True)
|
|
|
|
if response.status_code == 429:
|
|
if i < retries: continue
|
|
return f"错误:目标网站限制了请求频率 (429)。请稍后再试,禁止读取本地无关文件。", False
|
|
|
|
response.raise_for_status()
|
|
return response, False
|
|
|
|
except SSLError:
|
|
try:
|
|
response = requests.get(url, headers=COMMON_HEADERS, timeout=timeout, verify=False)
|
|
response.raise_for_status()
|
|
return response, True
|
|
except Exception as e:
|
|
return f"SSL 错误且备选方案失败: {str(e)}", False
|
|
except RequestException as e:
|
|
if i < retries: continue
|
|
return f"网络抓取失败: {str(e)}", False
|
|
|
|
return "未知抓取错误", False
|
|
|
|
def _extract_page_text(html: str, max_chars: int) -> Tuple[str, str]:
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
for tag in soup(['script', 'style', 'noscript', 'header', 'footer', 'nav']):
|
|
tag.decompose()
|
|
title = soup.title.string.strip() if soup.title and soup.title.string else '无标题'
|
|
body_text = _normalize_text(soup.get_text(separator='\n'))
|
|
return title, body_text[:max_chars]
|
|
|
|
@register_tool('web_fetch', allow_overwrite=True)
|
|
class WebFetchTool(BaseTool):
|
|
description = '抓取网页正文并返回可读文本。'
|
|
parameters = {
|
|
'type': 'object',
|
|
'properties': {
|
|
'url': {'type': 'string', 'description': '网页链接'},
|
|
'max_chars': {'type': 'integer', 'description': '返回最大字符数', 'default': DEFAULT_MAX_CHARS}
|
|
},
|
|
'required': ['url'],
|
|
}
|
|
|
|
def call(self, params: Union[str, dict], **kwargs) -> str:
|
|
params = self._verify_json_format_args(params)
|
|
url = params['url'].strip()
|
|
max_chars = int(params.get('max_chars', DEFAULT_MAX_CHARS))
|
|
|
|
result, insecure = _fetch_page(url)
|
|
if isinstance(result, str):
|
|
return result
|
|
|
|
title, body_text = _extract_page_text(result.text, max_chars)
|
|
insecure_note = '(注意:使用了非安全连接)\n' if insecure else ''
|
|
return f'标题: {title}\n链接: {url}\n{insecure_note}\n{body_text}'
|
|
|
|
@register_tool('web_extractor', allow_overwrite=True)
|
|
class WebExtractorTool(BaseTool):
|
|
description = '提取单个网页正文。'
|
|
parameters = {
|
|
'type': 'object',
|
|
'properties': {
|
|
'url': {'type': 'string', 'description': '网页链接'},
|
|
'max_chars': {'type': 'integer', 'description': '返回最大字符数', 'default': DEFAULT_MAX_CHARS}
|
|
},
|
|
'required': ['url'],
|
|
}
|
|
|
|
def call(self, params: Union[str, dict], **kwargs) -> str:
|
|
# 复用 WebFetchTool 的逻辑,但作为独立的类注册
|
|
fetcher = WebFetchTool(self.cfg)
|
|
return fetcher.call(params, **kwargs) |