Files
CodeZen/scraper.py
2025-11-07 19:44:51 +08:00

154 lines
6.0 KiB
Python

import requests
import sqlite3
import re
import base64
from datetime import datetime, timedelta
import os
import json
# ==============================================================================
# --- ⚙️ 配置区域 ---
# ==============================================================================
SEARCH_DAYS_AGO = 7
MIN_STARS = 50
DB_FILE = 'github_projects.db'
README_EXCERPT_LENGTH = 300
# ==============================================================================
# --- 脚本核心代码 ---
# ==============================================================================
# SESSION = requests.Session()
# SESSION.headers.update({'Authorization': f'token {GITHUB_TOKEN}'})
def setup_database():
print(" [Scheduler] 正在检查并设置数据库...")
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS projects (
repo_id INTEGER PRIMARY KEY, name TEXT NOT NULL, url TEXT NOT NULL UNIQUE,
description TEXT, readme_excerpt TEXT, language TEXT, stars INTEGER, pushed_at TIMESTAMP,
owner_avatar_url TEXT, topics TEXT
)
''')
conn.commit()
conn.close()
def has_chinese(text):
if not text: return False
return re.search(r'[\u4e00-\u9fa5]', text) is not None
def get_readme_content_with_excerpt(repo_full_name, session):
"""
智能两步扫描逻辑
"""
print(f" -> 智能扫描 '{repo_full_name}'...")
chinese_readme_filenames = ["README_CN.md", "README_zh.md", "README_zh-CN.md", "README-zh_CN.md"]
for filename in chinese_readme_filenames:
url = f"https://api.github.com/repos/{repo_full_name}/contents/{filename}"
try:
response = session.get(url, timeout=10)
if response.status_code == 200:
print(f" ✨ 找到中文文件: '{filename}'.")
content = response.json().get('content', '')
decoded_content = base64.b64decode(content).decode('utf-8', errors='ignore')
if has_chinese(decoded_content):
excerpt = re.sub(r'#+\s*|\!\[.*\]\(.*\)|<.*?>|\[|\]|\(|\)', '', decoded_content).strip().replace('\n', ' ')
return excerpt[:README_EXCERPT_LENGTH]
except requests.RequestException:
continue
print(" - 检查默认 README.md...")
url = f"https://api.github.com/repos/{repo_full_name}/contents/README.md"
try:
response = session.get(url, timeout=10)
if response.status_code == 200:
content = response.json().get('content', '')
decoded_content = base64.b64decode(content).decode('utf-8', errors='ignore')
if has_chinese(decoded_content):
print(f" ✔️ 默认 README 中发现中文.")
excerpt = re.sub(r'#+\s*|\!\[.*\]\(.*\)|<.*?>|\[|\]|\(|\)', '', decoded_content).strip().replace('\n', ' ')
return excerpt[:README_EXCERPT_LENGTH]
else:
print(f" ❌ 默认 README 中无中文.")
else:
print(f" - 默认 README.md 未找到.")
except requests.RequestException as e:
print(f" ⚠️ 抓取 README 出错: {e}")
return None
def get_new_hot_repos(session):
search_date = (datetime.now() - timedelta(days=SEARCH_DAYS_AGO)).strftime('%Y-%m-%d')
query = f'created:>{search_date} stars:>{MIN_STARS}'
url = f'https://api.github.com/search/repositories?q={query}&sort=stars&order=desc&per_page=100'
try:
response = session.get(url, timeout=20)
response.raise_for_status()
return response.json().get('items', [])
except requests.RequestException:
return []
def discover_and_save_projects(github_token):
if not github_token or github_token == '111112222233333':
print(" [Scheduler] 错误: GitHub Token 未设置或无效. 跳过本次抓取。")
return 0
session = requests.Session()
session.headers.update({'Authorization': f'token {github_token}'})
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
all_repos = get_new_hot_repos(session)
if not all_repos:
print(" [Scheduler] API 未返回任何匹配条件的项目。")
conn.close()
return 0
print(f" [Scheduler] API 返回 {len(all_repos)} 个项目. 开始过滤...")
new_projects_count = 0
for repo in all_repos:
repo_id, repo_full_name = repo['id'], repo['full_name']
cursor.execute("SELECT 1 FROM projects WHERE repo_id = ?", (repo_id,))
if cursor.fetchone(): continue
print(f"\n [Scheduler] 处理 [{repo_full_name}]...")
description = repo.get('description', '')
is_desc_chinese = has_chinese(description)
# V6.0: 传入 session
readme_excerpt = get_readme_content_with_excerpt(repo_full_name, session) if not is_desc_chinese else None
if not is_desc_chinese and not readme_excerpt:
print(f" -> 丢弃 '{repo_full_name}' (无中文内容).")
continue
owner_avatar_url = repo['owner']['avatar_url'] if repo.get('owner') else None
topics_list = repo.get('topics', [])
topics_json = json.dumps(topics_list)
cursor.execute('''
INSERT INTO projects (
repo_id, name, url, description, readme_excerpt, language,
stars, pushed_at, owner_avatar_url, topics
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
repo['id'], repo['name'], repo['html_url'], description, readme_excerpt,
repo.get('language', 'N/A'), repo['stargazers_count'],
repo['pushed_at'], owner_avatar_url, topics_json
))
conn.commit()
new_projects_count += 1
print(f" 💾 已保存 '{repo_full_name}' 到数据库.")
print(f"\n [Scheduler] 本轮抓取完成. 新增 {new_projects_count} 个项目.")
conn.close()
return new_projects_count
# ★★★ V6.0: 移除了 main() 和 if __name__ == '__main__' ★★★