596 lines
20 KiB
Python
596 lines
20 KiB
Python
"""音乐下载器模块 (已修改为支持Cookie参数)
|
||
|
||
提供网易云音乐下载功能,包括:
|
||
- 音乐信息获取
|
||
- 文件下载到本地
|
||
- 内存下载
|
||
- 音乐标签写入
|
||
- 异步下载支持
|
||
"""
|
||
|
||
import os
|
||
import re
|
||
import asyncio
|
||
import aiohttp
|
||
import aiofiles
|
||
from io import BytesIO
|
||
from typing import Dict, List, Optional, Tuple, Any, Union
|
||
from pathlib import Path
|
||
from dataclasses import dataclass
|
||
from enum import Enum
|
||
|
||
import requests
|
||
from mutagen.flac import FLAC
|
||
from mutagen.mp3 import MP3
|
||
from mutagen.id3 import ID3, TIT2, TPE1, TALB, TDRC, TRCK, APIC
|
||
from mutagen.mp4 import MP4
|
||
|
||
# 注意:这里我们不再需要 CookieManager
|
||
from music_api import NeteaseAPI, APIException
|
||
# from cookie_manager import CookieManager
|
||
|
||
|
||
class AudioFormat(Enum):
|
||
"""音频格式枚举"""
|
||
MP3 = "mp3"
|
||
FLAC = "flac"
|
||
M4A = "m4a"
|
||
UNKNOWN = "unknown"
|
||
|
||
|
||
class QualityLevel(Enum):
|
||
"""音质等级枚举"""
|
||
STANDARD = "standard" # 标准
|
||
EXHIGH = "exhigh" # 极高
|
||
LOSSLESS = "lossless" # 无损
|
||
HIRES = "hires" # Hi-Res
|
||
SKY = "sky" # 沉浸环绕声
|
||
JYEFFECT = "jyeffect" # 高清环绕声
|
||
JYMASTER = "jymaster" # 超清母带
|
||
|
||
|
||
@dataclass
|
||
class MusicInfo:
|
||
"""音乐信息数据类"""
|
||
id: int
|
||
name: str
|
||
artists: str
|
||
album: str
|
||
pic_url: str
|
||
duration: int
|
||
track_number: int
|
||
download_url: str
|
||
file_type: str
|
||
file_size: int
|
||
quality: str
|
||
lyric: str = ""
|
||
tlyric: str = ""
|
||
|
||
|
||
@dataclass
|
||
class DownloadResult:
|
||
"""下载结果数据类"""
|
||
success: bool
|
||
file_path: Optional[str] = None
|
||
file_size: int = 0
|
||
error_message: str = ""
|
||
music_info: Optional[MusicInfo] = None
|
||
|
||
|
||
class DownloadException(Exception):
|
||
"""下载异常类"""
|
||
pass
|
||
|
||
|
||
class MusicDownloader:
|
||
"""音乐下载器主类"""
|
||
|
||
def __init__(self, download_dir: str = "downloads", max_concurrent: int = 3):
|
||
"""
|
||
初始化音乐下载器
|
||
|
||
Args:
|
||
download_dir: 下载目录
|
||
max_concurrent: 最大并发下载数
|
||
"""
|
||
self.download_dir = Path(download_dir)
|
||
self.download_dir.mkdir(exist_ok=True)
|
||
self.max_concurrent = max_concurrent
|
||
|
||
# 初始化依赖
|
||
# self.cookie_manager = CookieManager() # <- 已移除
|
||
self.api = NeteaseAPI()
|
||
|
||
# 支持的文件格式
|
||
self.supported_formats = {
|
||
'mp3': AudioFormat.MP3,
|
||
'flac': AudioFormat.FLAC,
|
||
'm4a': AudioFormat.M4A
|
||
}
|
||
|
||
def _sanitize_filename(self, filename: str) -> str:
|
||
"""清理文件名,移除非法字符
|
||
|
||
Args:
|
||
filename: 原始文件名
|
||
|
||
Returns:
|
||
清理后的安全文件名
|
||
"""
|
||
# 移除或替换非法字符
|
||
illegal_chars = r'[<>:"/\\|?*]'
|
||
filename = re.sub(illegal_chars, '_', filename)
|
||
|
||
# 移除前后空格和点
|
||
filename = filename.strip(' .')
|
||
|
||
# 限制长度
|
||
if len(filename) > 200:
|
||
filename = filename[:200]
|
||
|
||
return filename or "unknown"
|
||
|
||
def _determine_file_extension(self, url: str, content_type: str = "") -> str:
|
||
"""根据URL和Content-Type确定文件扩展名
|
||
|
||
Args:
|
||
url: 下载URL
|
||
content_type: HTTP Content-Type头
|
||
|
||
Returns:
|
||
文件扩展名
|
||
"""
|
||
# 首先尝试从URL获取
|
||
if '.flac' in url.lower():
|
||
return '.flac'
|
||
elif '.mp3' in url.lower():
|
||
return '.mp3'
|
||
elif '.m4a' in url.lower():
|
||
return '.m4a'
|
||
|
||
# 从Content-Type获取
|
||
content_type = content_type.lower()
|
||
if 'flac' in content_type:
|
||
return '.flac'
|
||
elif 'mpeg' in content_type or 'mp3' in content_type:
|
||
return '.mp3'
|
||
elif 'mp4' in content_type or 'm4a' in content_type:
|
||
return '.m4a'
|
||
|
||
return '.mp3' # 默认
|
||
|
||
def get_music_info(self, music_id: int, quality: str, cookies: Dict[str, str]) -> MusicInfo:
|
||
"""获取音乐详细信息
|
||
|
||
Args:
|
||
music_id: 音乐ID
|
||
quality: 音质等级
|
||
cookies: [!! 新增 !!] 用户Cookie字典
|
||
|
||
Returns:
|
||
音乐信息对象
|
||
|
||
Raises:
|
||
DownloadException: 获取信息失败时抛出
|
||
"""
|
||
try:
|
||
# 获取cookies
|
||
# cookies = self.cookie_manager.parse_cookies() # <- 已移除, 使用传入的cookies
|
||
|
||
# 获取音乐URL信息
|
||
url_result = self.api.get_song_url(music_id, quality, cookies)
|
||
if not url_result.get('data') or not url_result['data']:
|
||
raise DownloadException(f"无法获取音乐ID {music_id} 的播放链接 (可能需要会员Cookie)")
|
||
|
||
song_data = url_result['data'][0]
|
||
download_url = song_data.get('url', '')
|
||
if not download_url:
|
||
raise DownloadException(f"音乐ID {music_id} 无可用的下载链接")
|
||
|
||
# 获取音乐详情
|
||
detail_result = self.api.get_song_detail(music_id)
|
||
if not detail_result.get('songs') or not detail_result['songs']:
|
||
raise DownloadException(f"无法获取音乐ID {music_id} 的详细信息")
|
||
|
||
song_detail = detail_result['songs'][0]
|
||
|
||
# 获取歌词
|
||
lyric_result = self.api.get_lyric(music_id, cookies)
|
||
lyric = lyric_result.get('lrc', {}).get('lyric', '') if lyric_result else ''
|
||
tlyric = lyric_result.get('tlyric', {}).get('lyric', '') if lyric_result else ''
|
||
|
||
# 构建艺术家字符串
|
||
artists = '/'.join(artist['name'] for artist in song_detail.get('ar', []))
|
||
|
||
# 创建MusicInfo对象
|
||
music_info = MusicInfo(
|
||
id=music_id,
|
||
name=song_detail.get('name', '未知歌曲'),
|
||
artists=artists or '未知艺术家',
|
||
album=song_detail.get('al', {}).get('name', '未知专辑'),
|
||
pic_url=song_detail.get('al', {}).get('picUrl', ''),
|
||
duration=song_detail.get('dt', 0) // 1000, # 转换为秒
|
||
track_number=song_detail.get('no', 0),
|
||
download_url=download_url,
|
||
file_type=song_data.get('type', 'mp3').lower(),
|
||
file_size=song_data.get('size', 0),
|
||
quality=quality,
|
||
lyric=lyric,
|
||
tlyric=tlyric
|
||
)
|
||
|
||
return music_info
|
||
|
||
except APIException as e:
|
||
raise DownloadException(f"API调用失败: {e}")
|
||
except Exception as e:
|
||
raise DownloadException(f"获取音乐信息时发生错误: {e}")
|
||
|
||
def download_music_file(self, music_id: int, quality: str, cookies: Dict[str, str]) -> DownloadResult:
|
||
"""下载音乐文件到本地
|
||
|
||
Args:
|
||
music_id: 音乐ID
|
||
quality: 音质等级
|
||
cookies: [!! 新增 !!] 用户Cookie字典
|
||
|
||
Returns:
|
||
下载结果对象
|
||
"""
|
||
try:
|
||
# 获取音乐信息
|
||
music_info = self.get_music_info(music_id, quality, cookies)
|
||
|
||
# 生成文件名
|
||
filename = f"{music_info.artists} - {music_info.name}"
|
||
safe_filename = self._sanitize_filename(filename)
|
||
|
||
# 确定文件扩展名
|
||
file_ext = self._determine_file_extension(music_info.download_url)
|
||
file_path = self.download_dir / f"{safe_filename}{file_ext}"
|
||
|
||
# 检查文件是否已存在
|
||
if file_path.exists():
|
||
return DownloadResult(
|
||
success=True,
|
||
file_path=str(file_path),
|
||
file_size=file_path.stat().st_size,
|
||
music_info=music_info
|
||
)
|
||
|
||
# 下载文件
|
||
response = requests.get(music_info.download_url, stream=True, timeout=30)
|
||
response.raise_for_status()
|
||
|
||
# 写入文件
|
||
with open(file_path, 'wb') as f:
|
||
for chunk in response.iter_content(chunk_size=8192):
|
||
if chunk:
|
||
f.write(chunk)
|
||
|
||
# 写入音乐标签
|
||
self._write_music_tags(file_path, music_info)
|
||
|
||
return DownloadResult(
|
||
success=True,
|
||
file_path=str(file_path),
|
||
file_size=file_path.stat().st_size,
|
||
music_info=music_info
|
||
)
|
||
|
||
except DownloadException:
|
||
raise
|
||
except requests.RequestException as e:
|
||
return DownloadResult(
|
||
success=False,
|
||
error_message=f"下载请求失败: {e}"
|
||
)
|
||
except Exception as e:
|
||
return DownloadResult(
|
||
success=False,
|
||
error_message=f"下载过程中发生错误: {e}"
|
||
)
|
||
|
||
async def download_music_file_async(self, music_id: int, quality: str, cookies: Dict[str, str]) -> DownloadResult:
|
||
"""异步下载音乐文件到本地
|
||
|
||
Args:
|
||
music_id: 音乐ID
|
||
quality: 音质等级
|
||
cookies: [!! 新增 !!] 用户Cookie字典
|
||
|
||
Returns:
|
||
下载结果对象
|
||
"""
|
||
try:
|
||
# 获取音乐信息(同步操作)
|
||
music_info = self.get_music_info(music_id, quality, cookies)
|
||
|
||
# 生成文件名
|
||
filename = f"{music_info.artists} - {music_info.name}"
|
||
safe_filename = self._sanitize_filename(filename)
|
||
|
||
# 确定文件扩展名
|
||
file_ext = self._determine_file_extension(music_info.download_url)
|
||
file_path = self.download_dir / f"{safe_filename}{file_ext}"
|
||
|
||
# 检查文件是否已存在
|
||
if file_path.exists():
|
||
return DownloadResult(
|
||
success=True,
|
||
file_path=str(file_path),
|
||
file_size=file_path.stat().st_size,
|
||
music_info=music_info
|
||
)
|
||
|
||
# 异步下载文件
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(music_info.download_url) as response:
|
||
response.raise_for_status()
|
||
|
||
async with aiofiles.open(file_path, 'wb') as f:
|
||
async for chunk in response.content.iter_chunked(8192):
|
||
await f.write(chunk)
|
||
|
||
# 写入音乐标签
|
||
self._write_music_tags(file_path, music_info)
|
||
|
||
return DownloadResult(
|
||
success=True,
|
||
file_path=str(file_path),
|
||
file_size=file_path.stat().st_size,
|
||
music_info=music_info
|
||
)
|
||
|
||
except DownloadException:
|
||
raise
|
||
except aiohttp.ClientError as e:
|
||
return DownloadResult(
|
||
success=False,
|
||
error_message=f"异步下载请求失败: {e}"
|
||
)
|
||
except Exception as e:
|
||
return DownloadResult(
|
||
success=False,
|
||
error_message=f"异步下载过程中发生错误: {e}"
|
||
)
|
||
|
||
def download_music_to_memory(self, music_id: int, quality: str, cookies: Dict[str, str]) -> Tuple[bool, BytesIO, MusicInfo]:
|
||
"""下载音乐到内存
|
||
|
||
Args:
|
||
music_id: 音乐ID
|
||
quality: 音质等级
|
||
cookies: [!! 新增 !!] 用户Cookie字典
|
||
|
||
Returns:
|
||
(是否成功, 音乐数据流, 音乐信息)
|
||
|
||
Raises:
|
||
DownloadException: 下载失败时抛出
|
||
"""
|
||
try:
|
||
# 获取音乐信息
|
||
music_info = self.get_music_info(music_id, quality, cookies)
|
||
|
||
# 下载到内存
|
||
response = requests.get(music_info.download_url, timeout=30)
|
||
response.raise_for_status()
|
||
|
||
# 创建BytesIO对象
|
||
audio_data = BytesIO(response.content)
|
||
|
||
return True, audio_data, music_info
|
||
|
||
except DownloadException:
|
||
raise
|
||
except requests.RequestException as e:
|
||
raise DownloadException(f"下载到内存失败: {e}")
|
||
except Exception as e:
|
||
raise DownloadException(f"内存下载过程中发生错误: {e}")
|
||
|
||
async def download_batch_async(self, music_ids: List[int], quality: str, cookies: Dict[str, str]) -> List[DownloadResult]:
|
||
"""批量异步下载音乐
|
||
|
||
Args:
|
||
music_ids: 音乐ID列表
|
||
quality: 音质等级
|
||
cookies: [!! 新增 !!] 用户Cookie字典
|
||
|
||
Returns:
|
||
下载结果列表
|
||
"""
|
||
semaphore = asyncio.Semaphore(self.max_concurrent)
|
||
|
||
async def download_with_semaphore(music_id: int) -> DownloadResult:
|
||
async with semaphore:
|
||
# 传递cookies
|
||
return await self.download_music_file_async(music_id, quality, cookies)
|
||
|
||
tasks = [download_with_semaphore(music_id) for music_id in music_ids]
|
||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||
|
||
# 处理异常结果
|
||
processed_results = []
|
||
for i, result in enumerate(results):
|
||
if isinstance(result, Exception):
|
||
processed_results.append(DownloadResult(
|
||
success=False,
|
||
error_message=f"下载音乐ID {music_ids[i]} 时发生异常: {result}"
|
||
))
|
||
else:
|
||
processed_results.append(result)
|
||
|
||
return processed_results
|
||
|
||
def _write_music_tags(self, file_path: Path, music_info: MusicInfo) -> None:
|
||
"""写入音乐标签信息
|
||
|
||
Args:
|
||
file_path: 音乐文件路径
|
||
music_info: 音乐信息
|
||
"""
|
||
try:
|
||
file_ext = file_path.suffix.lower()
|
||
|
||
if file_ext == '.mp3':
|
||
self._write_mp3_tags(file_path, music_info)
|
||
elif file_ext == '.flac':
|
||
self._write_flac_tags(file_path, music_info)
|
||
elif file_ext == '.m4a':
|
||
self._write_m4a_tags(file_path, music_info)
|
||
|
||
except Exception as e:
|
||
print(f"写入音乐标签失败: {e}")
|
||
|
||
def _write_mp3_tags(self, file_path: Path, music_info: MusicInfo) -> None:
|
||
"""写入MP3标签"""
|
||
try:
|
||
audio = MP3(str(file_path), ID3=ID3)
|
||
|
||
# 添加ID3标签
|
||
audio.tags.add(TIT2(encoding=3, text=music_info.name))
|
||
audio.tags.add(TPE1(encoding=3, text=music_info.artists))
|
||
audio.tags.add(TALB(encoding=3, text=music_info.album))
|
||
|
||
if music_info.track_number > 0:
|
||
audio.tags.add(TRCK(encoding=3, text=str(music_info.track_number)))
|
||
|
||
# 下载并添加封面
|
||
if music_info.pic_url:
|
||
try:
|
||
pic_response = requests.get(music_info.pic_url, timeout=10)
|
||
pic_response.raise_for_status()
|
||
audio.tags.add(APIC(
|
||
encoding=3,
|
||
mime='image/jpeg',
|
||
type=3,
|
||
desc='Cover',
|
||
data=pic_response.content
|
||
))
|
||
except:
|
||
pass # 封面下载失败不影响主流程
|
||
|
||
audio.save()
|
||
except Exception as e:
|
||
print(f"写入MP3标签失败: {e}")
|
||
|
||
def _write_flac_tags(self, file_path: Path, music_info: MusicInfo) -> None:
|
||
"""写入FLAC标签"""
|
||
try:
|
||
audio = FLAC(str(file_path))
|
||
|
||
audio['TITLE'] = music_info.name
|
||
audio['ARTIST'] = music_info.artists
|
||
audio['ALBUM'] = music_info.album
|
||
|
||
if music_info.track_number > 0:
|
||
audio['TRACKNUMBER'] = str(music_info.track_number)
|
||
|
||
# 下载并添加封面
|
||
if music_info.pic_url:
|
||
try:
|
||
pic_response = requests.get(music_info.pic_url, timeout=10)
|
||
pic_response.raise_for_status()
|
||
|
||
from mutagen.flac import Picture
|
||
picture = Picture()
|
||
picture.type = 3 # Cover (front)
|
||
picture.mime = 'image/jpeg'
|
||
picture.desc = 'Cover'
|
||
picture.data = pic_response.content
|
||
audio.add_picture(picture)
|
||
except:
|
||
pass # 封面下载失败不影响主流程
|
||
|
||
audio.save()
|
||
except Exception as e:
|
||
print(f"写入FLAC标签失败: {e}")
|
||
|
||
def _write_m4a_tags(self, file_path: Path, music_info: MusicInfo) -> None:
|
||
"""写入M4A标签"""
|
||
try:
|
||
audio = MP4(str(file_path))
|
||
|
||
audio['\xa9nam'] = music_info.name
|
||
audio['\xa9ART'] = music_info.artists
|
||
audio['\xa9alb'] = music_info.album
|
||
|
||
if music_info.track_number > 0:
|
||
audio['trkn'] = [(music_info.track_number, 0)]
|
||
|
||
# 下载并添加封面
|
||
if music_info.pic_url:
|
||
try:
|
||
pic_response = requests.get(music_info.pic_url, timeout=10)
|
||
pic_response.raise_for_status()
|
||
audio['covr'] = [pic_response.content]
|
||
except:
|
||
pass # 封面下载失败不影响主流程
|
||
|
||
audio.save()
|
||
except Exception as e:
|
||
print(f"写入M4A标签失败: {e}")
|
||
|
||
def get_download_progress(self, music_id: int, quality: str, cookies: Dict[str, str]) -> Dict[str, Any]:
|
||
"""获取下载进度信息
|
||
|
||
Args:
|
||
music_id: 音乐ID
|
||
quality: 音质等级
|
||
cookies: [!! 新增 !!] 用户Cookie字典
|
||
|
||
Returns:
|
||
包含进度信息的字典
|
||
"""
|
||
try:
|
||
music_info = self.get_music_info(music_id, quality, cookies)
|
||
|
||
filename = f"{music_info.artists} - {music_info.name}"
|
||
safe_filename = self._sanitize_filename(filename)
|
||
file_ext = self._determine_file_extension(music_info.download_url)
|
||
file_path = self.download_dir / f"{safe_filename}{file_ext}"
|
||
|
||
if file_path.exists():
|
||
current_size = file_path.stat().st_size
|
||
progress = (current_size / music_info.file_size * 100) if music_info.file_size > 0 else 0
|
||
|
||
return {
|
||
'music_id': music_id,
|
||
'filename': safe_filename + file_ext,
|
||
'total_size': music_info.file_size,
|
||
'current_size': current_size,
|
||
'progress': min(progress, 100),
|
||
'completed': current_size >= music_info.file_size
|
||
}
|
||
else:
|
||
return {
|
||
'music_id': music_id,
|
||
'filename': safe_filename + file_ext,
|
||
'total_size': music_info.file_size,
|
||
'current_size': 0,
|
||
'progress': 0,
|
||
'completed': False
|
||
}
|
||
|
||
except Exception as e:
|
||
return {
|
||
'music_id': music_id,
|
||
'error': str(e),
|
||
'progress': 0,
|
||
'completed': False
|
||
}
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 测试代码
|
||
# downloader = MusicDownloader() # <- 初始化会失败,因为现在需要cookies
|
||
print("音乐下载器模块 (已修改为支持Cookie参数)")
|
||
print("支持的功能:")
|
||
print("- 同步下载")
|
||
print("- 异步下载")
|
||
print("- 批量下载")
|
||
print("- 内存下载")
|
||
print("- 音乐标签写入")
|
||
print("- 下载进度跟踪")
|
||
print("\n[注意] 本模块现在依赖外部传入Cookie字典。") |