This commit is contained in:
2025-11-20 14:28:23 +08:00
commit 6e93abf7fb
4 changed files with 490 additions and 0 deletions

156
wav_to_mp3.py Normal file
View File

@@ -0,0 +1,156 @@
import os
import subprocess
import concurrent.futures
import time
import sys
# ================= 配置区域 =================
# 目标码率: '192k' 或 '320k' (推荐 320k)
BITRATE = '320k'
# 并发线程数 (根据你的CPU核心数自动设定也可以手动填数字比如 4)
MAX_WORKERS = os.cpu_count()
# ===========================================
def check_ffmpeg():
"""检查 ffmpeg 是否可用"""
try:
subprocess.run(["ffmpeg", "-version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
def convert_single_file(file_info):
"""单个文件转换逻辑"""
wav_path, mp3_path = file_info
# 如果 MP3 已经存在且大小不为0跳过
if os.path.exists(mp3_path) and os.path.getsize(mp3_path) > 0:
return "skipped"
# 调用 ffmpeg 转换
# -i 输入
# -b:a 码率
# -map_metadata 0 保留元数据
# -y 覆盖输出
# -v error 静默模式,只报错
cmd = [
"ffmpeg", "-i", wav_path,
"-codec:a", "libmp3lame",
"-b:a", BITRATE,
"-map_metadata", "0",
"-y", "-v", "error",
mp3_path
]
try:
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode == 0:
return "success"
else:
return f"error: {result.stderr.decode('utf-8', errors='ignore')}"
except Exception as e:
return f"exception: {str(e)}"
def main():
print("=============================================")
print(f" WAV 转 MP3 批量压缩工具 (多线程版)")
print(f" 目标码率: {BITRATE}")
print("=============================================")
if not check_ffmpeg():
print("❌ 错误:找不到 ffmpeg。")
print("请下载 ffmpeg.exe 并将其放到本脚本同一目录下,或者添加到系统环境变量。")
return
target_dir = input("\n请拖入要压缩的文件夹路径: ").strip().strip('"').strip("'")
if not os.path.exists(target_dir):
print("路径不存在。")
return
# 1. 扫描所有 wav 文件
print(f"\n正在扫描 WAV 文件...")
tasks = []
wav_files_list = [] # 用于后续统计和删除
for root, dirs, files in os.walk(target_dir):
for file in files:
if file.lower().endswith(".wav"):
wav_path = os.path.join(root, file)
# 构造 MP3 路径 (同目录下,后缀变 .mp3)
mp3_path = os.path.splitext(wav_path)[0] + ".mp3"
tasks.append((wav_path, mp3_path))
wav_files_list.append(wav_path)
total_files = len(tasks)
if total_files == 0:
print("没有找到 WAV 文件。")
return
print(f"找到 {total_files} 个 WAV 文件,准备开始转换...")
print(f"火力全开,使用 {MAX_WORKERS} 个线程并发处理...")
start_time = time.time()
# 2. 多线程执行转换
success_count = 0
skipped_count = 0
error_count = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# 提交任务
future_to_file = {executor.submit(convert_single_file, task): task for task in tasks}
# 处理结果进度条
finished = 0
for future in concurrent.futures.as_completed(future_to_file):
result = future.result()
finished += 1
if result == "success":
success_count += 1
elif result == "skipped":
skipped_count += 1
else:
error_count += 1
# 如果出错,打印出错的文件名
wav_p = future_to_file[future][0]
print(f"\n[失败] {os.path.basename(wav_p)} -> {result}")
# 简单的进度显示
print(f"进度: {finished}/{total_files} | 成功: {success_count} | 跳过: {skipped_count} | 失败: {error_count}", end="\r")
end_time = time.time()
duration = end_time - start_time
print(f"\n\n转换完成!耗时: {duration:.2f}")
# 3. 清理旧文件询问
if success_count > 0 or skipped_count > 0:
print("="*40)
print("【清理阶段】")
print(f"转换已完成。你现在的文件夹里同时存在 .wav 和 .mp3 文件。")
choice = input(f"⚠️ 是否删除原有的 {len(wav_files_list)} 个 WAV 文件以释放空间?(输入 'DELETE' 确认): ").strip()
if choice == 'DELETE':
print("正在删除 WAV 源文件...")
deleted_num = 0
for wav_p in wav_files_list:
try:
# 再次确认对应的 MP3 真的存在,防止误删
mp3_p = os.path.splitext(wav_p)[0] + ".mp3"
if os.path.exists(mp3_p) and os.path.getsize(mp3_p) > 0:
os.remove(wav_p)
deleted_num += 1
else:
print(f"[跳过删除] 对应的 MP3 不存在或大小异常: {wav_p}")
except Exception as e:
print(f"[删除失败] {wav_p}: {e}")
print(f"清理完毕!共删除了 {deleted_num} 个 WAV 文件。")
else:
print("已保留 WAV 原文件。")
if __name__ == "__main__":
main()