commit d7a08501712b7d727e658627293e1cb6964cefce Author: laowang Date: Wed Nov 5 14:22:18 2025 +0800 Initial commit diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..db6079e --- /dev/null +++ b/.dockerignore @@ -0,0 +1,22 @@ +# Git +.git +.gitignore + +# 本地Cookie (!! 绝对禁止打包 !!) +cookie.txt +*.bak + +# 文档 +README.md +使用文档.md +*.png +*.jpg + +# Python 缓存 +__pycache__/ +*.pyc +*.pyo + +# IDE/系统 +.vscode/ +.DS_Store \ No newline at end of file diff --git a/.env b/.env new file mode 100644 index 0000000..8434d0d --- /dev/null +++ b/.env @@ -0,0 +1,8 @@ +# 对应命令行参数的 level +LEVEL=lossless + +# 对应命令行参数的 level +MODE=api + +# 对应命令行参数的 url +URL=http://127.0.0.1:5000 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1887ae9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +.idea +.vscode + +venv +.venv + +__pycache__ +*.pyc +*.pyo +*.pyd diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..2d3e8c1 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,24 @@ +# [源: 1] 使用一个轻量级的 Python 基础镜像 +FROM python:3.9-alpine + +# 设置时区 +ENV TZ=Asia/Shanghai +RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone + +# [源: 1] 设置工作目录 +WORKDIR /app + +# [源: 1] 复制并安装依赖 +# (这里假设你的 requirements.txt 已经包含了 qrcode) +COPY requirements.txt requirements.txt +RUN pip3 install --no-cache-dir -r requirements.txt + +# [源: 1] 复制 *所有* 需要的文件 (得益于 .dockerignore) +# 这会复制 main.py, music_api.py, templates/, static/ 等 +COPY . . + +# [源: 2] 暴露端口 +EXPOSE 5000 + +# [源: 2] [!! 优化 !!] 直接运行 main.py, 不再需要 entrypoint.sh +CMD ["python", "main.py"] \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..e0e1bc5 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 Suxiaoqinx + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..c6eb852 --- /dev/null +++ b/README.md @@ -0,0 +1,378 @@ +# 📖 网易云音乐工具箱 (v9.x - Web 应用重构版) + + + +本项目是基于原版 `Netease_url` 项目的深度重构,旨在将其从一个依赖本地 `cookie.txt` 的工具,升级为一个**支持多用户、无状态后端、客户端认证**的现代化 Web 应用。 + + + +## ✨ V9 核心功能升级 + + + +相较于原版,V9 架构带来了质的飞跃: + +- **用户认证系统**: + - **扫码登录**:移除本地脚本,实现了完整的 API 驱动的扫码登录流程。 + - **客户端凭证**:用户 Cookie **不再**存储于服务器。登录成功后,Cookie 凭证安全地存储在**用户自己的浏览器 `localStorage`** 中。 +- **无状态 (Stateless) 后端**: + - 服务器**不保存任何用户状态**。所有 API 请求均通过 `localStorage` 自动携带 Cookie 凭证进行鉴权。 + - **高并发支持**:天然支持多用户同时登录和使用,Cookie 互相隔离,绝不串号。 +- **全局播放器**: + - 实现了一个固定在底部的**全局 APlayer 播放器**。 + - 音乐播放在页面内导航、搜索、解析时**不会中断**,提供了现代音乐站的体验。 +- **高级 UI/UX**: + - **智能错误提示**:当解析高音质失败时,系统会智能判断用户是否未登录,并提示“请登录黑胶会员”。 + - **用户设置**:增加了“自动播放”开关,并将其偏好存储在 `localStorage`。 + - **安全加固**:在 F12 控制台增加了醒目的安全警告,防止用户被“社会工程学”攻击。 + + + +## 🚀 部署模式 (重要) + + + +V9 架构支持两种部署模式,灵活性极高: + + + +### 1. 🌍 服务器部署模式 (多用户) + + + +这是标准的线上部署模式。 + +1. **清空 `cookie.txt`**:将 `cookie.txt` 文件内容设置为空。 +2. **启动服务**:`python main.py`。 +3. **用户流程**: + - 访问者**必须**点击右上角的“登录”按钮。 + - 通过 App 扫码登录。 + - Cookie 自动保存在他们各自的浏览器中,全程不接触你的服务器。 + + + +### 2. 💻 本地开发模式 (单用户) + + + +这是你自己本地调试时使用的便捷模式。 + +1. **填写 `cookie.txt`**:在 `cookie.txt` 中填入你自己的黑胶会员 Cookie。 +2. **启动服务**:`python main.py`。 +3. **开发者流程**: + - 你**无需**点击“登录”按钮。 + - 当你进行解析、下载等操作时,前端 `index.html` 会发送一个“空”的 Cookie。 + - 后端 `main.py` 的 `_get_cookies_from_request` 函数会检测到 Cookie 为空,并**自动降级 (Fallback)** 去读取 `cookie.txt` 中的内容来完成请求。 + + + +## 🔌 API 接口文档 (V9) + + + +原版的 API 文档 已失效。以下是当前 `main.py` 提供的有效接口。 + +**请求说明**: + +- 所有需要鉴权的接口(如下载、解析高音质),都**必须**在 JSON 请求体中包含 `cookie` 字段。 +- 前端 `index.html` 的 `getApiPayload` 函数会自动完成这一操作。 + +------ + + + +### /login/qr/generate + + + +- **功能**:生成一个用于扫码登录的二维码 Key 和 Base64 图像。 + +- **方法**:`POST` + +- **请求 (JSON)**:`{}` (无参数) + +- **响应 (JSON)**: + + JSON + + ``` + { + "success": true, + "data": { + "qr_key": "xxx-xxx-xxx", + "qr_img_b64": "data:image/png;base64,..." + } + } + ``` + +------ + + + +### /login/qr/check + + + +- **功能**:轮询检查二维码的扫码状态。 + +- **方法**:`POST` + +- **请求 (JSON)**: + + JSON + + ``` + { + "qr_key": "xxx-xxx-xxx" + } + ``` + +- **响应 (JSON)** (code: `801`=等待, `802`=已扫码, `803`=成功, `800`=过期): + + JSON + + ``` + { + "success": true, + "data": { + "code": 803, + "cookie": "MUSIC_U=...; __csrf=...;", + "message": "登录成功" + } + } + ``` + +------ + + + +### /search + + + +- **功能**:搜索歌曲。 + +- **方法**:`POST` + +- **请求 (JSON)**: + + JSON + + ``` + { + "keyword": "蓝莲花", + "cookie": "...(来自 localStorage)" + } + ``` + +------ + + + +### /song + + + +- **功能**:解析单曲详情和播放链接。 + +- **方法**:`POST` + +- **请求 (JSON)**: + + JSON + + ``` + { + "url": "歌曲ID或链接", + "level": "lossless", + "cookie": "...(来自 localStorage)" + } + ``` + +------ + + + +### /playlist + + + +- **功能**:解析歌单详情。 + +- **方法**:`POST` + +- **请求 (JSON)**: + + JSON + + ``` + { + "id": "歌单ID或链接", + "cookie": "...(来自 localStorage)" + } + ``` + +------ + + + +### /album + + + +- **功能**:解析专辑详情。 + +- **方法**:`POST` + +- **请求 (JSON)**: + + JSON + + ``` + { + "id": "专辑ID或链接", + "cookie": "...(来自 localStorage)" + } + ``` + +------ + + + +### /download_song_zip + + + +- **功能**:打包下载单曲(含封面、歌词)。 + +- **方法**:`POST` + +- **请求 (JSON)**: + + JSON + + ``` + { + "id": "歌曲ID", + "quality": "lossless", + "cookie": "...(来自 localStorage)" + } + ``` + +- **响应**:`application/zip` 文件流。 + +------ + + + +### /download_playlist_zip + + + +- **功能**:打包下载整个歌单。 + +- **方法**:`POST` + +- **请求 (JSON)**: + + JSON + + ``` + { + "id": "歌单ID", + "quality": "lossless", + "cookie": "...(来自 localStorage)" + } + ``` + +- **响应**:`application/zip` 文件流。 + +------ + + + +### /download_album_zip + + + +- **功能**:打包下载整个专辑。 + +- **方法**:`POST` + +- **请求 (JSON)**: + + JSON + + ``` + { + "id": "专辑ID", + "quality": "lossless", + "cookie": "...(来自 localStorage)" + } + ``` + +- **响应**:`application/zip` 文件流。 + + + +## 📝 架构与开发说明 + + + + + +### V9 核心架构 + + + +本项目成功的关键在于**“责任分离”**: + +1. **`main.py` (服务层)**:作为“总指挥”。它**只负责**编排请求 (接收 HTTP、解析参数) 和响应 (返回 JSON 或 Zip)。它**不**包含任何Cookie管理或API请求的硬编码逻辑。 +2. **`qr_login.py` (认证模块)**:**只负责**生成和检查二维码。 +3. **`music_api.py` (API 模块)**:**只负责**调用网易云的 *JSON* API (如搜索、歌单详情)。它必须是“无状态的”,并接收 `cookies` 作为参数。 +4. **`music_downloader.py` (下载模块)**:**只负责**获取 *文件* API (如歌曲 URL、歌词),并处理打包逻辑。它也必须接收 `cookies` 参数。 +5. **`cookie_manager.py` (工具模块)**:**只负责**解析 Cookie 字符串 (`parse_cookie_string`) 和读取 `cookie.txt` (作为备用)。 +6. **`index.html` (客户端)**:作为“用户界面”和“**凭证保险箱**”。它**全权负责**存储 `userCookie`,并在**每一次** API 调用时主动提供它。 + + + +### 项目结构 (V9) + + + +``` +Netease_url/ +├── main.py # [!! 重构 !!] Flask 主程序, 路由层 +├── music_api.py # API 核心模块 (基本不变, 仅被调用) +├── music_downloader.py # [!! 重构 !!] 下载模块 (已解耦, 需传入 cookie) +├── cookie_manager.py # Cookie 管理工具 (现主要用于解析) +├── qr_login.py # [!! 重构 !!] 从脚本变为 QR 登录 API 模块 +├── templates/ +│ └── index.html # [!! 重构 !!] V9 前端应用 +├── static/ # [!! 新增 !!] 静态文件目录 +│ └── favicon.ico # [!! 新增 !!] 网站图标 +├── requirements.txt +├── cookie.txt # (可选) 仅用于本地开发模式 +└── README_v9.md # (本文档) +``` + + + +### 未来升级的基石 (v10 展望) + + + +基于 V9 的“无状态后端 + 客户端凭证”架构,我们为下一阶段的升级打下了完美的基础: + +- **实现“我的音乐”**: + - 后端:在 `music_api.py` 中新增 `get_user_playlists(uid, cookies)` 函数。 + - 后端:在 `main.py` 中新增 `/my/playlists` 路由,它调用上述函数。 + - 前端:在登录成功后,`fetch('/my/playlists')` 并在 `index.html` 中渲染一个新区域。 +- **实现“真实进度条”**: + - 后端:`main.py` 引入 `Flask-SocketIO`。 + - 后端:`_package_tracks_as_zip` 在 `for` 循环中 `socketio.emit` 真实进度。 + - 前端:`handleDownloadClick` 不再使用假进度 `setInterval`,而是 `socket.on('progress', ...)` 来更新进度条。 +- **实现“歌单即时播放”**: + - 前端:在 `renderCollection` 中增加“播放全部”按钮。 + - 前端:点击后,循环 `tracks` 列表,为每首歌调用 `/song` 接口。 + - 前端:将所有返回的 `data.url` 组装成一个列表,一次性 `globalAPlayer.list.add([...])`。 \ No newline at end of file diff --git a/cookie.txt b/cookie.txt new file mode 100644 index 0000000..5000680 --- /dev/null +++ b/cookie.txt @@ -0,0 +1 @@ +MUSIC_U= \ No newline at end of file diff --git a/cookie_manager.py b/cookie_manager.py new file mode 100644 index 0000000..9275b00 --- /dev/null +++ b/cookie_manager.py @@ -0,0 +1,469 @@ +"""Cookie管理器模块 + +提供网易云音乐Cookie管理功能,包括: +- Cookie文件读取和写入 +- Cookie格式验证和解析 +- Cookie有效性检查 +- 自动过期处理 +""" + +import os +import json +import time +from typing import Dict, Optional, List, Tuple, Any +from pathlib import Path +from dataclasses import dataclass +from datetime import datetime, timedelta +import logging + + +@dataclass +class CookieInfo: + """Cookie信息数据类""" + name: str + value: str + domain: str = "" + path: str = "/" + expires: Optional[int] = None + secure: bool = False + http_only: bool = False + + +class CookieException(Exception): + """Cookie相关异常类""" + pass + + +class CookieManager: + """Cookie管理器主类""" + + def __init__(self, cookie_file: str = "cookie.txt"): + """ + 初始化Cookie管理器 + + Args: + cookie_file: Cookie文件路径 + """ + self.cookie_file = Path(cookie_file) + self.logger = logging.getLogger(__name__) + + # 网易云音乐相关的重要Cookie字段 + self.important_cookies = { + 'MUSIC_U', # 用户标识 + 'MUSIC_A', # 用户认证 + '__csrf', # CSRF令牌 + 'NMTID', # 设备标识 + 'WEVNSM', # 会话管理 + 'WNMCID', # 客户端标识 + } + + # 确保cookie文件存在 + self._ensure_cookie_file_exists() + + def _ensure_cookie_file_exists(self) -> None: + """确保Cookie文件存在""" + if not self.cookie_file.exists(): + self.cookie_file.touch() + self.logger.info(f"创建Cookie文件: {self.cookie_file}") + + def read_cookie(self) -> str: + """读取Cookie文件内容 + + Returns: + Cookie字符串内容 + + Raises: + CookieException: 读取失败时抛出 + """ + try: + if not self.cookie_file.exists(): + self.logger.warning(f"Cookie文件不存在: {self.cookie_file}") + return "" + + content = self.cookie_file.read_text(encoding='utf-8').strip() + + if not content: + self.logger.warning("Cookie文件为空") + return "" + + self.logger.debug(f"成功读取Cookie文件,长度: {len(content)}") + return content + + except UnicodeDecodeError as e: + raise CookieException(f"Cookie文件编码错误: {e}") + except PermissionError as e: + raise CookieException(f"没有权限读取Cookie文件: {e}") + except Exception as e: + raise CookieException(f"读取Cookie文件失败: {e}") + + def write_cookie(self, cookie_content: str) -> bool: + """写入Cookie到文件 + + Args: + cookie_content: Cookie内容字符串 + + Returns: + 是否写入成功 + + Raises: + CookieException: 写入失败时抛出 + """ + try: + if not cookie_content or not cookie_content.strip(): + raise CookieException("Cookie内容不能为空") + + # 验证Cookie格式 + if not self.validate_cookie_format(cookie_content): + raise CookieException("Cookie格式无效") + + # 写入文件 + self.cookie_file.write_text(cookie_content.strip(), encoding='utf-8') + + self.logger.info(f"成功写入Cookie到文件: {self.cookie_file}") + return True + + except PermissionError as e: + raise CookieException(f"没有权限写入Cookie文件: {e}") + except Exception as e: + raise CookieException(f"写入Cookie文件失败: {e}") + + def parse_cookies(self) -> Dict[str, str]: + """解析Cookie字符串为字典 + + Returns: + Cookie字典 + + Raises: + CookieException: 解析失败时抛出 + """ + try: + cookie_content = self.read_cookie() + if not cookie_content: + return {} + + return self.parse_cookie_string(cookie_content) + + except Exception as e: + raise CookieException(f"解析Cookie失败: {e}") + + def parse_cookie_string(self, cookie_string: str) -> Dict[str, str]: + """解析Cookie字符串 + + Args: + cookie_string: Cookie字符串 + + Returns: + Cookie字典 + """ + if not cookie_string or not cookie_string.strip(): + return {} + + cookies = {} + + try: + # 处理多种Cookie格式 + cookie_string = cookie_string.strip() + + # 分割Cookie项 + cookie_pairs = [] + if ';' in cookie_string: + cookie_pairs = cookie_string.split(';') + elif '\n' in cookie_string: + cookie_pairs = cookie_string.split('\n') + else: + cookie_pairs = [cookie_string] + + for pair in cookie_pairs: + pair = pair.strip() + if not pair or '=' not in pair: + continue + + # 分割键值对 + key, value = pair.split('=', 1) + key = key.strip() + value = value.strip() + + if key and value: + cookies[key] = value + + self.logger.debug(f"解析得到 {len(cookies)} 个Cookie项") + return cookies + + except Exception as e: + self.logger.error(f"解析Cookie字符串失败: {e}") + return {} + + def validate_cookie_format(self, cookie_string: str) -> bool: + """验证Cookie格式是否有效 + + Args: + cookie_string: Cookie字符串 + + Returns: + 是否格式有效 + """ + if not cookie_string or not cookie_string.strip(): + return False + + try: + # 尝试解析Cookie + cookies = self.parse_cookie_string(cookie_string) + + # 检查是否至少包含一个有效的Cookie + if not cookies: + return False + + # 检查Cookie名称是否合法 + for name, value in cookies.items(): + if not name or not isinstance(name, str): + return False + if not isinstance(value, str): + return False + # 检查是否包含非法字符 + if any(char in name for char in [' ', '\t', '\n', '\r', ';', ',']): + return False + + return True + + except Exception: + return False + + def is_cookie_valid(self) -> bool: + """检查Cookie是否有效 + + Returns: + Cookie是否有效 + """ + try: + cookies = self.parse_cookies() + + if not cookies: + self.logger.warning("Cookie为空") + return False + + # 检查重要Cookie是否存在 + missing_cookies = self.important_cookies - set(cookies.keys()) + if missing_cookies: + self.logger.warning(f"缺少重要Cookie: {missing_cookies}") + return False + + # 检查MUSIC_U是否有效(基本验证) + music_u = cookies.get('MUSIC_U', '') + if not music_u or len(music_u) < 10: + self.logger.warning("MUSIC_U Cookie无效") + return False + + self.logger.debug("Cookie验证通过") + return True + + except Exception as e: + self.logger.error(f"Cookie验证失败: {e}") + return False + + def get_cookie_info(self) -> Dict[str, Any]: + """获取Cookie详细信息 + + Returns: + 包含Cookie信息的字典 + """ + try: + cookies = self.parse_cookies() + + info = { + 'file_path': str(self.cookie_file), + 'file_exists': self.cookie_file.exists(), + 'file_size': self.cookie_file.stat().st_size if self.cookie_file.exists() else 0, + 'cookie_count': len(cookies), + 'is_valid': self.is_cookie_valid(), + 'important_cookies_present': list(self.important_cookies & set(cookies.keys())), + 'missing_important_cookies': list(self.important_cookies - set(cookies.keys())), + 'all_cookie_names': list(cookies.keys()) + } + + # 添加文件修改时间 + if self.cookie_file.exists(): + mtime = self.cookie_file.stat().st_mtime + info['last_modified'] = datetime.fromtimestamp(mtime).isoformat() + + return info + + except Exception as e: + return { + 'error': str(e), + 'file_path': str(self.cookie_file), + 'file_exists': False, + 'is_valid': False + } + + def backup_cookie(self, backup_suffix: str = None) -> str: + """备份Cookie文件 + + Args: + backup_suffix: 备份文件后缀,默认使用时间戳 + + Returns: + 备份文件路径 + + Raises: + CookieException: 备份失败时抛出 + """ + try: + if not self.cookie_file.exists(): + raise CookieException("Cookie文件不存在,无法备份") + + if backup_suffix is None: + backup_suffix = datetime.now().strftime("%Y%m%d_%H%M%S") + + backup_path = self.cookie_file.with_suffix(f".{backup_suffix}.bak") + + # 复制文件内容 + content = self.cookie_file.read_text(encoding='utf-8') + backup_path.write_text(content, encoding='utf-8') + + self.logger.info(f"Cookie备份成功: {backup_path}") + return str(backup_path) + + except Exception as e: + raise CookieException(f"备份Cookie文件失败: {e}") + + def restore_cookie(self, backup_path: str) -> bool: + """从备份恢复Cookie + + Args: + backup_path: 备份文件路径 + + Returns: + 是否恢复成功 + + Raises: + CookieException: 恢复失败时抛出 + """ + try: + backup_file = Path(backup_path) + if not backup_file.exists(): + raise CookieException(f"备份文件不存在: {backup_path}") + + # 读取备份内容 + backup_content = backup_file.read_text(encoding='utf-8') + + # 验证备份内容 + if not self.validate_cookie_format(backup_content): + raise CookieException("备份文件中的Cookie格式无效") + + # 写入当前Cookie文件 + self.write_cookie(backup_content) + + self.logger.info(f"从备份恢复Cookie成功: {backup_path}") + return True + + except Exception as e: + raise CookieException(f"恢复Cookie失败: {e}") + + def clear_cookie(self) -> bool: + """清空Cookie文件 + + Returns: + 是否清空成功 + """ + try: + if self.cookie_file.exists(): + self.cookie_file.write_text("", encoding='utf-8') + self.logger.info("Cookie文件已清空") + return True + + except Exception as e: + self.logger.error(f"清空Cookie文件失败: {e}") + return False + + def update_cookie(self, new_cookies: Dict[str, str]) -> bool: + """更新Cookie + + Args: + new_cookies: 新的Cookie字典 + + Returns: + 是否更新成功 + """ + try: + if not new_cookies: + raise CookieException("新Cookie不能为空") + + # 读取现有Cookie + existing_cookies = self.parse_cookies() + + # 合并Cookie + existing_cookies.update(new_cookies) + + # 转换为Cookie字符串 + cookie_string = '; '.join(f"{k}={v}" for k, v in existing_cookies.items()) + + # 写入文件 + return self.write_cookie(cookie_string) + + except Exception as e: + self.logger.error(f"更新Cookie失败: {e}") + return False + + def get_cookie_for_request(self) -> Dict[str, str]: + """获取用于HTTP请求的Cookie字典 + + Returns: + 适用于requests库的Cookie字典 + """ + try: + cookies = self.parse_cookies() + + # 过滤掉空值 + filtered_cookies = {k: v for k, v in cookies.items() if k and v} + + return filtered_cookies + + except Exception as e: + self.logger.error(f"获取请求Cookie失败: {e}") + return {} + + def format_cookie_string(self, cookies: Dict[str, str]) -> str: + """将Cookie字典格式化为字符串 + + Args: + cookies: Cookie字典 + + Returns: + Cookie字符串 + """ + if not cookies: + return "" + + return '; '.join(f"{k}={v}" for k, v in cookies.items() if k and v) + + def __str__(self) -> str: + """字符串表示""" + info = self.get_cookie_info() + return f"CookieManager(file={info['file_path']}, valid={info['is_valid']}, count={info['cookie_count']})" + + def __repr__(self) -> str: + """详细字符串表示""" + return self.__str__() + + +if __name__ == "__main__": + # 测试代码 + import sys + + # 配置日志 + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') + + manager = CookieManager() + + print("Cookie管理器模块") + print("支持的功能:") + print("- Cookie文件读写") + print("- Cookie格式验证") + print("- Cookie有效性检查") + print("- Cookie备份和恢复") + print("- Cookie信息查看") + + # 显示当前Cookie信息 + info = manager.get_cookie_info() + print(f"\n当前Cookie状态: {manager}") + print(f"详细信息: {info}") diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..7368b1c --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,10 @@ +services: + netease-url: + build: + context: . + dockerfile: Dockerfile + env_file: + - .env + ports: + - "5000:5000" + restart: unless-stopped diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml new file mode 100644 index 0000000..f522926 --- /dev/null +++ b/docker/docker-compose.yml @@ -0,0 +1,9 @@ +version: "3.9" + +services: + netease-url: + image: hiyuelin/netease-url:latest + container_name: netease-url + ports: + - "56315:5000" + restart: unless-stopped diff --git a/entrypoint.sh b/entrypoint.sh new file mode 100644 index 0000000..e039143 --- /dev/null +++ b/entrypoint.sh @@ -0,0 +1,2 @@ +#!/bin/sh +python3 main.py --level "${LEVEL:-lossless}" --mode "${MODE:-api}" --url "${URL:-''}" diff --git a/main.py b/main.py new file mode 100644 index 0000000..883c77a --- /dev/null +++ b/main.py @@ -0,0 +1,343 @@ +# --- main.py (v8.0 - The Ground-Up Rewrite) --- +# [!! 已修改 - 增加了智能错误提示 !!] + +import logging, sys, time, traceback, io, zipfile, requests, re, os, tempfile, shutil +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, Any +from urllib.parse import quote +from flask import Flask, request, send_file, render_template, Response, jsonify + +try: + from mutagen.flac import FLAC, Picture + from mutagen.mp3 import MP3 + from mutagen.id3 import ID3, APIC + MUTAGEN_AVAILABLE = True +except ImportError: + MUTAGEN_AVAILABLE = False + +try: + from music_api import NeteaseAPI, APIException, search_music, playlist_detail, album_detail + from cookie_manager import CookieManager, CookieException, CookieManager + import qr_login + from music_downloader import MusicDownloader, DownloadException +except ImportError as e: + print(f"导入模块失败: {e}\n请确保所有依赖模块存在且可用") + sys.exit(1) + +@dataclass +class APIConfig: + host: str = '0.0.0.0'; port: int = 5000; debug: bool = False + log_level: str = 'INFO'; cors_origins: str = '*' + +class APIResponse: + @staticmethod + def success(data: Any = None, message: str = 'success'): return jsonify({'success': True, 'message': message, 'data': data}), 200 + @staticmethod + def error(message: str, status_code: int = 400): return jsonify({'success': False, 'message': message}), status_code + +class MusicAPIService: + def __init__(self, config: APIConfig): + self.config = config + self.logger = self._setup_logger() + self.cookie_manager = CookieManager() + self.netease_api = NeteaseAPI() + self.downloader = MusicDownloader() + if not MUTAGEN_AVAILABLE: self.logger.warning("`mutagen` 库未安装, 打包文件将不含独立封面。请运行 `pip install mutagen`") + + def _setup_logger(self): + logger = logging.getLogger('music_api'); logger.setLevel(logging.INFO) + if not logger.handlers: + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + ch = logging.StreamHandler(); ch.setFormatter(formatter); logger.addHandler(ch) + return logger + + def _get_cookies_from_request(self) -> Dict[str, str]: + data = self._safe_get_request_data() + user_cookie_str = data.get('cookie') + + if user_cookie_str: + self.logger.debug("正在使用前端提供的用户Cookie") + return self.cookie_manager.parse_cookie_string(user_cookie_str) + + try: + self.logger.debug("未使用用户Cookie,降级到服务器 cookie.txt") + return self.cookie_manager.parse_cookies() + except CookieException: + self.logger.error("读取服务器 cookie.txt 失败") + return {} + + def _extract_id(self, id_or_url, id_type): + patterns = [rf'{id_type}\?id=(\d+)', rf'\/{id_type}\/(\d+)'] + for p in patterns: + match = re.search(p, str(id_or_url)) + if match: return match.group(1) + return str(id_or_url).strip() + + def _get_quality_display_name(self, quality): + names = {'standard':"标准",'exhigh':"极高",'lossless':"无损",'hires':"Hi-Res",'sky':"环绕声",'jyeffect':"高清环绕",'jymaster':"母带"} + return names.get(quality, quality) + + def _safe_get_request_data(self): + return request.get_json(silent=True) or dict(request.form) or dict(request.args) + + def _create_zip_response(self, memory_file, filename): + response = Response(memory_file.getvalue(), mimetype='application/zip') + safe_filename = re.sub(r'[\\/*?:"<>|]', "", filename) + encoded_filename = quote(safe_filename) + response.headers['Content-Disposition'] = f"attachment; filename*=UTF-8''{encoded_filename}" + return response + + def _embed_cover_on_disk(self, audio_path, cover_path, file_type): + if not MUTAGEN_AVAILABLE or not cover_path or not os.path.exists(cover_path): return + try: + with open(cover_path, 'rb') as f: + cover_data = f.read() + + if file_type == 'flac': + audio = FLAC(audio_path) + pic = Picture(); pic.type = 3; pic.mime = "image/jpeg"; pic.data = cover_data + audio.add_picture(pic) + elif file_type == 'mp3': + audio = MP3(audio_path, ID3=ID3) + if audio.tags is None: audio.add_tags() + audio.tags.add(APIC(encoding=3, mime='image/jpeg', type=3, desc='Cover', data=cover_data)) + else: return + audio.save() + except Exception as e: + self.logger.error(f"在磁盘嵌入封面失败 ({file_type}): {e}") + + def _package_tracks_as_zip(self, tracks, quality, collection_name, collection_cover_url, cookies: Dict[str, str]): + temp_dir = tempfile.mkdtemp() + self.logger.info(f"创建临时目录: {temp_dir}") + zip_buffer = io.BytesIO() + + try: + with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf: + if collection_cover_url: + try: zf.writestr('cover.jpg', requests.get(collection_cover_url, timeout=10).content) + except Exception: pass + + for i, track in enumerate(tracks): + song_id = track.get('id') + if not song_id: continue + self.logger.info(f"[{i+1}/{len(tracks)}] 打包中: {track.get('name')}") + + try: + music_info = self.downloader.get_music_info(int(song_id), quality, cookies) + if not music_info.download_url: raise DownloadException("获取下载链接失败") + + base_fn = self.downloader._sanitize_filename(f"{music_info.artists} - {music_info.name}") + audio_filename = f"{base_fn}.{music_info.file_type}" + audio_path = os.path.join(temp_dir, audio_filename) + + with requests.get(music_info.download_url, stream=True, timeout=120) as r: + r.raise_for_status() + with open(audio_path, 'wb') as f: + for chunk in r.iter_content(chunk_size=8192): + f.write(chunk) + + if music_info.pic_url: + cover_path = os.path.join(temp_dir, f"cover_{song_id}.jpg") + try: + with requests.get(music_info.pic_url, stream=True, timeout=20) as r: + r.raise_for_status() + with open(cover_path, 'wb') as f: + shutil.copyfileobj(r.raw, f) + self._embed_cover_on_disk(audio_path, cover_path, music_info.file_type) + except Exception as e: + self.logger.warning(f"下载或嵌入封面失败 for song {song_id}: {e}") + + zf.write(audio_path, arcname=audio_filename) + + if music_info.lyric: + zf.writestr(f"{base_fn}.lrc", music_info.lyric.encode('utf-8')) + + except Exception as e: + # [!! 已修改 - 歌单打包时的错误提示 !!] + error_msg = f"歌曲 '{track.get('name')}' (ID: {song_id}) 处理失败: {e}" + if ("无可用的下载链接" in str(e) or "播放链接" in str(e)): + # 检查传入的 cookies 字典是否为空或没有 MUSIC_U,来*猜测*是否为登录问题 + # 这是一个不完美的猜测,但优于没有 + if not cookies.get('MUSIC_U'): + error_msg = f"歌曲 '{track.get('name')}' (ID: {song_id}) 处理失败: 需要会员权限" + else: + error_msg = f"歌曲 '{track.get('name')}' (ID: {song_id}) 处理失败: 权限不足或Cookie过期" + + self.logger.error(error_msg) + zf.writestr(f"_error_log_ID_{song_id}.txt", error_msg.encode('utf-8')) + + zip_buffer.seek(0) + download_name = f"{collection_name} [{self._get_quality_display_name(quality)}].zip" + return self._create_zip_response(zip_buffer, download_name) + finally: + shutil.rmtree(temp_dir) + self.logger.info(f"删除临时目录: {temp_dir}") + +config = APIConfig(); app = Flask(__name__) +api_service = MusicAPIService(config) + +@app.after_request +def after_request(response: Response) -> Response: + response.headers.add('Access-Control-Allow-Origin', config.cors_origins) + response.headers.add('Access-Control-Expose-Headers', 'Content-Disposition') + response.headers.add('Access-Control-Allow-Headers', 'Content-Type') + response.headers.add('Access-Control-Allow-Methods', 'GET, POST') + return response + +@app.route('/') +def index(): return render_template('index.html') + +@app.route('/login/qr/generate', methods=['POST']) +def qr_generate(): + try: + result = qr_login.api_generate_qr_key() + if result['success']: + return APIResponse.success(result) + else: + return APIResponse.error(result.get('message', '生成二维码失败'), 500) + except Exception as e: + return APIResponse.error(f"生成二维码异常: {e}", 500) + +@app.route('/login/qr/check', methods=['POST']) +def qr_check(): + data = api_service._safe_get_request_data() + qr_key = data.get('qr_key') + if not qr_key: + return APIResponse.error("参数 'qr_key' 不能为空") + + try: + result = qr_login.api_check_qr_status(qr_key) + return APIResponse.success(result) + except Exception as e: + return APIResponse.error(f"检查状态异常: {e}", 500) + +@app.route('/song', methods=['POST']) +def get_song_info(): + data = api_service._safe_get_request_data(); url = data.get('url'); level = data.get('level', 'lossless') + # [!! 已修改 !!] 检查用户是否*发送*了cookie + user_was_logged_in = bool(data.get('cookie')) + + if not url: return APIResponse.error("参数 'url' 不能为空") + + music_id = api_service._extract_id(url, 'song') + try: + cookies = api_service._get_cookies_from_request() + music_info = api_service.downloader.get_music_info(int(music_id), level, cookies) + + response_data = { + 'id': music_id, 'name': music_info.name, 'ar_name': music_info.artists, + 'al_name': music_info.album, 'pic': music_info.pic_url, 'lyric': music_info.lyric, + 'url': music_info.download_url, + 'size': f"{music_info.file_size/1024/1024:.2f} MB" if music_info.file_size > 0 else "N/A", + 'level': api_service._get_quality_display_name(level), + 'file_type': music_info.file_type + } + return APIResponse.success(response_data) + except Exception as e: + # [!! 已修改 - 智能错误处理 !!] + error_msg = str(e) + if "无可用的下载链接" in error_msg or "播放链接" in error_msg: + if not user_was_logged_in: + error_msg = "解析失败:此音质或歌曲需要会员权限。请点击右上角登录黑胶会员账号后再试。" + else: + error_msg = "解析失败:您的账号可能不是黑胶会员,Cookie已过期,或歌曲已下架。" + + return APIResponse.error(error_msg, 502) + + +@app.route('/search', methods=['POST']) +def search_music_api(): + data = api_service._safe_get_request_data(); keyword = data.get('keyword') + if not keyword: return APIResponse.error("参数 'keyword' 不能为空") + try: + cookies = api_service._get_cookies_from_request() + return APIResponse.success(search_music(keyword, cookies, limit=30)) + except APIException as e: return APIResponse.error(str(e), 502) + +@app.route('/playlist', methods=['POST']) +def get_playlist(): + data = api_service._safe_get_request_data(); pid = data.get('id') + if not pid: return APIResponse.error("参数 'id' 不能为空") + playlist_id = api_service._extract_id(pid, 'playlist') + try: + cookies = api_service._get_cookies_from_request() + return APIResponse.success({'playlist': playlist_detail(playlist_id, cookies)}) + except APIException as e: return APIResponse.error(str(e), 502) + +@app.route('/album', methods=['POST']) +def get_album(): + data = api_service._safe_get_request_data(); aid = data.get('id') + if not aid: return APIResponse.error("参数 'id' 不能为空") + album_id = api_service._extract_id(aid, 'album') + try: + cookies = api_service._get_cookies_from_request() + return APIResponse.success({'album': album_detail(album_id, cookies)}) + except APIException as e: return APIResponse.error(str(e), 502) + +@app.route('/download_song_zip', methods=['POST']) +def download_song_zip(): + data = api_service._safe_get_request_data() + song_id = data.get('id'); quality = data.get('quality', 'lossless') + # [!! 已修改 !!] 检查用户是否*发送*了cookie + user_was_logged_in = bool(data.get('cookie')) + + if not song_id: return APIResponse.error("参数 'id' 不能为空", 400) + + try: + cookies = api_service._get_cookies_from_request() + music_info = api_service.downloader.get_music_info(int(song_id), quality, cookies) + song_as_track = [{'id': song_id, 'name': music_info.name}] + return api_service._package_tracks_as_zip(song_as_track, quality, music_info.name, None, cookies) + except Exception as e: + api_service.logger.error(f"打包单曲异常: {e}\n{traceback.format_exc}") + + # [!! 已修改 - 智能错误处理 !!] + error_msg = str(e) + if "无可用的下载链接" in error_msg or "播放链接" in error_msg: + if not user_was_logged_in: + error_msg = "打包失败:此音质或歌曲需要会员权限。请点击右上角登录黑胶会员账号后再试。" + else: + error_msg = "打包失败:您的账号可能不是黑胶会员,Cookie已过期,或歌曲已下架。" + else: + error_msg = f"打包失败: {str(e)}" + + return APIResponse.error(error_msg, 500) + +@app.route('/download_playlist_zip', methods=['POST']) +def download_playlist_zip(): + data = api_service._safe_get_request_data(); pid = data.get('id'); quality = data.get('quality', 'lossless') + if not pid: return APIResponse.error("参数 'id' 不能为空", 400) + playlist_id = api_service._extract_id(pid, 'playlist') + try: + cookies = api_service._get_cookies_from_request() + playlist_info = playlist_detail(playlist_id, cookies) + if not playlist_info or not playlist_info.get('tracks'): return APIResponse.error("歌单信息获取失败或歌单为空", 404) + return api_service._package_tracks_as_zip( + playlist_info['tracks'], quality, playlist_info.get('name'), + playlist_info.get('coverImgUrl'), cookies + ) + except Exception as e: + api_service.logger.error(f"打包歌单异常: {e}\n{traceback.format_exc()}"); return APIResponse.error(f"打包失败: {str(e)}", 500) + +@app.route('/download_album_zip', methods=['POST']) +def download_album_zip(): + data = api_service._safe_get_request_data(); aid = data.get('id'); quality = data.get('quality', 'lossless') + if not aid: return APIResponse.error("参数 'id' 不能为空", 400) + album_id = api_service._extract_id(aid, 'album') + try: + cookies = api_service._get_cookies_from_request() + album_info = album_detail(album_id, cookies) + if not album_info or not album_info.get('songs'): return APIResponse.error("专辑信息获取失败或专辑为空", 404) + return api_service._package_tracks_as_zip( + album_info['songs'], quality, album_info.get('name'), + album_info.get('coverImgUrl'), cookies + ) + except Exception as e: + api_service.logger.error(f"打包专辑异常: {e}\n{traceback.format_exc()}"); return APIResponse.error(f"打包失败: {str(e)}", 500) + +if __name__ == '__main__': + print("\n" + "="*60); print("🚀 网易云音乐工具箱 (v8.1 - 重构稳定版) 启动中..."); + print(" [!!] 已集成用户Cookie和QR登录API"); + print(f"📡 服务地址: http://{config.host}:{config.port}"); print("="*60) + app.run(host=config.host, port=config.port, debug=config.debug) \ No newline at end of file diff --git a/music_api.log b/music_api.log new file mode 100644 index 0000000..e69de29 diff --git a/music_api.py b/music_api.py new file mode 100644 index 0000000..4cac887 --- /dev/null +++ b/music_api.py @@ -0,0 +1,673 @@ +"""网易云音乐API模块 + +提供网易云音乐相关API接口的封装,包括: +- 音乐URL获取 +- 歌曲详情获取 +- 歌词获取 +- 搜索功能 +- 歌单和专辑详情 +- 二维码登录 +""" + +import json +import urllib.parse +import time +from random import randrange +from typing import Dict, List, Optional, Tuple, Any +from hashlib import md5 +from enum import Enum + +import requests +from cryptography.hazmat.primitives import padding +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + + +class QualityLevel(Enum): + """音质等级枚举""" + STANDARD = "standard" # 标准音质 + EXHIGH = "exhigh" # 极高音质 + LOSSLESS = "lossless" # 无损音质 + HIRES = "hires" # Hi-Res音质 + SKY = "sky" # 沉浸环绕声 + JYEFFECT = "jyeffect" # 高清环绕声 + JYMASTER = "jymaster" # 超清母带 + + +# 常量定义 +class APIConstants: + """API相关常量""" + AES_KEY = b"e82ckenh8dichen8" + USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36 Chrome/91.0.4472.164 NeteaseMusicDesktop/2.10.2.200154' + REFERER = 'https://music.163.com/' + + # API URLs + SONG_URL_V1 = "https://interface3.music.163.com/eapi/song/enhance/player/url/v1" + SONG_DETAIL_V3 = "https://interface3.music.163.com/api/v3/song/detail" + LYRIC_API = "https://interface3.music.163.com/api/song/lyric" + SEARCH_API = 'https://music.163.com/api/cloudsearch/pc' + PLAYLIST_DETAIL_API = 'https://music.163.com/api/v6/playlist/detail' + ALBUM_DETAIL_API = 'https://music.163.com/api/v1/album/' + QR_UNIKEY_API = 'https://interface3.music.163.com/eapi/login/qrcode/unikey' + QR_LOGIN_API = 'https://interface3.music.163.com/eapi/login/qrcode/client/login' + + # 默认配置 + DEFAULT_CONFIG = { + "os": "pc", + "appver": "", + "osver": "", + "deviceId": "pyncm!" + } + + DEFAULT_COOKIES = { + "os": "pc", + "appver": "", + "osver": "", + "deviceId": "pyncm!" + } + + +class CryptoUtils: + """加密工具类""" + + @staticmethod + def hex_digest(data: bytes) -> str: + """将字节数据转换为十六进制字符串""" + return "".join([hex(d)[2:].zfill(2) for d in data]) + + @staticmethod + def hash_digest(text: str) -> bytes: + """计算MD5哈希值""" + return md5(text.encode("utf-8")).digest() + + @staticmethod + def hash_hex_digest(text: str) -> str: + """计算MD5哈希值并转换为十六进制字符串""" + return CryptoUtils.hex_digest(CryptoUtils.hash_digest(text)) + + @staticmethod + def encrypt_params(url: str, payload: Dict[str, Any]) -> str: + """加密请求参数""" + url_path = urllib.parse.urlparse(url).path.replace("/eapi/", "/api/") + digest = CryptoUtils.hash_hex_digest(f"nobody{url_path}use{json.dumps(payload)}md5forencrypt") + params = f"{url_path}-36cd479b6b5-{json.dumps(payload)}-36cd479b6b5-{digest}" + + # AES加密 + padder = padding.PKCS7(algorithms.AES(APIConstants.AES_KEY).block_size).padder() + padded_data = padder.update(params.encode()) + padder.finalize() + cipher = Cipher(algorithms.AES(APIConstants.AES_KEY), modes.ECB()) + encryptor = cipher.encryptor() + enc = encryptor.update(padded_data) + encryptor.finalize() + + return CryptoUtils.hex_digest(enc) + + +class HTTPClient: + """HTTP客户端类""" + + @staticmethod + def post_request(url: str, params: str, cookies: Dict[str, str]) -> str: + """发送POST请求并返回文本响应""" + headers = { + 'User-Agent': APIConstants.USER_AGENT, + 'Referer': APIConstants.REFERER, + } + + request_cookies = APIConstants.DEFAULT_COOKIES.copy() + request_cookies.update(cookies) + + try: + response = requests.post(url, headers=headers, cookies=request_cookies, + data={"params": params}, timeout=30) + response.raise_for_status() + return response.text + except requests.RequestException as e: + raise APIException(f"HTTP请求失败: {e}") + + @staticmethod + def post_request_full(url: str, params: str, cookies: Dict[str, str]) -> requests.Response: + """发送POST请求并返回完整响应对象""" + headers = { + 'User-Agent': APIConstants.USER_AGENT, + 'Referer': APIConstants.REFERER, + } + + request_cookies = APIConstants.DEFAULT_COOKIES.copy() + request_cookies.update(cookies) + + try: + response = requests.post(url, headers=headers, cookies=request_cookies, + data={"params": params}, timeout=30) + response.raise_for_status() + return response + except requests.RequestException as e: + raise APIException(f"HTTP请求失败: {e}") + + +class APIException(Exception): + """API异常类""" + pass + + +class NeteaseAPI: + """网易云音乐API主类""" + + def __init__(self): + self.http_client = HTTPClient() + self.crypto_utils = CryptoUtils() + + def get_song_url(self, song_id: int, quality: str, cookies: Dict[str, str]) -> Dict[str, Any]: + """获取歌曲播放URL + + Args: + song_id: 歌曲ID + quality: 音质等级 (standard, exhigh, lossless, hires, sky, jyeffect, jymaster) + cookies: 用户cookies + + Returns: + 包含歌曲URL信息的字典 + + Raises: + APIException: API调用失败时抛出 + """ + try: + config = APIConstants.DEFAULT_CONFIG.copy() + config["requestId"] = str(randrange(20000000, 30000000)) + + payload = { + 'ids': [song_id], + 'level': quality, + 'encodeType': 'flac', + 'header': json.dumps(config), + } + + if quality == 'sky': + payload['immerseType'] = 'c51' + + params = self.crypto_utils.encrypt_params(APIConstants.SONG_URL_V1, payload) + response_text = self.http_client.post_request(APIConstants.SONG_URL_V1, params, cookies) + + result = json.loads(response_text) + if result.get('code') != 200: + raise APIException(f"获取歌曲URL失败: {result.get('message', '未知错误')}") + + return result + except (json.JSONDecodeError, KeyError) as e: + raise APIException(f"解析响应数据失败: {e}") + + def get_song_detail(self, song_id: int) -> Dict[str, Any]: + """获取歌曲详细信息 + + Args: + song_id: 歌曲ID + + Returns: + 包含歌曲详细信息的字典 + + Raises: + APIException: API调用失败时抛出 + """ + try: + data = {'c': json.dumps([{"id": song_id, "v": 0}])} + response = requests.post(APIConstants.SONG_DETAIL_V3, data=data, timeout=30) + response.raise_for_status() + + result = response.json() + if result.get('code') != 200: + raise APIException(f"获取歌曲详情失败: {result.get('message', '未知错误')}") + + return result + except requests.RequestException as e: + raise APIException(f"获取歌曲详情请求失败: {e}") + except json.JSONDecodeError as e: + raise APIException(f"解析歌曲详情响应失败: {e}") + + def get_lyric(self, song_id: int, cookies: Dict[str, str]) -> Dict[str, Any]: + """获取歌词信息 + + Args: + song_id: 歌曲ID + cookies: 用户cookies + + Returns: + 包含歌词信息的字典 + + Raises: + APIException: API调用失败时抛出 + """ + try: + data = { + 'id': song_id, + 'cp': 'false', + 'tv': '0', + 'lv': '0', + 'rv': '0', + 'kv': '0', + 'yv': '0', + 'ytv': '0', + 'yrv': '0' + } + + headers = { + 'User-Agent': APIConstants.USER_AGENT, + 'Referer': APIConstants.REFERER + } + + response = requests.post(APIConstants.LYRIC_API, data=data, + headers=headers, cookies=cookies, timeout=30) + response.raise_for_status() + + result = response.json() + if result.get('code') != 200: + raise APIException(f"获取歌词失败: {result.get('message', '未知错误')}") + + return result + except requests.RequestException as e: + raise APIException(f"获取歌词请求失败: {e}") + except json.JSONDecodeError as e: + raise APIException(f"解析歌词响应失败: {e}") + + def search_music(self, keywords: str, cookies: Dict[str, str], limit: int = 10) -> List[Dict[str, Any]]: + """搜索音乐 + + Args: + keywords: 搜索关键词 + cookies: 用户cookies + limit: 返回数量限制 + + Returns: + 歌曲信息列表 + + Raises: + APIException: API调用失败时抛出 + """ + try: + data = {'s': keywords, 'type': 1, 'limit': limit} + headers = { + 'User-Agent': APIConstants.USER_AGENT, + 'Referer': APIConstants.REFERER + } + + response = requests.post(APIConstants.SEARCH_API, data=data, + headers=headers, cookies=cookies, timeout=30) + response.raise_for_status() + + result = response.json() + if result.get('code') != 200: + raise APIException(f"搜索失败: {result.get('message', '未知错误')}") + + songs = [] + for item in result.get('result', {}).get('songs', []): + song_info = { + 'id': item['id'], + 'name': item['name'], + 'artists': '/'.join(artist['name'] for artist in item['ar']), + 'album': item['al']['name'], + 'picUrl': item['al']['picUrl'] + } + songs.append(song_info) + + return songs + except requests.RequestException as e: + raise APIException(f"搜索请求失败: {e}") + except (json.JSONDecodeError, KeyError) as e: + raise APIException(f"解析搜索响应失败: {e}") + + def get_playlist_detail(self, playlist_id: int, cookies: Dict[str, str]) -> Dict[str, Any]: + """获取歌单详情 + + Args: + playlist_id: 歌单ID + cookies: 用户cookies + + Returns: + 歌单详情信息 + + Raises: + APIException: API调用失败时抛出 + """ + try: + data = {'id': playlist_id} + headers = { + 'User-Agent': APIConstants.USER_AGENT, + 'Referer': APIConstants.REFERER + } + + response = requests.post(APIConstants.PLAYLIST_DETAIL_API, data=data, + headers=headers, cookies=cookies, timeout=30) + response.raise_for_status() + + result = response.json() + if result.get('code') != 200: + raise APIException(f"获取歌单详情失败: {result.get('message', '未知错误')}") + + playlist = result.get('playlist', {}) + info = { + 'id': playlist.get('id'), + 'name': playlist.get('name'), + 'coverImgUrl': playlist.get('coverImgUrl'), + 'creator': playlist.get('creator', {}).get('nickname', ''), + 'trackCount': playlist.get('trackCount'), + 'description': playlist.get('description', ''), + 'tracks': [] + } + + # 获取所有trackIds并分批获取详细信息 + track_ids = [str(t['id']) for t in playlist.get('trackIds', [])] + for i in range(0, len(track_ids), 100): + batch_ids = track_ids[i:i+100] + song_data = {'c': json.dumps([{'id': int(sid), 'v': 0} for sid in batch_ids])} + + song_resp = requests.post(APIConstants.SONG_DETAIL_V3, data=song_data, + headers=headers, cookies=cookies, timeout=30) + song_resp.raise_for_status() + + song_result = song_resp.json() + for song in song_result.get('songs', []): + info['tracks'].append({ + 'id': song['id'], + 'name': song['name'], + 'artists': '/'.join(artist['name'] for artist in song['ar']), + 'album': song['al']['name'], + 'picUrl': song['al']['picUrl'] + }) + + return info + except requests.RequestException as e: + raise APIException(f"获取歌单详情请求失败: {e}") + except (json.JSONDecodeError, KeyError) as e: + raise APIException(f"解析歌单详情响应失败: {e}") + + def get_album_detail(self, album_id: int, cookies: Dict[str, str]) -> Dict[str, Any]: + """获取专辑详情 + + Args: + album_id: 专辑ID + cookies: 用户cookies + + Returns: + 专辑详情信息 + + Raises: + APIException: API调用失败时抛出 + """ + try: + url = f'{APIConstants.ALBUM_DETAIL_API}{album_id}' + headers = { + 'User-Agent': APIConstants.USER_AGENT, + 'Referer': APIConstants.REFERER + } + + response = requests.get(url, headers=headers, cookies=cookies, timeout=30) + response.raise_for_status() + + result = response.json() + if result.get('code') != 200: + raise APIException(f"获取专辑详情失败: {result.get('message', '未知错误')}") + + album = result.get('album', {}) + info = { + 'id': album.get('id'), + 'name': album.get('name'), + 'coverImgUrl': self.get_pic_url(album.get('pic')), + 'artist': album.get('artist', {}).get('name', ''), + 'publishTime': album.get('publishTime'), + 'description': album.get('description', ''), + 'songs': [] + } + + for song in result.get('songs', []): + info['songs'].append({ + 'id': song['id'], + 'name': song['name'], + 'artists': '/'.join(artist['name'] for artist in song['ar']), + 'album': song['al']['name'], + 'picUrl': self.get_pic_url(song['al'].get('pic')) + }) + + return info + except requests.RequestException as e: + raise APIException(f"获取专辑详情请求失败: {e}") + except (json.JSONDecodeError, KeyError) as e: + raise APIException(f"解析专辑详情响应失败: {e}") + + def netease_encrypt_id(self, id_str: str) -> str: + """网易云加密图片ID算法 + + Args: + id_str: 图片ID字符串 + + Returns: + 加密后的字符串 + """ + import base64 + import hashlib + + magic = list('3go8&$8*3*3h0k(2)2') + song_id = list(id_str) + + for i in range(len(song_id)): + song_id[i] = chr(ord(song_id[i]) ^ ord(magic[i % len(magic)])) + + m = ''.join(song_id) + md5_bytes = hashlib.md5(m.encode('utf-8')).digest() + result = base64.b64encode(md5_bytes).decode('utf-8') + result = result.replace('/', '_').replace('+', '-') + + return result + + def get_pic_url(self, pic_id: Optional[int], size: int = 300) -> str: + """获取网易云加密歌曲/专辑封面直链 + + Args: + pic_id: 封面ID + size: 图片尺寸 + + Returns: + 图片URL + """ + if pic_id is None: + return '' + + enc_id = self.netease_encrypt_id(str(pic_id)) + return f'https://p3.music.126.net/{enc_id}/{pic_id}.jpg?param={size}y{size}' + + +class QRLoginManager: + """二维码登录管理器""" + + def __init__(self): + self.http_client = HTTPClient() + self.crypto_utils = CryptoUtils() + + def generate_qr_key(self) -> Optional[str]: + """生成二维码的key + + Returns: + 成功返回unikey,失败返回None + + Raises: + APIException: API调用失败时抛出 + """ + try: + config = APIConstants.DEFAULT_CONFIG.copy() + config["requestId"] = str(randrange(20000000, 30000000)) + + payload = { + 'type': 1, + 'header': json.dumps(config) + } + + params = self.crypto_utils.encrypt_params(APIConstants.QR_UNIKEY_API, payload) + response = self.http_client.post_request_full(APIConstants.QR_UNIKEY_API, params, {}) + + result = json.loads(response.text) + if result.get('code') == 200: + return result.get('unikey') + else: + raise APIException(f"生成二维码key失败: {result.get('message', '未知错误')}") + except (json.JSONDecodeError, KeyError) as e: + raise APIException(f"解析二维码key响应失败: {e}") + + def create_qr_login(self) -> Optional[str]: + """创建登录二维码并在控制台显示 + + Returns: + 成功返回unikey,失败返回None + """ + try: + import qrcode + + unikey = self.generate_qr_key() + if not unikey: + print("生成二维码key失败") + return None + + # 创建二维码 + qr = qrcode.QRCode() + qr.add_data(f'https://music.163.com/login?codekey={unikey}') + qr.make(fit=True) + + # 在控制台显示二维码 + qr.print_ascii(tty=True) + print("\n请使用网易云音乐APP扫描上方二维码登录") + return unikey + except ImportError: + print("请安装qrcode库: pip install qrcode") + return None + except Exception as e: + print(f"创建二维码失败: {e}") + return None + + def check_qr_login(self, unikey: str) -> Tuple[int, Dict[str, str]]: + """检查二维码登录状态 + + Args: + unikey: 二维码key + + Returns: + (登录状态码, cookie字典) + + Raises: + APIException: API调用失败时抛出 + """ + try: + config = APIConstants.DEFAULT_CONFIG.copy() + config["requestId"] = str(randrange(20000000, 30000000)) + + payload = { + 'key': unikey, + 'type': 1, + 'header': json.dumps(config) + } + + params = self.crypto_utils.encrypt_params(APIConstants.QR_LOGIN_API, payload) + response = self.http_client.post_request_full(APIConstants.QR_LOGIN_API, params, {}) + + result = json.loads(response.text) + cookie_dict = {} + + if result.get('code') == 803: + # 登录成功,提取cookie + all_cookies = response.headers.get('Set-Cookie', '').split(', ') + for cookie_str in all_cookies: + if 'MUSIC_U=' in cookie_str: + cookie_dict['MUSIC_U'] = cookie_str.split('MUSIC_U=')[1].split(';')[0] + + return result.get('code', -1), cookie_dict + except (json.JSONDecodeError, KeyError) as e: + raise APIException(f"解析登录状态响应失败: {e}") + + def qr_login(self) -> Optional[str]: + """完整的二维码登录流程 + + Returns: + 成功返回cookie字符串,失败返回None + """ + try: + unikey = self.create_qr_login() + if not unikey: + return None + + while True: + code, cookies = self.check_qr_login(unikey) + + if code == 803: + print("\n登录成功!") + return f"MUSIC_U={cookies['MUSIC_U']};os=pc;appver=8.9.70;" + elif code == 801: + print("\r等待扫码...", end='') + elif code == 802: + print("\r扫码成功,请在手机上确认登录...", end='') + else: + print(f"\n登录失败,错误码:{code}") + return None + + time.sleep(2) + except KeyboardInterrupt: + print("\n用户取消登录") + return None + except Exception as e: + print(f"\n登录过程中发生错误: {e}") + return None + + +# 向后兼容的函数接口 +def url_v1(song_id: int, level: str, cookies: Dict[str, str]) -> Dict[str, Any]: + """获取歌曲URL(向后兼容)""" + api = NeteaseAPI() + return api.get_song_url(song_id, level, cookies) + + +def name_v1(song_id: int) -> Dict[str, Any]: + """获取歌曲详情(向后兼容)""" + api = NeteaseAPI() + return api.get_song_detail(song_id) + + +def lyric_v1(song_id: int, cookies: Dict[str, str]) -> Dict[str, Any]: + """获取歌词(向后兼容)""" + api = NeteaseAPI() + return api.get_lyric(song_id, cookies) + + +def search_music(keywords: str, cookies: Dict[str, str], limit: int = 10) -> List[Dict[str, Any]]: + """搜索音乐(向后兼容)""" + api = NeteaseAPI() + return api.search_music(keywords, cookies, limit) + + +def playlist_detail(playlist_id: int, cookies: Dict[str, str]) -> Dict[str, Any]: + """获取歌单详情(向后兼容)""" + api = NeteaseAPI() + return api.get_playlist_detail(playlist_id, cookies) + + +def album_detail(album_id: int, cookies: Dict[str, str]) -> Dict[str, Any]: + """获取专辑详情(向后兼容)""" + api = NeteaseAPI() + return api.get_album_detail(album_id, cookies) + + +def get_pic_url(pic_id: Optional[int], size: int = 300) -> str: + """获取图片URL(向后兼容)""" + api = NeteaseAPI() + return api.get_pic_url(pic_id, size) + + +def qr_login() -> Optional[str]: + """二维码登录(向后兼容)""" + manager = QRLoginManager() + return manager.qr_login() + + +if __name__ == "__main__": + # 测试代码 + print("网易云音乐API模块") + print("支持的功能:") + print("- 歌曲URL获取") + print("- 歌曲详情获取") + print("- 歌词获取") + print("- 音乐搜索") + print("- 歌单详情") + print("- 专辑详情") + print("- 二维码登录") diff --git a/music_downloader.py b/music_downloader.py new file mode 100644 index 0000000..e7a3cef --- /dev/null +++ b/music_downloader.py @@ -0,0 +1,596 @@ +"""音乐下载器模块 (已修改为支持Cookie参数) + +提供网易云音乐下载功能,包括: +- 音乐信息获取 +- 文件下载到本地 +- 内存下载 +- 音乐标签写入 +- 异步下载支持 +""" + +import os +import re +import asyncio +import aiohttp +import aiofiles +from io import BytesIO +from typing import Dict, List, Optional, Tuple, Any, Union +from pathlib import Path +from dataclasses import dataclass +from enum import Enum + +import requests +from mutagen.flac import FLAC +from mutagen.mp3 import MP3 +from mutagen.id3 import ID3, TIT2, TPE1, TALB, TDRC, TRCK, APIC +from mutagen.mp4 import MP4 + +# 注意:这里我们不再需要 CookieManager +from music_api import NeteaseAPI, APIException +# from cookie_manager import CookieManager + + +class AudioFormat(Enum): + """音频格式枚举""" + MP3 = "mp3" + FLAC = "flac" + M4A = "m4a" + UNKNOWN = "unknown" + + +class QualityLevel(Enum): + """音质等级枚举""" + STANDARD = "standard" # 标准 + EXHIGH = "exhigh" # 极高 + LOSSLESS = "lossless" # 无损 + HIRES = "hires" # Hi-Res + SKY = "sky" # 沉浸环绕声 + JYEFFECT = "jyeffect" # 高清环绕声 + JYMASTER = "jymaster" # 超清母带 + + +@dataclass +class MusicInfo: + """音乐信息数据类""" + id: int + name: str + artists: str + album: str + pic_url: str + duration: int + track_number: int + download_url: str + file_type: str + file_size: int + quality: str + lyric: str = "" + tlyric: str = "" + + +@dataclass +class DownloadResult: + """下载结果数据类""" + success: bool + file_path: Optional[str] = None + file_size: int = 0 + error_message: str = "" + music_info: Optional[MusicInfo] = None + + +class DownloadException(Exception): + """下载异常类""" + pass + + +class MusicDownloader: + """音乐下载器主类""" + + def __init__(self, download_dir: str = "downloads", max_concurrent: int = 3): + """ + 初始化音乐下载器 + + Args: + download_dir: 下载目录 + max_concurrent: 最大并发下载数 + """ + self.download_dir = Path(download_dir) + self.download_dir.mkdir(exist_ok=True) + self.max_concurrent = max_concurrent + + # 初始化依赖 + # self.cookie_manager = CookieManager() # <- 已移除 + self.api = NeteaseAPI() + + # 支持的文件格式 + self.supported_formats = { + 'mp3': AudioFormat.MP3, + 'flac': AudioFormat.FLAC, + 'm4a': AudioFormat.M4A + } + + def _sanitize_filename(self, filename: str) -> str: + """清理文件名,移除非法字符 + + Args: + filename: 原始文件名 + + Returns: + 清理后的安全文件名 + """ + # 移除或替换非法字符 + illegal_chars = r'[<>:"/\\|?*]' + filename = re.sub(illegal_chars, '_', filename) + + # 移除前后空格和点 + filename = filename.strip(' .') + + # 限制长度 + if len(filename) > 200: + filename = filename[:200] + + return filename or "unknown" + + def _determine_file_extension(self, url: str, content_type: str = "") -> str: + """根据URL和Content-Type确定文件扩展名 + + Args: + url: 下载URL + content_type: HTTP Content-Type头 + + Returns: + 文件扩展名 + """ + # 首先尝试从URL获取 + if '.flac' in url.lower(): + return '.flac' + elif '.mp3' in url.lower(): + return '.mp3' + elif '.m4a' in url.lower(): + return '.m4a' + + # 从Content-Type获取 + content_type = content_type.lower() + if 'flac' in content_type: + return '.flac' + elif 'mpeg' in content_type or 'mp3' in content_type: + return '.mp3' + elif 'mp4' in content_type or 'm4a' in content_type: + return '.m4a' + + return '.mp3' # 默认 + + def get_music_info(self, music_id: int, quality: str, cookies: Dict[str, str]) -> MusicInfo: + """获取音乐详细信息 + + Args: + music_id: 音乐ID + quality: 音质等级 + cookies: [!! 新增 !!] 用户Cookie字典 + + Returns: + 音乐信息对象 + + Raises: + DownloadException: 获取信息失败时抛出 + """ + try: + # 获取cookies + # cookies = self.cookie_manager.parse_cookies() # <- 已移除, 使用传入的cookies + + # 获取音乐URL信息 + url_result = self.api.get_song_url(music_id, quality, cookies) + if not url_result.get('data') or not url_result['data']: + raise DownloadException(f"无法获取音乐ID {music_id} 的播放链接 (可能需要会员Cookie)") + + song_data = url_result['data'][0] + download_url = song_data.get('url', '') + if not download_url: + raise DownloadException(f"音乐ID {music_id} 无可用的下载链接") + + # 获取音乐详情 + detail_result = self.api.get_song_detail(music_id) + if not detail_result.get('songs') or not detail_result['songs']: + raise DownloadException(f"无法获取音乐ID {music_id} 的详细信息") + + song_detail = detail_result['songs'][0] + + # 获取歌词 + lyric_result = self.api.get_lyric(music_id, cookies) + lyric = lyric_result.get('lrc', {}).get('lyric', '') if lyric_result else '' + tlyric = lyric_result.get('tlyric', {}).get('lyric', '') if lyric_result else '' + + # 构建艺术家字符串 + artists = '/'.join(artist['name'] for artist in song_detail.get('ar', [])) + + # 创建MusicInfo对象 + music_info = MusicInfo( + id=music_id, + name=song_detail.get('name', '未知歌曲'), + artists=artists or '未知艺术家', + album=song_detail.get('al', {}).get('name', '未知专辑'), + pic_url=song_detail.get('al', {}).get('picUrl', ''), + duration=song_detail.get('dt', 0) // 1000, # 转换为秒 + track_number=song_detail.get('no', 0), + download_url=download_url, + file_type=song_data.get('type', 'mp3').lower(), + file_size=song_data.get('size', 0), + quality=quality, + lyric=lyric, + tlyric=tlyric + ) + + return music_info + + except APIException as e: + raise DownloadException(f"API调用失败: {e}") + except Exception as e: + raise DownloadException(f"获取音乐信息时发生错误: {e}") + + def download_music_file(self, music_id: int, quality: str, cookies: Dict[str, str]) -> DownloadResult: + """下载音乐文件到本地 + + Args: + music_id: 音乐ID + quality: 音质等级 + cookies: [!! 新增 !!] 用户Cookie字典 + + Returns: + 下载结果对象 + """ + try: + # 获取音乐信息 + music_info = self.get_music_info(music_id, quality, cookies) + + # 生成文件名 + filename = f"{music_info.artists} - {music_info.name}" + safe_filename = self._sanitize_filename(filename) + + # 确定文件扩展名 + file_ext = self._determine_file_extension(music_info.download_url) + file_path = self.download_dir / f"{safe_filename}{file_ext}" + + # 检查文件是否已存在 + if file_path.exists(): + return DownloadResult( + success=True, + file_path=str(file_path), + file_size=file_path.stat().st_size, + music_info=music_info + ) + + # 下载文件 + response = requests.get(music_info.download_url, stream=True, timeout=30) + response.raise_for_status() + + # 写入文件 + with open(file_path, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + + # 写入音乐标签 + self._write_music_tags(file_path, music_info) + + return DownloadResult( + success=True, + file_path=str(file_path), + file_size=file_path.stat().st_size, + music_info=music_info + ) + + except DownloadException: + raise + except requests.RequestException as e: + return DownloadResult( + success=False, + error_message=f"下载请求失败: {e}" + ) + except Exception as e: + return DownloadResult( + success=False, + error_message=f"下载过程中发生错误: {e}" + ) + + async def download_music_file_async(self, music_id: int, quality: str, cookies: Dict[str, str]) -> DownloadResult: + """异步下载音乐文件到本地 + + Args: + music_id: 音乐ID + quality: 音质等级 + cookies: [!! 新增 !!] 用户Cookie字典 + + Returns: + 下载结果对象 + """ + try: + # 获取音乐信息(同步操作) + music_info = self.get_music_info(music_id, quality, cookies) + + # 生成文件名 + filename = f"{music_info.artists} - {music_info.name}" + safe_filename = self._sanitize_filename(filename) + + # 确定文件扩展名 + file_ext = self._determine_file_extension(music_info.download_url) + file_path = self.download_dir / f"{safe_filename}{file_ext}" + + # 检查文件是否已存在 + if file_path.exists(): + return DownloadResult( + success=True, + file_path=str(file_path), + file_size=file_path.stat().st_size, + music_info=music_info + ) + + # 异步下载文件 + async with aiohttp.ClientSession() as session: + async with session.get(music_info.download_url) as response: + response.raise_for_status() + + async with aiofiles.open(file_path, 'wb') as f: + async for chunk in response.content.iter_chunked(8192): + await f.write(chunk) + + # 写入音乐标签 + self._write_music_tags(file_path, music_info) + + return DownloadResult( + success=True, + file_path=str(file_path), + file_size=file_path.stat().st_size, + music_info=music_info + ) + + except DownloadException: + raise + except aiohttp.ClientError as e: + return DownloadResult( + success=False, + error_message=f"异步下载请求失败: {e}" + ) + except Exception as e: + return DownloadResult( + success=False, + error_message=f"异步下载过程中发生错误: {e}" + ) + + def download_music_to_memory(self, music_id: int, quality: str, cookies: Dict[str, str]) -> Tuple[bool, BytesIO, MusicInfo]: + """下载音乐到内存 + + Args: + music_id: 音乐ID + quality: 音质等级 + cookies: [!! 新增 !!] 用户Cookie字典 + + Returns: + (是否成功, 音乐数据流, 音乐信息) + + Raises: + DownloadException: 下载失败时抛出 + """ + try: + # 获取音乐信息 + music_info = self.get_music_info(music_id, quality, cookies) + + # 下载到内存 + response = requests.get(music_info.download_url, timeout=30) + response.raise_for_status() + + # 创建BytesIO对象 + audio_data = BytesIO(response.content) + + return True, audio_data, music_info + + except DownloadException: + raise + except requests.RequestException as e: + raise DownloadException(f"下载到内存失败: {e}") + except Exception as e: + raise DownloadException(f"内存下载过程中发生错误: {e}") + + async def download_batch_async(self, music_ids: List[int], quality: str, cookies: Dict[str, str]) -> List[DownloadResult]: + """批量异步下载音乐 + + Args: + music_ids: 音乐ID列表 + quality: 音质等级 + cookies: [!! 新增 !!] 用户Cookie字典 + + Returns: + 下载结果列表 + """ + semaphore = asyncio.Semaphore(self.max_concurrent) + + async def download_with_semaphore(music_id: int) -> DownloadResult: + async with semaphore: + # 传递cookies + return await self.download_music_file_async(music_id, quality, cookies) + + tasks = [download_with_semaphore(music_id) for music_id in music_ids] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # 处理异常结果 + processed_results = [] + for i, result in enumerate(results): + if isinstance(result, Exception): + processed_results.append(DownloadResult( + success=False, + error_message=f"下载音乐ID {music_ids[i]} 时发生异常: {result}" + )) + else: + processed_results.append(result) + + return processed_results + + def _write_music_tags(self, file_path: Path, music_info: MusicInfo) -> None: + """写入音乐标签信息 + + Args: + file_path: 音乐文件路径 + music_info: 音乐信息 + """ + try: + file_ext = file_path.suffix.lower() + + if file_ext == '.mp3': + self._write_mp3_tags(file_path, music_info) + elif file_ext == '.flac': + self._write_flac_tags(file_path, music_info) + elif file_ext == '.m4a': + self._write_m4a_tags(file_path, music_info) + + except Exception as e: + print(f"写入音乐标签失败: {e}") + + def _write_mp3_tags(self, file_path: Path, music_info: MusicInfo) -> None: + """写入MP3标签""" + try: + audio = MP3(str(file_path), ID3=ID3) + + # 添加ID3标签 + audio.tags.add(TIT2(encoding=3, text=music_info.name)) + audio.tags.add(TPE1(encoding=3, text=music_info.artists)) + audio.tags.add(TALB(encoding=3, text=music_info.album)) + + if music_info.track_number > 0: + audio.tags.add(TRCK(encoding=3, text=str(music_info.track_number))) + + # 下载并添加封面 + if music_info.pic_url: + try: + pic_response = requests.get(music_info.pic_url, timeout=10) + pic_response.raise_for_status() + audio.tags.add(APIC( + encoding=3, + mime='image/jpeg', + type=3, + desc='Cover', + data=pic_response.content + )) + except: + pass # 封面下载失败不影响主流程 + + audio.save() + except Exception as e: + print(f"写入MP3标签失败: {e}") + + def _write_flac_tags(self, file_path: Path, music_info: MusicInfo) -> None: + """写入FLAC标签""" + try: + audio = FLAC(str(file_path)) + + audio['TITLE'] = music_info.name + audio['ARTIST'] = music_info.artists + audio['ALBUM'] = music_info.album + + if music_info.track_number > 0: + audio['TRACKNUMBER'] = str(music_info.track_number) + + # 下载并添加封面 + if music_info.pic_url: + try: + pic_response = requests.get(music_info.pic_url, timeout=10) + pic_response.raise_for_status() + + from mutagen.flac import Picture + picture = Picture() + picture.type = 3 # Cover (front) + picture.mime = 'image/jpeg' + picture.desc = 'Cover' + picture.data = pic_response.content + audio.add_picture(picture) + except: + pass # 封面下载失败不影响主流程 + + audio.save() + except Exception as e: + print(f"写入FLAC标签失败: {e}") + + def _write_m4a_tags(self, file_path: Path, music_info: MusicInfo) -> None: + """写入M4A标签""" + try: + audio = MP4(str(file_path)) + + audio['\xa9nam'] = music_info.name + audio['\xa9ART'] = music_info.artists + audio['\xa9alb'] = music_info.album + + if music_info.track_number > 0: + audio['trkn'] = [(music_info.track_number, 0)] + + # 下载并添加封面 + if music_info.pic_url: + try: + pic_response = requests.get(music_info.pic_url, timeout=10) + pic_response.raise_for_status() + audio['covr'] = [pic_response.content] + except: + pass # 封面下载失败不影响主流程 + + audio.save() + except Exception as e: + print(f"写入M4A标签失败: {e}") + + def get_download_progress(self, music_id: int, quality: str, cookies: Dict[str, str]) -> Dict[str, Any]: + """获取下载进度信息 + + Args: + music_id: 音乐ID + quality: 音质等级 + cookies: [!! 新增 !!] 用户Cookie字典 + + Returns: + 包含进度信息的字典 + """ + try: + music_info = self.get_music_info(music_id, quality, cookies) + + filename = f"{music_info.artists} - {music_info.name}" + safe_filename = self._sanitize_filename(filename) + file_ext = self._determine_file_extension(music_info.download_url) + file_path = self.download_dir / f"{safe_filename}{file_ext}" + + if file_path.exists(): + current_size = file_path.stat().st_size + progress = (current_size / music_info.file_size * 100) if music_info.file_size > 0 else 0 + + return { + 'music_id': music_id, + 'filename': safe_filename + file_ext, + 'total_size': music_info.file_size, + 'current_size': current_size, + 'progress': min(progress, 100), + 'completed': current_size >= music_info.file_size + } + else: + return { + 'music_id': music_id, + 'filename': safe_filename + file_ext, + 'total_size': music_info.file_size, + 'current_size': 0, + 'progress': 0, + 'completed': False + } + + except Exception as e: + return { + 'music_id': music_id, + 'error': str(e), + 'progress': 0, + 'completed': False + } + + +if __name__ == "__main__": + # 测试代码 + # downloader = MusicDownloader() # <- 初始化会失败,因为现在需要cookies + print("音乐下载器模块 (已修改为支持Cookie参数)") + print("支持的功能:") + print("- 同步下载") + print("- 异步下载") + print("- 批量下载") + print("- 内存下载") + print("- 音乐标签写入") + print("- 下载进度跟踪") + print("\n[注意] 本模块现在依赖外部传入Cookie字典。") \ No newline at end of file diff --git a/qr_login.py b/qr_login.py new file mode 100644 index 0000000..8a9541b --- /dev/null +++ b/qr_login.py @@ -0,0 +1,152 @@ +"""网易云音乐二维码登录模块 (已修改为API) + +提供网易云音乐二维码登录功能,被 main.py 调用。 +- 二维码生成 (key 和 base64) +- 登录状态检查 (返回 code 和 cookie) +""" + +import json +import logging +import base64 +import io +import re # [!! 已添加 !!] +from random import randrange # [!! 关键修复 !!] 导入 randrange +from typing import Optional, Dict, Any, Tuple + +try: + # qrcode 库在 music_api.py 中被导入,这里我们也需要它 + import qrcode +except ImportError: + print("错误:缺少 'qrcode' 库。请运行 `pip install qrcode`") + qrcode = None + +try: + # 我们需要从 music_api 借用很多底层工具 + from music_api import ( + QRLoginManager, APIException, HTTPClient, + CryptoUtils, APIConstants + ) + from cookie_manager import CookieManager, CookieException +except ImportError as e: + print(f"导入模块失败: {e}") + print("请确保 music_api.py 和 cookie_manager.py 文件存在且可用") + + +def api_generate_qr_key() -> Dict[str, Any]: + """ + [新API] 生成二维码Key和Base64图像 + + Returns: + {'success': bool, 'qr_key': str, 'qr_img_b64': str, 'message': str} + """ + if not qrcode: + return {'success': False, 'message': "qrcode 库未安装"} + + try: + qr_manager = QRLoginManager() + unikey = qr_manager.generate_qr_key() + + if not unikey: + raise APIException("生成二维码key失败") + + # 生成二维码数据 + qr_data = f'https://music.163.com/login?codekey={unikey}' + + # 在内存中生成二维码图片 + qr = qrcode.QRCode() + qr.add_data(qr_data) + qr.make(fit=True) + img = qr.make_image(fill_color="black", back_color="white") + + # 将图片保存到 BytesIO + buffered = io.BytesIO() + img.save(buffered, format="PNG") + + # 转换为 Base64 字符串 + img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") + qr_img_b64 = f"data:image/png;base64,{img_str}" + + return { + 'success': True, + 'qr_key': unikey, + 'qr_img_b64': qr_img_b64 + } + + except Exception as e: + logging.error(f"生成二维码失败: {e}") + return {'success': False, 'message': str(e)} + + +def api_check_qr_status(unikey: str) -> Dict[str, Any]: + """ + [新API] 检查二维码登录状态 + + 此函数复制并*修复*了 music_api.py 中 check_qr_login 的逻辑, + 以便返回完整的 Cookie 字符串,而不只是 MUSIC_U。 + + Args: + unikey: 二维码key + + Returns: + {'code': int, 'cookie': Optional[str], 'message': str} + code: 800=过期, 801=等待, 802=已扫码, 803=成功 + """ + try: + http_client = HTTPClient() + crypto_utils = CryptoUtils() + config = APIConstants.DEFAULT_CONFIG.copy() + + # [!! 关键修复 !!] 现在 randrange 已经被导入,这行代码可以正常工作 + config["requestId"] = str(randrange(20000000, 30000000)) + + payload = { + 'key': unikey, + 'type': 1, + 'header': json.dumps(config) + } + + params = crypto_utils.encrypt_params(APIConstants.QR_LOGIN_API, payload) + + # 我们需要完整的 response 对象来获取 headers + response = http_client.post_request_full( + APIConstants.QR_LOGIN_API, params, {} + ) + + result = json.loads(response.text) + code = result.get('code', -1) + + cookie_string = None + + if code == 803: + # 登录成功,提取cookie + raw_cookies = response.headers.get('Set-Cookie', '') + + # 使用 re.sub 来处理 'path=/,' 这种导致错误分割的情况 + raw_cookies = re.sub(r"path=/,", "path=/", raw_cookies) + cookie_list = [c.strip() for c in raw_cookies.split(',') if c.strip()] + + final_cookie_parts = [] + for item in cookie_list: + if not item: + continue + part = item.split(';')[0] + if part.strip(): + final_cookie_parts.append(part.strip()) + + cookie_string = '; '.join(final_cookie_parts) + + if "MUSIC_U" not in cookie_string: + return {'code': -1, 'cookie': None, 'message': '登录成功但未获取到 MUSIC_U'} + + return { + 'code': code, + 'cookie': cookie_string, + 'message': result.get('message', '') + } + + except Exception as e: + # 在 `api_check_qr_status` 捕获异常时,记录详细的 traceback + logging.error(f"检查登录状态响应失败: {e}", exc_info=True) + return {'code': -1, 'cookie': None, 'message': f"检查失败: {e}"} + +# --- 删除了所有旧的交互式代码 --- \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..b26adcf --- /dev/null +++ b/requirements.txt @@ -0,0 +1,28 @@ +aiofiles +aiohappyeyeballs +aiohttp +aiosignal +attrs +blinker +certifi +cffi +charset-normalizer +click +colorama +cryptography +Flask +frozenlist +idna +itsdangerous +Jinja2 +MarkupSafe +multidict +mutagen +propcache +pycparser +pillow +qrcode +requests +urllib3 +Werkzeug +yarl diff --git a/static/favicon.ico b/static/favicon.ico new file mode 100644 index 0000000..1772d1c Binary files /dev/null and b/static/favicon.ico differ diff --git a/templates/index.html b/templates/index.html new file mode 100644 index 0000000..e945818 --- /dev/null +++ b/templates/index.html @@ -0,0 +1,429 @@ + + + + + + 网易云音乐工具箱 + + + + + + + + + + + + + + + +
+
+
+

一站式音乐解析与下载

+

无论是单曲、歌单还是专辑,轻松获取高品质音乐资源。

+
+ +

① 选择音质与设置

+
+ + +
+
+ + +
+
+
+ +

② 选择功能

+
+
歌曲搜索

通过关键词发现音乐

+
单曲解析

获取歌曲详细信息

+
歌单解析

批量打包下载歌单

+
专辑解析

解析并打包完整专辑

+
+ + + +
+ +
+

音质选择指南

+
+

特点:有损压缩(MP3),文件体积最小,兼容所有设备。
建议:适合节省流量或存储空间,在嘈杂环境或普通设备上听感差异不大。
+

特点:音质与CD完全相同,保留了全部声音细节。
推荐:绝大多数场景下的最佳选择,完美平衡了音质与文件大小。
+

特点:超越CD音质,达到录音室母带级别。
建议:为音乐发烧友准备。需配合专业播放器(DAC)和高端耳机才能完全发挥其潜力。
+

特点:通过算法模拟多声道音效,营造出空间感和包围感。
建议:适合搭配耳机使用,尤其在特定曲目或希望获得新奇听感时尝试。
+
+
+ +
+

网易云音乐工具箱 - 仅供学习交流

+

All music resources copyright belongs to NetEase, Inc.

+
+ +
+ +
+
+ + + + + + + + + + \ No newline at end of file diff --git a/启动.bat b/启动.bat new file mode 100644 index 0000000..67f1a16 --- /dev/null +++ b/启动.bat @@ -0,0 +1,24 @@ +@echo off +echo -------------------------------------------------- +echo ⻷Ƿ... +if exist venv ( + echo ⻷Ѵڡ +) else ( + echo ⻷ڣڴ... + python -m venv venv + echo ⻷װ... + call venv\Scripts\activate.bat + pip install -r requirements.txt + goto run_script +) + +echo ⻷... +call venv\Scripts\activate.bat + +:run_script +echo תű... +python main.py + +echo -------------------------------------------------- +echo ɣ +pause \ No newline at end of file