import requests import sqlite3 import re import base64 from datetime import datetime, timedelta import os import json # ============================================================================== # --- ⚙️ 配置区域 --- # ============================================================================== SEARCH_DAYS_AGO = 7 MIN_STARS = 50 DB_FILE = 'github_projects.db' README_EXCERPT_LENGTH = 300 # ============================================================================== # --- 脚本核心代码 --- # ============================================================================== # SESSION = requests.Session() # SESSION.headers.update({'Authorization': f'token {GITHUB_TOKEN}'}) def setup_database(): print(" [Scheduler] 正在检查并设置数据库...") conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS projects ( repo_id INTEGER PRIMARY KEY, name TEXT NOT NULL, url TEXT NOT NULL UNIQUE, description TEXT, readme_excerpt TEXT, language TEXT, stars INTEGER, pushed_at TIMESTAMP, owner_avatar_url TEXT, topics TEXT ) ''') conn.commit() conn.close() def has_chinese(text): if not text: return False return re.search(r'[\u4e00-\u9fa5]', text) is not None def get_readme_content_with_excerpt(repo_full_name, session): """ 智能两步扫描逻辑 """ print(f" -> 智能扫描 '{repo_full_name}'...") chinese_readme_filenames = ["README_CN.md", "README_zh.md", "README_zh-CN.md", "README-zh_CN.md"] for filename in chinese_readme_filenames: url = f"https://api.github.com/repos/{repo_full_name}/contents/{filename}" try: response = session.get(url, timeout=10) if response.status_code == 200: print(f" ✨ 找到中文文件: '{filename}'.") content = response.json().get('content', '') decoded_content = base64.b64decode(content).decode('utf-8', errors='ignore') if has_chinese(decoded_content): excerpt = re.sub(r'#+\s*|\!\[.*\]\(.*\)|<.*?>|\[|\]|\(|\)', '', decoded_content).strip().replace('\n', ' ') return excerpt[:README_EXCERPT_LENGTH] except requests.RequestException: continue print(" - 检查默认 README.md...") url = f"https://api.github.com/repos/{repo_full_name}/contents/README.md" try: response = session.get(url, timeout=10) if response.status_code == 200: content = response.json().get('content', '') decoded_content = base64.b64decode(content).decode('utf-8', errors='ignore') if has_chinese(decoded_content): print(f" ✔️ 默认 README 中发现中文.") excerpt = re.sub(r'#+\s*|\!\[.*\]\(.*\)|<.*?>|\[|\]|\(|\)', '', decoded_content).strip().replace('\n', ' ') return excerpt[:README_EXCERPT_LENGTH] else: print(f" ❌ 默认 README 中无中文.") else: print(f" - 默认 README.md 未找到.") except requests.RequestException as e: print(f" ⚠️ 抓取 README 出错: {e}") return None def get_new_hot_repos(session): search_date = (datetime.now() - timedelta(days=SEARCH_DAYS_AGO)).strftime('%Y-%m-%d') query = f'created:>{search_date} stars:>{MIN_STARS}' url = f'https://api.github.com/search/repositories?q={query}&sort=stars&order=desc&per_page=100' try: response = session.get(url, timeout=20) response.raise_for_status() return response.json().get('items', []) except requests.RequestException: return [] def discover_and_save_projects(github_token): if not github_token or github_token == '111112222233333': print(" [Scheduler] 错误: GitHub Token 未设置或无效. 跳过本次抓取。") return 0 session = requests.Session() session.headers.update({'Authorization': f'token {github_token}'}) conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() all_repos = get_new_hot_repos(session) if not all_repos: print(" [Scheduler] API 未返回任何匹配条件的项目。") conn.close() return 0 print(f" [Scheduler] API 返回 {len(all_repos)} 个项目. 开始过滤...") new_projects_count = 0 for repo in all_repos: repo_id, repo_full_name = repo['id'], repo['full_name'] cursor.execute("SELECT 1 FROM projects WHERE repo_id = ?", (repo_id,)) if cursor.fetchone(): continue print(f"\n [Scheduler] 处理 [{repo_full_name}]...") description = repo.get('description', '') is_desc_chinese = has_chinese(description) # V6.0: 传入 session readme_excerpt = get_readme_content_with_excerpt(repo_full_name, session) if not is_desc_chinese else None if not is_desc_chinese and not readme_excerpt: print(f" -> 丢弃 '{repo_full_name}' (无中文内容).") continue owner_avatar_url = repo['owner']['avatar_url'] if repo.get('owner') else None topics_list = repo.get('topics', []) topics_json = json.dumps(topics_list) cursor.execute(''' INSERT INTO projects ( repo_id, name, url, description, readme_excerpt, language, stars, pushed_at, owner_avatar_url, topics ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( repo['id'], repo['name'], repo['html_url'], description, readme_excerpt, repo.get('language', 'N/A'), repo['stargazers_count'], repo['pushed_at'], owner_avatar_url, topics_json )) conn.commit() new_projects_count += 1 print(f" 💾 已保存 '{repo_full_name}' 到数据库.") print(f"\n [Scheduler] 本轮抓取完成. 新增 {new_projects_count} 个项目.") conn.close() return new_projects_count # ★★★ V6.0: 移除了 main() 和 if __name__ == '__main__' ★★★