import tkinter as tk from tkinter import filedialog, ttk, scrolledtext, messagebox import sv_ttk import asyncio import edge_tts import os import re import subprocess import threading import queue from datetime import timedelta import json import uuid import platform # --- 默认配置 --- DEFAULT_CONFIG = { "voice": "zh-CN-XiaoxiaoNeural", "volume": -16.0, "format": "mp3", "base_cps": 5.0, "max_concurrency": 10, "audition_text": "你好,这是一个声音样本试听。", "output_dir": "", "merge_video": False, "keep_original_audio": False, "video_path": "", "keep_intermediate_audio": False, "burn_subtitles": False, "encoder": "CPU (libx264)", "subtitle_fontsize": 14 } CONFIG_FILE = "config.json" # --- 自定义控件 --- class CollapsiblePane(ttk.Frame): def __init__(self, parent, text="", initial_state='expanded'): super().__init__(parent); self.columnconfigure(0, weight=1); self.text = text self._variable = tk.BooleanVar(value=(initial_state == 'expanded')) self.button = ttk.Button(self, text=f"▼ {self.text}", command=self.toggle, style="TButton") self.button.grid(row=0, column=0, sticky="ew") self.content_frame = ttk.Frame(self, padding=(10, 5)) self._variable.trace_add("write", self._update_button_text); self._update_content() def toggle(self): self._variable.set(not self.get()); self._update_content() def get(self): return self._variable.get() def _update_content(self): if self.get(): self.content_frame.grid(row=1, column=0, sticky="ew") else: self.content_frame.grid_remove() def _update_button_text(self, *args): self.button.config(text=f"{'▼' if self.get() else '▶'} {self.text}") # --- 辅助函数和类 --- class SrtEntry: def __init__(self, index, start, end, text): self.index=int(index); self.start=self._to_timedelta(start); self.end=self._to_timedelta(end); self.text=text.strip(); self.duration=(self.end-self.start).total_seconds() @staticmethod def _to_timedelta(time_str): t=time_str.replace(',','.'); p=t.split('.'); m=p[0]; ms=int(p[1]) if len(p)>1 else 0 h,mi,s=map(int,m.split(':')); return timedelta(hours=h,minutes=mi,seconds=s,milliseconds=ms) def parse_srt(srt_content): entries = []; p = re.compile(r'(\d+)\s*[\r\n]+(\d{2}:\d{2}:\d{2}[,.]\d{3})\s*-->\s*(\d{2}:\d{2}:\d{2}[,.]\d{3})\s*[\r\n]+([\s\S]+?)(?=(?:\r\n?|\n){2,}|$)', re.UNICODE) for m in p.finditer(srt_content): entries.append(SrtEntry(*m.groups())) return entries def srt_to_ass(srt_content, style_options): style_options['fontsize'] = str(style_options.get('fontsize', 40)) # 确保是字符串 ass_header = f"""[Script Info] Title: Generated by AI Video Dubbing Tool; ScriptType: v4.00+; WrapStyle: 0; PlayResX: 1920; PlayResY: 1080 [V4+ Styles] Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding Style: {style_options['name']},{style_options['fontname']},{style_options['fontsize']},{style_options['primary_colour']},{style_options['secondary_colour']},{style_options['outline_colour']},{style_options['back_colour']},{style_options['bold']},{style_options['italic']},{style_options['underline']},{style_options['strikeout']},{style_options['scale_x']},{style_options['scale_y']},{style_options['spacing']},{style_options['angle']},{style_options['border_style']},{style_options['outline']},{style_options['shadow']},{style_options['alignment']},{style_options['margin_l']},{style_options['margin_r']},{style_options['margin_v']},{style_options['encoding']} [Events] Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text """ ass_lines = [] srt_pattern = re.compile(r'(\d+)\n(\d{2}:\d{2}:\d{2},\d{3}) --> (\d{2}:\d{2}:\d{2},\d{3})\n([\s\S]*?)(?=\n\n|\Z)', re.MULTILINE) for match in srt_pattern.finditer(srt_content): start_time, end_time, text = match.group(2), match.group(3), match.group(4) start_ass = start_time.replace(',', '.')[:-1]; end_ass = end_time.replace(',', '.')[:-1] text_ass = text.strip().replace('\n', '\\N') ass_lines.append(f"Dialogue: 0,{start_ass},{end_ass},{style_options['name']},,0,0,0,,{text_ass}") return ass_header + "\n".join(ass_lines) # --- 核心处理逻辑 --- class Processor: def __init__(self, srt_path, config, gui_queue): self.srt_path = srt_path self.config = config self.gui_queue = gui_queue self.base_name = os.path.splitext(os.path.basename(srt_path))[0] output_dir = self.config['output_dir'] or os.path.dirname(os.path.abspath(srt_path)) self.output_dir = output_dir self.cache_dir = os.path.join(self.output_dir, f"{self.base_name}_cache") self.audio_output_path = os.path.join(self.output_dir, f"{self.base_name}.{self.config['format']}") self.is_cancelled = threading.Event() def log(self, message): self.gui_queue.put({"type": "log", "data": message}) def update_progress(self, current, total, status_text=""): self.gui_queue.put({"type": "progress", "current": current, "total": total, "status": status_text}) async def _generate_clip_worker(self, entry, semaphore): async with semaphore: if self.is_cancelled.is_set(): return (entry.index, "cancelled") clip_path = os.path.join(self.cache_dir, f"{entry.index}.mp3") if os.path.exists(clip_path) and os.path.getsize(clip_path) > 0: self.log(f"片段 {entry.index} 缓存已存在.") return (entry.index, "success") clean_text = re.sub(r'\s+', '', entry.text) char_count = len(clean_text) if char_count < 2 or entry.duration <= 0.2: self.log(f"!! 警告: 片段 {entry.index} 因文本过短或时长不足而被跳过.") return (entry.index, "skipped") rate_p = round(((char_count / entry.duration) / self.config['base_cps'] - 1) * 100) rate_p = max(-90, min(200, rate_p)) rate_s = f"+{rate_p}%" if rate_p >= 0 else f"{rate_p}%" self.log(f"生成片段 {entry.index}: 时长={entry.duration:.2f}s, 字数={char_count}, 速率={rate_s}") max_retries = 3 for attempt in range(max_retries): if self.is_cancelled.is_set(): return (entry.index, "cancelled") try: comm = edge_tts.Communicate(entry.text, self.config['voice'], rate=rate_s) await comm.save(clip_path) # 即使 await 成功,也再次确认文件有效性 if not os.path.exists(clip_path) or os.path.getsize(clip_path) == 0: raise ValueError("生成的音频文件为空或无效.") return (entry.index, "success") # 成功后立即返回 except Exception as e: # <--- 关键优化: 异常后复核文件是否存在 --- # 检查是否因为网络抖动,库报错了但文件已成功写入 if os.path.exists(clip_path) and os.path.getsize(clip_path) > 0: self.log(f"片段 {entry.index} 捕获到异常,但文件已成功生成,视为成功。") return (entry.index, "success") # 如果文件确实没生成,执行智能重试 backoff_time = 2 ** (attempt + 1) # 2, 4, 8 秒 self.log(f"!! 错误: 片段 {entry.index} (尝试 {attempt + 1}/{max_retries}): {e}") if attempt < max_retries - 1: self.log(f" 将在 {backoff_time} 秒后重试...") await asyncio.sleep(backoff_time) self.log(f"!! 严重: 片段 {entry.index} 在多次尝试后仍然失败,将被跳过。") return (entry.index, "failed") async def _generate_all_clips_async(self, entries): self.log(f"\n--- 阶段1: 生成语音 (并发数: {self.config['max_concurrency']}) ---") sem = asyncio.Semaphore(self.config['max_concurrency']) tasks = [self._generate_clip_worker(e, sem) for e in entries] successful_clips = 0 failed_clips = [] for i, f in enumerate(asyncio.as_completed(tasks)): if self.is_cancelled.is_set(): raise InterruptedError("任务取消") index, status = await f if status == "success": successful_clips += 1 elif status == "failed": failed_clips.append(index) self.update_progress(i + 1, self.total_steps, f"生成片段 {i+1}/{len(entries)}") if failed_clips: self.log(f"\n!! 警告: 以下语音片段生成失败: {', '.join(map(str, sorted(failed_clips)))}") if successful_clips == 0: raise RuntimeError("没有任何语音片段成功生成,任务中止。") self.log(f"语音生成完成: {successful_clips} 个成功, {len(failed_clips)} 个失败。") def _merge_media_sync(self, entries): self.log(f"\n--- 阶段2: 合并音频 ---") clips = sorted([e for e in entries if os.path.exists(os.path.join(self.cache_dir, f"{e.index}.mp3")) and os.path.getsize(os.path.join(self.cache_dir, f"{e.index}.mp3")) > 0], key=lambda e: e.index) if not clips: self.log("!! 错误: 没有可用的音频片段进行合并。") raise RuntimeError("没有可用的音频片段。") cmd=['ffmpeg','-y'] filters=[] inputs="" for i,e in enumerate(clips): cmd.extend(['-i', os.path.join(self.cache_dir, f"{e.index}.mp3")]) delay=int(e.start.total_seconds()*1000) filters.append(f"[{i}:a]adelay={delay}|{delay}[a{i}]") inputs += f"[a{i}]" filters.extend([f"{inputs}amix=inputs={len(clips)}:normalize=0[merged]", f"[merged]loudnorm=I={self.config['volume']}:LRA=11:TP=-1.5"]) cmd.extend(['-filter_complex', ";".join(filters), self.audio_output_path]) if self.run_ffmpeg_sync(cmd, "合并音频") != 0: raise RuntimeError("音频合并失败") self.log("音频合并成功!") if self.is_cancelled.is_set(): raise InterruptedError("任务取消") if self.config['merge_video']: self.log("\n--- 阶段3: 与视频合成 ---"); self.update_progress(len(entries), self.total_steps, "视频合成中...") video_path=self.config['video_path']; v_ext=os.path.splitext(video_path)[1]; v_out_path=os.path.join(self.output_dir,f"{self.base_name}_dubbed{v_ext}"); cmd=['ffmpeg','-y','-i',video_path,'-i',self.audio_output_path] video_codec, audio_codec = self.get_codecs() vf_options = [] if self.config['burn_subtitles']: ass_path = self.prepare_ass_subtitle(); escaped_ass_path = ass_path.replace('\\', '/').replace(':', '\\:') vf_options.append(f"subtitles='{escaped_ass_path}'"); self.log(f"准备烧录字幕: {os.path.basename(ass_path)}") if self.config['keep_original_audio']: self.log("模式: 创建双音轨"); locale=self.config['voice'].split('-')[0] cmd.extend(['-map','0:v:0','-map','0:a:0','-map','1:a:0', '-metadata:s:a:0','language=und','-metadata:s:a:0','title=Original', '-metadata:s:a:1',f'language={locale}','-metadata:s:a:1','title=AI Dubbing', '-disposition:a:1','default']) else: self.log("模式: 替换音轨"); cmd.extend(['-map','0:v:0','-map','1:a:0']) cmd.extend(['-c:v', video_codec, '-c:a', audio_codec]) if vf_options: cmd.extend(['-vf', ",".join(vf_options)]) cmd.append(v_out_path) if self.run_ffmpeg_sync(cmd, "视频合成") != 0: raise RuntimeError("视频合成失败") self.log(f"视频合成成功! 输出: {v_out_path}") if self.config['merge_video'] and not self.config['keep_intermediate_audio']: try: self.log("删除中间音频文件..."); os.remove(self.audio_output_path) except OSError: pass def get_codecs(self): encoder_map = {"CPU (libx264)": "libx264", "NVIDIA (h264_nvenc)": "h264_nvenc", "AMD (h264_amf)": "h264_amf", "Intel (h264_qsv)": "h264_qsv"} audio_codec = "aac" if self.config['burn_subtitles']: return encoder_map.get(self.config['encoder'], "libx264"), audio_codec else: return "copy", "copy" def prepare_ass_subtitle(self): style = {'name': 'Default','fontname': '微软雅黑','fontsize': self.config['subtitle_fontsize'],'primary_colour': '&H00FFFFFF','secondary_colour': '&H000000FF','outline_colour': '&H00000000','back_colour': '&H00000000','bold': '0','italic': '0','underline': '0','strikeout': '0','scale_x': '100','scale_y': '100','spacing': '0','angle': '0','border_style': '1','outline': '2','shadow': '1','alignment': '2','margin_l': '10','margin_r': '10','margin_v': '30','encoding': '1'} ass_content = srt_to_ass(self.srt_content, style); ass_path = os.path.join(self.cache_dir, f"{self.base_name}.ass") with open(ass_path, 'w', encoding='utf-8') as f: f.write(ass_content) return ass_path def run_ffmpeg_sync(self, cmd, stage="FFmpeg"): self.log(f"执行 {stage} 命令..."); creationflags = 0 if platform.system() == "Windows": creationflags = subprocess.CREATE_NO_WINDOW process = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8', creationflags=creationflags) if process.returncode != 0: self.log(f"!! {stage} 失败,返回码: {process.returncode}"); self.log("--- FFmpeg 错误输出 ---"); self.log(process.stderr); self.log("------------------------") return process.returncode def run(self): try: self.log("--- 开始处理 ---"); os.makedirs(self.cache_dir, exist_ok=True) with open(self.srt_path, 'r', encoding='utf-8-sig') as f: self.srt_content=f.read() entries = parse_srt(self.srt_content) if not entries: raise RuntimeError("解析SRT失败") self.log(f"解析到 {len(entries)} 条字幕.") self.total_steps=len(entries) + (1 if self.config['merge_video'] else 0) asyncio.run(self._generate_all_clips_async(entries)) if self.is_cancelled.is_set(): raise InterruptedError("任务取消") self._merge_media_sync(entries) self.update_progress(self.total_steps, self.total_steps, "全部完成!"); self.log("\n--- 全部完成!---") self.gui_queue.put({"type":"finish", "success":True, "output_dir":self.output_dir}) except Exception as e: if not isinstance(e, (InterruptedError, RuntimeError)): self.log(f"发生未知严重错误: {e}") self.gui_queue.put({"type": "finish", "success": False}) # --- GUI 应用 --- # [ App 类及以下所有代码保持不变,无需修改 ] class App: def __init__(self, root): self.root = root; self.root.title("AI视频配音合成工具 V15.3 (终极优化版)"); self.root.geometry("800x800") self.load_config(); self.gui_queue = queue.Queue() self.vars = { "srt_path": tk.StringVar(), "output_dir": tk.StringVar(value=self.config["output_dir"]), "voice": tk.StringVar(value=self.config["voice"]), "volume": tk.DoubleVar(value=self.config["volume"]), "format": tk.StringVar(value=self.config["format"]), "status": tk.StringVar(value="准备就绪"), "merge_video": tk.BooleanVar(value=self.config["merge_video"]), "keep_original_audio": tk.BooleanVar(value=self.config["keep_original_audio"]), "video_path": tk.StringVar(value=self.config["video_path"]), "keep_intermediate_audio": tk.BooleanVar(value=self.config["keep_intermediate_audio"]), "burn_subtitles": tk.BooleanVar(value=self.config["burn_subtitles"]), "encoder": tk.StringVar(value=self.config["encoder"]), "subtitle_fontsize": tk.IntVar(value=self.config["subtitle_fontsize"]) } self.build_ui(); sv_ttk.set_theme("dark") self.root.after(100, self.load_voices); self.root.after(100, self.process_queue); self.root.protocol("WM_DELETE_WINDOW", self.on_closing) def log(self, message): self.gui_queue.put({"type": "log", "data": message}) def build_ui(self): main_frame = ttk.Frame(self.root, padding=10); main_frame.pack(fill=tk.BOTH, expand=True); main_frame.rowconfigure(4, weight=1); main_frame.columnconfigure(0, weight=1) pane1 = CollapsiblePane(main_frame, "基础设置"); pane1.grid(row=0, column=0, sticky="ew", pady=(0, 5)) f1=ttk.Frame(pane1.content_frame); f1.pack(fill=tk.X,expand=True,pady=(0,5)); ttk.Label(f1,text="SRT文件:").pack(side=tk.LEFT,padx=(0,5)); ttk.Entry(f1,textvariable=self.vars['srt_path']).pack(side=tk.LEFT,fill=tk.X,expand=True,padx=(0,5)); ttk.Button(f1,text="选择...",command=self.select_srt_file).pack(side=tk.LEFT) f2=ttk.Frame(pane1.content_frame); f2.pack(fill=tk.X,expand=True); ttk.Label(f2,text="输出目录:").pack(side=tk.LEFT,padx=(0,5)); ttk.Entry(f2,textvariable=self.vars['output_dir']).pack(side=tk.LEFT,fill=tk.X,expand=True,padx=(0,5)); ttk.Button(f2,text="选择...",command=self.select_output_dir).pack(side=tk.LEFT,padx=(0,5)); self.open_dir_button=ttk.Button(f2,text="打开",command=self.open_output_dir,state="disabled"); self.open_dir_button.pack(side=tk.LEFT) pane3 = CollapsiblePane(main_frame, "音频参数"); pane3.grid(row=1, column=0, sticky="ew", pady=(0, 5)) f3=ttk.Frame(pane3.content_frame); f3.pack(fill=tk.X,expand=True,pady=2); ttk.Label(f3,text="配音员:").pack(side=tk.LEFT,padx=(0,10)); self.voice_combo=ttk.Combobox(f3,textvariable=self.vars['voice'],state="readonly",width=30); self.voice_combo.pack(side=tk.LEFT,fill=tk.X,expand=True); self.audition_button=ttk.Button(f3,text="试听",command=self.audition_voice,state="disabled"); self.audition_button.pack(side=tk.LEFT,padx=(5,0)) f4=ttk.Frame(pane3.content_frame); f4.pack(fill=tk.X,expand=True,pady=2); ttk.Label(f4,text="响度:").pack(side=tk.LEFT,padx=(0,10)); self.volume_scale=ttk.Scale(f4,from_=-24,to=-12,orient=tk.HORIZONTAL,variable=self.vars['volume'],command=lambda v:self.volume_label.config(text=f"{float(v):.1f} LUFS")); self.volume_scale.pack(side=tk.LEFT,fill=tk.X,expand=True); self.volume_label=ttk.Label(f4,text=f"{self.vars['volume'].get():.1f} LUFS",width=10); self.volume_label.pack(side=tk.LEFT,padx=(5,0)) f5=ttk.Frame(pane3.content_frame); f5.pack(fill=tk.X,expand=True,pady=2); ttk.Label(f5,text="格式:").pack(side=tk.LEFT,padx=(0,10)); self.format_combo=ttk.Combobox(f5,textvariable=self.vars['format'],values=["mp3","wav","aac"],state="readonly"); self.format_combo.pack(side=tk.LEFT,fill=tk.X,expand=True) pane2 = CollapsiblePane(main_frame, "视频合成 (可选)", initial_state='collapsed'); pane2.grid(row=2, column=0, sticky="ew", pady=(0, 5)) ttk.Checkbutton(pane2.content_frame, text="启用视频合成功能", variable=self.vars['merge_video']).pack(anchor='w') f6=ttk.Frame(pane2.content_frame); f6.pack(fill=tk.X,expand=True,pady=5); ttk.Label(f6,text="视频文件:").pack(side=tk.LEFT,padx=(0,5)); ttk.Entry(f6,textvariable=self.vars['video_path']).pack(side=tk.LEFT,fill=tk.X,expand=True,padx=(0,5)); ttk.Button(f6,text="选择...",command=self.select_video_file).pack(side=tk.LEFT) f7=ttk.Frame(pane2.content_frame); f7.pack(fill=tk.X,expand=True); ttk.Checkbutton(f7,text="保留原始音轨",variable=self.vars['keep_original_audio']).pack(side=tk.LEFT); ttk.Checkbutton(f7,text="保留独立音频",variable=self.vars['keep_intermediate_audio']).pack(side=tk.LEFT,padx=(20,0)) self.pane4 = CollapsiblePane(main_frame, "字幕烧录 (可选)", initial_state='collapsed'); self.pane4.grid(row=3, column=0, sticky="ew", pady=(0, 10)) ttk.Checkbutton(self.pane4.content_frame, text="将字幕烧录到视频画面中", variable=self.vars['burn_subtitles']).pack(anchor='w') self.encoder_frame = ttk.Frame(self.pane4.content_frame); self.encoder_frame.pack(fill=tk.X, expand=True, pady=5) f_enc1=ttk.Frame(self.encoder_frame); f_enc1.pack(fill=tk.X, expand=True); ttk.Label(f_enc1, text="编码器:").pack(side=tk.LEFT, padx=(0,5)); self.encoder_combo = ttk.Combobox(f_enc1, textvariable=self.vars['encoder'], state="readonly"); self.encoder_combo.pack(side=tk.LEFT, fill=tk.X, expand=True) f_enc2=ttk.Frame(self.encoder_frame); f_enc2.pack(fill=tk.X, expand=True, pady=5); ttk.Label(f_enc2, text="字幕字号:").pack(side=tk.LEFT, padx=(0,5)); ttk.Spinbox(f_enc2, from_=10, to=100, textvariable=self.vars['subtitle_fontsize'], width=10).pack(side=tk.LEFT); self.preview_button = ttk.Button(f_enc2, text="生成5秒预览", command=self.create_preview); self.preview_button.pack(side=tk.LEFT, padx=(10,0)) log_pane=ttk.Frame(main_frame); log_pane.grid(row=4,column=0,sticky="nsew"); log_pane.rowconfigure(1,weight=1); log_pane.columnconfigure(0,weight=1) f8=ttk.Frame(log_pane); f8.pack(fill=tk.X,pady=(0,5)); self.progress_bar=ttk.Progressbar(f8,orient=tk.HORIZONTAL); self.progress_bar.pack(fill=tk.X,expand=True,side=tk.LEFT,padx=(0,10)); self.status_label=ttk.Label(f8,textvariable=self.vars['status']); self.status_label.pack(side=tk.LEFT) self.log_text=scrolledtext.ScrolledText(log_pane,wrap=tk.WORD,state="disabled"); self.log_text.pack(fill=tk.BOTH,expand=True) f9=ttk.Frame(main_frame); f9.grid(row=5,column=0,sticky="ew",pady=(10,0)); f9.columnconfigure(0,weight=1) self.start_button=ttk.Button(f9,text="开始生成",command=self.start_processing,style="Accent.TButton"); self.start_button.pack(side=tk.RIGHT) self.cancel_button=ttk.Button(f9,text="取消任务",command=self.cancel_processing,state="disabled"); self.cancel_button.pack(side=tk.RIGHT,padx=(0,5)) def load_config(self): try: with open(CONFIG_FILE,'r',encoding='utf-8') as f: self.config=json.load(f) except(FileNotFoundError,json.JSONDecodeError): self.config=DEFAULT_CONFIG for key,value in DEFAULT_CONFIG.items(): self.config.setdefault(key,value) def save_config(self): for key, var in self.vars.items(): self.config[key] = var.get().split(' ')[0] if key == "voice" else var.get() with open(CONFIG_FILE,'w',encoding='utf-8') as f: json.dump(self.config, f, indent=4, ensure_ascii=False) def on_closing(self): self.save_config(); self.root.destroy() def select_srt_file(self): path=filedialog.askopenfilename(filetypes=[("SRT Subtitles","*.srt")]); if path: self.vars['srt_path'].set(path); srt_dir=os.path.dirname(path); base_name=os.path.splitext(os.path.basename(path))[0] if not self.vars['output_dir'].get(): self.vars['output_dir'].set(srt_dir); self.open_dir_button.config(state="normal") for ext in ['.mp4','.mkv','.avi','.mov','.webm']: if os.path.exists(os.path.join(srt_dir,base_name+ext)): self.vars['video_path'].set(os.path.join(srt_dir,base_name+ext)); break def select_output_dir(self): path=filedialog.askdirectory(title="选择输出目录") if path: self.vars['output_dir'].set(path); self.open_dir_button.config(state="normal") def open_output_dir(self): path=self.vars['output_dir'].get() if path and os.path.isdir(path): if platform.system() == "Windows": os.startfile(path) else: subprocess.Popen(['open', path]) else: messagebox.showwarning("目录无效", "输出目录不存在或无效.") def select_video_file(self): path=filedialog.askopenfilename(filetypes=[("Video Files","*.mp4 *.mkv *.avi *.mov"),("All Files", "*.*")]) if path: self.vars['video_path'].set(path) def audition_voice(self): voice=self.vars['voice'].get().split(' ')[0] if not voice: return self.audition_button.config(state="disabled"); self.log(f"正在试听 {voice}...") def _audition(): try: out_dir = os.path.dirname(os.path.abspath(__file__)) cache=os.path.join(out_dir,"_audition_temp_cache"); os.makedirs(cache,exist_ok=True); tmp_file=os.path.join(cache,"_audition_temp.mp3") async def save(): comm = edge_tts.Communicate(self.config['audition_text'], voice) await comm.save(tmp_file) asyncio.run(save()) if os.path.exists(tmp_file) and os.path.getsize(tmp_file) > 0: if platform.system() == "Windows": os.startfile(tmp_file) else: subprocess.Popen(['xdg-open', tmp_file]) else: raise Exception("生成的试听文件为空。") except Exception as e: self.gui_queue.put({"type":"log", "data":f"!! 试听失败: {e}"}) finally: self.gui_queue.put({"type":"audition_done"}) threading.Thread(target=_audition,daemon=True).start() def create_preview(self): srt_path = self.vars['srt_path'].get(); video_path = self.vars['video_path'].get() if not srt_path or not os.path.exists(srt_path): messagebox.showerror("错误","请先选择有效的SRT文件。"); return if not video_path or not os.path.exists(video_path): messagebox.showerror("错误","请先选择有效的视频文件。"); return self.preview_button.config(state="disabled"); self.log("正在生成预览切片...") def _preview(): try: with open(srt_path, 'r', encoding='utf-8-sig') as f: content=f.read() entries = parse_srt(content) if not entries: raise Exception("SRT文件为空或格式错误。") preview_entry = sorted(entries, key=lambda e: e.duration, reverse=True)[0] out_dir = self.vars['output_dir'].get() or os.path.dirname(srt_path) preview_file = os.path.join(out_dir, "_preview.mp4") style = {'name': 'Default','fontname': '微软雅黑','fontsize': self.vars['subtitle_fontsize'].get(),'primary_colour': '&H00FFFFFF','secondary_colour': '&H000000FF','outline_colour': '&H00000000','back_colour': '&H00000000','bold': '0','italic': '0','underline': '0','strikeout': '0','scale_x': '100','scale_y': '100','spacing': '0','angle': '0','border_style': '1','outline': '2','shadow': '1','alignment': '2','margin_l': '10','margin_r': '10','margin_v': '30','encoding': '1'} ass_content = srt_to_ass(content, style) ass_path = os.path.join(out_dir, "_preview.ass") with open(ass_path, 'w', encoding='utf-8') as f: f.write(ass_content) start_time = max(0, preview_entry.start.total_seconds() - 1) duration = preview_entry.duration + 2 escaped_ass_path = ass_path.replace('\\', '/').replace(':', '\\:') vf = f"subtitles='{escaped_ass_path}'" cmd = ['ffmpeg', '-y', '-ss', str(start_time), '-t', str(duration), '-i', video_path, '-vf', vf, '-c:v', 'libx264', '-preset', 'ultrafast', '-an', preview_file] creationflags = subprocess.CREATE_NO_WINDOW if platform.system() == "Windows" else 0 result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8', creationflags=creationflags) if result.returncode != 0: raise Exception(f"FFmpeg预览生成失败:\n{result.stderr}") if os.path.exists(preview_file): if platform.system() == "Windows": os.startfile(preview_file) else: subprocess.Popen(['open', preview_file]) except Exception as e: self.gui_queue.put({"type":"log", "data":f"!! 预览失败: {e}"}) finally: self.gui_queue.put({"type":"preview_done"}) threading.Thread(target=_preview, daemon=True).start() def start_processing(self): if not self.vars['srt_path'].get() or not os.path.exists(self.vars['srt_path'].get()): messagebox.showerror("错误","请选择有效的SRT文件."); return if not self.vars['output_dir'].get() or not os.path.isdir(self.vars['output_dir'].get()): messagebox.showerror("错误","请选择有效的输出目录."); return if self.vars['merge_video'].get() and (not self.vars['video_path'].get() or not os.path.exists(self.vars['video_path'].get())): messagebox.showerror("错误","请选择有效的视频文件进行合并."); return self.start_button.config(state="disabled"); self.cancel_button.config(state="normal") self.log_text.config(state="normal"); self.log_text.delete(1.0,tk.END); self.log_text.config(state="disabled") current_config=self.config.copy() for key, var in self.vars.items(): current_config[key] = var.get().split(' ')[0] if key == "voice" else var.get() processor=Processor(self.vars['srt_path'].get(),current_config,self.gui_queue); self.processor_thread=threading.Thread(target=processor.run,daemon=True); self.processor_thread.processor_instance=processor; self.processor_thread.start() def cancel_processing(self): if self.processor_thread and self.processor_thread.is_alive(): self.log("正在发送取消信号..."); self.processor_thread.processor_instance.is_cancelled.set() def load_voices(self): self.log("正在获取可用配音员列表...") def _load(): try: encoders = ["CPU (libx264)"] try: result = subprocess.run(['ffmpeg', '-hide_banner', '-encoders'], capture_output=True, text=True, encoding='utf-8', timeout=5) if 'h264_nvenc' in result.stdout: encoders.append("NVIDIA (h264_nvenc)") if 'h264_amf' in result.stdout: encoders.append("AMD (h264_amf)") if 'h264_qsv' in result.stdout: encoders.append("Intel (h264_qsv)") except Exception: pass voices = asyncio.run(edge_tts.list_voices()) self.gui_queue.put({"type":"init_data", "voices":voices, "encoders":encoders}) except Exception as e: self.gui_queue.put({"type":"log", "data":f"!! 致命错误: 获取配音员列表失败: {e}"}) if "401" in str(e): self.gui_queue.put({"type":"log", "data":"!! [建议]: 认证失败, 请升级 'pip install --upgrade edge-tts'"}) threading.Thread(target=_load, daemon=True).start() def process_queue(self): try: while True: msg=self.gui_queue.get_nowait() msg_type = msg.get("type") if msg_type=="log": self.log_text.config(state="normal"); self.log_text.insert(tk.END, msg["data"] + "\n"); self.log_text.see(tk.END); self.log_text.config(state="disabled") elif msg_type=="progress": self.progress_bar['maximum']=msg['total']; self.progress_bar['value']=msg['current']; self.vars['status'].set(msg['status']) elif msg_type=="finish": self.start_button.config(state="normal"); self.cancel_button.config(state="disabled") if msg["success"]: self.vars['status'].set("任务完成!"); self.vars['output_dir'].set(msg["output_dir"]); self.open_dir_button.config(state="normal"); messagebox.showinfo("成功", "任务已成功完成!") else: self.vars['status'].set("任务失败或被取消"); messagebox.showwarning("任务中断", "任务失败或被用户取消。") elif msg_type=="init_data": voices_list=msg["voices"]; d_names=[f"{v['ShortName']} - {v['Gender']} ({v['Locale']})" for v in voices_list] self.voice_combo['values']=d_names; saved_voice=self.config['voice'] for i,name in enumerate(d_names): if name.startswith(saved_voice): self.voice_combo.current(i); break else: if d_names: self.voice_combo.current(0) self.log("配音员列表加载完毕。"); self.audition_button.config(state="normal") encoders = msg["encoders"]; self.encoder_combo['values'] = encoders encoder_priority = ["NVIDIA", "AMD", "Intel", "CPU"] best_encoder = next((enc for prio in encoder_priority for enc in encoders if prio in enc), encoders[0]) if self.vars['encoder'].get() not in encoders: self.vars['encoder'].set(best_encoder); self.log(f"自动选择最佳编码器: {best_encoder}") self.log(f"检测到可用编码器: {', '.join(encoders)}") elif msg_type=="audition_done": self.audition_button.config(state="normal"); self.log("试听完毕。") elif msg_type=="preview_done": self.preview_button.config(state="normal"); self.log("预览生成完毕。") except queue.Empty: pass finally: self.root.after(100, self.process_queue) if __name__ == "__main__": if platform.system() == "Windows": asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) root = tk.Tk(); app = App(root); root.mainloop()