Files
TeleIntelliChat/tg_bot.py
2026-03-14 12:07:05 +08:00

490 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import json
import base64
import datetime
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
import telebot
from telebot import types
from concurrent.futures import ThreadPoolExecutor, as_completed
from dotenv import load_dotenv
from ddgs import DDGS
from bs4 import BeautifulSoup
# ================= 1. 核心配置与环境变量 =================
load_dotenv()
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN')
NVIDIA_API_KEY = os.getenv('NVIDIA_API_KEY')
NVIDIA_API_URL = "https://integrate.api.nvidia.com/v1/chat/completions"
# 解析允许的用户 ID 列表
try:
ALLOWED_USERS = [int(u.strip()) for u in os.getenv('ALLOWED_USERS', '').split(',') if u.strip()]
except ValueError:
ALLOWED_USERS = []
MODEL_MAP = {
"gpt-oss-120b": "openai/gpt-oss-120b",
"qwen3-next-80b-a3b-thinking": "qwen/qwen3-next-80b-a3b-thinking",
"qwen3.5-122b-a10b": "qwen/qwen3.5-122b-a10b",
"qwen3.5-397b-a17b": "qwen/qwen3.5-397b-a17b",
"qwen3-coder-480b-a35b-instruct": "qwen/qwen3-coder-480b-a35b-instruct",
"kimi-k2.5": "moonshotai/kimi-k2.5",
"llama-3.1-70b-instruct": "meta/llama-3.1-70b-instruct",
"llama-3.1-405b-instruct": "meta/llama-3.1-405b-instruct",
"llama-3.3-70b-instruct": "meta/llama-3.3-70b-instruct",
"deepseek-v3.2": "deepseek-ai/deepseek-v3.2",
"deepseek-v3.1": "deepseek-ai/deepseek-v3.1",
"deepseek-v3.1-terminus": "deepseek-ai/deepseek-v3.1-terminus",
"minimax-m2.5": "minimaxai/minimax-m2.5",
"mistral-large-3-675b-instruct-2512": "mistralai/mistral-large-3-675b-instruct-2512" ,
}
DEFAULT_MODEL = "openai/gpt-oss-120b"
CTX_SIZE = 16000
bot = telebot.TeleBot(TELEGRAM_BOT_TOKEN)
chat_memory = {}
# ================= 记忆持久化模块 =================
MEMORY_FILE = "user_memory.json"
def load_memory():
if os.path.exists(MEMORY_FILE):
with open(MEMORY_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
return {}
def save_memory(data):
with open(MEMORY_FILE, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def execute_manage_memory(chat_id, action, content=None):
memories = load_memory()
user_mem = memories.get(str(chat_id), [])
if action == "add" and content:
if content not in user_mem:
user_mem.append(content)
memories[str(chat_id)] = user_mem
save_memory(memories)
return f"已成功将 '{content}' 添加到长期记忆。"
elif action == "delete" and content:
# 模糊匹配删除
user_mem = [m for m in user_mem if content not in m]
memories[str(chat_id)] = user_mem
save_memory(memories)
return f"已删除包含 '{content}' 的记忆。"
elif action == "list":
if not user_mem: return "当前没有任何长期记忆。"
return "【用户当前记忆列表】:\n" + "\n".join([f"- {m}" for m in user_mem])
return "操作无效或缺少 content 参数。"
user_selected_model = {}
# ================= 2. 注册 TG 快捷菜单 =================
def setup_bot_commands():
try:
commands = [
types.BotCommand("start", "🚀 启动并查看帮助"),
types.BotCommand("model", "🧠 切换 AI 模型"),
types.BotCommand("reset", "🧹 清空对话记忆")
]
bot.set_my_commands(commands)
print("✅ Telegram 快捷菜单注册成功!")
except Exception as e:
print(f"❌ 快捷菜单注册失败: {e}")
# ================= 3. 工具库 (Tools Definition) =================
# 这是大模型的“使用说明书”
# ================= 3. 工具库 (Tools Definition) =================
TOOLS = [
{
"type": "function",
"function": {
"name": "web_search",
"description": "搜索互联网并返回最新的标题、链接和摘要。",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "搜索关键词"}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "web_fetch",
"description": "抓取特定网页的正文。遇到需要深度阅读的内容时调用。",
"parameters": {
"type": "object",
"properties": {
"url": {"type": "string", "description": "网页URL"}
},
"required": ["url"]
}
}
},
# 👇 新增:图片搜索
{
"type": "function",
"function": {
"name": "image_search",
"description": "按关键词搜索图片并返回图片的直链。当用户需要找图、看图时调用。",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "图片搜索关键词"}
},
"required": ["query"]
}
}
},
# 👇 新增:主动记忆
{
"type": "function",
"function": {
"name": "manage_memory",
"description": "读取、添加或删除关于用户的长期记忆(偏好、习惯、身份等)。当用户说'记住...''你记得我什么'时调用。",
"parameters": {
"type": "object",
"properties": {
"action": {"type": "string", "enum": ["add", "list", "delete"], "description": "操作类型"},
"content": {"type": "string", "description": "要添加或删除的具体记忆内容list操作时可留空"}
},
"required": ["action"]
}
}
}
]
# 👇 紧接着加上搜图的执行逻辑(和 web_search 放在一起)
def execute_image_search(query):
print(f"🖼️ [Agent] 正在搜图: {query}")
try:
with DDGS(timeout=15) as ddgs:
results = list(ddgs.images(query, max_results=3))
if not results: return "未找到相关图片。"
# 直接返回 Markdown 格式给大模型,让它输出给用户
return "\n".join([f"![{r['title']}]({r['image']})" for r in results])
except Exception as e:
return f"图片搜索失败: {e}"
# 实际的工具执行函数
def execute_web_search(query):
print(f"🔍 [Agent] 正在搜索: {query}")
try:
with DDGS(timeout=15) as ddgs:
results = list(ddgs.text(query, max_results=5))
if not results: return "未找到相关结果。"
formatted = ""
for i, r in enumerate(results):
formatted += f"[{i+1}] {r['title']}\nURL: {r['href']}\n摘要: {r['body']}\n\n"
return formatted
except Exception as e:
return f"搜索失败: {e}"
def execute_web_fetch(url):
print(f"📄 [Agent] 正在阅读网页: {url}")
# 【新增】防 PDF 陷阱机制
if url.lower().endswith('.pdf'):
print("⚠️ [Agent] 拦截 PDF 读取,提示模型切换策略。")
return "【工具报错】:该链接是 PDF 文件,当前工具无法解析二进制内容。如果是论文,请尝试读取该论文的 HTML 摘要页(如将 /pdf/ 替换为 /abs/),或重新搜索相关的科技新闻报道。"
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
try:
res = requests.get(url, headers=headers, timeout=10, verify=False)
res.raise_for_status()
soup = BeautifulSoup(res.text, 'html.parser')
for tag in soup(['script', 'style', 'nav', 'footer', 'header']):
tag.decompose()
text = '\n'.join([line.strip() for line in soup.get_text().splitlines() if line.strip()])
return text[:3500]
except Exception as e:
return f"网页抓取失败: {e}"
# ================= 4. 核心逻辑与 API 调用 =================
def get_chat_history(chat_id):
today = datetime.date.today().strftime("%Y年%m月%d")
# 强化版系统提示词:增加负面约束与正面示例
ADVANCED_SYSTEM_PROMPT = f"""当前时间是 {today}。你是一个强大的多模态 AI 助手。
【核心行动手册】
1. **知识调用**:你的自身知识库极其渊博!对于通用知识、编程基础、历史常识、常规聊天等,【必须】直接回答,**绝对禁止**调用工具!只有实时资讯或极冷门知识才允许使用 `web_search`。
2. **工具节制(极度重要)**:你的上下文记忆极其有限!
- 搜新闻或列表时,**仅依靠 web_search 的摘要即可作答**,绝对禁止为了完美而去 web_fetch 每一篇文章!
- 一旦收集到能基本回答用户问题的信息,必须**立即停止调用工具**并输出最终结果。
2. **排版极端禁令****绝对严禁**使用 Markdown 表格语法(即包含 | 和 - 的结构。Telegram 无法正常渲染表格,会导致用户体验极差。
3. **强制替代格式**:如果需要对比或列举数据,请严格按照以下格式:
### [类别名称/标题]
- **项目 A**:描述内容
- **项目 B**:描述内容
---
*(重复上述结构直到完成对比)*
4. **手机端优化**:必须使用【加粗标题】和【- 分点列表】。
【执行反馈】
- 哪怕模型内部逻辑认为表格更清晰,也必须为了 Telegram 的兼容性将其转化为上述列表格式。"""
# 👇 每次都刷新系统提示词,确保最新的约束生效
if chat_id not in chat_memory:
chat_memory[chat_id] = [{"role": "system", "content": ADVANCED_SYSTEM_PROMPT}]
else:
# 确保索引 0 的 system 消息始终包含最新的 Prompt
chat_memory[chat_id][0]["content"] = ADVANCED_SYSTEM_PROMPT
# === 上下文自动压缩机制 ===
current_history = chat_memory[chat_id]
# 如果对话轮数过多,剔除最旧的一轮 user 和 assistant 回复,保留索引 0 的 system
while len(current_history) > 20:
current_history.pop(1)
return current_history
# 👇 注意增加了 status_msg 参数
def chat_with_agent(chat_id, user_message, status_msg, max_loops=15):
history = get_chat_history(chat_id)
history.append(user_message)
headers = {'Authorization': f'Bearer {NVIDIA_API_KEY}', 'Content-Type': 'application/json'}
model = user_selected_model.get(chat_id, DEFAULT_MODEL)
print(f"🧠 当前使用模型: {model}") # --- 新增日志 ---
# 👇 新增本轮防重复抓取记录Token 保护机制)
visited_urls = set()
for loop in range(max_loops):
print(f"🔄 思考循环: 第 {loop + 1}") # --- 新增日志 ---
payload = {"model": model, "messages": history, "tools": TOOLS, "temperature": 0.6, "max_tokens": 4096}
try:
print("🌐 正在请求 NVIDIA API...") # --- 新增日志 ---
response = requests.post(NVIDIA_API_URL, headers=headers, json=payload, timeout=120)
response.raise_for_status()
print("🌐 API 请求成功") # --- 新增日志 ---
except requests.exceptions.ReadTimeout as e:
print(f"❌ API 超时: {e}") # --- 新增日志 ---
return "❌ API 接口超时,请稍后再试或切换模型。"
except Exception as e:
print(f"❌ API 请求异常: {e}") # --- 新增日志 ---
return f"❌ API 请求异常: {e}"
data = response.json()
message = data['choices'][0]['message']
if not message.get("tool_calls"):
print("🏁 模型决定直接回复,无工具调用") # --- 新增日志 ---
final_content = message.get("content")
# 修复:确保将 None 或纯空格转为空字符串处理
if not final_content or not str(final_content).strip():
print("⚠️ 警告:模型返回了空文本,触发保护机制。")
final_content = "【系统提示】:由于检索的信息量过大,模型未能成功生成文字回复。这通常是因为查阅了太多网页导致上下文超载。\n\n💡 **建议**:请发送 `/reset` 清空记忆,然后要求我“只搜索不深入阅读”,或者缩小搜索范围。"
# 修复:这里必须使用 final_content而不是 message["content"]
history.append({"role": "assistant", "content": final_content})
return final_content
history.append(message)
print(f"🛠️ 模型决定调用 {len(message['tool_calls'])} 个工具") # --- 新增日志 ---
for tool_call in message["tool_calls"]:
func_name = tool_call["function"]["name"]
try:
args = json.loads(tool_call["function"]["arguments"])
except json.JSONDecodeError as e:
print(f"⚠️ 工具参数解析失败: {e}") # --- 新增日志 ---
args = {}
tool_result = ""
# 👇 动态修改那条 Telegram 消息!
if func_name == "web_search":
print(f"执行工具: web_search, 参数: {args}") # --- 新增日志 ---
bot.edit_message_text(f"🔍 正在检索: `{args.get('query', '')}`", chat_id=chat_id, message_id=status_msg.message_id, parse_mode="Markdown")
tool_result = execute_web_search(args.get("query", ""))
elif func_name == "web_fetch":
url = args.get("url", "").strip()
# 👇 新增:防止模型传一个空的网址过来浪费时间
if not url:
print("🛑 拦截空网址抓取")
tool_result = "【系统警告】:你提供了一个空的 URL请检查搜索结果提取正确的 href 链接后再调用 web_fetch 工具。"
# 下面是原有的去重逻辑
elif url in visited_urls:
print(f"🛑 拦截重复抓取,保护 Token: {url}")
bot.edit_message_text(f"🛑 拦截重复阅读,节省 Token...", chat_id=chat_id, message_id=status_msg.message_id)
tool_result = "【系统警告】:您已经抓取并阅读过该 URL 的内容,内容已在您的上下文记忆中,请勿重复调用 web_fetch 浪费 Token请基于已有信息回答或搜索新的线索。"
else:
visited_urls.add(url)
print(f"执行工具: web_fetch, 参数: {args}")
bot.edit_message_text(f"📄 正在深度阅读网页...", chat_id=chat_id, message_id=status_msg.message_id, parse_mode="Markdown")
tool_result = execute_web_fetch(url)
elif func_name == "image_search":
print(f"执行工具: image_search, 参数: {args}") # --- 新增日志 ---
bot.edit_message_text(f"🖼️ 正在搜图: `{args.get('query', '')}`", chat_id=chat_id, message_id=status_msg.message_id, parse_mode="Markdown")
tool_result = execute_image_search(args.get("query", ""))
elif func_name == "manage_memory":
print(f"执行工具: manage_memory, 参数: {args}") # --- 新增日志 ---
bot.edit_message_text(f"🧠 正在整理记忆...", chat_id=chat_id, message_id=status_msg.message_id)
tool_result = execute_manage_memory(chat_id, args.get("action", ""), args.get("content", ""))
else:
print(f"⚠️ 未知工具: {func_name}") # --- 新增日志 ---
history.append({
"role": "tool",
"tool_call_id": tool_call["id"],
"name": func_name,
"content": tool_result
})
# 查完资料后,更新状态为“整理回答中”
bot.edit_message_text("✍️ 正在整理最终回答...", chat_id=chat_id, message_id=status_msg.message_id)
print("✍️ 工具执行完毕,准备进行下一轮循环") # --- 新增日志 ---
print("❌ 思考循环达到上限") # --- 新增日志 ---
return "思考深度超过限制,未能得出最终结论。"
def check_model_status(model_id):
"""测试单个模型是否可用,返回布尔值 (True/False)"""
headers = {
"Authorization": f"Bearer {NVIDIA_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": model_id,
"messages": [{"role": "user", "content": "hi"}],
"max_tokens": 1 # 只需要1个token来探测连通性
}
try:
# 注意:这里把 timeout 设置为 8 秒。
# 因为这是用户点菜单时的实时检测等太久体验不好。8秒连不上就视为不可用。
res = requests.post(NVIDIA_API_URL, headers=headers, json=payload, timeout=8)
return res.status_code == 200
except Exception:
return False
# ================= 5. 消息处理与入口 =================
def check_auth(message):
if not ALLOWED_USERS or message.from_user.id not in ALLOWED_USERS:
print(f"⚠️ 拦截非法访问: {message.from_user.id}")
return False
return True
@bot.message_handler(commands=['start', 'help'])
def send_welcome(message):
if not check_auth(message): return
bot.reply_to(message, "👋 **全功能 Agent 已上线**\n\n• 我能自主决定何时搜索、何时阅读网页\n• `/model` 切换引擎\n• `/reset` 清空记忆", parse_mode="Markdown")
@bot.message_handler(commands=['model'])
def show_model_menu(message):
if not check_auth(message): return
# 1. 先发一条提示消息,因为测速需要几秒钟
status_msg = bot.reply_to(message, "⏳ 正在并发检测各节点的 API 连通性,请稍候...")
# 2. 并发测试所有模型
model_status = {}
with ThreadPoolExecutor(max_workers=len(MODEL_MAP)) as executor:
# 提交所有测试任务
future_to_model = {executor.submit(check_model_status, v): k for k, v in MODEL_MAP.items()}
# 收集测试结果
for future in as_completed(future_to_model):
model_name = future_to_model[future]
try:
is_ok = future.result()
except Exception:
is_ok = False
model_status[model_name] = is_ok
# 3. 动态构建带 ✅/❌ 状态的键盘
markup = types.InlineKeyboardMarkup(row_width=1)
for name, model_id in MODEL_MAP.items():
status_icon = "" if model_status.get(name) else ""
# 按钮文本带图标,但 callback_data 保持原样,这样不影响后续切换逻辑
btn_text = f"{status_icon} {name}"
markup.add(types.InlineKeyboardButton(text=btn_text, callback_data=f"set_model_{name}"))
# 4. 把刚才的“等待中”消息修改为真正的菜单
bot.edit_message_text(
text="请选择思考引擎(✅ 可用 / ❌ 异常或超时):",
chat_id=message.chat.id,
message_id=status_msg.message_id,
reply_markup=markup
)
@bot.callback_query_handler(func=lambda call: call.data.startswith('set_model_'))
def handle_model_selection(call):
name = call.data.replace('set_model_', '')
user_selected_model[call.message.chat.id] = MODEL_MAP[name]
bot.edit_message_text(chat_id=call.message.chat.id, message_id=call.message.message_id, text=f"✅ 已切换至: {name}")
@bot.message_handler(commands=['reset'])
def reset_memory(message):
if not check_auth(message): return
if message.chat.id in chat_memory: del chat_memory[message.chat.id]
bot.reply_to(message, "🧹 对话上下文及工具执行记录已清空。")
@bot.message_handler(content_types=['text', 'photo'])
def handle_message(message):
if not check_auth(message): return
if message.text and message.text.startswith('/'): return
chat_id = message.chat.id
user_text = message.text or message.caption or "请描述图片"
# --- 新增日志 ---
print(f"\n[{datetime.datetime.now().strftime('%H:%M:%S')}] 收到用户 {chat_id} 的消息: {user_text[:20]}...")
current_content = [{"type": "text", "text": user_text}]
if message.photo:
try:
file_info = bot.get_file(message.photo[-1].file_id)
img_b64 = base64.b64encode(bot.download_file(file_info.file_path)).decode('utf-8')
current_content.append({"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{img_b64}"}})
print("📸 已接收并处理图片附件") # --- 新增日志 ---
except Exception as e:
print(f"❌ 图片处理失败: {e}") # --- 新增日志 ---
user_message = {"role": "user", "content": current_content}
# 👇 视觉感核心:先发送一条状态占位消息
status_msg = bot.reply_to(message, "⏳ 思考中...")
print("⏳ 已发送状态占位消息") # --- 新增日志 ---
try:
# 把占位消息的 ID 传给 agent
reply = chat_with_agent(chat_id, user_message, status_msg)
# 最终生成完毕后,覆盖那条状态消息
try:
bot.edit_message_text(reply, chat_id=chat_id, message_id=status_msg.message_id, parse_mode="Markdown")
except Exception:
bot.edit_message_text(reply, chat_id=chat_id, message_id=status_msg.message_id)
print("✅ 最终回复已发送") # --- 新增日志 ---
except Exception as e:
print(f"❌ 系统异常: {e}") # --- 新增日志 ---
bot.edit_message_text(f"❌ 系统异常: {e}", chat_id=chat_id, message_id=status_msg.message_id)
if __name__ == "__main__":
setup_bot_commands()
print("🚀 Bot 开始运行...")
bot.infinity_polling()