from flask import Flask, render_template, request import sqlite3 from datetime import datetime import math import re import json import collections import os import atexit from apscheduler.schedulers.background import BackgroundScheduler from scraper import setup_database, discover_and_save_projects # ============================================================================== # --- ⚙️自动化配置区域 --- # ============================================================================== # ★★★ 在这里填入你的GitHub Token ★★★ GITHUB_TOKEN = 'YOUR_REAL_GITHUB_TOKEN_HERE' # ★★★ 自动抓取的间隔时间 (小时) ★★★ SCHEDULE_HOURS = 4 # ============================================================================== app = Flask(__name__) DB_FILE = 'github_projects.db' PER_PAGE = 12 TOP_TOPICS_COUNT = 20 LANGUAGE_COLORS = { "Python": "bg-green-400", "JavaScript": "bg-yellow-400", "TypeScript": "bg-blue-400", "Go": "bg-cyan-400", "Rust": "bg-orange-400", "HTML": "bg-purple-400", "Vue": "bg-emerald-400", "Java": "bg-red-500", "C++": "bg-pink-500", "C#": "bg-indigo-500", "PHP": "bg-violet-500", "Ruby": "bg-red-600", "Jupyter Notebook": "bg-orange-300" } def get_db_connection(): conn = sqlite3.connect(DB_FILE) conn.row_factory = sqlite3.Row return conn def format_stars(num): if num is None: return "0" if num >= 1000: return f"{num / 1000:.1f}k" return str(num) def has_chinese(text): if not text: return False return re.search(r'[\u4e00-\u9fa5]', text) is not None @app.context_processor def utility_processor(): return dict(format_stars=format_stars, language_colors=LANGUAGE_COLORS) def scheduled_scrape_job(): print(f"\n--- [ {datetime.now()} ] ---") print(" [Scheduler] 开始执行定时抓取任务...") # 我们把 Token 传递给抓取函数 discover_and_save_projects(GITHUB_TOKEN) print(" [Scheduler] 定时抓取任务执行完毕。") print(f"--- [ 下次运行在 {SCHEDULE_HOURS} 小时后 ] ---") @app.route('/') def index(): page = request.args.get('page', 1, type=int) query = request.args.get('q', '') sort_by = request.args.get('sort', 'pushed_at') current_lang = request.args.get('lang', '') current_topic = request.args.get('topic', '') conn = get_db_connection() languages_cursor = conn.execute("SELECT DISTINCT language FROM projects WHERE language IS NOT NULL AND language != 'N/A' ORDER BY language ASC") languages = [row['language'] for row in languages_cursor.fetchall()] topic_counter = collections.Counter() topics_cursor = conn.execute("SELECT topics FROM projects WHERE topics IS NOT NULL AND topics != '[]'") for row in topics_cursor: try: topics_list = json.loads(row['topics']) topic_counter.update(topics_list) except json.JSONDecodeError: continue popular_topics = [topic for topic, count in topic_counter.most_common(TOP_TOPICS_COUNT)] where_clauses = [] params = [] if query: where_clauses.append("(name LIKE ? OR description LIKE ? OR readme_excerpt LIKE ?)") params.extend([f'%{query}%', f'%{query}%', f'%{query}%']) if current_lang: where_clauses.append("language = ?") params.append(current_lang) if current_topic: where_clauses.append("topics LIKE ?") params.append(f'%"{current_topic}"%') base_where = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" total_items_cursor = conn.execute(f'SELECT COUNT(*) FROM projects{base_where}', params) total_items = total_items_cursor.fetchone()[0] total_pages = math.ceil(total_items / PER_PAGE) offset = (page - 1) * PER_PAGE order_by_clause = " ORDER BY stars DESC" if sort_by == 'stars' else " ORDER BY pushed_at DESC" sql_query = f"SELECT * FROM projects{base_where}{order_by_clause} LIMIT ? OFFSET ?" final_params = params + [PER_PAGE, offset] projects_cursor = conn.execute(sql_query, final_params).fetchall() processed_projects = [] for row in projects_cursor: project = dict(row) desc = project.get('description') excerpt = project.get('readme_excerpt') if has_chinese(desc): project['display_text'] = desc elif excerpt: project['display_text'] = excerpt elif desc: project['display_text'] = desc else: project['display_text'] = '暂无简介' if project.get('topics'): try: project['topics'] = json.loads(project['topics']) except json.JSONDecodeError: project['topics'] = [] else: project['topics'] = [] processed_projects.append(project) conn.close() current_year = datetime.now().year template_context = dict( projects=processed_projects, search_query=query, current_sort=sort_by, current_year=current_year, current_page=page, total_pages=total_pages, languages=languages, current_lang=current_lang, popular_topics=popular_topics, current_topic=current_topic ) if request.headers.get('HX-Request'): return render_template('_projects_partial.html', **template_context) return render_template('index.html', **template_context) if __name__ == '__main__': setup_database() print("\n" + "="*50) print(" 🚀 启动 CodeZen 自动化服务...") scheduler = BackgroundScheduler() scheduler.add_job(func=scheduled_scrape_job, trigger="interval", hours=SCHEDULE_HOURS, misfire_grace_time=60, next_run_time=datetime.now()) scheduler.start() print(f" ✅ 后台抓取任务已启动 (每 {SCHEDULE_HOURS} 小时运行一次)") atexit.register(lambda: (print(" [Scheduler] 关闭调度器..."), scheduler.shutdown())) print("="*50 + "\n") app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False)