| import requests |
| import time |
| import os |
| import logging |
| from tqdm import tqdm |
| from dotenv import load_dotenv |
|
|
| |
| load_dotenv() |
|
|
| |
| GITHUB_TOKEN = os.getenv("REAL_TOKEN") |
| timeout_duration = 10 |
| output_file = "python_files.txt" |
| sha_file = "seen_shas.txt" |
| line_number_file = "line_number.txt" |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" |
| ) |
|
|
|
|
| def fetch_python_files_from_repo(repo_url, seen_shas): |
| """Fetches Python files from a given GitHub repository.""" |
| repo_name = repo_url.split("https://github.com/")[-1] |
| contents_url = f"https://api.github.com/repos/{repo_name}/contents" |
|
|
| headers = {"Authorization": f"token {GITHUB_TOKEN}"} |
| response = requests.get(contents_url, headers=headers, timeout=timeout_duration) |
|
|
| if response.status_code == 200: |
| contents = response.json() |
| for file_data in contents: |
| if file_data["name"].endswith(".py") and not file_data["name"].endswith( |
| "setup.py" |
| ): |
| file_size = file_data.get("size", 0) |
| if 1000 <= file_size <= 100000: |
| file_sha = file_data.get("sha") |
|
|
| if file_sha not in seen_shas: |
| with open(output_file, "a") as file: |
| file.write(f"{file_data['download_url']}\n") |
| with open(sha_file, "a") as sha_log: |
| sha_log.write(f"{file_sha}\n") |
| seen_shas.add(file_sha) |
| else: |
| logging.info(f"Skipping {file_data['name']} (SHA already seen)") |
| return True |
|
|
| elif response.status_code == 403: |
| reset_time = int( |
| response.headers.get("X-RateLimit-Reset", time.time()) |
| ) |
| wait_time = reset_time - int(time.time()) |
| logging.warning( |
| f"Rate limit hit! Waiting {wait_time // 60} minutes until reset..." |
| ) |
| |
| print("Actually just exiting") |
| exit() |
| return fetch_python_files_from_repo(repo_url, seen_shas) |
|
|
| logging.error(f"Failed to fetch {repo_name}. Status: {response.status_code}") |
| return False |
|
|
|
|
| def read_repositories_from_file(file_path): |
| """Reads repository URLs from a file.""" |
| with open(file_path, "r") as f: |
| return [line.strip() for line in f if line.strip()] |
|
|
|
|
| def read_seen_shas(file_path): |
| """Reads previously seen SHAs from a file.""" |
| if os.path.exists(file_path): |
| with open(file_path, "r") as f: |
| return set(f.read().splitlines()) |
| return set() |
|
|
|
|
| def get_last_line_number(file_path): |
| """Reads the last processed line number.""" |
| if os.path.exists(file_path): |
| with open(file_path, "r") as f: |
| return int(f.read().strip()) |
| return 0 |
|
|
|
|
| def save_last_line_number(file_path, line_number): |
| """Saves the last processed line number.""" |
| with open(file_path, "w") as f: |
| f.write(str(line_number)) |
|
|
|
|
| |
| repositories = read_repositories_from_file("repositories.txt") |
| seen_shas = read_seen_shas(sha_file) |
| last_line_number = get_last_line_number(line_number_file) |
|
|
| with tqdm( |
| total=len(repositories) - last_line_number, desc="Processing Repositories" |
| ) as pbar: |
| for index, repo in enumerate( |
| repositories[last_line_number:], start=last_line_number |
| ): |
| success = fetch_python_files_from_repo(repo, seen_shas) |
| if not success: |
| continue |
|
|
| pbar.update(1) |
| save_last_line_number(line_number_file, index + 1) |
|
|
| logging.info(f"Python files saved to {output_file}") |
|
|