343 lines
16 KiB
Python
343 lines
16 KiB
Python
# --- main.py (v8.0 - The Ground-Up Rewrite) ---
|
||
# [!! 已修改 - 增加了智能错误提示 !!]
|
||
|
||
import logging, sys, time, traceback, io, zipfile, requests, re, os, tempfile, shutil
|
||
from dataclasses import dataclass
|
||
from pathlib import Path
|
||
from typing import Dict, Any
|
||
from urllib.parse import quote
|
||
from flask import Flask, request, send_file, render_template, Response, jsonify
|
||
|
||
try:
|
||
from mutagen.flac import FLAC, Picture
|
||
from mutagen.mp3 import MP3
|
||
from mutagen.id3 import ID3, APIC
|
||
MUTAGEN_AVAILABLE = True
|
||
except ImportError:
|
||
MUTAGEN_AVAILABLE = False
|
||
|
||
try:
|
||
from music_api import NeteaseAPI, APIException, search_music, playlist_detail, album_detail
|
||
from cookie_manager import CookieManager, CookieException, CookieManager
|
||
import qr_login
|
||
from music_downloader import MusicDownloader, DownloadException
|
||
except ImportError as e:
|
||
print(f"导入模块失败: {e}\n请确保所有依赖模块存在且可用")
|
||
sys.exit(1)
|
||
|
||
@dataclass
|
||
class APIConfig:
|
||
host: str = '0.0.0.0'; port: int = 5000; debug: bool = False
|
||
log_level: str = 'INFO'; cors_origins: str = '*'
|
||
|
||
class APIResponse:
|
||
@staticmethod
|
||
def success(data: Any = None, message: str = 'success'): return jsonify({'success': True, 'message': message, 'data': data}), 200
|
||
@staticmethod
|
||
def error(message: str, status_code: int = 400): return jsonify({'success': False, 'message': message}), status_code
|
||
|
||
class MusicAPIService:
|
||
def __init__(self, config: APIConfig):
|
||
self.config = config
|
||
self.logger = self._setup_logger()
|
||
self.cookie_manager = CookieManager()
|
||
self.netease_api = NeteaseAPI()
|
||
self.downloader = MusicDownloader()
|
||
if not MUTAGEN_AVAILABLE: self.logger.warning("`mutagen` 库未安装, 打包文件将不含独立封面。请运行 `pip install mutagen`")
|
||
|
||
def _setup_logger(self):
|
||
logger = logging.getLogger('music_api'); logger.setLevel(logging.INFO)
|
||
if not logger.handlers:
|
||
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
||
ch = logging.StreamHandler(); ch.setFormatter(formatter); logger.addHandler(ch)
|
||
return logger
|
||
|
||
def _get_cookies_from_request(self) -> Dict[str, str]:
|
||
data = self._safe_get_request_data()
|
||
user_cookie_str = data.get('cookie')
|
||
|
||
if user_cookie_str:
|
||
self.logger.debug("正在使用前端提供的用户Cookie")
|
||
return self.cookie_manager.parse_cookie_string(user_cookie_str)
|
||
|
||
try:
|
||
self.logger.debug("未使用用户Cookie,降级到服务器 cookie.txt")
|
||
return self.cookie_manager.parse_cookies()
|
||
except CookieException:
|
||
self.logger.error("读取服务器 cookie.txt 失败")
|
||
return {}
|
||
|
||
def _extract_id(self, id_or_url, id_type):
|
||
patterns = [rf'{id_type}\?id=(\d+)', rf'\/{id_type}\/(\d+)']
|
||
for p in patterns:
|
||
match = re.search(p, str(id_or_url))
|
||
if match: return match.group(1)
|
||
return str(id_or_url).strip()
|
||
|
||
def _get_quality_display_name(self, quality):
|
||
names = {'standard':"标准",'exhigh':"极高",'lossless':"无损",'hires':"Hi-Res",'sky':"环绕声",'jyeffect':"高清环绕",'jymaster':"母带"}
|
||
return names.get(quality, quality)
|
||
|
||
def _safe_get_request_data(self):
|
||
return request.get_json(silent=True) or dict(request.form) or dict(request.args)
|
||
|
||
def _create_zip_response(self, memory_file, filename):
|
||
response = Response(memory_file.getvalue(), mimetype='application/zip')
|
||
safe_filename = re.sub(r'[\\/*?:"<>|]', "", filename)
|
||
encoded_filename = quote(safe_filename)
|
||
response.headers['Content-Disposition'] = f"attachment; filename*=UTF-8''{encoded_filename}"
|
||
return response
|
||
|
||
def _embed_cover_on_disk(self, audio_path, cover_path, file_type):
|
||
if not MUTAGEN_AVAILABLE or not cover_path or not os.path.exists(cover_path): return
|
||
try:
|
||
with open(cover_path, 'rb') as f:
|
||
cover_data = f.read()
|
||
|
||
if file_type == 'flac':
|
||
audio = FLAC(audio_path)
|
||
pic = Picture(); pic.type = 3; pic.mime = "image/jpeg"; pic.data = cover_data
|
||
audio.add_picture(pic)
|
||
elif file_type == 'mp3':
|
||
audio = MP3(audio_path, ID3=ID3)
|
||
if audio.tags is None: audio.add_tags()
|
||
audio.tags.add(APIC(encoding=3, mime='image/jpeg', type=3, desc='Cover', data=cover_data))
|
||
else: return
|
||
audio.save()
|
||
except Exception as e:
|
||
self.logger.error(f"在磁盘嵌入封面失败 ({file_type}): {e}")
|
||
|
||
def _package_tracks_as_zip(self, tracks, quality, collection_name, collection_cover_url, cookies: Dict[str, str]):
|
||
temp_dir = tempfile.mkdtemp()
|
||
self.logger.info(f"创建临时目录: {temp_dir}")
|
||
zip_buffer = io.BytesIO()
|
||
|
||
try:
|
||
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
|
||
if collection_cover_url:
|
||
try: zf.writestr('cover.jpg', requests.get(collection_cover_url, timeout=10).content)
|
||
except Exception: pass
|
||
|
||
for i, track in enumerate(tracks):
|
||
song_id = track.get('id')
|
||
if not song_id: continue
|
||
self.logger.info(f"[{i+1}/{len(tracks)}] 打包中: {track.get('name')}")
|
||
|
||
try:
|
||
music_info = self.downloader.get_music_info(int(song_id), quality, cookies)
|
||
if not music_info.download_url: raise DownloadException("获取下载链接失败")
|
||
|
||
base_fn = self.downloader._sanitize_filename(f"{music_info.artists} - {music_info.name}")
|
||
audio_filename = f"{base_fn}.{music_info.file_type}"
|
||
audio_path = os.path.join(temp_dir, audio_filename)
|
||
|
||
with requests.get(music_info.download_url, stream=True, timeout=120) as r:
|
||
r.raise_for_status()
|
||
with open(audio_path, 'wb') as f:
|
||
for chunk in r.iter_content(chunk_size=8192):
|
||
f.write(chunk)
|
||
|
||
if music_info.pic_url:
|
||
cover_path = os.path.join(temp_dir, f"cover_{song_id}.jpg")
|
||
try:
|
||
with requests.get(music_info.pic_url, stream=True, timeout=20) as r:
|
||
r.raise_for_status()
|
||
with open(cover_path, 'wb') as f:
|
||
shutil.copyfileobj(r.raw, f)
|
||
self._embed_cover_on_disk(audio_path, cover_path, music_info.file_type)
|
||
except Exception as e:
|
||
self.logger.warning(f"下载或嵌入封面失败 for song {song_id}: {e}")
|
||
|
||
zf.write(audio_path, arcname=audio_filename)
|
||
|
||
if music_info.lyric:
|
||
zf.writestr(f"{base_fn}.lrc", music_info.lyric.encode('utf-8'))
|
||
|
||
except Exception as e:
|
||
# [!! 已修改 - 歌单打包时的错误提示 !!]
|
||
error_msg = f"歌曲 '{track.get('name')}' (ID: {song_id}) 处理失败: {e}"
|
||
if ("无可用的下载链接" in str(e) or "播放链接" in str(e)):
|
||
# 检查传入的 cookies 字典是否为空或没有 MUSIC_U,来*猜测*是否为登录问题
|
||
# 这是一个不完美的猜测,但优于没有
|
||
if not cookies.get('MUSIC_U'):
|
||
error_msg = f"歌曲 '{track.get('name')}' (ID: {song_id}) 处理失败: 需要会员权限"
|
||
else:
|
||
error_msg = f"歌曲 '{track.get('name')}' (ID: {song_id}) 处理失败: 权限不足或Cookie过期"
|
||
|
||
self.logger.error(error_msg)
|
||
zf.writestr(f"_error_log_ID_{song_id}.txt", error_msg.encode('utf-8'))
|
||
|
||
zip_buffer.seek(0)
|
||
download_name = f"{collection_name} [{self._get_quality_display_name(quality)}].zip"
|
||
return self._create_zip_response(zip_buffer, download_name)
|
||
finally:
|
||
shutil.rmtree(temp_dir)
|
||
self.logger.info(f"删除临时目录: {temp_dir}")
|
||
|
||
config = APIConfig(); app = Flask(__name__)
|
||
api_service = MusicAPIService(config)
|
||
|
||
@app.after_request
|
||
def after_request(response: Response) -> Response:
|
||
response.headers.add('Access-Control-Allow-Origin', config.cors_origins)
|
||
response.headers.add('Access-Control-Expose-Headers', 'Content-Disposition')
|
||
response.headers.add('Access-Control-Allow-Headers', 'Content-Type')
|
||
response.headers.add('Access-Control-Allow-Methods', 'GET, POST')
|
||
return response
|
||
|
||
@app.route('/')
|
||
def index(): return render_template('index.html')
|
||
|
||
@app.route('/login/qr/generate', methods=['POST'])
|
||
def qr_generate():
|
||
try:
|
||
result = qr_login.api_generate_qr_key()
|
||
if result['success']:
|
||
return APIResponse.success(result)
|
||
else:
|
||
return APIResponse.error(result.get('message', '生成二维码失败'), 500)
|
||
except Exception as e:
|
||
return APIResponse.error(f"生成二维码异常: {e}", 500)
|
||
|
||
@app.route('/login/qr/check', methods=['POST'])
|
||
def qr_check():
|
||
data = api_service._safe_get_request_data()
|
||
qr_key = data.get('qr_key')
|
||
if not qr_key:
|
||
return APIResponse.error("参数 'qr_key' 不能为空")
|
||
|
||
try:
|
||
result = qr_login.api_check_qr_status(qr_key)
|
||
return APIResponse.success(result)
|
||
except Exception as e:
|
||
return APIResponse.error(f"检查状态异常: {e}", 500)
|
||
|
||
@app.route('/song', methods=['POST'])
|
||
def get_song_info():
|
||
data = api_service._safe_get_request_data(); url = data.get('url'); level = data.get('level', 'lossless')
|
||
# [!! 已修改 !!] 检查用户是否*发送*了cookie
|
||
user_was_logged_in = bool(data.get('cookie'))
|
||
|
||
if not url: return APIResponse.error("参数 'url' 不能为空")
|
||
|
||
music_id = api_service._extract_id(url, 'song')
|
||
try:
|
||
cookies = api_service._get_cookies_from_request()
|
||
music_info = api_service.downloader.get_music_info(int(music_id), level, cookies)
|
||
|
||
response_data = {
|
||
'id': music_id, 'name': music_info.name, 'ar_name': music_info.artists,
|
||
'al_name': music_info.album, 'pic': music_info.pic_url, 'lyric': music_info.lyric,
|
||
'url': music_info.download_url,
|
||
'size': f"{music_info.file_size/1024/1024:.2f} MB" if music_info.file_size > 0 else "N/A",
|
||
'level': api_service._get_quality_display_name(level),
|
||
'file_type': music_info.file_type
|
||
}
|
||
return APIResponse.success(response_data)
|
||
except Exception as e:
|
||
# [!! 已修改 - 智能错误处理 !!]
|
||
error_msg = str(e)
|
||
if "无可用的下载链接" in error_msg or "播放链接" in error_msg:
|
||
if not user_was_logged_in:
|
||
error_msg = "解析失败:此音质或歌曲需要会员权限。请点击右上角登录黑胶会员账号后再试。"
|
||
else:
|
||
error_msg = "解析失败:您的账号可能不是黑胶会员,Cookie已过期,或歌曲已下架。"
|
||
|
||
return APIResponse.error(error_msg, 502)
|
||
|
||
|
||
@app.route('/search', methods=['POST'])
|
||
def search_music_api():
|
||
data = api_service._safe_get_request_data(); keyword = data.get('keyword')
|
||
if not keyword: return APIResponse.error("参数 'keyword' 不能为空")
|
||
try:
|
||
cookies = api_service._get_cookies_from_request()
|
||
return APIResponse.success(search_music(keyword, cookies, limit=30))
|
||
except APIException as e: return APIResponse.error(str(e), 502)
|
||
|
||
@app.route('/playlist', methods=['POST'])
|
||
def get_playlist():
|
||
data = api_service._safe_get_request_data(); pid = data.get('id')
|
||
if not pid: return APIResponse.error("参数 'id' 不能为空")
|
||
playlist_id = api_service._extract_id(pid, 'playlist')
|
||
try:
|
||
cookies = api_service._get_cookies_from_request()
|
||
return APIResponse.success({'playlist': playlist_detail(playlist_id, cookies)})
|
||
except APIException as e: return APIResponse.error(str(e), 502)
|
||
|
||
@app.route('/album', methods=['POST'])
|
||
def get_album():
|
||
data = api_service._safe_get_request_data(); aid = data.get('id')
|
||
if not aid: return APIResponse.error("参数 'id' 不能为空")
|
||
album_id = api_service._extract_id(aid, 'album')
|
||
try:
|
||
cookies = api_service._get_cookies_from_request()
|
||
return APIResponse.success({'album': album_detail(album_id, cookies)})
|
||
except APIException as e: return APIResponse.error(str(e), 502)
|
||
|
||
@app.route('/download_song_zip', methods=['POST'])
|
||
def download_song_zip():
|
||
data = api_service._safe_get_request_data()
|
||
song_id = data.get('id'); quality = data.get('quality', 'lossless')
|
||
# [!! 已修改 !!] 检查用户是否*发送*了cookie
|
||
user_was_logged_in = bool(data.get('cookie'))
|
||
|
||
if not song_id: return APIResponse.error("参数 'id' 不能为空", 400)
|
||
|
||
try:
|
||
cookies = api_service._get_cookies_from_request()
|
||
music_info = api_service.downloader.get_music_info(int(song_id), quality, cookies)
|
||
song_as_track = [{'id': song_id, 'name': music_info.name}]
|
||
return api_service._package_tracks_as_zip(song_as_track, quality, music_info.name, None, cookies)
|
||
except Exception as e:
|
||
api_service.logger.error(f"打包单曲异常: {e}\n{traceback.format_exc}")
|
||
|
||
# [!! 已修改 - 智能错误处理 !!]
|
||
error_msg = str(e)
|
||
if "无可用的下载链接" in error_msg or "播放链接" in error_msg:
|
||
if not user_was_logged_in:
|
||
error_msg = "打包失败:此音质或歌曲需要会员权限。请点击右上角登录黑胶会员账号后再试。"
|
||
else:
|
||
error_msg = "打包失败:您的账号可能不是黑胶会员,Cookie已过期,或歌曲已下架。"
|
||
else:
|
||
error_msg = f"打包失败: {str(e)}"
|
||
|
||
return APIResponse.error(error_msg, 500)
|
||
|
||
@app.route('/download_playlist_zip', methods=['POST'])
|
||
def download_playlist_zip():
|
||
data = api_service._safe_get_request_data(); pid = data.get('id'); quality = data.get('quality', 'lossless')
|
||
if not pid: return APIResponse.error("参数 'id' 不能为空", 400)
|
||
playlist_id = api_service._extract_id(pid, 'playlist')
|
||
try:
|
||
cookies = api_service._get_cookies_from_request()
|
||
playlist_info = playlist_detail(playlist_id, cookies)
|
||
if not playlist_info or not playlist_info.get('tracks'): return APIResponse.error("歌单信息获取失败或歌单为空", 404)
|
||
return api_service._package_tracks_as_zip(
|
||
playlist_info['tracks'], quality, playlist_info.get('name'),
|
||
playlist_info.get('coverImgUrl'), cookies
|
||
)
|
||
except Exception as e:
|
||
api_service.logger.error(f"打包歌单异常: {e}\n{traceback.format_exc()}"); return APIResponse.error(f"打包失败: {str(e)}", 500)
|
||
|
||
@app.route('/download_album_zip', methods=['POST'])
|
||
def download_album_zip():
|
||
data = api_service._safe_get_request_data(); aid = data.get('id'); quality = data.get('quality', 'lossless')
|
||
if not aid: return APIResponse.error("参数 'id' 不能为空", 400)
|
||
album_id = api_service._extract_id(aid, 'album')
|
||
try:
|
||
cookies = api_service._get_cookies_from_request()
|
||
album_info = album_detail(album_id, cookies)
|
||
if not album_info or not album_info.get('songs'): return APIResponse.error("专辑信息获取失败或专辑为空", 404)
|
||
return api_service._package_tracks_as_zip(
|
||
album_info['songs'], quality, album_info.get('name'),
|
||
album_info.get('coverImgUrl'), cookies
|
||
)
|
||
except Exception as e:
|
||
api_service.logger.error(f"打包专辑异常: {e}\n{traceback.format_exc()}"); return APIResponse.error(f"打包失败: {str(e)}", 500)
|
||
|
||
if __name__ == '__main__':
|
||
print("\n" + "="*60); print("🚀 网易云音乐工具箱 (v8.1 - 重构稳定版) 启动中...");
|
||
print(" [!!] 已集成用户Cookie和QR登录API");
|
||
print(f"📡 服务地址: http://{config.host}:{config.port}"); print("="*60)
|
||
app.run(host=config.host, port=config.port, debug=config.debug) |