Files
2026-03-11 16:49:00 +08:00

160 lines
5.5 KiB
Python

import json
import os
import shutil
import subprocess
from pathlib import Path
from typing import Union
from qwen_agent.tools.base import BaseTool, register_tool
DEFAULT_TIMEOUT = 60
def _ensure_parent(path: Path) -> None:
parent = path.parent
parent.mkdir(parents=True, exist_ok=True)
def _build_shell_command(command: str) -> list[str]:
if os.name == 'nt':
return ['powershell.exe', '-NoProfile', '-Command', command]
return ['bash', '-lc', command]
@register_tool('filesystem', allow_overwrite=True)
class FilesystemTool(BaseTool):
description = '文件系统工具,支持目录列举、读写文件、创建目录和删除。'
parameters = {
'type': 'object',
'properties': {
'operation': {
'type': 'string',
'description': 'list|read|write|append|mkdir|remove'
},
'path': {
'type': 'string',
'description': '目标路径'
},
'content': {
'type': 'string',
'description': '写入内容,仅 write 或 append 需要'
}
},
'required': ['operation', 'path'],
}
def call(self, params: Union[str, dict], **kwargs) -> str:
params = self._verify_json_format_args(params)
operation = params['operation'].strip().lower()
target = Path(os.path.expanduser(params['path'])).resolve()
handlers = {
'list': self._list_path,
'read': self._read_file,
'write': self._write_file,
'append': self._append_file,
'mkdir': self._mkdir_path,
'remove': self._remove_path,
}
if operation not in handlers:
raise ValueError(f'不支持的 operation: {operation}')
return handlers[operation](target, params)
def _list_path(self, target: Path, params: dict) -> str:
if not target.exists():
raise FileNotFoundError(f'路径不存在: {target}')
if target.is_file():
stat = target.stat()
return json.dumps({'type': 'file', 'path': str(target), 'size': stat.st_size}, ensure_ascii=False)
items = []
for child in sorted(target.iterdir()):
item_type = 'dir' if child.is_dir() else 'file'
size = child.stat().st_size if child.is_file() else None
items.append({'name': child.name, 'type': item_type, 'size': size})
return json.dumps({'type': 'dir', 'path': str(target), 'items': items}, ensure_ascii=False, indent=2)
def _read_file(self, target: Path, params: dict) -> str:
if not target.exists() or not target.is_file():
raise FileNotFoundError(f'文件不存在: {target}')
return target.read_text(encoding='utf-8')
def _write_file(self, target: Path, params: dict) -> str:
content = params.get('content')
if content is None:
raise ValueError('write 操作必须提供 content')
_ensure_parent(target)
target.write_text(content, encoding='utf-8')
return f'写入成功: {target}'
def _append_file(self, target: Path, params: dict) -> str:
content = params.get('content')
if content is None:
raise ValueError('append 操作必须提供 content')
_ensure_parent(target)
with target.open('a', encoding='utf-8') as fp:
fp.write(content)
return f'追加成功: {target}'
def _mkdir_path(self, target: Path, params: dict) -> str:
target.mkdir(parents=True, exist_ok=True)
return f'目录已创建: {target}'
def _remove_path(self, target: Path, params: dict) -> str:
if not target.exists():
raise FileNotFoundError(f'路径不存在: {target}')
if target.is_dir():
shutil.rmtree(target)
else:
target.unlink()
return f'删除成功: {target}'
@register_tool('run_command', allow_overwrite=True)
class RunCommandTool(BaseTool):
description = '执行本机命令并返回退出码、标准输出和标准错误。'
parameters = {
'type': 'object',
'properties': {
'command': {
'type': 'string',
'description': '待执行命令'
},
'cwd': {
'type': 'string',
'description': '执行目录'
},
'timeout_sec': {
'type': 'integer',
'description': '超时时间秒数',
'default': DEFAULT_TIMEOUT
}
},
'required': ['command'],
}
def call(self, params: Union[str, dict], **kwargs) -> str:
params = self._verify_json_format_args(params)
command = params['command'].strip()
if not command:
raise ValueError('command 不能为空')
timeout_sec = int(params.get('timeout_sec', DEFAULT_TIMEOUT))
cwd_raw = params.get('cwd') or os.getcwd()
cwd = str(Path(os.path.expanduser(cwd_raw)).resolve())
completed = subprocess.run(
_build_shell_command(command),
cwd=cwd,
text=True,
capture_output=True,
timeout=timeout_sec,
check=False,
)
payload = {
'command': command,
'cwd': cwd,
'returncode': completed.returncode,
'stdout': completed.stdout,
'stderr': completed.stderr,
}
return json.dumps(payload, ensure_ascii=False, indent=2)