Files
SFX-Library-Automation/wav_to_mp3.py
2025-11-20 14:28:23 +08:00

156 lines
5.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import subprocess
import concurrent.futures
import time
import sys
# ================= 配置区域 =================
# 目标码率: '192k' 或 '320k' (推荐 320k)
BITRATE = '320k'
# 并发线程数 (根据你的CPU核心数自动设定也可以手动填数字比如 4)
MAX_WORKERS = os.cpu_count()
# ===========================================
def check_ffmpeg():
"""检查 ffmpeg 是否可用"""
try:
subprocess.run(["ffmpeg", "-version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
def convert_single_file(file_info):
"""单个文件转换逻辑"""
wav_path, mp3_path = file_info
# 如果 MP3 已经存在且大小不为0跳过
if os.path.exists(mp3_path) and os.path.getsize(mp3_path) > 0:
return "skipped"
# 调用 ffmpeg 转换
# -i 输入
# -b:a 码率
# -map_metadata 0 保留元数据
# -y 覆盖输出
# -v error 静默模式,只报错
cmd = [
"ffmpeg", "-i", wav_path,
"-codec:a", "libmp3lame",
"-b:a", BITRATE,
"-map_metadata", "0",
"-y", "-v", "error",
mp3_path
]
try:
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode == 0:
return "success"
else:
return f"error: {result.stderr.decode('utf-8', errors='ignore')}"
except Exception as e:
return f"exception: {str(e)}"
def main():
print("=============================================")
print(f" WAV 转 MP3 批量压缩工具 (多线程版)")
print(f" 目标码率: {BITRATE}")
print("=============================================")
if not check_ffmpeg():
print("❌ 错误:找不到 ffmpeg。")
print("请下载 ffmpeg.exe 并将其放到本脚本同一目录下,或者添加到系统环境变量。")
return
target_dir = input("\n请拖入要压缩的文件夹路径: ").strip().strip('"').strip("'")
if not os.path.exists(target_dir):
print("路径不存在。")
return
# 1. 扫描所有 wav 文件
print(f"\n正在扫描 WAV 文件...")
tasks = []
wav_files_list = [] # 用于后续统计和删除
for root, dirs, files in os.walk(target_dir):
for file in files:
if file.lower().endswith(".wav"):
wav_path = os.path.join(root, file)
# 构造 MP3 路径 (同目录下,后缀变 .mp3)
mp3_path = os.path.splitext(wav_path)[0] + ".mp3"
tasks.append((wav_path, mp3_path))
wav_files_list.append(wav_path)
total_files = len(tasks)
if total_files == 0:
print("没有找到 WAV 文件。")
return
print(f"找到 {total_files} 个 WAV 文件,准备开始转换...")
print(f"火力全开,使用 {MAX_WORKERS} 个线程并发处理...")
start_time = time.time()
# 2. 多线程执行转换
success_count = 0
skipped_count = 0
error_count = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# 提交任务
future_to_file = {executor.submit(convert_single_file, task): task for task in tasks}
# 处理结果进度条
finished = 0
for future in concurrent.futures.as_completed(future_to_file):
result = future.result()
finished += 1
if result == "success":
success_count += 1
elif result == "skipped":
skipped_count += 1
else:
error_count += 1
# 如果出错,打印出错的文件名
wav_p = future_to_file[future][0]
print(f"\n[失败] {os.path.basename(wav_p)} -> {result}")
# 简单的进度显示
print(f"进度: {finished}/{total_files} | 成功: {success_count} | 跳过: {skipped_count} | 失败: {error_count}", end="\r")
end_time = time.time()
duration = end_time - start_time
print(f"\n\n转换完成!耗时: {duration:.2f}")
# 3. 清理旧文件询问
if success_count > 0 or skipped_count > 0:
print("="*40)
print("【清理阶段】")
print(f"转换已完成。你现在的文件夹里同时存在 .wav 和 .mp3 文件。")
choice = input(f"⚠️ 是否删除原有的 {len(wav_files_list)} 个 WAV 文件以释放空间?(输入 'DELETE' 确认): ").strip()
if choice == 'DELETE':
print("正在删除 WAV 源文件...")
deleted_num = 0
for wav_p in wav_files_list:
try:
# 再次确认对应的 MP3 真的存在,防止误删
mp3_p = os.path.splitext(wav_p)[0] + ".mp3"
if os.path.exists(mp3_p) and os.path.getsize(mp3_p) > 0:
os.remove(wav_p)
deleted_num += 1
else:
print(f"[跳过删除] 对应的 MP3 不存在或大小异常: {wav_p}")
except Exception as e:
print(f"[删除失败] {wav_p}: {e}")
print(f"清理完毕!共删除了 {deleted_num} 个 WAV 文件。")
else:
print("已保留 WAV 原文件。")
if __name__ == "__main__":
main()