Files
CodeZen/webapp.py
2025-11-07 19:44:51 +08:00

163 lines
5.9 KiB
Python

from flask import Flask, render_template, request
import sqlite3
from datetime import datetime
import math
import re
import json
import collections
import os
import atexit
from apscheduler.schedulers.background import BackgroundScheduler
from scraper import setup_database, discover_and_save_projects
# ==============================================================================
# --- ⚙️自动化配置区域 ---
# ==============================================================================
# ★★★ 在这里填入你的GitHub Token ★★★
GITHUB_TOKEN = 'YOUR_REAL_GITHUB_TOKEN_HERE'
# ★★★ 自动抓取的间隔时间 (小时) ★★★
SCHEDULE_HOURS = 4
# ==============================================================================
app = Flask(__name__)
DB_FILE = 'github_projects.db'
PER_PAGE = 12
TOP_TOPICS_COUNT = 20
LANGUAGE_COLORS = { "Python": "bg-green-400", "JavaScript": "bg-yellow-400", "TypeScript": "bg-blue-400", "Go": "bg-cyan-400", "Rust": "bg-orange-400", "HTML": "bg-purple-400", "Vue": "bg-emerald-400", "Java": "bg-red-500", "C++": "bg-pink-500", "C#": "bg-indigo-500", "PHP": "bg-violet-500", "Ruby": "bg-red-600", "Jupyter Notebook": "bg-orange-300" }
def get_db_connection():
conn = sqlite3.connect(DB_FILE)
conn.row_factory = sqlite3.Row
return conn
def format_stars(num):
if num is None: return "0"
if num >= 1000: return f"{num / 1000:.1f}k"
return str(num)
def has_chinese(text):
if not text: return False
return re.search(r'[\u4e00-\u9fa5]', text) is not None
@app.context_processor
def utility_processor():
return dict(format_stars=format_stars, language_colors=LANGUAGE_COLORS)
def scheduled_scrape_job():
print(f"\n--- [ {datetime.now()} ] ---")
print(" [Scheduler] 开始执行定时抓取任务...")
# 我们把 Token 传递给抓取函数
discover_and_save_projects(GITHUB_TOKEN)
print(" [Scheduler] 定时抓取任务执行完毕。")
print(f"--- [ 下次运行在 {SCHEDULE_HOURS} 小时后 ] ---")
@app.route('/')
def index():
page = request.args.get('page', 1, type=int)
query = request.args.get('q', '')
sort_by = request.args.get('sort', 'pushed_at')
current_lang = request.args.get('lang', '')
current_topic = request.args.get('topic', '')
conn = get_db_connection()
languages_cursor = conn.execute("SELECT DISTINCT language FROM projects WHERE language IS NOT NULL AND language != 'N/A' ORDER BY language ASC")
languages = [row['language'] for row in languages_cursor.fetchall()]
topic_counter = collections.Counter()
topics_cursor = conn.execute("SELECT topics FROM projects WHERE topics IS NOT NULL AND topics != '[]'")
for row in topics_cursor:
try:
topics_list = json.loads(row['topics'])
topic_counter.update(topics_list)
except json.JSONDecodeError:
continue
popular_topics = [topic for topic, count in topic_counter.most_common(TOP_TOPICS_COUNT)]
where_clauses = []
params = []
if query:
where_clauses.append("(name LIKE ? OR description LIKE ? OR readme_excerpt LIKE ?)")
params.extend([f'%{query}%', f'%{query}%', f'%{query}%'])
if current_lang:
where_clauses.append("language = ?")
params.append(current_lang)
if current_topic:
where_clauses.append("topics LIKE ?")
params.append(f'%"{current_topic}"%')
base_where = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
total_items_cursor = conn.execute(f'SELECT COUNT(*) FROM projects{base_where}', params)
total_items = total_items_cursor.fetchone()[0]
total_pages = math.ceil(total_items / PER_PAGE)
offset = (page - 1) * PER_PAGE
order_by_clause = " ORDER BY stars DESC" if sort_by == 'stars' else " ORDER BY pushed_at DESC"
sql_query = f"SELECT * FROM projects{base_where}{order_by_clause} LIMIT ? OFFSET ?"
final_params = params + [PER_PAGE, offset]
projects_cursor = conn.execute(sql_query, final_params).fetchall()
processed_projects = []
for row in projects_cursor:
project = dict(row)
desc = project.get('description')
excerpt = project.get('readme_excerpt')
if has_chinese(desc): project['display_text'] = desc
elif excerpt: project['display_text'] = excerpt
elif desc: project['display_text'] = desc
else: project['display_text'] = '暂无简介'
if project.get('topics'):
try: project['topics'] = json.loads(project['topics'])
except json.JSONDecodeError: project['topics'] = []
else: project['topics'] = []
processed_projects.append(project)
conn.close()
current_year = datetime.now().year
template_context = dict(
projects=processed_projects,
search_query=query,
current_sort=sort_by,
current_year=current_year,
current_page=page,
total_pages=total_pages,
languages=languages,
current_lang=current_lang,
popular_topics=popular_topics,
current_topic=current_topic
)
if request.headers.get('HX-Request'):
return render_template('_projects_partial.html', **template_context)
return render_template('index.html', **template_context)
if __name__ == '__main__':
setup_database()
print("\n" + "="*50)
print(" 🚀 启动 CodeZen 自动化服务...")
scheduler = BackgroundScheduler()
scheduler.add_job(func=scheduled_scrape_job, trigger="interval", hours=SCHEDULE_HOURS, misfire_grace_time=60, next_run_time=datetime.now())
scheduler.start()
print(f" ✅ 后台抓取任务已启动 (每 {SCHEDULE_HOURS} 小时运行一次)")
atexit.register(lambda: (print(" [Scheduler] 关闭调度器..."), scheduler.shutdown()))
print("="*50 + "\n")
app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False)