import os import json import base64 import datetime import requests import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) import telebot from telebot import types from concurrent.futures import ThreadPoolExecutor, as_completed from dotenv import load_dotenv from ddgs import DDGS from bs4 import BeautifulSoup # ================= 1. 核心配置与环境变量 ================= load_dotenv() TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN') NVIDIA_API_KEY = os.getenv('NVIDIA_API_KEY') NVIDIA_API_URL = "https://integrate.api.nvidia.com/v1/chat/completions" # 解析允许的用户 ID 列表 try: ALLOWED_USERS = [int(u.strip()) for u in os.getenv('ALLOWED_USERS', '').split(',') if u.strip()] except ValueError: ALLOWED_USERS = [] MODEL_MAP = { "gpt-oss-120b": "openai/gpt-oss-120b", "qwen3-next-80b-a3b-thinking": "qwen/qwen3-next-80b-a3b-thinking", "qwen3.5-122b-a10b": "qwen/qwen3.5-122b-a10b", "qwen3.5-397b-a17b": "qwen/qwen3.5-397b-a17b", "qwen3-coder-480b-a35b-instruct": "qwen/qwen3-coder-480b-a35b-instruct", "kimi-k2.5": "moonshotai/kimi-k2.5", "llama-3.1-70b-instruct": "meta/llama-3.1-70b-instruct", "llama-3.1-405b-instruct": "meta/llama-3.1-405b-instruct", "llama-3.3-70b-instruct": "meta/llama-3.3-70b-instruct", "deepseek-v3.2": "deepseek-ai/deepseek-v3.2", "deepseek-v3.1": "deepseek-ai/deepseek-v3.1", "deepseek-v3.1-terminus": "deepseek-ai/deepseek-v3.1-terminus", "minimax-m2.5": "minimaxai/minimax-m2.5", "mistral-large-3-675b-instruct-2512": "mistralai/mistral-large-3-675b-instruct-2512" , } DEFAULT_MODEL = "openai/gpt-oss-120b" CTX_SIZE = 16000 bot = telebot.TeleBot(TELEGRAM_BOT_TOKEN) chat_memory = {} # ================= 记忆持久化模块 ================= MEMORY_FILE = "user_memory.json" def load_memory(): if os.path.exists(MEMORY_FILE): with open(MEMORY_FILE, 'r', encoding='utf-8') as f: return json.load(f) return {} def save_memory(data): with open(MEMORY_FILE, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) def execute_manage_memory(chat_id, action, content=None): memories = load_memory() user_mem = memories.get(str(chat_id), []) if action == "add" and content: if content not in user_mem: user_mem.append(content) memories[str(chat_id)] = user_mem save_memory(memories) return f"已成功将 '{content}' 添加到长期记忆。" elif action == "delete" and content: # 模糊匹配删除 user_mem = [m for m in user_mem if content not in m] memories[str(chat_id)] = user_mem save_memory(memories) return f"已删除包含 '{content}' 的记忆。" elif action == "list": if not user_mem: return "当前没有任何长期记忆。" return "【用户当前记忆列表】:\n" + "\n".join([f"- {m}" for m in user_mem]) return "操作无效或缺少 content 参数。" user_selected_model = {} # ================= 2. 注册 TG 快捷菜单 ================= def setup_bot_commands(): try: commands = [ types.BotCommand("start", "🚀 启动并查看帮助"), types.BotCommand("model", "🧠 切换 AI 模型"), types.BotCommand("reset", "🧹 清空对话记忆") ] bot.set_my_commands(commands) print("✅ Telegram 快捷菜单注册成功!") except Exception as e: print(f"❌ 快捷菜单注册失败: {e}") # ================= 3. 工具库 (Tools Definition) ================= # 这是大模型的“使用说明书” # ================= 3. 工具库 (Tools Definition) ================= TOOLS = [ { "type": "function", "function": { "name": "web_search", "description": "搜索互联网并返回最新的标题、链接和摘要。", "parameters": { "type": "object", "properties": { "query": {"type": "string", "description": "搜索关键词"} }, "required": ["query"] } } }, { "type": "function", "function": { "name": "web_fetch", "description": "抓取特定网页的正文。遇到需要深度阅读的内容时调用。", "parameters": { "type": "object", "properties": { "url": {"type": "string", "description": "网页URL"} }, "required": ["url"] } } }, # 👇 新增:图片搜索 { "type": "function", "function": { "name": "image_search", "description": "按关键词搜索图片并返回图片的直链。当用户需要找图、看图时调用。", "parameters": { "type": "object", "properties": { "query": {"type": "string", "description": "图片搜索关键词"} }, "required": ["query"] } } }, # 👇 新增:主动记忆 { "type": "function", "function": { "name": "manage_memory", "description": "读取、添加或删除关于用户的长期记忆(偏好、习惯、身份等)。当用户说'记住...'或'你记得我什么'时调用。", "parameters": { "type": "object", "properties": { "action": {"type": "string", "enum": ["add", "list", "delete"], "description": "操作类型"}, "content": {"type": "string", "description": "要添加或删除的具体记忆内容(list操作时可留空)"} }, "required": ["action"] } } } ] # 👇 紧接着加上搜图的执行逻辑(和 web_search 放在一起) def execute_image_search(query): print(f"🖼️ [Agent] 正在搜图: {query}") try: with DDGS(timeout=15) as ddgs: results = list(ddgs.images(query, max_results=3)) if not results: return "未找到相关图片。" # 直接返回 Markdown 格式给大模型,让它输出给用户 return "\n".join([f"![{r['title']}]({r['image']})" for r in results]) except Exception as e: return f"图片搜索失败: {e}" # 实际的工具执行函数 def execute_web_search(query): print(f"🔍 [Agent] 正在搜索: {query}") try: with DDGS(timeout=15) as ddgs: results = list(ddgs.text(query, max_results=5)) if not results: return "未找到相关结果。" formatted = "" for i, r in enumerate(results): formatted += f"[{i+1}] {r['title']}\nURL: {r['href']}\n摘要: {r['body']}\n\n" return formatted except Exception as e: return f"搜索失败: {e}" def execute_web_fetch(url): print(f"📄 [Agent] 正在阅读网页: {url}") # 【新增】防 PDF 陷阱机制 if url.lower().endswith('.pdf'): print("⚠️ [Agent] 拦截 PDF 读取,提示模型切换策略。") return "【工具报错】:该链接是 PDF 文件,当前工具无法解析二进制内容。如果是论文,请尝试读取该论文的 HTML 摘要页(如将 /pdf/ 替换为 /abs/),或重新搜索相关的科技新闻报道。" headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'} try: res = requests.get(url, headers=headers, timeout=10, verify=False) res.raise_for_status() soup = BeautifulSoup(res.text, 'html.parser') for tag in soup(['script', 'style', 'nav', 'footer', 'header']): tag.decompose() text = '\n'.join([line.strip() for line in soup.get_text().splitlines() if line.strip()]) return text[:3500] except Exception as e: return f"网页抓取失败: {e}" # ================= 4. 核心逻辑与 API 调用 ================= def get_chat_history(chat_id): today = datetime.date.today().strftime("%Y年%m月%d日") # 强化版系统提示词:增加负面约束与正面示例 ADVANCED_SYSTEM_PROMPT = f"""当前时间是 {today}。你是一个强大的多模态 AI 助手。 【核心行动手册】 1. **知识调用**:你的自身知识库极其渊博!对于通用知识、编程基础、历史常识、常规聊天等,【必须】直接回答,**绝对禁止**调用工具!只有实时资讯或极冷门知识才允许使用 `web_search`。 2. **工具节制(极度重要)**:你的上下文记忆极其有限! - 搜新闻或列表时,**仅依靠 web_search 的摘要即可作答**,绝对禁止为了完美而去 web_fetch 每一篇文章! - 一旦收集到能基本回答用户问题的信息,必须**立即停止调用工具**并输出最终结果。 2. **排版极端禁令**:**绝对严禁**使用 Markdown 表格语法(即包含 | 和 - 的结构)。Telegram 无法正常渲染表格,会导致用户体验极差。 3. **强制替代格式**:如果需要对比或列举数据,请严格按照以下格式: ### [类别名称/标题] - **项目 A**:描述内容 - **项目 B**:描述内容 --- *(重复上述结构直到完成对比)* 4. **手机端优化**:必须使用【加粗标题】和【- 分点列表】。 【执行反馈】 - 哪怕模型内部逻辑认为表格更清晰,也必须为了 Telegram 的兼容性将其转化为上述列表格式。""" # 👇 每次都刷新系统提示词,确保最新的约束生效 if chat_id not in chat_memory: chat_memory[chat_id] = [{"role": "system", "content": ADVANCED_SYSTEM_PROMPT}] else: # 确保索引 0 的 system 消息始终包含最新的 Prompt chat_memory[chat_id][0]["content"] = ADVANCED_SYSTEM_PROMPT # === 上下文自动压缩机制 === current_history = chat_memory[chat_id] # 如果对话轮数过多,剔除最旧的一轮 user 和 assistant 回复,保留索引 0 的 system while len(current_history) > 20: current_history.pop(1) return current_history # 👇 注意增加了 status_msg 参数 def chat_with_agent(chat_id, user_message, status_msg, max_loops=15): history = get_chat_history(chat_id) history.append(user_message) headers = {'Authorization': f'Bearer {NVIDIA_API_KEY}', 'Content-Type': 'application/json'} model = user_selected_model.get(chat_id, DEFAULT_MODEL) print(f"🧠 当前使用模型: {model}") # --- 新增日志 --- # 👇 新增:本轮防重复抓取记录(Token 保护机制) visited_urls = set() for loop in range(max_loops): print(f"🔄 思考循环: 第 {loop + 1} 次") # --- 新增日志 --- payload = {"model": model, "messages": history, "tools": TOOLS, "temperature": 0.6, "max_tokens": 4096} try: print("🌐 正在请求 NVIDIA API...") # --- 新增日志 --- response = requests.post(NVIDIA_API_URL, headers=headers, json=payload, timeout=120) response.raise_for_status() print("🌐 API 请求成功") # --- 新增日志 --- except requests.exceptions.ReadTimeout as e: print(f"❌ API 超时: {e}") # --- 新增日志 --- return "❌ API 接口超时,请稍后再试或切换模型。" except Exception as e: print(f"❌ API 请求异常: {e}") # --- 新增日志 --- return f"❌ API 请求异常: {e}" data = response.json() message = data['choices'][0]['message'] if not message.get("tool_calls"): print("🏁 模型决定直接回复,无工具调用") # --- 新增日志 --- final_content = message.get("content") # 修复:确保将 None 或纯空格转为空字符串处理 if not final_content or not str(final_content).strip(): print("⚠️ 警告:模型返回了空文本,触发保护机制。") final_content = "【系统提示】:由于检索的信息量过大,模型未能成功生成文字回复。这通常是因为查阅了太多网页导致上下文超载。\n\n💡 **建议**:请发送 `/reset` 清空记忆,然后要求我“只搜索不深入阅读”,或者缩小搜索范围。" # 修复:这里必须使用 final_content,而不是 message["content"] history.append({"role": "assistant", "content": final_content}) return final_content history.append(message) print(f"🛠️ 模型决定调用 {len(message['tool_calls'])} 个工具") # --- 新增日志 --- for tool_call in message["tool_calls"]: func_name = tool_call["function"]["name"] try: args = json.loads(tool_call["function"]["arguments"]) except json.JSONDecodeError as e: print(f"⚠️ 工具参数解析失败: {e}") # --- 新增日志 --- args = {} tool_result = "" # 👇 动态修改那条 Telegram 消息! if func_name == "web_search": print(f"执行工具: web_search, 参数: {args}") # --- 新增日志 --- bot.edit_message_text(f"🔍 正在检索: `{args.get('query', '')}`", chat_id=chat_id, message_id=status_msg.message_id, parse_mode="Markdown") tool_result = execute_web_search(args.get("query", "")) elif func_name == "web_fetch": url = args.get("url", "").strip() # 👇 新增:防止模型传一个空的网址过来浪费时间 if not url: print("🛑 拦截空网址抓取") tool_result = "【系统警告】:你提供了一个空的 URL!请检查搜索结果,提取正确的 href 链接后再调用 web_fetch 工具。" # 下面是原有的去重逻辑 elif url in visited_urls: print(f"🛑 拦截重复抓取,保护 Token: {url}") bot.edit_message_text(f"🛑 拦截重复阅读,节省 Token...", chat_id=chat_id, message_id=status_msg.message_id) tool_result = "【系统警告】:您已经抓取并阅读过该 URL 的内容,内容已在您的上下文记忆中,请勿重复调用 web_fetch 浪费 Token!请基于已有信息回答,或搜索新的线索。" else: visited_urls.add(url) print(f"执行工具: web_fetch, 参数: {args}") bot.edit_message_text(f"📄 正在深度阅读网页...", chat_id=chat_id, message_id=status_msg.message_id, parse_mode="Markdown") tool_result = execute_web_fetch(url) elif func_name == "image_search": print(f"执行工具: image_search, 参数: {args}") # --- 新增日志 --- bot.edit_message_text(f"🖼️ 正在搜图: `{args.get('query', '')}`", chat_id=chat_id, message_id=status_msg.message_id, parse_mode="Markdown") tool_result = execute_image_search(args.get("query", "")) elif func_name == "manage_memory": print(f"执行工具: manage_memory, 参数: {args}") # --- 新增日志 --- bot.edit_message_text(f"🧠 正在整理记忆...", chat_id=chat_id, message_id=status_msg.message_id) tool_result = execute_manage_memory(chat_id, args.get("action", ""), args.get("content", "")) else: print(f"⚠️ 未知工具: {func_name}") # --- 新增日志 --- history.append({ "role": "tool", "tool_call_id": tool_call["id"], "name": func_name, "content": tool_result }) # 查完资料后,更新状态为“整理回答中” bot.edit_message_text("✍️ 正在整理最终回答...", chat_id=chat_id, message_id=status_msg.message_id) print("✍️ 工具执行完毕,准备进行下一轮循环") # --- 新增日志 --- print("❌ 思考循环达到上限") # --- 新增日志 --- return "思考深度超过限制,未能得出最终结论。" def check_model_status(model_id): """测试单个模型是否可用,返回布尔值 (True/False)""" headers = { "Authorization": f"Bearer {NVIDIA_API_KEY}", "Content-Type": "application/json" } payload = { "model": model_id, "messages": [{"role": "user", "content": "hi"}], "max_tokens": 1 # 只需要1个token来探测连通性 } try: # 注意:这里把 timeout 设置为 8 秒。 # 因为这是用户点菜单时的实时检测,等太久体验不好。8秒连不上就视为不可用。 res = requests.post(NVIDIA_API_URL, headers=headers, json=payload, timeout=8) return res.status_code == 200 except Exception: return False # ================= 5. 消息处理与入口 ================= def check_auth(message): if not ALLOWED_USERS or message.from_user.id not in ALLOWED_USERS: print(f"⚠️ 拦截非法访问: {message.from_user.id}") return False return True @bot.message_handler(commands=['start', 'help']) def send_welcome(message): if not check_auth(message): return bot.reply_to(message, "👋 **全功能 Agent 已上线**\n\n• 我能自主决定何时搜索、何时阅读网页\n• `/model` 切换引擎\n• `/reset` 清空记忆", parse_mode="Markdown") @bot.message_handler(commands=['model']) def show_model_menu(message): if not check_auth(message): return # 1. 先发一条提示消息,因为测速需要几秒钟 status_msg = bot.reply_to(message, "⏳ 正在并发检测各节点的 API 连通性,请稍候...") # 2. 并发测试所有模型 model_status = {} with ThreadPoolExecutor(max_workers=len(MODEL_MAP)) as executor: # 提交所有测试任务 future_to_model = {executor.submit(check_model_status, v): k for k, v in MODEL_MAP.items()} # 收集测试结果 for future in as_completed(future_to_model): model_name = future_to_model[future] try: is_ok = future.result() except Exception: is_ok = False model_status[model_name] = is_ok # 3. 动态构建带 ✅/❌ 状态的键盘 markup = types.InlineKeyboardMarkup(row_width=1) for name, model_id in MODEL_MAP.items(): status_icon = "✅" if model_status.get(name) else "❌" # 按钮文本带图标,但 callback_data 保持原样,这样不影响后续切换逻辑 btn_text = f"{status_icon} {name}" markup.add(types.InlineKeyboardButton(text=btn_text, callback_data=f"set_model_{name}")) # 4. 把刚才的“等待中”消息修改为真正的菜单 bot.edit_message_text( text="请选择思考引擎(✅ 可用 / ❌ 异常或超时):", chat_id=message.chat.id, message_id=status_msg.message_id, reply_markup=markup ) @bot.callback_query_handler(func=lambda call: call.data.startswith('set_model_')) def handle_model_selection(call): name = call.data.replace('set_model_', '') user_selected_model[call.message.chat.id] = MODEL_MAP[name] bot.edit_message_text(chat_id=call.message.chat.id, message_id=call.message.message_id, text=f"✅ 已切换至: {name}") @bot.message_handler(commands=['reset']) def reset_memory(message): if not check_auth(message): return if message.chat.id in chat_memory: del chat_memory[message.chat.id] bot.reply_to(message, "🧹 对话上下文及工具执行记录已清空。") @bot.message_handler(content_types=['text', 'photo']) def handle_message(message): if not check_auth(message): return if message.text and message.text.startswith('/'): return chat_id = message.chat.id user_text = message.text or message.caption or "请描述图片" # --- 新增日志 --- print(f"\n[{datetime.datetime.now().strftime('%H:%M:%S')}] 收到用户 {chat_id} 的消息: {user_text[:20]}...") current_content = [{"type": "text", "text": user_text}] if message.photo: try: file_info = bot.get_file(message.photo[-1].file_id) img_b64 = base64.b64encode(bot.download_file(file_info.file_path)).decode('utf-8') current_content.append({"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{img_b64}"}}) print("📸 已接收并处理图片附件") # --- 新增日志 --- except Exception as e: print(f"❌ 图片处理失败: {e}") # --- 新增日志 --- user_message = {"role": "user", "content": current_content} # 👇 视觉感核心:先发送一条状态占位消息 status_msg = bot.reply_to(message, "⏳ 思考中...") print("⏳ 已发送状态占位消息") # --- 新增日志 --- try: # 把占位消息的 ID 传给 agent reply = chat_with_agent(chat_id, user_message, status_msg) # 最终生成完毕后,覆盖那条状态消息 try: bot.edit_message_text(reply, chat_id=chat_id, message_id=status_msg.message_id, parse_mode="Markdown") except Exception: bot.edit_message_text(reply, chat_id=chat_id, message_id=status_msg.message_id) print("✅ 最终回复已发送") # --- 新增日志 --- except Exception as e: print(f"❌ 系统异常: {e}") # --- 新增日志 --- bot.edit_message_text(f"❌ 系统异常: {e}", chat_id=chat_id, message_id=status_msg.message_id) if __name__ == "__main__": setup_bot_commands() print("🚀 Bot 开始运行...") bot.infinity_polling()