上传文件至「/」

This commit is contained in:
2025-11-05 10:37:09 +08:00
commit 222556cfbd
5 changed files with 1241 additions and 0 deletions

434
main_v1.py Normal file
View File

@@ -0,0 +1,434 @@
import tkinter as tk
from tkinter import filedialog, ttk, scrolledtext, messagebox
import sv_ttk
import asyncio
import edge_tts
import os
import re
import subprocess
import threading
import queue
from datetime import timedelta
import json
import uuid
import platform
# --- 默认配置 ---
DEFAULT_CONFIG = { "voice": "zh-CN-XiaoxiaoNeural", "volume": -16.0, "format": "mp3", "base_cps": 5.0, "max_concurrency": 10, "audition_text": "你好,这是一个声音样本试听。", "output_dir": "", "merge_video": False, "keep_original_audio": False, "video_path": "", "keep_intermediate_audio": False, "burn_subtitles": False, "encoder": "CPU (libx264)", "subtitle_fontsize": 14 }
CONFIG_FILE = "config.json"
# --- 自定义控件 ---
class CollapsiblePane(ttk.Frame):
def __init__(self, parent, text="", initial_state='expanded'):
super().__init__(parent); self.columnconfigure(0, weight=1); self.text = text
self._variable = tk.BooleanVar(value=(initial_state == 'expanded'))
self.button = ttk.Button(self, text=f"{self.text}", command=self.toggle, style="TButton")
self.button.grid(row=0, column=0, sticky="ew")
self.content_frame = ttk.Frame(self, padding=(10, 5))
self._variable.trace_add("write", self._update_button_text); self._update_content()
def toggle(self): self._variable.set(not self.get()); self._update_content()
def get(self): return self._variable.get()
def _update_content(self):
if self.get(): self.content_frame.grid(row=1, column=0, sticky="ew")
else: self.content_frame.grid_remove()
def _update_button_text(self, *args): self.button.config(text=f"{'' if self.get() else ''} {self.text}")
# --- 辅助函数和类 ---
class SrtEntry:
def __init__(self, index, start, end, text):
self.index=int(index); self.start=self._to_timedelta(start); self.end=self._to_timedelta(end); self.text=text.strip(); self.duration=(self.end-self.start).total_seconds()
@staticmethod
def _to_timedelta(time_str):
t=time_str.replace(',','.'); p=t.split('.'); m=p[0]; ms=int(p[1]) if len(p)>1 else 0
h,mi,s=map(int,m.split(':')); return timedelta(hours=h,minutes=mi,seconds=s,milliseconds=ms)
def parse_srt(srt_content):
entries = []; p = re.compile(r'(\d+)\s*[\r\n]+(\d{2}:\d{2}:\d{2}[,.]\d{3})\s*-->\s*(\d{2}:\d{2}:\d{2}[,.]\d{3})\s*[\r\n]+([\s\S]+?)(?=(?:\r\n?|\n){2,}|$)', re.UNICODE)
for m in p.finditer(srt_content): entries.append(SrtEntry(*m.groups()))
return entries
def srt_to_ass(srt_content, style_options):
style_options['fontsize'] = str(style_options.get('fontsize', 40)) # 确保是字符串
ass_header = f"""[Script Info]
Title: Generated by AI Video Dubbing Tool; ScriptType: v4.00+; WrapStyle: 0; PlayResX: 1920; PlayResY: 1080
[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
Style: {style_options['name']},{style_options['fontname']},{style_options['fontsize']},{style_options['primary_colour']},{style_options['secondary_colour']},{style_options['outline_colour']},{style_options['back_colour']},{style_options['bold']},{style_options['italic']},{style_options['underline']},{style_options['strikeout']},{style_options['scale_x']},{style_options['scale_y']},{style_options['spacing']},{style_options['angle']},{style_options['border_style']},{style_options['outline']},{style_options['shadow']},{style_options['alignment']},{style_options['margin_l']},{style_options['margin_r']},{style_options['margin_v']},{style_options['encoding']}
[Events]
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
"""
ass_lines = []
srt_pattern = re.compile(r'(\d+)\n(\d{2}:\d{2}:\d{2},\d{3}) --> (\d{2}:\d{2}:\d{2},\d{3})\n([\s\S]*?)(?=\n\n|\Z)', re.MULTILINE)
for match in srt_pattern.finditer(srt_content):
start_time, end_time, text = match.group(2), match.group(3), match.group(4)
start_ass = start_time.replace(',', '.')[:-1]; end_ass = end_time.replace(',', '.')[:-1]
text_ass = text.strip().replace('\n', '\\N')
ass_lines.append(f"Dialogue: 0,{start_ass},{end_ass},{style_options['name']},,0,0,0,,{text_ass}")
return ass_header + "\n".join(ass_lines)
# --- 核心处理逻辑 ---
class Processor:
def __init__(self, srt_path, config, gui_queue):
self.srt_path = srt_path
self.config = config
self.gui_queue = gui_queue
self.base_name = os.path.splitext(os.path.basename(srt_path))[0]
output_dir = self.config['output_dir'] or os.path.dirname(os.path.abspath(srt_path))
self.output_dir = output_dir
self.cache_dir = os.path.join(self.output_dir, f"{self.base_name}_cache")
self.audio_output_path = os.path.join(self.output_dir, f"{self.base_name}.{self.config['format']}")
self.is_cancelled = threading.Event()
def log(self, message): self.gui_queue.put({"type": "log", "data": message})
def update_progress(self, current, total, status_text=""): self.gui_queue.put({"type": "progress", "current": current, "total": total, "status": status_text})
async def _generate_clip_worker(self, entry, semaphore):
async with semaphore:
if self.is_cancelled.is_set():
return (entry.index, "cancelled")
clip_path = os.path.join(self.cache_dir, f"{entry.index}.mp3")
if os.path.exists(clip_path) and os.path.getsize(clip_path) > 0:
self.log(f"片段 {entry.index} 缓存已存在.")
return (entry.index, "success")
clean_text = re.sub(r'\s+', '', entry.text)
char_count = len(clean_text)
if char_count < 2 or entry.duration <= 0.2:
self.log(f"!! 警告: 片段 {entry.index} 因文本过短或时长不足而被跳过.")
return (entry.index, "skipped")
rate_p = round(((char_count / entry.duration) / self.config['base_cps'] - 1) * 100)
rate_p = max(-90, min(200, rate_p))
rate_s = f"+{rate_p}%" if rate_p >= 0 else f"{rate_p}%"
self.log(f"生成片段 {entry.index}: 时长={entry.duration:.2f}s, 字数={char_count}, 速率={rate_s}")
max_retries = 3
for attempt in range(max_retries):
if self.is_cancelled.is_set():
return (entry.index, "cancelled")
try:
comm = edge_tts.Communicate(entry.text, self.config['voice'], rate=rate_s)
await comm.save(clip_path)
# 即使 await 成功,也再次确认文件有效性
if not os.path.exists(clip_path) or os.path.getsize(clip_path) == 0:
raise ValueError("生成的音频文件为空或无效.")
return (entry.index, "success") # 成功后立即返回
except Exception as e:
# <--- 关键优化: 异常后复核文件是否存在 ---
# 检查是否因为网络抖动,库报错了但文件已成功写入
if os.path.exists(clip_path) and os.path.getsize(clip_path) > 0:
self.log(f"片段 {entry.index} 捕获到异常,但文件已成功生成,视为成功。")
return (entry.index, "success")
# 如果文件确实没生成,执行智能重试
backoff_time = 2 ** (attempt + 1) # 2, 4, 8 秒
self.log(f"!! 错误: 片段 {entry.index} (尝试 {attempt + 1}/{max_retries}): {e}")
if attempt < max_retries - 1:
self.log(f" 将在 {backoff_time} 秒后重试...")
await asyncio.sleep(backoff_time)
self.log(f"!! 严重: 片段 {entry.index} 在多次尝试后仍然失败,将被跳过。")
return (entry.index, "failed")
async def _generate_all_clips_async(self, entries):
self.log(f"\n--- 阶段1: 生成语音 (并发数: {self.config['max_concurrency']}) ---")
sem = asyncio.Semaphore(self.config['max_concurrency'])
tasks = [self._generate_clip_worker(e, sem) for e in entries]
successful_clips = 0
failed_clips = []
for i, f in enumerate(asyncio.as_completed(tasks)):
if self.is_cancelled.is_set():
raise InterruptedError("任务取消")
index, status = await f
if status == "success":
successful_clips += 1
elif status == "failed":
failed_clips.append(index)
self.update_progress(i + 1, self.total_steps, f"生成片段 {i+1}/{len(entries)}")
if failed_clips:
self.log(f"\n!! 警告: 以下语音片段生成失败: {', '.join(map(str, sorted(failed_clips)))}")
if successful_clips == 0:
raise RuntimeError("没有任何语音片段成功生成,任务中止。")
self.log(f"语音生成完成: {successful_clips} 个成功, {len(failed_clips)} 个失败。")
def _merge_media_sync(self, entries):
self.log(f"\n--- 阶段2: 合并音频 ---")
clips = sorted([e for e in entries if os.path.exists(os.path.join(self.cache_dir, f"{e.index}.mp3")) and os.path.getsize(os.path.join(self.cache_dir, f"{e.index}.mp3")) > 0], key=lambda e: e.index)
if not clips:
self.log("!! 错误: 没有可用的音频片段进行合并。")
raise RuntimeError("没有可用的音频片段。")
cmd=['ffmpeg','-y']
filters=[]
inputs=""
for i,e in enumerate(clips):
cmd.extend(['-i', os.path.join(self.cache_dir, f"{e.index}.mp3")])
delay=int(e.start.total_seconds()*1000)
filters.append(f"[{i}:a]adelay={delay}|{delay}[a{i}]")
inputs += f"[a{i}]"
filters.extend([f"{inputs}amix=inputs={len(clips)}:normalize=0[merged]", f"[merged]loudnorm=I={self.config['volume']}:LRA=11:TP=-1.5"])
cmd.extend(['-filter_complex', ";".join(filters), self.audio_output_path])
if self.run_ffmpeg_sync(cmd, "合并音频") != 0: raise RuntimeError("音频合并失败")
self.log("音频合并成功!")
if self.is_cancelled.is_set(): raise InterruptedError("任务取消")
if self.config['merge_video']:
self.log("\n--- 阶段3: 与视频合成 ---"); self.update_progress(len(entries), self.total_steps, "视频合成中...")
video_path=self.config['video_path']; v_ext=os.path.splitext(video_path)[1]; v_out_path=os.path.join(self.output_dir,f"{self.base_name}_dubbed{v_ext}"); cmd=['ffmpeg','-y','-i',video_path,'-i',self.audio_output_path]
video_codec, audio_codec = self.get_codecs()
vf_options = []
if self.config['burn_subtitles']:
ass_path = self.prepare_ass_subtitle(); escaped_ass_path = ass_path.replace('\\', '/').replace(':', '\\:')
vf_options.append(f"subtitles='{escaped_ass_path}'"); self.log(f"准备烧录字幕: {os.path.basename(ass_path)}")
if self.config['keep_original_audio']:
self.log("模式: 创建双音轨"); locale=self.config['voice'].split('-')[0]
cmd.extend(['-map','0:v:0','-map','0:a:0','-map','1:a:0', '-metadata:s:a:0','language=und','-metadata:s:a:0','title=Original', '-metadata:s:a:1',f'language={locale}','-metadata:s:a:1','title=AI Dubbing', '-disposition:a:1','default'])
else: self.log("模式: 替换音轨"); cmd.extend(['-map','0:v:0','-map','1:a:0'])
cmd.extend(['-c:v', video_codec, '-c:a', audio_codec])
if vf_options: cmd.extend(['-vf', ",".join(vf_options)])
cmd.append(v_out_path)
if self.run_ffmpeg_sync(cmd, "视频合成") != 0: raise RuntimeError("视频合成失败")
self.log(f"视频合成成功! 输出: {v_out_path}")
if self.config['merge_video'] and not self.config['keep_intermediate_audio']:
try: self.log("删除中间音频文件..."); os.remove(self.audio_output_path)
except OSError: pass
def get_codecs(self):
encoder_map = {"CPU (libx264)": "libx264", "NVIDIA (h264_nvenc)": "h264_nvenc", "AMD (h264_amf)": "h264_amf", "Intel (h264_qsv)": "h264_qsv"}
audio_codec = "aac"
if self.config['burn_subtitles']: return encoder_map.get(self.config['encoder'], "libx264"), audio_codec
else: return "copy", "copy"
def prepare_ass_subtitle(self):
style = {'name': 'Default','fontname': '微软雅黑','fontsize': self.config['subtitle_fontsize'],'primary_colour': '&H00FFFFFF','secondary_colour': '&H000000FF','outline_colour': '&H00000000','back_colour': '&H00000000','bold': '0','italic': '0','underline': '0','strikeout': '0','scale_x': '100','scale_y': '100','spacing': '0','angle': '0','border_style': '1','outline': '2','shadow': '1','alignment': '2','margin_l': '10','margin_r': '10','margin_v': '30','encoding': '1'}
ass_content = srt_to_ass(self.srt_content, style); ass_path = os.path.join(self.cache_dir, f"{self.base_name}.ass")
with open(ass_path, 'w', encoding='utf-8') as f: f.write(ass_content)
return ass_path
def run_ffmpeg_sync(self, cmd, stage="FFmpeg"):
self.log(f"执行 {stage} 命令...");
creationflags = 0
if platform.system() == "Windows": creationflags = subprocess.CREATE_NO_WINDOW
process = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8', creationflags=creationflags)
if process.returncode != 0:
self.log(f"!! {stage} 失败,返回码: {process.returncode}");
self.log("--- FFmpeg 错误输出 ---"); self.log(process.stderr); self.log("------------------------")
return process.returncode
def run(self):
try:
self.log("--- 开始处理 ---"); os.makedirs(self.cache_dir, exist_ok=True)
with open(self.srt_path, 'r', encoding='utf-8-sig') as f: self.srt_content=f.read()
entries = parse_srt(self.srt_content)
if not entries: raise RuntimeError("解析SRT失败")
self.log(f"解析到 {len(entries)} 条字幕.")
self.total_steps=len(entries) + (1 if self.config['merge_video'] else 0)
asyncio.run(self._generate_all_clips_async(entries))
if self.is_cancelled.is_set(): raise InterruptedError("任务取消")
self._merge_media_sync(entries)
self.update_progress(self.total_steps, self.total_steps, "全部完成!"); self.log("\n--- 全部完成!---")
self.gui_queue.put({"type":"finish", "success":True, "output_dir":self.output_dir})
except Exception as e:
if not isinstance(e, (InterruptedError, RuntimeError)): self.log(f"发生未知严重错误: {e}")
self.gui_queue.put({"type": "finish", "success": False})
# --- GUI 应用 ---
# [ App 类及以下所有代码保持不变,无需修改 ]
class App:
def __init__(self, root):
self.root = root; self.root.title("AI视频配音合成工具 V15.3 (终极优化版)"); self.root.geometry("800x800")
self.load_config(); self.gui_queue = queue.Queue()
self.vars = { "srt_path": tk.StringVar(), "output_dir": tk.StringVar(value=self.config["output_dir"]), "voice": tk.StringVar(value=self.config["voice"]), "volume": tk.DoubleVar(value=self.config["volume"]), "format": tk.StringVar(value=self.config["format"]), "status": tk.StringVar(value="准备就绪"), "merge_video": tk.BooleanVar(value=self.config["merge_video"]), "keep_original_audio": tk.BooleanVar(value=self.config["keep_original_audio"]), "video_path": tk.StringVar(value=self.config["video_path"]), "keep_intermediate_audio": tk.BooleanVar(value=self.config["keep_intermediate_audio"]), "burn_subtitles": tk.BooleanVar(value=self.config["burn_subtitles"]), "encoder": tk.StringVar(value=self.config["encoder"]), "subtitle_fontsize": tk.IntVar(value=self.config["subtitle_fontsize"]) }
self.build_ui(); sv_ttk.set_theme("dark")
self.root.after(100, self.load_voices); self.root.after(100, self.process_queue); self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
def log(self, message): self.gui_queue.put({"type": "log", "data": message})
def build_ui(self):
main_frame = ttk.Frame(self.root, padding=10); main_frame.pack(fill=tk.BOTH, expand=True); main_frame.rowconfigure(4, weight=1); main_frame.columnconfigure(0, weight=1)
pane1 = CollapsiblePane(main_frame, "基础设置"); pane1.grid(row=0, column=0, sticky="ew", pady=(0, 5))
f1=ttk.Frame(pane1.content_frame); f1.pack(fill=tk.X,expand=True,pady=(0,5)); ttk.Label(f1,text="SRT文件:").pack(side=tk.LEFT,padx=(0,5)); ttk.Entry(f1,textvariable=self.vars['srt_path']).pack(side=tk.LEFT,fill=tk.X,expand=True,padx=(0,5)); ttk.Button(f1,text="选择...",command=self.select_srt_file).pack(side=tk.LEFT)
f2=ttk.Frame(pane1.content_frame); f2.pack(fill=tk.X,expand=True); ttk.Label(f2,text="输出目录:").pack(side=tk.LEFT,padx=(0,5)); ttk.Entry(f2,textvariable=self.vars['output_dir']).pack(side=tk.LEFT,fill=tk.X,expand=True,padx=(0,5)); ttk.Button(f2,text="选择...",command=self.select_output_dir).pack(side=tk.LEFT,padx=(0,5)); self.open_dir_button=ttk.Button(f2,text="打开",command=self.open_output_dir,state="disabled"); self.open_dir_button.pack(side=tk.LEFT)
pane3 = CollapsiblePane(main_frame, "音频参数"); pane3.grid(row=1, column=0, sticky="ew", pady=(0, 5))
f3=ttk.Frame(pane3.content_frame); f3.pack(fill=tk.X,expand=True,pady=2); ttk.Label(f3,text="配音员:").pack(side=tk.LEFT,padx=(0,10)); self.voice_combo=ttk.Combobox(f3,textvariable=self.vars['voice'],state="readonly",width=30); self.voice_combo.pack(side=tk.LEFT,fill=tk.X,expand=True); self.audition_button=ttk.Button(f3,text="试听",command=self.audition_voice,state="disabled"); self.audition_button.pack(side=tk.LEFT,padx=(5,0))
f4=ttk.Frame(pane3.content_frame); f4.pack(fill=tk.X,expand=True,pady=2); ttk.Label(f4,text="响度:").pack(side=tk.LEFT,padx=(0,10)); self.volume_scale=ttk.Scale(f4,from_=-24,to=-12,orient=tk.HORIZONTAL,variable=self.vars['volume'],command=lambda v:self.volume_label.config(text=f"{float(v):.1f} LUFS")); self.volume_scale.pack(side=tk.LEFT,fill=tk.X,expand=True); self.volume_label=ttk.Label(f4,text=f"{self.vars['volume'].get():.1f} LUFS",width=10); self.volume_label.pack(side=tk.LEFT,padx=(5,0))
f5=ttk.Frame(pane3.content_frame); f5.pack(fill=tk.X,expand=True,pady=2); ttk.Label(f5,text="格式:").pack(side=tk.LEFT,padx=(0,10)); self.format_combo=ttk.Combobox(f5,textvariable=self.vars['format'],values=["mp3","wav","aac"],state="readonly"); self.format_combo.pack(side=tk.LEFT,fill=tk.X,expand=True)
pane2 = CollapsiblePane(main_frame, "视频合成 (可选)", initial_state='collapsed'); pane2.grid(row=2, column=0, sticky="ew", pady=(0, 5))
ttk.Checkbutton(pane2.content_frame, text="启用视频合成功能", variable=self.vars['merge_video']).pack(anchor='w')
f6=ttk.Frame(pane2.content_frame); f6.pack(fill=tk.X,expand=True,pady=5); ttk.Label(f6,text="视频文件:").pack(side=tk.LEFT,padx=(0,5)); ttk.Entry(f6,textvariable=self.vars['video_path']).pack(side=tk.LEFT,fill=tk.X,expand=True,padx=(0,5)); ttk.Button(f6,text="选择...",command=self.select_video_file).pack(side=tk.LEFT)
f7=ttk.Frame(pane2.content_frame); f7.pack(fill=tk.X,expand=True); ttk.Checkbutton(f7,text="保留原始音轨",variable=self.vars['keep_original_audio']).pack(side=tk.LEFT); ttk.Checkbutton(f7,text="保留独立音频",variable=self.vars['keep_intermediate_audio']).pack(side=tk.LEFT,padx=(20,0))
self.pane4 = CollapsiblePane(main_frame, "字幕烧录 (可选)", initial_state='collapsed'); self.pane4.grid(row=3, column=0, sticky="ew", pady=(0, 10))
ttk.Checkbutton(self.pane4.content_frame, text="将字幕烧录到视频画面中", variable=self.vars['burn_subtitles']).pack(anchor='w')
self.encoder_frame = ttk.Frame(self.pane4.content_frame); self.encoder_frame.pack(fill=tk.X, expand=True, pady=5)
f_enc1=ttk.Frame(self.encoder_frame); f_enc1.pack(fill=tk.X, expand=True); ttk.Label(f_enc1, text="编码器:").pack(side=tk.LEFT, padx=(0,5)); self.encoder_combo = ttk.Combobox(f_enc1, textvariable=self.vars['encoder'], state="readonly"); self.encoder_combo.pack(side=tk.LEFT, fill=tk.X, expand=True)
f_enc2=ttk.Frame(self.encoder_frame); f_enc2.pack(fill=tk.X, expand=True, pady=5); ttk.Label(f_enc2, text="字幕字号:").pack(side=tk.LEFT, padx=(0,5)); ttk.Spinbox(f_enc2, from_=10, to=100, textvariable=self.vars['subtitle_fontsize'], width=10).pack(side=tk.LEFT); self.preview_button = ttk.Button(f_enc2, text="生成5秒预览", command=self.create_preview); self.preview_button.pack(side=tk.LEFT, padx=(10,0))
log_pane=ttk.Frame(main_frame); log_pane.grid(row=4,column=0,sticky="nsew"); log_pane.rowconfigure(1,weight=1); log_pane.columnconfigure(0,weight=1)
f8=ttk.Frame(log_pane); f8.pack(fill=tk.X,pady=(0,5)); self.progress_bar=ttk.Progressbar(f8,orient=tk.HORIZONTAL); self.progress_bar.pack(fill=tk.X,expand=True,side=tk.LEFT,padx=(0,10)); self.status_label=ttk.Label(f8,textvariable=self.vars['status']); self.status_label.pack(side=tk.LEFT)
self.log_text=scrolledtext.ScrolledText(log_pane,wrap=tk.WORD,state="disabled"); self.log_text.pack(fill=tk.BOTH,expand=True)
f9=ttk.Frame(main_frame); f9.grid(row=5,column=0,sticky="ew",pady=(10,0)); f9.columnconfigure(0,weight=1)
self.start_button=ttk.Button(f9,text="开始生成",command=self.start_processing,style="Accent.TButton"); self.start_button.pack(side=tk.RIGHT)
self.cancel_button=ttk.Button(f9,text="取消任务",command=self.cancel_processing,state="disabled"); self.cancel_button.pack(side=tk.RIGHT,padx=(0,5))
def load_config(self):
try:
with open(CONFIG_FILE,'r',encoding='utf-8') as f: self.config=json.load(f)
except(FileNotFoundError,json.JSONDecodeError): self.config=DEFAULT_CONFIG
for key,value in DEFAULT_CONFIG.items(): self.config.setdefault(key,value)
def save_config(self):
for key, var in self.vars.items(): self.config[key] = var.get().split(' ')[0] if key == "voice" else var.get()
with open(CONFIG_FILE,'w',encoding='utf-8') as f: json.dump(self.config, f, indent=4, ensure_ascii=False)
def on_closing(self): self.save_config(); self.root.destroy()
def select_srt_file(self):
path=filedialog.askopenfilename(filetypes=[("SRT Subtitles","*.srt")]);
if path:
self.vars['srt_path'].set(path); srt_dir=os.path.dirname(path); base_name=os.path.splitext(os.path.basename(path))[0]
if not self.vars['output_dir'].get(): self.vars['output_dir'].set(srt_dir); self.open_dir_button.config(state="normal")
for ext in ['.mp4','.mkv','.avi','.mov','.webm']:
if os.path.exists(os.path.join(srt_dir,base_name+ext)): self.vars['video_path'].set(os.path.join(srt_dir,base_name+ext)); break
def select_output_dir(self):
path=filedialog.askdirectory(title="选择输出目录")
if path: self.vars['output_dir'].set(path); self.open_dir_button.config(state="normal")
def open_output_dir(self):
path=self.vars['output_dir'].get()
if path and os.path.isdir(path):
if platform.system() == "Windows": os.startfile(path)
else: subprocess.Popen(['open', path])
else: messagebox.showwarning("目录无效", "输出目录不存在或无效.")
def select_video_file(self):
path=filedialog.askopenfilename(filetypes=[("Video Files","*.mp4 *.mkv *.avi *.mov"),("All Files", "*.*")])
if path: self.vars['video_path'].set(path)
def audition_voice(self):
voice=self.vars['voice'].get().split(' ')[0]
if not voice: return
self.audition_button.config(state="disabled"); self.log(f"正在试听 {voice}...")
def _audition():
try:
out_dir = os.path.dirname(os.path.abspath(__file__))
cache=os.path.join(out_dir,"_audition_temp_cache"); os.makedirs(cache,exist_ok=True); tmp_file=os.path.join(cache,"_audition_temp.mp3")
async def save():
comm = edge_tts.Communicate(self.config['audition_text'], voice)
await comm.save(tmp_file)
asyncio.run(save())
if os.path.exists(tmp_file) and os.path.getsize(tmp_file) > 0:
if platform.system() == "Windows": os.startfile(tmp_file)
else: subprocess.Popen(['xdg-open', tmp_file])
else: raise Exception("生成的试听文件为空。")
except Exception as e: self.gui_queue.put({"type":"log", "data":f"!! 试听失败: {e}"})
finally: self.gui_queue.put({"type":"audition_done"})
threading.Thread(target=_audition,daemon=True).start()
def create_preview(self):
srt_path = self.vars['srt_path'].get(); video_path = self.vars['video_path'].get()
if not srt_path or not os.path.exists(srt_path): messagebox.showerror("错误","请先选择有效的SRT文件。"); return
if not video_path or not os.path.exists(video_path): messagebox.showerror("错误","请先选择有效的视频文件。"); return
self.preview_button.config(state="disabled"); self.log("正在生成预览切片...")
def _preview():
try:
with open(srt_path, 'r', encoding='utf-8-sig') as f: content=f.read()
entries = parse_srt(content)
if not entries: raise Exception("SRT文件为空或格式错误。")
preview_entry = sorted(entries, key=lambda e: e.duration, reverse=True)[0]
out_dir = self.vars['output_dir'].get() or os.path.dirname(srt_path)
preview_file = os.path.join(out_dir, "_preview.mp4")
style = {'name': 'Default','fontname': '微软雅黑','fontsize': self.vars['subtitle_fontsize'].get(),'primary_colour': '&H00FFFFFF','secondary_colour': '&H000000FF','outline_colour': '&H00000000','back_colour': '&H00000000','bold': '0','italic': '0','underline': '0','strikeout': '0','scale_x': '100','scale_y': '100','spacing': '0','angle': '0','border_style': '1','outline': '2','shadow': '1','alignment': '2','margin_l': '10','margin_r': '10','margin_v': '30','encoding': '1'}
ass_content = srt_to_ass(content, style)
ass_path = os.path.join(out_dir, "_preview.ass")
with open(ass_path, 'w', encoding='utf-8') as f: f.write(ass_content)
start_time = max(0, preview_entry.start.total_seconds() - 1)
duration = preview_entry.duration + 2
escaped_ass_path = ass_path.replace('\\', '/').replace(':', '\\:')
vf = f"subtitles='{escaped_ass_path}'"
cmd = ['ffmpeg', '-y', '-ss', str(start_time), '-t', str(duration), '-i', video_path, '-vf', vf, '-c:v', 'libx264', '-preset', 'ultrafast', '-an', preview_file]
creationflags = subprocess.CREATE_NO_WINDOW if platform.system() == "Windows" else 0
result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8', creationflags=creationflags)
if result.returncode != 0: raise Exception(f"FFmpeg预览生成失败:\n{result.stderr}")
if os.path.exists(preview_file):
if platform.system() == "Windows": os.startfile(preview_file)
else: subprocess.Popen(['open', preview_file])
except Exception as e: self.gui_queue.put({"type":"log", "data":f"!! 预览失败: {e}"})
finally: self.gui_queue.put({"type":"preview_done"})
threading.Thread(target=_preview, daemon=True).start()
def start_processing(self):
if not self.vars['srt_path'].get() or not os.path.exists(self.vars['srt_path'].get()): messagebox.showerror("错误","请选择有效的SRT文件."); return
if not self.vars['output_dir'].get() or not os.path.isdir(self.vars['output_dir'].get()): messagebox.showerror("错误","请选择有效的输出目录."); return
if self.vars['merge_video'].get() and (not self.vars['video_path'].get() or not os.path.exists(self.vars['video_path'].get())): messagebox.showerror("错误","请选择有效的视频文件进行合并."); return
self.start_button.config(state="disabled"); self.cancel_button.config(state="normal")
self.log_text.config(state="normal"); self.log_text.delete(1.0,tk.END); self.log_text.config(state="disabled")
current_config=self.config.copy()
for key, var in self.vars.items(): current_config[key] = var.get().split(' ')[0] if key == "voice" else var.get()
processor=Processor(self.vars['srt_path'].get(),current_config,self.gui_queue); self.processor_thread=threading.Thread(target=processor.run,daemon=True); self.processor_thread.processor_instance=processor; self.processor_thread.start()
def cancel_processing(self):
if self.processor_thread and self.processor_thread.is_alive(): self.log("正在发送取消信号..."); self.processor_thread.processor_instance.is_cancelled.set()
def load_voices(self):
self.log("正在获取可用配音员列表...")
def _load():
try:
encoders = ["CPU (libx264)"]
try:
result = subprocess.run(['ffmpeg', '-hide_banner', '-encoders'], capture_output=True, text=True, encoding='utf-8', timeout=5)
if 'h264_nvenc' in result.stdout: encoders.append("NVIDIA (h264_nvenc)")
if 'h264_amf' in result.stdout: encoders.append("AMD (h264_amf)")
if 'h264_qsv' in result.stdout: encoders.append("Intel (h264_qsv)")
except Exception: pass
voices = asyncio.run(edge_tts.list_voices())
self.gui_queue.put({"type":"init_data", "voices":voices, "encoders":encoders})
except Exception as e:
self.gui_queue.put({"type":"log", "data":f"!! 致命错误: 获取配音员列表失败: {e}"})
if "401" in str(e): self.gui_queue.put({"type":"log", "data":"!! [建议]: 认证失败, 请升级 'pip install --upgrade edge-tts'"})
threading.Thread(target=_load, daemon=True).start()
def process_queue(self):
try:
while True:
msg=self.gui_queue.get_nowait()
msg_type = msg.get("type")
if msg_type=="log": self.log_text.config(state="normal"); self.log_text.insert(tk.END, msg["data"] + "\n"); self.log_text.see(tk.END); self.log_text.config(state="disabled")
elif msg_type=="progress": self.progress_bar['maximum']=msg['total']; self.progress_bar['value']=msg['current']; self.vars['status'].set(msg['status'])
elif msg_type=="finish":
self.start_button.config(state="normal"); self.cancel_button.config(state="disabled")
if msg["success"]: self.vars['status'].set("任务完成!"); self.vars['output_dir'].set(msg["output_dir"]); self.open_dir_button.config(state="normal"); messagebox.showinfo("成功", "任务已成功完成!")
else: self.vars['status'].set("任务失败或被取消"); messagebox.showwarning("任务中断", "任务失败或被用户取消。")
elif msg_type=="init_data":
voices_list=msg["voices"]; d_names=[f"{v['ShortName']} - {v['Gender']} ({v['Locale']})" for v in voices_list]
self.voice_combo['values']=d_names; saved_voice=self.config['voice']
for i,name in enumerate(d_names):
if name.startswith(saved_voice): self.voice_combo.current(i); break
else:
if d_names: self.voice_combo.current(0)
self.log("配音员列表加载完毕。"); self.audition_button.config(state="normal")
encoders = msg["encoders"]; self.encoder_combo['values'] = encoders
encoder_priority = ["NVIDIA", "AMD", "Intel", "CPU"]
best_encoder = next((enc for prio in encoder_priority for enc in encoders if prio in enc), encoders[0])
if self.vars['encoder'].get() not in encoders: self.vars['encoder'].set(best_encoder); self.log(f"自动选择最佳编码器: {best_encoder}")
self.log(f"检测到可用编码器: {', '.join(encoders)}")
elif msg_type=="audition_done": self.audition_button.config(state="normal"); self.log("试听完毕。")
elif msg_type=="preview_done": self.preview_button.config(state="normal"); self.log("预览生成完毕。")
except queue.Empty: pass
finally: self.root.after(100, self.process_queue)
if __name__ == "__main__":
if platform.system() == "Windows": asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
root = tk.Tk(); app = App(root); root.mainloop()