Spaces:
Paused
Paused
| import gradio as gr | |
| from fastapi import FastAPI, UploadFile, File, Request | |
| from fastapi.responses import FileResponse, HTMLResponse | |
| import uuid | |
| import os | |
| import json | |
| from datetime import datetime | |
| from typing import Dict, List | |
| import shutil | |
| import asyncio | |
| from contextlib import asynccontextmanager | |
| # Initialize data storage | |
| peers: Dict[str, Dict] = {} | |
| jobs: List[Dict] = [] | |
| # Create directories | |
| os.makedirs("results", exist_ok=True) | |
| os.makedirs("client", exist_ok=True) | |
| # Client code | |
| CLIENT_CODE = '''import requests | |
| import subprocess | |
| import time | |
| import os | |
| import sys | |
| from datetime import datetime | |
| # Configuration | |
| PEER_ID = f"peer-{os.getenv('COMPUTERNAME', 'unknown')}-{datetime.now().strftime('%Y%m%d%H%M%S')}" | |
| SERVER_URL = "https://your-username-your-space.hf.space" # Replace with actual Space URL | |
| def check_gpu(): | |
| """Check GPU availability""" | |
| try: | |
| result = subprocess.run(['nvidia-smi', '--query-gpu=utilization.gpu', '--format=csv,noheader,nounits'], | |
| capture_output=True, text=True) | |
| if result.returncode == 0: | |
| gpu_usage = int(result.stdout.strip()) | |
| return gpu_usage < 20 # GPU is idle if usage < 20% | |
| except: | |
| print("GPU not found. Running in CPU mode.") | |
| return False | |
| def register_peer(): | |
| """Register peer with server""" | |
| try: | |
| response = requests.post(f"{SERVER_URL}/api/peers/register", params={"peer_id": PEER_ID}) | |
| if response.status_code == 200: | |
| print(f"โ Peer registered: {PEER_ID}") | |
| return True | |
| except Exception as e: | |
| print(f"โ Server connection failed: {e}") | |
| return False | |
| def generate_image_cpu(prompt, output_path): | |
| """Generate test image using CPU""" | |
| from PIL import Image, ImageDraw, ImageFont | |
| img = Image.new('RGB', (512, 512), color='white') | |
| draw = ImageDraw.Draw(img) | |
| # Draw prompt text | |
| text = f"Prompt: {prompt[:50]}..." | |
| draw.text((10, 10), text, fill='black') | |
| draw.text((10, 40), f"Generated by: {PEER_ID}", fill='gray') | |
| draw.text((10, 70), f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", fill='gray') | |
| img.save(output_path) | |
| print(f"๐ Test image generated: {output_path}") | |
| def main(): | |
| print("๐ Starting P2P GPU Client...") | |
| if not register_peer(): | |
| print("Server registration failed. Exiting.") | |
| return | |
| while True: | |
| try: | |
| # Heartbeat | |
| requests.post(f"{SERVER_URL}/api/peers/heartbeat", params={"peer_id": PEER_ID}) | |
| # Request job | |
| response = requests.get(f"{SERVER_URL}/api/jobs/request", params={"peer_id": PEER_ID}) | |
| if response.status_code == 200: | |
| job_data = response.json() | |
| if job_data.get("job"): | |
| job = job_data["job"] | |
| job_id = job["id"] | |
| prompt = job["prompt"] | |
| print(f"\\n๐ New job received: {prompt}") | |
| # Generate image | |
| output_path = f"{job_id}.png" | |
| if check_gpu(): | |
| print("๐ฎ Generating with GPU...") | |
| # Actual GPU generation code would go here | |
| generate_image_cpu(prompt, output_path) | |
| else: | |
| print("๐ป Generating with CPU...") | |
| generate_image_cpu(prompt, output_path) | |
| # Upload result | |
| with open(output_path, 'rb') as f: | |
| files = {'file': (output_path, f, 'image/png')} | |
| response = requests.post( | |
| f"{SERVER_URL}/api/jobs/result", | |
| params={"job_id": job_id}, | |
| files=files | |
| ) | |
| if response.status_code == 200: | |
| print("โ Result uploaded successfully") | |
| # Clean up | |
| os.remove(output_path) | |
| time.sleep(10) # Check every 10 seconds | |
| except KeyboardInterrupt: | |
| print("\\n๐ Shutting down") | |
| break | |
| except Exception as e: | |
| print(f"โ ๏ธ Error: {e}") | |
| time.sleep(30) | |
| if __name__ == "__main__": | |
| # Check required packages | |
| try: | |
| import PIL | |
| except ImportError: | |
| print("Installing required packages...") | |
| subprocess.run([sys.executable, "-m", "pip", "install", "pillow", "requests"]) | |
| main() | |
| ''' | |
| # Create client files | |
| with open("client/peer_agent.py", "w", encoding="utf-8") as f: | |
| f.write(CLIENT_CODE) | |
| with open("client/requirements.txt", "w") as f: | |
| f.write("requests\npillow\n") | |
| with open("client/README.md", "w", encoding="utf-8") as f: | |
| f.write("""# P2P GPU Client for Windows | |
| ## Installation | |
| 1. Install Python 3.8+ | |
| 2. Run `pip install -r requirements.txt` | |
| 3. Update SERVER_URL in `peer_agent.py` with actual Hugging Face Space URL | |
| 4. Run `python peer_agent.py` | |
| ## GPU Support | |
| - Automatically detects NVIDIA GPU if available | |
| - Falls back to CPU mode for testing | |
| """) | |
| # FastAPI app with lifespan | |
| async def lifespan(app: FastAPI): | |
| # Startup | |
| print("Starting P2P GPU Hub...") | |
| yield | |
| # Shutdown | |
| print("Shutting down P2P GPU Hub...") | |
| app = FastAPI(lifespan=lifespan) | |
| # API endpoints | |
| async def get_status(): | |
| """Get system status""" | |
| active_peers = sum(1 for p in peers.values() | |
| if (datetime.now() - p['last_seen']).seconds < 60) | |
| pending_jobs = sum(1 for j in jobs if j['status'] == 'pending') | |
| completed_jobs = sum(1 for j in jobs if j['status'] == 'completed') | |
| recent_results = [ | |
| {"filename": j['filename'], "prompt": j['prompt']} | |
| for j in jobs[-10:] if j['status'] == 'completed' and 'filename' in j | |
| ] | |
| return { | |
| "active_peers": active_peers, | |
| "pending_jobs": pending_jobs, | |
| "completed_jobs": completed_jobs, | |
| "recent_results": recent_results | |
| } | |
| async def register_peer(peer_id: str): | |
| """Register a peer""" | |
| peers[peer_id] = { | |
| "status": "idle", | |
| "last_seen": datetime.now(), | |
| "jobs_completed": 0 | |
| } | |
| return {"status": "registered", "peer_id": peer_id} | |
| async def heartbeat(peer_id: str): | |
| """Update peer status""" | |
| if peer_id in peers: | |
| peers[peer_id]["last_seen"] = datetime.now() | |
| return {"status": "alive"} | |
| return {"status": "unregistered"} | |
| async def submit_job(request: Request): | |
| """Submit a job""" | |
| data = await request.json() | |
| job_id = str(uuid.uuid4()) | |
| job = { | |
| "id": job_id, | |
| "prompt": data.get("prompt", ""), | |
| "status": "pending", | |
| "created_at": datetime.now() | |
| } | |
| jobs.append(job) | |
| return {"job_id": job_id, "status": "submitted"} | |
| async def request_job(peer_id: str): | |
| """Request a job for processing""" | |
| for job in jobs: | |
| if job["status"] == "pending": | |
| job["status"] = "assigned" | |
| job["peer_id"] = peer_id | |
| job["assigned_at"] = datetime.now() | |
| return {"job": job} | |
| return {"job": None} | |
| async def submit_result(job_id: str, file: UploadFile = File(...)): | |
| """Submit job result""" | |
| filename = f"{job_id}.png" | |
| file_path = f"results/{filename}" | |
| with open(file_path, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| for job in jobs: | |
| if job["id"] == job_id: | |
| job["status"] = "completed" | |
| job["filename"] = filename | |
| job["completed_at"] = datetime.now() | |
| if "peer_id" in job and job["peer_id"] in peers: | |
| peers[job["peer_id"]]["jobs_completed"] += 1 | |
| break | |
| return {"status": "success", "filename": filename} | |
| async def get_result(filename: str): | |
| """Get generated image""" | |
| file_path = f"results/{filename}" | |
| if os.path.exists(file_path): | |
| return FileResponse(file_path) | |
| return {"error": "File not found"} | |
| async def get_client_file(filename: str): | |
| """Download client file""" | |
| file_path = f"client/{filename}" | |
| if os.path.exists(file_path): | |
| return FileResponse(file_path, filename=filename) | |
| return {"error": "File not found"} | |
| # Gradio interface functions | |
| def gradio_submit_job(prompt): | |
| """Submit job through Gradio""" | |
| if not prompt: | |
| return "Please enter a prompt" | |
| job_id = str(uuid.uuid4()) | |
| job = { | |
| "id": job_id, | |
| "prompt": prompt, | |
| "status": "pending", | |
| "created_at": datetime.now() | |
| } | |
| jobs.append(job) | |
| return f"Job submitted successfully! Job ID: {job_id}" | |
| def gradio_get_status(): | |
| """Get status through Gradio""" | |
| active_peers = sum(1 for p in peers.values() | |
| if (datetime.now() - p['last_seen']).seconds < 60) | |
| pending = sum(1 for j in jobs if j['status'] == 'pending') | |
| completed = sum(1 for j in jobs if j['status'] == 'completed') | |
| status_text = f"""### System Status | |
| - Active Peers: {active_peers} | |
| - Pending Jobs: {pending} | |
| - Completed Jobs: {completed} | |
| ### Recent Jobs | |
| """ | |
| # Add recent jobs | |
| recent_jobs = jobs[-5:][::-1] # Last 5 jobs, reversed | |
| for job in recent_jobs: | |
| status_text += f"\n- **{job['id'][:8]}...**: {job['prompt'][:50]}... ({job['status']})" | |
| return status_text | |
| def gradio_get_gallery(): | |
| """Get completed images for gallery""" | |
| image_files = [] | |
| for job in jobs[-20:]: # Last 20 jobs | |
| if job['status'] == 'completed' and 'filename' in job: | |
| file_path = f"results/{job['filename']}" | |
| if os.path.exists(file_path): | |
| image_files.append((file_path, job['prompt'])) | |
| return image_files | |
| # Create Gradio interface | |
| with gr.Blocks(title="P2P GPU Image Generation Hub") as demo: | |
| gr.Markdown("# ๐ค P2P GPU Image Generation Hub") | |
| gr.Markdown("Distributed image generation using idle GPUs from peer nodes") | |
| with gr.Tabs(): | |
| with gr.Tab("Submit Job"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| prompt_input = gr.Textbox( | |
| label="Image Prompt", | |
| placeholder="Describe the image you want to generate...", | |
| lines=3 | |
| ) | |
| submit_btn = gr.Button("Submit Job", variant="primary") | |
| result_text = gr.Textbox(label="Result", interactive=False) | |
| submit_btn.click( | |
| fn=gradio_submit_job, | |
| inputs=prompt_input, | |
| outputs=result_text | |
| ) | |
| with gr.Tab("System Status"): | |
| status_display = gr.Markdown() | |
| refresh_btn = gr.Button("Refresh Status") | |
| refresh_btn.click( | |
| fn=gradio_get_status, | |
| outputs=status_display | |
| ) | |
| # Auto-refresh status on load | |
| demo.load(fn=gradio_get_status, outputs=status_display) | |
| with gr.Tab("Gallery"): | |
| gallery = gr.Gallery( | |
| label="Generated Images", | |
| show_label=True, | |
| elem_id="gallery", | |
| columns=3, | |
| rows=2, | |
| height="auto" | |
| ) | |
| refresh_gallery_btn = gr.Button("Refresh Gallery") | |
| refresh_gallery_btn.click( | |
| fn=gradio_get_gallery, | |
| outputs=gallery | |
| ) | |
| # Auto-load gallery on tab load | |
| demo.load(fn=gradio_get_gallery, outputs=gallery) | |
| with gr.Tab("Download Client"): | |
| gr.Markdown(""" | |
| ## Windows Client Setup | |
| 1. Download the client files: | |
| - [peer_agent.py](/api/client/peer_agent.py) | |
| - [requirements.txt](/api/client/requirements.txt) | |
| - [README.md](/api/client/README.md) | |
| 2. Install Python 3.8 or higher | |
| 3. Install requirements: | |
| ```bash | |
| pip install -r requirements.txt | |
| ``` | |
| 4. Update the SERVER_URL in peer_agent.py with this Space's URL | |
| 5. Run the client: | |
| ```bash | |
| python peer_agent.py | |
| ``` | |
| The client will automatically detect GPU availability and start processing jobs. | |
| """) | |
| # Mount Gradio app to FastAPI | |
| app = gr.mount_gradio_app(app, demo, path="/") | |
| # For Hugging Face Spaces | |
| if __name__ == "__main__": | |
| demo.launch() |