| | |
| | |
| |
|
| | import json |
| | import shutil |
| | import hashlib |
| | from pathlib import Path |
| | from git import Repo |
| | from datetime import datetime |
| | import argparse |
| |
|
| | |
| | REPO_LIST_PATH = Path("data/metadata/repos_filtered.json") |
| | CLONE_DIR = Path("temp_repos") |
| | OUTPUT_FILE = Path("data/raw/dockerfiles.jsonl") |
| | OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True) |
| |
|
| | REMOVE_DIRS = [".git", ".github", "docs", "tests", "__pycache__", ".idea", ".vscode"] |
| |
|
| | def clean_repo(path: Path): |
| | for d in REMOVE_DIRS: |
| | shutil.rmtree(path / d, ignore_errors=True) |
| |
|
| | def compute_sha1(text: str) -> str: |
| | return hashlib.sha1(text.encode("utf-8")).hexdigest() |
| |
|
| | def is_valid_dockerfile(path: Path) -> bool: |
| | try: |
| | text = path.read_text(encoding="utf-8").strip() |
| | lines = [l.strip().lower() for l in text.splitlines() if l.strip()] |
| | if len(lines) < 5 or path.stat().st_size > 200_000: |
| | return False |
| | top_lines = lines[:10] |
| | has_from = any(l.startswith("from") for l in top_lines) |
| | has_run = any(l.startswith(("run", "cmd", "copy")) for l in lines) |
| | return has_from and has_run |
| | except Exception as e: |
| | print(f"⚠️ Błąd walidacji pliku {path}: {e}") |
| | return False |
| |
|
| | def find_dockerfiles(repo_path: Path) -> list[Path]: |
| | return [ |
| | f for f in repo_path.rglob("*") |
| | if f.name.lower() == "dockerfile" and f.is_file() |
| | ] |
| |
|
| | def clone_repo(url: str, full_name: str) -> Path | None: |
| | dest = CLONE_DIR / full_name.replace("/", "__") |
| | if dest.exists(): |
| | print(f"⚠️ Repo {full_name} już istnieje – pomijam klonowanie.") |
| | return dest |
| | try: |
| | print(f"⬇️ Klonuję {full_name}...") |
| | Repo.clone_from(url, dest, depth=1) |
| | clean_repo(dest) |
| | return dest |
| | except Exception as e: |
| | print(f"❌ Błąd klonowania {full_name}: {e}") |
| | return None |
| |
|
| | def main(): |
| | parser = argparse.ArgumentParser() |
| | parser.add_argument("--purge", action="store_true", help="Usuń repozytorium po ekstrakcji") |
| | args = parser.parse_args() |
| |
|
| | with open(REPO_LIST_PATH) as f: |
| | repos = json.load(f) |
| |
|
| | saved, skipped = 0, 0 |
| | seen_hashes = set() |
| |
|
| | with open(OUTPUT_FILE, "w", encoding="utf-8") as out_f: |
| | for repo in repos: |
| | full_name = repo["fullName"] |
| | url = repo["url"] |
| | repo_path = clone_repo(url, full_name) |
| | if not repo_path: |
| | continue |
| |
|
| | for file in find_dockerfiles(repo_path): |
| | if not is_valid_dockerfile(file): |
| | skipped += 1 |
| | continue |
| |
|
| | try: |
| | lines = file.read_text(encoding="utf-8").strip().splitlines() |
| | lines = [l.rstrip() for l in lines if l.strip()] |
| | file_id = compute_sha1("\n".join(lines)) |
| | if file_id in seen_hashes: |
| | skipped += 1 |
| | continue |
| | seen_hashes.add(file_id) |
| |
|
| | json.dump({ |
| | "repo": full_name, |
| | "path": str(file.relative_to(repo_path)), |
| | "file_id": file_id, |
| | "content": lines, |
| | "size_bytes": file.stat().st_size, |
| | "line_count": len(lines), |
| | "valid": True, |
| | "cloned_at": datetime.now().isoformat() |
| | }, out_f) |
| | out_f.write("\n") |
| | saved += 1 |
| |
|
| | except Exception as e: |
| | print(f"⚠️ Błąd przy zapisie {file}: {e}") |
| | skipped += 1 |
| |
|
| | if args.purge: |
| | shutil.rmtree(repo_path, ignore_errors=True) |
| |
|
| | print(f"\n✅ Zapisano {saved} poprawnych Dockerfile do {OUTPUT_FILE}") |
| | print(f"🚫 Pominięto {skipped} plików (nieważne, błędne, zduplikowane)") |
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|