Files
tts-server/srt_interactive_refiner_v1.0.py
2025-11-05 10:37:09 +08:00

395 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# filename: srt_interactive_refiner_v2.0.py
import tkinter as tk
from tkinter import filedialog, ttk, messagebox, scrolledtext
import sv_ttk
import re
from datetime import timedelta
import os
import requests
import json
import threading
import queue
import copy
# --- 配置区 ---
OLLAMA_HOST = "http://127.0.0.1:11434"
# --- SRT核心类 和 解析函数 ---
class SrtEntry:
def __init__(self, index, start_td, end_td, text, original_index=None):
self.index = index
self.start_td = start_td
self.end_td = end_td
self.text = text.strip()
self.original_index = original_index if original_index is not None else index
@property
def start_str(self): return self._td_to_str(self.start_td)
@property
def end_str(self): return self._td_to_str(self.end_td)
@staticmethod
def _td_to_str(td):
total_seconds = int(td.total_seconds())
ms = int((td.total_seconds() - total_seconds) * 1000)
h, m, s = total_seconds // 3600, (total_seconds % 3600) // 60, total_seconds % 60
return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"
def to_srt_block(self):
return f"{self.index}\n{self.start_str} --> {self.end_str}\n{self.text}\n\n"
def parse_srt(content):
entries = []
pattern = re.compile(r'(\d+)\n(\d{2}:\d{2}:\d{2},\d{3})\s*-->\s*(\d{2}:\d{2}:\d{2},\d{3})\n([\s\S]*?)(?=\n\n|\Z)', re.MULTILINE)
def to_td(time_str):
h, m, s, ms = map(int, re.split('[:,]', time_str))
return timedelta(hours=h, minutes=m, seconds=s, milliseconds=ms)
for match in pattern.finditer(content):
index = int(match.group(1))
entries.append(SrtEntry(index, to_td(match.group(2)), to_td(match.group(3)), match.group(4)))
return entries
# --- GUI 应用 ---
class App:
def __init__(self, root):
self.root = root
self.root.title("交互式字幕编辑器 V2.0 (最终稳定版)")
self.root.geometry("1400x800")
self.srt_path = ""
self.original_entries = []
self.working_entries = []
self.current_selected_work_iid = None
self.current_selected_orig_iid = None
self.gui_queue = queue.Queue()
self.is_ollama_available = False
self.vars = { "ollama_model": tk.StringVar(), "status": tk.StringVar(value="准备就绪") }
self.build_ui()
sv_ttk.set_theme("dark")
self.root.after(100, self.process_queue)
self.root.after(100, self.load_ollama_models)
def build_ui(self):
main_pane = ttk.PanedWindow(self.root, orient=tk.HORIZONTAL)
main_pane.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
left_pane = ttk.Frame(main_pane)
main_pane.add(left_pane, weight=2)
control_frame = ttk.Frame(left_pane)
control_frame.pack(fill=tk.X, pady=(0, 10))
self.load_btn = ttk.Button(control_frame, text="加载SRT", command=self.load_srt)
self.load_btn.pack(side=tk.LEFT, padx=(0, 5))
self.save_btn = ttk.Button(control_frame, text="另存为...", command=self.save_srt, state="disabled")
self.save_btn.pack(side=tk.LEFT, padx=(0, 5))
self.refine_all_btn = ttk.Button(control_frame, text="🚀 一键润色", command=self.refine_all_lines, state="disabled", style="Accent.TButton")
self.refine_all_btn.pack(side=tk.LEFT, padx=(10,5))
ttk.Label(control_frame, text="Ollama:").pack(side=tk.LEFT, padx=(10, 5))
self.model_combo = ttk.Combobox(control_frame, textvariable=self.vars['ollama_model'], state="readonly", width=25)
self.model_combo.pack(side=tk.LEFT, fill=tk.X, expand=True)
orig_frame = ttk.Labelframe(left_pane, text="原始字幕 (对照区)")
orig_frame.pack(fill=tk.BOTH, expand=True)
self.tree_orig = self.create_treeview(orig_frame)
right_pane = ttk.Frame(main_pane)
main_pane.add(right_pane, weight=3)
work_frame = ttk.Labelframe(right_pane, text="工作区 (可编辑)")
work_frame.pack(fill=tk.BOTH, expand=True)
self.tree_work = self.create_treeview(work_frame)
self.tree_work.bind("<<TreeviewSelect>>", self.on_tree_select)
self.tree_work.tag_configure("modified", background="#3a6b3a")
editor_frame = ttk.Labelframe(right_pane, text="单句编辑器")
editor_frame.pack(fill=tk.X, pady=(10, 0))
self.editor_text = scrolledtext.ScrolledText(editor_frame, height=4, wrap=tk.WORD, state="disabled")
self.editor_text.pack(fill=tk.X, padx=5, pady=5)
button_bar = ttk.Frame(editor_frame)
button_bar.pack(fill=tk.X, padx=5, pady=(0, 5))
button_bar.columnconfigure((0, 1, 2, 3), weight=1)
self.refine_btn = ttk.Button(button_bar, text="润色", command=self.refine_current_line, state="disabled")
self.refine_btn.grid(row=0, column=0, sticky="ew", padx=(0, 2))
self.revert_btn = ttk.Button(button_bar, text="还原文本", command=self.revert_current_line_text, state="disabled")
self.revert_btn.grid(row=0, column=1, sticky="ew", padx=2)
self.apply_btn = ttk.Button(button_bar, text="✅ 应用更改", command=self.apply_editor_changes, state="disabled", style="Accent.TButton")
self.apply_btn.grid(row=1, column=0, columnspan=2, sticky="ew", padx=(0,2), pady=(5,0))
self.delete_btn = ttk.Button(button_bar, text="❌ 删除此行", command=self.delete_current_line, state="disabled")
self.delete_btn.grid(row=1, column=2, columnspan=2, sticky="ew", padx=2, pady=(5,0))
status_bar = ttk.Frame(self.root)
status_bar.pack(side=tk.BOTTOM, fill=tk.X, padx=10, pady=(0, 5))
self.progress_bar = ttk.Progressbar(status_bar, orient=tk.HORIZONTAL, mode='determinate')
self.progress_bar.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0, 10))
ttk.Label(status_bar, textvariable=self.vars['status']).pack(side=tk.LEFT)
def create_treeview(self, parent):
cols = ("#0", "开始", "结束", "文本")
tree = ttk.Treeview(parent, columns=cols[1:], show="headings")
for col in cols: tree.heading(col, text=col, anchor="w")
tree.column("#0", width=40, anchor="center"); tree.column("开始", width=90, anchor="w")
tree.column("结束", width=90, anchor="w"); tree.column("文本", width=400, anchor="w")
vsb = ttk.Scrollbar(parent, orient="vertical", command=tree.yview); hsb = ttk.Scrollbar(parent, orient="horizontal", command=tree.xview)
tree.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set)
vsb.pack(side=tk.RIGHT, fill=tk.Y); hsb.pack(side=tk.BOTTOM, fill=tk.X); tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
return tree
def load_srt(self):
path = filedialog.askopenfilename(filetypes=[("SRT Subtitles", "*.srt")])
if not path: return
self.srt_path = path
try:
with open(path, 'r', encoding='utf-8-sig') as f: content = f.read()
self.original_entries = parse_srt(content)
self.working_entries = copy.deepcopy(self.original_entries)
self.populate_tree(self.tree_orig, self.original_entries)
self.repopulate_work_tree()
self.save_btn.config(state="normal")
self.refine_all_btn.config(state="normal" if self.is_ollama_available else "disabled")
self.current_selected_work_iid = None
self.editor_text.config(state="normal"); self.editor_text.delete(1.0, tk.END); self.editor_text.config(state="disabled")
self.set_editor_buttons_state(False)
self.vars['status'].set(f"已加载 {len(self.original_entries)} 条字幕。")
except Exception as e: messagebox.showerror("加载失败", f"无法加载或解析文件: {e}")
def populate_tree(self, tree, entries):
tree.delete(*tree.get_children())
for entry in entries:
values = (entry.start_str, entry.end_str, entry.text.replace('\n', ' '))
tree.insert("", "end", text=str(entry.original_index), values=values, iid=str(entry.original_index))
def repopulate_work_tree(self):
last_selected = self.current_selected_work_iid
self.tree_work.delete(*self.tree_work.get_children())
for i, entry in enumerate(self.working_entries):
entry.index = i + 1
values = (entry.start_str, entry.end_str, entry.text.replace('\n', ' '))
is_modified = entry.text != self.original_entries[entry.original_index - 1].text
tags = ("modified",) if is_modified else ()
self.tree_work.insert("", "end", text=str(entry.index), values=values, iid=str(entry.index), tags=tags)
if last_selected and self.tree_work.exists(str(last_selected)):
self.tree_work.selection_set(str(last_selected))
self.tree_work.see(str(last_selected))
def on_tree_select(self, event):
selection = self.tree_work.selection()
if not selection: return
work_iid = int(selection[0])
if work_iid > len(self.working_entries): return
self.current_selected_work_iid = work_iid
orig_iid = self.working_entries[work_iid - 1].original_index
self.current_selected_orig_iid = orig_iid
entry_text = self.working_entries[work_iid - 1].text
self.editor_text.config(state="normal")
self.editor_text.delete(1.0, tk.END)
self.editor_text.insert(tk.END, entry_text)
self.set_editor_buttons_state(True)
if self.tree_orig.exists(str(orig_iid)):
self.tree_orig.selection_set(str(orig_iid))
self.tree_orig.see(str(orig_iid))
def set_editor_buttons_state(self, is_enabled):
state = "normal" if is_enabled else "disabled"
is_ready = self.is_ollama_available and is_enabled
self.refine_btn.config(state="normal" if is_ready else "disabled")
self.revert_btn.config(state=state)
self.apply_btn.config(state=state)
self.delete_btn.config(state=state)
def save_srt(self):
if not self.working_entries: return
self.apply_editor_changes()
original_basename = os.path.splitext(os.path.basename(self.srt_path))[0]
save_path = filedialog.asksaveasfilename(defaultextension=".srt", initialfile=f"{original_basename}_edited.srt", filetypes=[("SRT Subtitles", "*.srt")])
if not save_path: return
try:
for i, entry in enumerate(self.working_entries):
entry.index = i + 1
with open(save_path, 'w', encoding='utf-8') as f:
for entry in self.working_entries: f.write(entry.to_srt_block())
messagebox.showinfo("保存成功", f"文件已保存至:\n{save_path}")
except Exception as e: messagebox.showerror("保存失败", f"无法保存文件: {e}")
def apply_editor_changes(self):
if self.current_selected_work_iid is not None:
entry_index_in_list = self.current_selected_work_iid - 1
if entry_index_in_list < len(self.working_entries):
new_text = self.editor_text.get(1.0, tk.END).strip()
if new_text != self.working_entries[entry_index_in_list].text:
self.working_entries[entry_index_in_list].text = new_text
self.repopulate_work_tree()
self.vars['status'].set(f"{self.current_selected_work_iid} 行已更新。")
def delete_current_line(self):
if self.current_selected_work_iid is None: return
entry_index_in_list = self.current_selected_work_iid - 1
entry_to_delete = self.working_entries[entry_index_in_list]
if messagebox.askyesno("确认删除", f"确定要删除第 {entry_to_delete.index} 行吗?\n'{entry_to_delete.text[:50]}...'"):
self.working_entries.pop(entry_index_in_list)
self.repopulate_work_tree()
self.current_selected_work_iid = None
self.editor_text.config(state="normal"); self.editor_text.delete(1.0, tk.END); self.editor_text.config(state="disabled")
self.set_editor_buttons_state(False)
self.vars['status'].set(f"原始行 {entry_to_delete.original_index} 已删除。")
def refine_current_line(self):
if self.current_selected_work_iid is None: return
self.apply_editor_changes()
original_text = self.original_entries[self.current_selected_orig_iid - 1].text
self.set_buttons_state(False)
threading.Thread(target=self._call_llm_for_refine, args=(original_text, self.vars['ollama_model'].get(), self.current_selected_work_iid), daemon=True).start()
def refine_all_lines(self):
if not self.working_entries: return
if messagebox.askyesno("确认", f"即将对全部字幕进行润色,这会覆盖您当前的修改。是否继续?"):
self.set_buttons_state(False)
self.progress_bar['value'] = 0; self.progress_bar['maximum'] = len(self.original_entries)
threading.Thread(target=self._refine_all_worker, args=(self.vars['ollama_model'].get(),), daemon=True).start()
def _refine_all_worker(self, model_name):
for i, entry in enumerate(self.original_entries):
self.vars['status'].set(f"正在处理 {i+1}/{len(self.original_entries)}...")
self.progress_bar['value'] = i + 1
refined_text = self._call_llm_for_refine_sync(entry.text, model_name)
if refined_text:
self.gui_queue.put({"type": "batch_line_refined", "data": {"orig_iid": entry.index, "text": refined_text}})
else:
self.gui_queue.put({"type": "batch_line_refined", "data": {"orig_iid": entry.index, "text": entry.text, "no_change": True}})
self.gui_queue.put({"type": "batch_finish"})
def revert_current_line_text(self):
if self.current_selected_orig_iid is None: return
original_text = self.original_entries[self.current_selected_orig_iid - 1].text
self.editor_text.delete(1.0, tk.END); self.editor_text.insert(tk.END, original_text)
def _call_llm_for_refine(self, text, model, work_iid):
refined_text = self._call_llm_for_refine_sync(text, model)
if refined_text: self.gui_queue.put({"type": "line_refined", "data": {"iid": work_iid, "text": refined_text}})
else: self.gui_queue.put({"type": "refine_failed", "data": work_iid})
def _call_llm_for_refine_sync(self, text, model):
prompt = f"""你是一个专业的视频字幕精炼师。任务是优化“待处理字幕”,使其更适合专业配音。
规则:
1. 改为流畅、专业的书面语,但必须保留所有的核心操作指令和细节。
2. 优先去除明显的口语化词汇、重复和不必要的填充词。
3. 在不影响信息完整性的前提下,可以适当缩短句子。
4. 【重要】只输出精炼后的字幕文本,不要包含任何标签、解释或引号。
---
[待处理字幕]
{text}
---
[精炼后的文本]"""
payload = {"model": model, "prompt": prompt, "stream": False, "options": {'temperature': 0.3}}
try:
response = requests.post(f"{OLLAMA_HOST}/api/generate", json=payload, timeout=45)
response.raise_for_status()
response_data = response.json()
refined_text = response_data.get('response', '').strip().replace("\n", " ")
return re.sub(r'^["\'“‘]|["\'”’]$', '', refined_text)
except Exception as e:
self.gui_queue.put({"type": "error", "data": f"API调用失败: {e}"})
return None
def process_queue(self):
try:
while True:
msg = self.gui_queue.get_nowait()
msg_type, data = msg.get("type"), msg.get("data")
if msg_type == "line_refined":
if data["iid"] == self.current_selected_work_iid:
self.editor_text.delete(1.0, tk.END)
self.editor_text.insert(tk.END, data["text"])
self.set_buttons_state(True)
elif msg_type == "batch_line_refined":
orig_iid = data["orig_iid"]
for work_entry in self.working_entries:
if work_entry.original_index == orig_iid:
work_entry.text = data["text"]
values = (work_entry.start_str, work_entry.end_str, work_entry.text.replace('\n', ' '))
self.tree_work.item(str(work_entry.index), values=values, tags=("modified",))
break
elif msg_type == "batch_finish":
self.repopulate_work_tree()
self.vars['status'].set("批量润色完成!")
self.set_buttons_state(True)
messagebox.showinfo("完成", "所有字幕已批量润色。请检查并进行微调。")
elif msg_type == "refine_failed":
if data == self.current_selected_work_iid: self.set_buttons_state(True)
elif msg_type == "error": messagebox.showerror("Ollama 错误", data)
elif msg_type == "models_loaded":
if data:
self.model_combo['values'] = data; self.model_combo.set(data[0])
self.is_ollama_available = True
if self.working_entries: self.refine_all_btn.config(state="normal")
else:
self.gui_queue.put({"type": "error", "data": "Ollama连接成功, 但未检测到任何模型。请确保您已下载模型。"})
except queue.Empty: pass
finally: self.root.after(100, self.process_queue)
def set_buttons_state(self, is_enabled):
state = "normal" if is_enabled else "disabled"
self.load_btn.config(state=state); self.save_btn.config(state=state); self.refine_all_btn.config(state=state)
self.set_editor_buttons_state(is_enabled and self.current_selected_work_iid is not None)
def load_ollama_models(self):
threading.Thread(target=self._load_models_worker, daemon=True).start()
def _load_models_worker(self):
# *** 修复: 补全此函数并增强错误处理 ***
try:
self.vars['status'].set("正在连接Ollama...")
response = requests.get(f"{OLLAMA_HOST}/api/tags", timeout=5)
response.raise_for_status() # 如果状态码不是 200-299则抛出异常
# 检查返回的是否是有效的JSON
try:
models_data = response.json()
except json.JSONDecodeError:
self.gui_queue.put({"type": "error", "data": "Ollama返回了无效的数据格式无法解析模型列表。"})
return
models = models_data.get('models')
if models is not None:
model_names = [m['name'] for m in models]
self.gui_queue.put({"type": "models_loaded", "data": model_names})
self.vars['status'].set("Ollama连接成功")
else: # models 键不存在
self.gui_queue.put({"type": "models_loaded", "data": []})
except requests.exceptions.Timeout:
self.gui_queue.put({"type": "error", "data": f"连接Ollama超时 ({OLLAMA_HOST})。\n请检查服务是否运行且地址正确。"})
except requests.exceptions.ConnectionError:
self.gui_queue.put({"type": "error", "data": f"无法连接到Ollama ({OLLAMA_HOST})。\n请确保Ollama服务正在运行。"})
except requests.exceptions.RequestException as e:
self.gui_queue.put({"type": "error", "data": f"连接Ollama时发生网络错误: {e}"})
finally:
if not self.is_ollama_available: self.vars['status'].set("Ollama连接失败润色功能不可用。")
if __name__ == "__main__":
root = tk.Tk()
app = App(root)
root.mainloop()