Files
tts-server/srt_ultimate_optimizer_v4.0.py
2025-11-05 10:37:23 +08:00

408 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# filename: srt_ultimate_optimizer_v4.0.py
import tkinter as tk
from tkinter import filedialog, ttk, messagebox, scrolledtext
import sv_ttk
import re
from datetime import timedelta
import os
import jieba
import requests
import json
import threading
import queue
# --- 配置区 ---
# Ollama 服务地址
OLLAMA_HOST = "http://127.0.0.1:11434"
# 语速和结构优化参数
TARGET_SPEED_STRICT = 4.2 # 黄金语速 (字/秒),略微调高以适应更自然的语感
NATURAL_PAUSE_DURATION = 0.4 # 拆分句子间的自然停顿 (秒)
FAST_SPEED_THRESHOLD = 7.5 # 判定为“过快”的语速阈值
SPLIT_CHARS_THRESHOLD = 28 # 超过此字数也考虑拆分
# 借用时间策略的参数
GAP_BORROW_THRESHOLD = 0.3 # 必须大于此秒数的间隙才能借用
GAP_SAFE_MARGIN = 0.15 # 借用后至少保留的间隙时长
# 缝合策略的安全阈值
MIN_DURATION_THRESHOLD = 0.4
MIN_CHARS_THRESHOLD = 2
# --- SRT核心类 和 解析函数 ---
class SrtEntry:
def __init__(self, index, start_td, end_td, text):
self.index = index
self.start_td = start_td
self.end_td = end_td
self.text = text.strip()
self.optimization_type = "original" # original, compressed, stretched, shortened, split
@property
def duration(self): return (self.end_td - self.start_td).total_seconds()
@property
def char_count(self): return len(re.sub(r'[\s,.?!。,、?!]', '', self.text))
@property
def speed(self):
count = self.char_count
return count / self.duration if self.duration > 0 and count > 0 else 0
@property
def start_str(self): return self._td_to_str(self.start_td)
@property
def end_str(self): return self._td_to_str(self.end_td)
@staticmethod
def _td_to_str(td):
total_seconds = int(td.total_seconds())
ms = int((td.total_seconds() - total_seconds) * 1000)
h, m, s = total_seconds // 3600, (total_seconds % 3600) // 60, total_seconds % 60
return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"
def to_srt_block(self): return f"{self.index}\n{self.start_str} --> {self.end_str}\n{self.text}\n\n"
def parse_srt(content):
entries = []
pattern = re.compile(r'(\d+)\n(\d{2}:\d{2}:\d{2},\d{3})\s*-->\s*(\d{2}:\d{2}:\d{2},\d{3})\n([\s\S]*?)(?=\n\n|\Z)', re.MULTILINE)
def to_td(time_str):
h, m, s, ms = map(int, re.split('[:,]', time_str))
return timedelta(hours=h, minutes=m, seconds=s, milliseconds=ms)
for match in pattern.finditer(content):
entries.append(SrtEntry(int(match.group(1)), to_td(match.group(2)), to_td(match.group(3)), match.group(4)))
return entries
# --- GUI 应用 ---
class App:
def __init__(self, root):
self.root = root
self.root.title("终极字幕优化器 V4.0 - 融合版")
self.root.geometry("1300x850")
self.srt_path = ""
self.original_entries = []
self.optimized_entries = []
self.processing_thread = None
self.gui_queue = queue.Queue()
self.is_ollama_available = False
self.vars = {
"ollama_model": tk.StringVar(),
"status": tk.StringVar(value="准备就绪"),
"use_llm": tk.BooleanVar(value=True)
}
self.build_ui()
sv_ttk.set_theme("dark")
self.root.after(100, self.process_queue)
self.root.after(100, self.load_ollama_models)
def build_ui(self):
main_pane = ttk.PanedWindow(self.root, orient=tk.HORIZONTAL)
main_pane.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# --- 左侧控制面板 ---
control_frame = ttk.Frame(main_pane, width=280)
main_pane.add(control_frame, weight=0)
# 1. 文件操作
file_frame = ttk.Labelframe(control_frame, text="1. 文件操作")
file_frame.pack(fill=tk.X, pady=(0, 10))
load_btn = ttk.Button(file_frame, text="加载SRT文件", command=self.load_srt, style="Accent.TButton")
load_btn.pack(fill=tk.X, padx=5, pady=5)
self.save_btn = ttk.Button(file_frame, text="另存为...", command=self.save_srt, state="disabled")
self.save_btn.pack(fill=tk.X, padx=5, pady=(0, 5))
# 2. Ollama 设置
ollama_frame = ttk.Labelframe(control_frame, text="2. LLM 设置 (可选)")
ollama_frame.pack(fill=tk.X, pady=10)
ttk.Label(ollama_frame, text="Ollama模型:").pack(fill=tk.X, padx=5, pady=(5,0))
self.model_combo = ttk.Combobox(ollama_frame, textvariable=self.vars['ollama_model'], state="readonly")
self.model_combo.pack(fill=tk.X, padx=5, pady=5)
llm_check = ttk.Checkbutton(ollama_frame, text="启用LLM进行文本缩减", variable=self.vars['use_llm'])
llm_check.pack(fill=tk.X, padx=5, pady=(0, 5))
# 3. 开始优化
self.optimize_btn = ttk.Button(control_frame, text="开始优化", command=self.run_optimization, state="disabled")
self.optimize_btn.pack(fill=tk.X, pady=10)
# 4. 日志输出
log_frame = ttk.Labelframe(control_frame, text="处理日志")
log_frame.pack(fill=tk.BOTH, expand=True, pady=10)
self.log_text = scrolledtext.ScrolledText(log_frame, wrap=tk.WORD, state="disabled", height=10)
self.log_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# --- 右侧表格对比区 ---
table_container = ttk.Frame(main_pane)
main_pane.add(table_container, weight=1)
# 状态栏
status_bar = ttk.Frame(table_container)
status_bar.pack(side=tk.BOTTOM, fill=tk.X, pady=(5, 0))
self.progress_bar = ttk.Progressbar(status_bar, orient=tk.HORIZONTAL, mode='determinate')
self.progress_bar.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=(0, 10))
ttk.Label(status_bar, textvariable=self.vars['status']).pack(side=tk.LEFT)
# 表格
table_pane = ttk.PanedWindow(table_container, orient=tk.VERTICAL)
table_pane.pack(fill=tk.BOTH, expand=True)
left_frame = ttk.Labelframe(table_pane, text="原始字幕")
table_pane.add(left_frame, weight=1)
right_frame = ttk.Labelframe(table_pane, text="优化后 (蓝:借时 橙:缩减 绿:压缩/拆分)")
table_pane.add(right_frame, weight=1)
self.tree_orig = self.create_treeview(left_frame)
self.tree_optim = self.create_treeview(right_frame)
self.tree_optim.tag_configure("compressed", background="#2a5a2a")
self.tree_optim.tag_configure("split", background="#3a6b3a")
self.tree_optim.tag_configure("stretched", background="#2a4a7a")
self.tree_optim.tag_configure("shortened", background="#7a5a2a")
def create_treeview(self, parent):
cols = ("#0", "开始", "结束", "时长", "字数", "语速", "文本")
tree = ttk.Treeview(parent, columns=cols[1:], show="headings")
for col in cols: tree.heading(col, text=col, anchor="w")
tree.column("#0", width=40, anchor="center"); tree.column("开始", width=90, anchor="w"); tree.column("结束", width=90, anchor="w")
tree.column("时长", width=50, anchor="e"); tree.column("字数", width=40, anchor="e")
tree.column("语速", width=50, anchor="e"); tree.column("文本", width=400, anchor="w")
vsb = ttk.Scrollbar(parent, orient="vertical", command=tree.yview)
tree.configure(yscrollcommand=vsb.set); tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True); vsb.pack(side=tk.RIGHT, fill=tk.Y)
return tree
# --- GUI 事件处理 ---
def load_srt(self):
path = filedialog.askopenfilename(filetypes=[("SRT Subtitles", "*.srt")])
if not path: return
self.srt_path = path
try:
with open(path, 'r', encoding='utf-8-sig') as f: content = f.read()
self.original_entries = parse_srt(content)
self.populate_tree(self.tree_orig, self.original_entries)
self.tree_optim.delete(*self.tree_optim.get_children())
self.log_message(f"已加载: {os.path.basename(path)} ({len(self.original_entries)} 条)")
self.optimize_btn.config(state="normal")
self.save_btn.config(state="disabled")
except Exception as e: messagebox.showerror("加载失败", f"无法加载或解析文件: {e}")
def save_srt(self):
if not self.optimized_entries: return
original_basename = os.path.splitext(os.path.basename(self.srt_path))[0]
save_path = filedialog.asksaveasfilename(defaultextension=".srt", initialfile=f"{original_basename}_ultimate.srt", filetypes=[("SRT Subtitles", "*.srt")])
if not save_path: return
try:
with open(save_path, 'w', encoding='utf-8') as f:
for entry in self.optimized_entries: f.write(entry.to_srt_block())
messagebox.showinfo("保存成功", f"优化后的SRT文件已保存至:\n{save_path}")
except Exception as e: messagebox.showerror("保存失败", f"无法保存文件: {e}")
def populate_tree(self, tree, entries):
tree.delete(*tree.get_children())
for entry in entries:
values = (entry.start_str, entry.end_str, f"{entry.duration:.2f}s", entry.char_count, f"{entry.speed:.2f}", entry.text)
tags = (entry.optimization_type,) if entry.optimization_type != "original" else ()
tree.insert("", "end", text=str(entry.index), values=values, tags=tags)
# --- 线程与队列 ---
def run_optimization(self):
if not self.original_entries: return
if self.vars['use_llm'].get() and not self.is_ollama_available:
messagebox.showerror("错误", "已勾选启用LLM但Ollama服务未连接。")
return
self.optimize_btn.config(state="disabled")
self.save_btn.config(state="disabled")
self.log_text.config(state="normal"); self.log_text.delete(1.0, tk.END); self.log_text.config(state="disabled")
self.progress_bar['value'] = 0
self.progress_bar['maximum'] = len(self.original_entries)
self.processing_thread = threading.Thread(
target=self.optimization_worker,
args=(list(self.original_entries), self.vars['ollama_model'].get(), self.vars['use_llm'].get()),
daemon=True
)
self.processing_thread.start()
def process_queue(self):
try:
while True:
msg = self.gui_queue.get_nowait()
msg_type, data = msg.get("type"), msg.get("data")
if msg_type == "log":
self.log_text.config(state="normal")
self.log_text.insert(tk.END, data + "\n"); self.log_text.see(tk.END)
self.log_text.config(state="disabled")
elif msg_type == "error":
self.log_message(f"错误: {data}")
messagebox.showerror("Ollama 错误", data)
elif msg_type == "models_loaded":
if data:
self.model_combo['values'] = data; self.model_combo.set(data[0])
self.log_message(f"Ollama连接成功, 检测到模型: {', '.join(data)}")
self.is_ollama_available = True
else: self.log_message("Ollama连接成功, 但未检测到模型。")
elif msg_type == "progress":
self.progress_bar['value'] = data['current']
self.vars['status'].set(f"处理中... {data['current']}/{data['total']}")
elif msg_type == "finish":
self.optimized_entries = data
self.populate_tree(self.tree_optim, self.optimized_entries)
self.optimize_btn.config(state="normal"); self.save_btn.config(state="normal")
self.vars['status'].set("优化完成!")
self.log_message("\n--- 所有字幕优化完成!---")
messagebox.showinfo("完成", f"优化完成!\n{len(self.original_entries)} 句 -> 新 {len(self.optimized_entries)}")
except queue.Empty: pass
finally: self.root.after(100, self.process_queue)
def log_message(self, msg): self.gui_queue.put({"type": "log", "data": msg})
# --- Ollama API 通信 ---
def load_ollama_models(self):
self.log_message("正在连接Ollama...")
def _load():
try:
response = requests.get(f"{OLLAMA_HOST}/api/tags", timeout=5)
response.raise_for_status()
models_data = response.json()['models']
model_names = [m['name'] for m in models_data]
self.gui_queue.put({"type": "models_loaded", "data": model_names})
except requests.exceptions.RequestException as e:
self.gui_queue.put({"type": "log", "data": f"Ollama连接失败: {e}\n(LLM功能将不可用)"})
except Exception as e:
self.gui_queue.put({"type": "error", "data": f"解析Ollama模型列表时出错: {e}"})
threading.Thread(target=_load, daemon=True).start()
def _call_llm_for_shorten(self, text_to_shorten, model_name):
prompt = f"""你是一个专业的视频字幕精炼师。任务是将给定的字幕文本缩短,以适配过短的时间轴,同时保持核心意义。
规则:
1. **目标是减少字数**,变得更简洁、紧凑。
2. 必须保留关键信息、专有名词和数字。
3. 只输出精炼和缩短后的文本,不要包含任何解释或引号。
---
[待缩短的文本]
{text_to_shorten}
---
[缩短后的文本]:"""
payload = {"model": model_name, "prompt": prompt, "stream": False, "options": {'temperature': 0.1}}
try:
response = requests.post(f"{OLLAMA_HOST}/api/generate", json=payload, timeout=45)
response.raise_for_status()
response_data = response.json()
refined_text = response_data.get('response', '').strip().replace("\n", " ")
return re.sub(r'^["\'“‘]|["\'”’]$', '', refined_text)
except Exception as e:
self.log_message(f"!! LLM调用失败: {e}")
return None
# --- 核心优化算法 ---
def optimization_worker(self, entries, model_name, use_llm):
paced_entries = self._optimize_pacing(entries, model_name, use_llm)
final_entries = self._perform_suturing(paced_entries)
for i, entry in enumerate(final_entries): entry.index = i + 1
self.gui_queue.put({"type": "finish", "data": final_entries})
def _optimize_pacing(self, entries, model_name, use_llm):
temp_entries = []
total_count = len(entries)
for i, entry in enumerate(entries):
self.gui_queue.put({"type": "progress", "data": {"current": i + 1, "total": total_count}})
entry.text = entry.text.replace('\n', ' ').strip()
# --- 策略 1: 处理语速过慢 ---
is_slow = entry.speed < TARGET_SPEED_STRICT and entry.char_count > 0
if is_slow:
ideal_duration = max(entry.char_count / TARGET_SPEED_STRICT, MIN_DURATION_THRESHOLD)
new_end_td = entry.start_td + timedelta(seconds=ideal_duration)
new_entry = SrtEntry(0, entry.start_td, new_end_td, entry.text)
new_entry.optimization_type = "compressed"
temp_entries.append(new_entry)
self.log_message(f"[{i+1}] 慢速 -> 压缩时长: '{entry.text}' ({entry.duration:.2f}s -> {new_entry.duration:.2f}s)")
continue
# --- 策略 2: 处理语速过快/过长 ---
is_fast_or_long = entry.speed > FAST_SPEED_THRESHOLD or entry.char_count > SPLIT_CHARS_THRESHOLD
if is_fast_or_long:
# 优先级 2.1: 尝试借用时间
next_entry = entries[i + 1] if i + 1 < total_count else None
if next_entry:
gap_duration = (next_entry.start_td - entry.end_td).total_seconds()
if gap_duration > GAP_BORROW_THRESHOLD:
ideal_duration = entry.char_count / TARGET_SPEED_STRICT
needed_time = ideal_duration - entry.duration
available_time = gap_duration - GAP_SAFE_MARGIN
time_to_borrow = min(needed_time, available_time)
if time_to_borrow > 0:
new_end_td = entry.end_td + timedelta(seconds=time_to_borrow)
new_entry = SrtEntry(0, entry.start_td, new_end_td, entry.text)
new_entry.optimization_type = "stretched"
temp_entries.append(new_entry)
self.log_message(f"[{i+1}] 快速 -> 借用时间: '{entry.text}' (借用 {time_to_borrow:.2f}s)")
continue
# 优先级 2.2: 尝试LLM文本缩减
if use_llm and self.is_ollama_available:
self.log_message(f"[{i+1}] 快速 -> 尝试LLM缩减: '{entry.text}'")
shortened_text = self._call_llm_for_shorten(entry.text, model_name)
if shortened_text and len(shortened_text) < len(entry.text):
new_entry = SrtEntry(0, entry.start_td, entry.end_td, shortened_text)
if new_entry.speed <= FAST_SPEED_THRESHOLD * 1.1: # 允许一点点超速
new_entry.optimization_type = "shortened"
temp_entries.append(new_entry)
self.log_message(f"[{i+1}] -> LLM缩减成功: '{shortened_text}'")
continue
else:
self.log_message(f"[{i+1}] -> LLM缩减后仍过快, 放弃。")
else:
self.log_message(f"[{i+1}] -> LLM缩减失败或无效果。")
# 优先级 2.3: 句子拆分
parts = re.split(r'([。,、?!,?!])', entry.text)
sub_sentences = [p for p in ("".join(parts[i:i+2]) for i in range(0, len(parts), 2)) if p.strip()]
if len(sub_sentences) > 1:
self.log_message(f"[{i+1}] 快速/长句 -> 拆分: '{entry.text}'")
total_chars = sum(len(re.sub(r'[\s,.?!。,、?!]', '', s)) for s in sub_sentences)
if total_chars == 0: temp_entries.append(entry); continue
available_speech_time = entry.duration - (len(sub_sentences) - 1) * NATURAL_PAUSE_DURATION
if available_speech_time > 0.1:
current_start_td = entry.start_td
for k, sub_text in enumerate(sub_sentences):
sub_chars = len(re.sub(r'[\s,.?!。,、?!]', '', sub_text))
sub_duration_sec = (sub_chars / total_chars) * available_speech_time
sub_duration_td = timedelta(seconds=sub_duration_sec)
new_entry = SrtEntry(0, current_start_td, current_start_td + sub_duration_td, sub_text)
new_entry.optimization_type = "split"
temp_entries.append(new_entry)
current_start_td += sub_duration_td + timedelta(seconds=NATURAL_PAUSE_DURATION)
continue
# --- 默认策略: 无需处理 ---
entry.optimization_type = "original"
temp_entries.append(entry)
return temp_entries
def _perform_suturing(self, entries):
if not entries: return []
final_entries = []; merge_buffer = []
def flush_buffer():
nonlocal merge_buffer
if not merge_buffer: return
if len(merge_buffer) == 1: final_entries.append(merge_buffer[0])
else:
start_time = merge_buffer[0].start_td; end_time = merge_buffer[-1].end_td
combined_text = " ".join(e.text for e in merge_buffer)
merged_entry = SrtEntry(0, start_time, end_time, combined_text)
merged_entry.optimization_type = "compressed" # 缝合的也算一种压缩
final_entries.append(merged_entry)
merge_buffer = []
for entry in entries:
if entry.duration < MIN_DURATION_THRESHOLD or entry.char_count < MIN_CHARS_THRESHOLD:
merge_buffer.append(entry)
else: flush_buffer(); final_entries.append(entry)
flush_buffer()
return final_entries
if __name__ == "__main__":
root = tk.Tk()
app = App(root)
root.mainloop()