import os import subprocess import concurrent.futures import time import sys # ================= 配置区域 ================= # 目标码率: '192k' 或 '320k' (推荐 320k) BITRATE = '320k' # 并发线程数 (根据你的CPU核心数自动设定,也可以手动填数字,比如 4) MAX_WORKERS = os.cpu_count() # =========================================== def check_ffmpeg(): """检查 ffmpeg 是否可用""" try: subprocess.run(["ffmpeg", "-version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) return True except (subprocess.CalledProcessError, FileNotFoundError): return False def convert_single_file(file_info): """单个文件转换逻辑""" wav_path, mp3_path = file_info # 如果 MP3 已经存在且大小不为0,跳过 if os.path.exists(mp3_path) and os.path.getsize(mp3_path) > 0: return "skipped" # 调用 ffmpeg 转换 # -i 输入 # -b:a 码率 # -map_metadata 0 保留元数据 # -y 覆盖输出 # -v error 静默模式,只报错 cmd = [ "ffmpeg", "-i", wav_path, "-codec:a", "libmp3lame", "-b:a", BITRATE, "-map_metadata", "0", "-y", "-v", "error", mp3_path ] try: result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if result.returncode == 0: return "success" else: return f"error: {result.stderr.decode('utf-8', errors='ignore')}" except Exception as e: return f"exception: {str(e)}" def main(): print("=============================================") print(f" WAV 转 MP3 批量压缩工具 (多线程版)") print(f" 目标码率: {BITRATE}") print("=============================================") if not check_ffmpeg(): print("❌ 错误:找不到 ffmpeg。") print("请下载 ffmpeg.exe 并将其放到本脚本同一目录下,或者添加到系统环境变量。") return target_dir = input("\n请拖入要压缩的文件夹路径: ").strip().strip('"').strip("'") if not os.path.exists(target_dir): print("路径不存在。") return # 1. 扫描所有 wav 文件 print(f"\n正在扫描 WAV 文件...") tasks = [] wav_files_list = [] # 用于后续统计和删除 for root, dirs, files in os.walk(target_dir): for file in files: if file.lower().endswith(".wav"): wav_path = os.path.join(root, file) # 构造 MP3 路径 (同目录下,后缀变 .mp3) mp3_path = os.path.splitext(wav_path)[0] + ".mp3" tasks.append((wav_path, mp3_path)) wav_files_list.append(wav_path) total_files = len(tasks) if total_files == 0: print("没有找到 WAV 文件。") return print(f"找到 {total_files} 个 WAV 文件,准备开始转换...") print(f"火力全开,使用 {MAX_WORKERS} 个线程并发处理...") start_time = time.time() # 2. 多线程执行转换 success_count = 0 skipped_count = 0 error_count = 0 with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: # 提交任务 future_to_file = {executor.submit(convert_single_file, task): task for task in tasks} # 处理结果进度条 finished = 0 for future in concurrent.futures.as_completed(future_to_file): result = future.result() finished += 1 if result == "success": success_count += 1 elif result == "skipped": skipped_count += 1 else: error_count += 1 # 如果出错,打印出错的文件名 wav_p = future_to_file[future][0] print(f"\n[失败] {os.path.basename(wav_p)} -> {result}") # 简单的进度显示 print(f"进度: {finished}/{total_files} | 成功: {success_count} | 跳过: {skipped_count} | 失败: {error_count}", end="\r") end_time = time.time() duration = end_time - start_time print(f"\n\n转换完成!耗时: {duration:.2f} 秒") # 3. 清理旧文件询问 if success_count > 0 or skipped_count > 0: print("="*40) print("【清理阶段】") print(f"转换已完成。你现在的文件夹里同时存在 .wav 和 .mp3 文件。") choice = input(f"⚠️ 是否删除原有的 {len(wav_files_list)} 个 WAV 文件以释放空间?(输入 'DELETE' 确认): ").strip() if choice == 'DELETE': print("正在删除 WAV 源文件...") deleted_num = 0 for wav_p in wav_files_list: try: # 再次确认对应的 MP3 真的存在,防止误删 mp3_p = os.path.splitext(wav_p)[0] + ".mp3" if os.path.exists(mp3_p) and os.path.getsize(mp3_p) > 0: os.remove(wav_p) deleted_num += 1 else: print(f"[跳过删除] 对应的 MP3 不存在或大小异常: {wav_p}") except Exception as e: print(f"[删除失败] {wav_p}: {e}") print(f"清理完毕!共删除了 {deleted_num} 个 WAV 文件。") else: print("已保留 WAV 原文件。") if __name__ == "__main__": main()