import json import os import time import datetime import uuid from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Sequence, Union import requests from qwen_agent.agents import Assistant from qwen_agent.llm.schema import ContentItem, Message import agent_runtime # noqa: F401 from agent_runtime import readonly_tools # noqa: F401 # 强制注册 write_file 工具,防止 build_agent 时报错 try: import agent_runtime.write_tools except ImportError: pass DEFAULT_SYSTEM_PROMPT = ( '你是 Qwen3.5,本地部署的多模态中文助手。\n' '默认中文回答。\n' '当用户只是打招呼或闲聊时,自然回应即可,不要主动枚举全部工具。\n' '你的目标是先使用可用工具获得可验证信息,再给出结论。\n' '规则:\n' '1. 对最新信息先用 web_search,再按需用 web_fetch 或 web_extractor 抓取正文。\n' '2. 对人名、作品名、小众概念等不确定知识先 web_search,若结果歧义则改写关键词再检索一次。\n' '3. 核心规则:你已具备 filesystem 的读写能力。你可以读取文件,如果用户有需求,你也可以调用 write_file 工具进行写入。\n' '4. 图片问题先看整图,细节再用 image_zoom_in_tool,使用相对坐标。\n' '5. 工具失败时必须明确说明原因,不得伪造结果。\n' '6. 联网任务要控制上下文预算,优先少量高质量来源。\n' '7. 严禁在未获授权的情况下使用 filesystem 工具查看助手自身的源代码或运行目录。\n' '8. 联网任务要控制上下文预算,优先少量高质量来源,避免搬运大段无关正文。\n' '9. 长期记忆(主动意识):你拥有 manage_memory 工具。当你从对话中识别出以下内容时,必须【主动】调用 add 操作:\n - 用户的明确偏好(如:喜欢 MD 格式、不喜欢繁琐说明)。\n - 重要的个人事实(如:职业、项目代号、系统配置路径)。\n - 约定的工作习惯(如:每段代码都要加注释)。\n 执行后,在回复中自然地告知用户“我已记下此习惯/信息”。当用户问“你了解我什么”或要求修改时,配合 list 和 delete 操作。\n' ) DEFAULT_FUNCTION_LIST = [ 'web_search', 'web_fetch', 'web_extractor', 'image_search', 'image_zoom_in_tool', 'filesystem', 'manage_memory', ] TIMINGS_EMIT_INTERVAL_SEC = 0.8 MAX_FALLBACK_PART_TEXT_CHARS = 512 # ========================================== # 🧠 记忆系统:全兼容自愈逻辑 # ========================================== def get_injected_memory() -> str: """从环境变量指定的路径动态加载记忆,支持多种 JSON 格式""" raw_path = os.getenv('MEMORY_FILE_PATH', './.tmp/super_agent_data/memory.json') # 彻底剥离可能存在的换行符或引号 clean_path = raw_path.strip().strip('"').strip("'") memory_path = Path(clean_path).resolve() if not memory_path.exists(): return "" try: with open(memory_path, 'r', encoding='utf-8') as f: data = json.load(f) # 兼容:直接是列表 [item1, item2] 或 字典 {"items": [item1, ...]} if isinstance(data, dict) and "items" in data: raw_list = data["items"] elif isinstance(data, list): raw_list = data else: return "" processed = [] for item in raw_list: # 兼容项:直接是字符串 或 带 content 键的字典 val = item.get("content") if isinstance(item, dict) else item if val: processed.append(str(val)) if not processed: return "" memory_str = "\n".join([f"- {m}" for m in processed]) return f"\n【长期记忆库(已自动加载)】:\n{memory_str}\n" except Exception as e: print(f"[Memory] 记忆注入提示: 暂未读取到有效记忆或格式不匹配 ({e})") return "" def fetch_model_id(model_server: str, timeout_sec: int) -> str: try: response = requests.get(f'{model_server}/models', timeout=2) response.raise_for_status() return response.json()['data'][0]['id'] except Exception: return "local-model" # ========================================== # 🚥 路由系统:指令拦截 # ========================================== def extract_routing_command(messages: list) -> Optional[str]: """从后往前扫描整段对话历史,寻找最近的路由指令""" if not messages: return None # 1. 优先检查最后一条消息(即时指令优先级最高) last_msg = messages[-1] if last_msg.get('role') == 'user': content = last_msg.get('content', '') if isinstance(content, str): text = content.strip() if text.startswith('/cloud'): last_msg['content'] = text[len('/cloud'):].strip() return 'FORCE_CLOUD' elif text.startswith('/local'): last_msg['content'] = text[len('/local'):].strip() return 'FORCE_LOCAL' # 2. 如果最后一条没指令,则从倒数第二条开始往回找历史指令 # 这样可以实现“记忆切换状态” for i in range(len(messages) - 2, -1, -1): msg = messages[i] if msg.get('role') == 'user': content = msg.get('content', '') if isinstance(content, str): if content.strip().startswith('/cloud'): return 'FORCE_CLOUD' if content.strip().startswith('/local'): return 'FORCE_LOCAL' return None def build_agent( model_server: str, timeout_sec: int, generate_cfg: Dict[str, Any], model_id: Optional[str] = None, system_prompt: str = DEFAULT_SYSTEM_PROMPT, ) -> Assistant: # 获取默认模式 env_mode = os.getenv('MODEL_MODE', 'local').strip().lower() # 决策逻辑:指令优先 > 环境变量 if model_id == 'FORCE_CLOUD': is_cloud = True elif model_id == 'FORCE_LOCAL': is_cloud = False else: is_cloud = (env_mode == 'cloud') if is_cloud: target_model = os.getenv('CLOUD_MODEL_ID', 'gpt-4o').strip() target_server = os.getenv('CLOUD_BASE_URL', '').strip() target_key = os.getenv('CLOUD_API_KEY', '').strip() display_name = f"Cloud-Agent ({target_model})" else: target_model = fetch_model_id(model_server, timeout_sec) target_server = model_server target_key = os.getenv('OPENAI_API_KEY', 'EMPTY') display_name = f"Local-Agent ({target_model})" # 日志输出(不带表情符号,防止 Windows CMD 崩溃) print(f"\n[Route] Current Brain: {display_name}", flush=True) llm_cfg = { 'model': target_model, 'model_server': target_server, 'api_key': target_key, 'model_type': 'qwenvl_oai', 'generate_cfg': generate_cfg, } # 组装功能列表 actual_function_list = list(DEFAULT_FUNCTION_LIST) if os.getenv('ENABLE_FILE_WRITE', 'False').lower() == 'true': actual_function_list.append('write_file') # 组装系统提示词 persistent_memory = get_injected_memory() now = datetime.datetime.now() weekdays = ["一", "二", "三", "四", "五", "六", "日"] dynamic_context = f"【系统实时状态】\n当前时间:{now.strftime('%Y年%m月%d日 %H:%M:%S')},星期{weekdays[now.weekday()]}。\n" actual_system_prompt = dynamic_context + persistent_memory + system_prompt return Assistant( name=display_name, llm=llm_cfg, function_list=actual_function_list, system_message=actual_system_prompt, ) # ========================================== # 🔄 基础转换工具函数 # ========================================== def _extract_image_uri(part: Dict[str, Any]) -> Optional[str]: keys = ('image_url', 'image', 'url', 'input_image', 'image_uri') for key in keys: value = part.get(key) if isinstance(value, str) and value.strip(): return value.strip() if isinstance(value, dict): nested = value.get('url') or value.get('image_url') or value.get('image') if isinstance(nested, str) and nested.strip(): return nested.strip() return None def _build_compact_part_text(part: Dict[str, Any], part_type: Any) -> str: part_keys = sorted(str(k) for k in part.keys()) payload = {'type': str(part_type or 'unknown'), 'keys': part_keys[:12]} text = part.get('text') if isinstance(text, str) and text.strip(): payload['text'] = text.strip()[:MAX_FALLBACK_PART_TEXT_CHARS] return json.dumps(payload, ensure_ascii=False) def extract_generate_cfg(payload: Dict[str, Any]) -> Dict[str, Any]: cfg: Dict[str, Any] = {} keys = ('temperature', 'top_p', 'top_k', 'presence_penalty', 'frequency_penalty') for key in keys: value = payload.get(key) if value is not None: cfg[key] = value repeat_penalty = payload.get('repeat_penalty') if repeat_penalty is not None: cfg['repetition_penalty'] = repeat_penalty extra_body = payload.get('extra_body', {}) chat_template_kwargs = extra_body.get('chat_template_kwargs', {}) chat_template_kwargs.setdefault('enable_thinking', True) extra_body['chat_template_kwargs'] = chat_template_kwargs extra_body.setdefault('reasoning_format', payload.get('reasoning_format', 'deepseek')) extra_body.setdefault('reasoning_budget', -1) cfg['extra_body'] = extra_body max_tokens = payload.get('max_tokens') if isinstance(max_tokens, int) and max_tokens > 0: cfg['max_tokens'] = max_tokens return cfg if cfg else {'temperature': 0.7, 'top_p': 0.9, 'max_tokens': 512} def to_content_items(content: Any) -> Union[str, List[ContentItem]]: if isinstance(content, str): return content if not isinstance(content, list): return str(content) items: List[ContentItem] = [] for part in content: if not isinstance(part, dict): items.append(ContentItem(text=str(part))) continue part_type = part.get('type') if part_type in (None, 'text', 'input_text'): text = part.get('text', '') if text: items.append(ContentItem(text=str(text))) continue image_uri = _extract_image_uri(part) if image_uri: items.append(ContentItem(image=image_uri)) continue items.append(ContentItem(text=_build_compact_part_text(part, part_type))) return items if items else '' def to_qwen_messages(openai_messages: Sequence[Dict[str, Any]]) -> List[Message]: qwen_messages: List[Message] = [] for item in openai_messages: role = str(item.get('role', '')).strip() if role in {'system', 'user', 'assistant'}: qwen_messages.append(Message(role=role, content=to_content_items(item.get('content', '')))) return qwen_messages def content_to_text(content: Any) -> str: if isinstance(content, str): return content if not isinstance(content, list): return str(content) texts: List[str] = [] for item in content: if isinstance(item, str): texts.append(item) elif isinstance(item, dict) and item.get('text'): texts.append(str(item['text'])) elif hasattr(item, 'text'): texts.append(str(item.text)) return '\n'.join(texts).strip() def extract_answer_and_reasoning(messages: Sequence[Message]) -> Dict[str, str]: answer = '' reasoning_parts = [] for message in messages: if getattr(message, 'role', '') != 'assistant': continue content_text = content_to_text(message.get('content', '')) if content_text: answer = content_text reasoning_text = content_to_text(message.get('reasoning_content', '')) if reasoning_text: reasoning_parts.append(reasoning_text) return {'answer': answer, 'reasoning': '\n'.join(reasoning_parts).strip()} # ========================================== # 📡 核心接口实现 # ========================================== def run_chat_completion(payload: Dict[str, Any], model_server: str, timeout_sec: int) -> Dict[str, str]: openai_messages = payload.get('messages', []) # 拦截路由指令 route_signal = extract_routing_command(openai_messages) requested_model = route_signal if route_signal else payload.get('model') agent = build_agent(model_server, timeout_sec, extract_generate_cfg(payload), model_id=requested_model) qwen_messages = to_qwen_messages(openai_messages) final_batch = None for batch in agent.run(qwen_messages): final_batch = batch if not final_batch: raise RuntimeError('未收到模型输出') texts = extract_answer_and_reasoning(final_batch) return {'model': agent.llm.model, 'answer': texts['answer'], 'reasoning': texts['reasoning']} def build_sse_chunk(chat_id: str, created: int, model: str, delta: Dict[str, Any], finish_reason: Optional[str] = None, timings: Optional[Dict[str, Any]] = None) -> bytes: chunk = { 'id': chat_id, 'object': 'chat.completion.chunk', 'created': created, 'model': model, 'choices': [{'index': 0, 'delta': delta, 'finish_reason': finish_reason}], } if timings: chunk['timings'] = timings return f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n".encode('utf-8') def text_delta(previous: str, current: str) -> str: return current[len(previous):] if current.startswith(previous) else current def count_text_tokens(model_server: str, timeout_sec: int, text: str) -> int: try: url = f"{model_server.rstrip('/')}/tokenize" if not model_server.endswith('/v1') else f"{model_server[:-3]}/tokenize" response = requests.post(url, json={'content': text}, timeout=timeout_sec) return len(response.json().get('tokens', [])) except: return 0 def build_live_timings(token_count: int, elapsed_sec: float) -> Dict[str, Any]: safe_elapsed = max(elapsed_sec, 0.001) return {'prompt_n': 0, 'prompt_ms': 0, 'predicted_n': token_count, 'predicted_ms': safe_elapsed * 1000.0, 'predicted_per_second': token_count / safe_elapsed, 'cache_n': 0} def merge_generated_text(reasoning: str, answer: str) -> str: return f"{reasoning}\n{answer}" if reasoning and answer else (reasoning or answer) def stream_chat_completion(payload: Dict[str, Any], model_server: str, timeout_sec: int) -> Iterable[bytes]: openai_messages = payload.get('messages', []) # 拦截路由指令 route_signal = extract_routing_command(openai_messages) requested_model = route_signal if route_signal else payload.get('model') agent = build_agent(model_server, timeout_sec, extract_generate_cfg(payload), model_id=requested_model) qwen_messages = to_qwen_messages(openai_messages) now, chat_id = int(time.time()), f'chatcmpl-{uuid.uuid4().hex}' current_model = agent.llm.model yield build_sse_chunk(chat_id, now, current_model, {'role': 'assistant'}) previous_answer, previous_reasoning = '', '' started_at = time.perf_counter() last_timing_at, last_reported_tokens, last_counted_text = started_at, -1, '' for batch in agent.run(qwen_messages): texts = extract_answer_and_reasoning(batch) answer, reasoning = texts['answer'], texts['reasoning'] reasoning_inc = text_delta(previous_reasoning, reasoning) if reasoning_inc: yield build_sse_chunk(chat_id, now, current_model, {'reasoning_content': reasoning_inc}) answer_inc = text_delta(previous_answer, answer) if answer_inc: yield build_sse_chunk(chat_id, now, current_model, {'content': answer_inc}) # Timing 逻辑 gen_text = merge_generated_text(reasoning, answer) cur_t = time.perf_counter() if gen_text and gen_text != last_counted_text and (cur_t - last_timing_at) >= TIMINGS_EMIT_INTERVAL_SEC: t_count = count_text_tokens(model_server, timeout_sec, gen_text) if t_count != last_reported_tokens: yield build_sse_chunk(chat_id, now, current_model, {}, timings=build_live_timings(t_count, cur_t - started_at)) last_reported_tokens = t_count last_counted_text, last_timing_at = gen_text, cur_t previous_answer, previous_reasoning = answer, reasoning yield build_sse_chunk(chat_id, now, current_model, {}, 'stop') yield b'data: [DONE]\n\n' # ========================================== # 🎁 响应构建工具(补全缺失的函数) # ========================================== def build_non_stream_response(answer: str, model: str, reasoning: str = '') -> Dict[str, Any]: """构建 OpenAI 兼容的非流式响应 JSON""" now = int(time.time()) message = {'role': 'assistant', 'content': answer} if reasoning: message['reasoning_content'] = reasoning return { 'id': f'chatcmpl-{uuid.uuid4().hex}', 'object': 'chat.completion', 'created': now, 'model': model, 'choices': [{ 'index': 0, 'message': message, 'finish_reason': 'stop', }], 'usage': {'prompt_tokens': 0, 'completion_tokens': 0, 'total_tokens': 0}, } def sse_lines(answer: str, model: str, reasoning: str = '') -> Iterable[bytes]: """将完整答案转换为 SSE 流格式(用于特殊兼容性场景)""" now = int(time.time()) chat_id = f'chatcmpl-{uuid.uuid4().hex}' # 1. 角色初始化块 yield build_sse_chunk(chat_id, now, model, {'role': 'assistant'}) # 2. 思维链块(如果有) if reasoning: yield build_sse_chunk(chat_id, now, model, {'reasoning_content': reasoning}) # 3. 正文块 yield build_sse_chunk(chat_id, now, model, {'content': answer}) # 4. 结束块 yield build_sse_chunk(chat_id, now, model, {}, 'stop') yield b'data: [DONE]\n\n'