Tools / main.py
jebin2's picture
issue
413ae2e
raw
history blame
10.5 kB
from fastapi import FastAPI, Request, File, UploadFile, HTTPException, Form, Query
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from jinja2 import Environment, FileSystemLoader, select_autoescape
from custom_logger import logger_config
from image.image_base import ImageBase
from pydantic import BaseModel
from image.converter import Converter
from image.remove_metadata import RemoveMetadata
from image.remove_background import RemoveBackground
import mimetypes
app = FastAPI(title="Tools Collection", description="Collection of utility tools")
# Startup event to cleanup old files
@app.on_event("startup")
async def startup_cleanup():
"""Run cleanup of files older than 2 days on server startup"""
try:
image_base = ImageBase()
deleted_count = image_base.cleanup_old_files(max_age_days=2)
if deleted_count > 0:
logger_config.info(f"Startup cleanup: Removed {deleted_count} old file(s)")
else:
logger_config.info("Startup cleanup: No old files to remove")
except Exception as e:
logger_config.warning(f"Startup cleanup failed: {e}")
# Mount static/image at /image
app.mount("/image/javascript", StaticFiles(directory="image/javascript"), name="image")
# Templates
template_dirs = [".", "./image"]
env = Environment(
loader=FileSystemLoader(template_dirs),
autoescape=select_autoescape(['html', 'xml'])
)
# Available features
FEATURES = {
"image": {
"name": "Image Tools",
"description": "HEIC to PNG/JPG conversion and metadata removal",
"icon": "๐Ÿ–ผ๏ธ",
"features": ["convert", "remove_metadata", "remove_background"],
"folder": "image",
"tags": ["image", "heic", "png", "jpg", "convert", "metadata"]
},
"pdf": {
"name": "PDF Tools",
"description": "Convert images to PDF, merge PDFs, and more",
"icon": "๐Ÿ“„",
"features": ["images_to_pdf"],
"folder": "pdf",
"tags": ["pdf", "merge", "convert", "images", "document"],
"coming_soon": True
},
"video": {
"name": "Video Tools",
"description": "Video player with playlist management - add videos and convert via โ‹ฎ menu",
"icon": "๐ŸŽฌ",
"features": ["player"],
"folder": "video",
"tags": ["video", "player", "media", "watch", "convert", "playlist"]
}
}
# Routes
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
template = env.get_template("index.html") # From tool/
html_content = template.render(request=request)
return HTMLResponse(content=html_content)
@app.get("/image/convert", response_class=HTMLResponse)
async def image_tools(request: Request):
template = env.get_template("image/convert.html") # From tool/image/
html_content = template.render(request=request)
return HTMLResponse(content=html_content)
@app.get("/image/remove_metadata", response_class=HTMLResponse)
async def image_tools(request: Request):
template = env.get_template("image/remove_metadata.html") # From tool/image/
html_content = template.render(request=request)
return HTMLResponse(content=html_content)
@app.get("/image/remove_background", response_class=HTMLResponse)
async def image_tools(request: Request):
template = env.get_template("image/remove_background.html") # From tool/image/
html_content = template.render(request=request)
return HTMLResponse(content=html_content)
@app.get("/video/player")
async def video_player():
return RedirectResponse(url="https://jebin2.github.io/JellyJump/player.html", status_code=302)
@app.post("/image/upload")
async def upload_image(
id: str = Form(...),
image: UploadFile = File(...)
):
try:
image_base = ImageBase()
image_base.upload(id, image)
# Return success response
return JSONResponse({
"success": True,
"message": "Image uploaded successfully"
})
except ValueError as ve:
logger_config.error(f"Validation error: {str(ve)}")
raise HTTPException(
status_code=400,
detail=str(ve)
)
except Exception as e:
logger_config.error(f"Unexpected error during upload: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Internal server error: {str(e)}"
)
@app.post("/image/convert")
async def convert_image(
id: str = Form(...),
to_format: str = Form(...)
):
try:
# Check if input is SVG - PIL cannot read SVG files
if id.lower().endswith('.svg'):
raise ValueError("SVG files cannot be converted. SVG is only available as an output format.")
converter = Converter()
output_path = converter.convert_image(id, to_format)
# Check if conversion failed
if output_path is None:
raise ValueError("Image conversion failed. The file may be corrupted or in an unsupported format.")
# Return success response
return JSONResponse({
"success": True,
"message": "Image converted successfully",
"new_filename": output_path.split("/")[-1]
})
except ValueError as ve:
logger_config.error(f"Validation error: {str(ve)}")
raise HTTPException(
status_code=400,
detail=str(ve)
)
except Exception as e:
logger_config.error(f"Unexpected error during upload: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Internal server error: {str(e)}"
)
@app.post("/image/remove_metadata")
async def remove_metadata(
id: str = Form(...)
):
try:
# Check if input is SVG - PIL cannot read SVG files
if id.lower().endswith('.svg'):
raise ValueError("SVG files cannot be processed. SVG is a vector format without embedded metadata.")
removeMetadata = RemoveMetadata()
output_path, metadata = removeMetadata.process(id)
# Check if processing failed
if output_path is None:
raise ValueError("Metadata removal failed. The file may be corrupted or in an unsupported format.")
# Return success response
return JSONResponse({
"success": True,
"message": "Metadata removed successfully",
"new_filename": output_path.split("/")[-1],
"other_info": metadata
})
except ValueError as ve:
logger_config.error(f"Validation error: {str(ve)}")
raise HTTPException(
status_code=400,
detail=str(ve)
)
except Exception as e:
logger_config.error(f"Unexpected error during remove_metadata: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Internal server error: {str(e)}"
)
@app.post("/image/remove_background")
async def remove_background(
id: str = Form(...)
):
try:
removeBackground = RemoveBackground()
output_path = removeBackground.process(id)
# Return success response
return JSONResponse({
"success": True,
"message": "Image uploaded successfully",
"new_filename": output_path.split("/")[-1]
})
except ValueError as ve:
logger_config.error(f"Validation error: {str(ve)}")
raise HTTPException(
status_code=400,
detail=str(ve)
)
except Exception as e:
logger_config.error(f"Unexpected error during remove_background: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Internal server error: {str(e)}"
)
@app.get("/image/download")
async def download_converted_image(
id: str = Query(...)
):
try:
image_base = ImageBase()
file_path = image_base.download_url(id)
mime_type, _ = mimetypes.guess_type(file_path)
return FileResponse(file_path, media_type=mime_type, filename=id)
except ValueError as ve:
logger_config.error(f"Validation error: {str(ve)}")
raise HTTPException(
status_code=400,
detail=str(ve)
)
except Exception as e:
logger_config.error(f"Unexpected error during upload: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Internal server error: {str(e)}"
)
class DeleteRequest(BaseModel):
ids: list[str]
@app.post("/image/delete")
async def delete_images(request: DeleteRequest):
try:
image_base = ImageBase()
image_base.delete(request.ids)
# Return success response
return JSONResponse({
"success": True,
"message": "Image deleted successfully"
})
except ValueError as ve:
logger_config.error(f"Validation error: {str(ve)}")
raise HTTPException(
status_code=400,
detail=str(ve)
)
except Exception as e:
logger_config.error(f"Unexpected error during upload: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Internal server error: {str(e)}"
)
@app.get("/api/features")
async def get_features():
"""Get available features"""
return {"features": FEATURES}
@app.get("/api/search")
async def search_features(q: str = ""):
"""Search features by name, description, or tags"""
if not q:
return {"features": FEATURES}
q = q.lower()
filtered_features = {}
for key, feature in FEATURES.items():
# Search in name, description, and tags
search_text = f"{feature['name']} {feature['description']} {' '.join(feature.get('tags', []))}".lower()
if q in search_text:
filtered_features[key] = feature
return {"features": filtered_features}
@app.get("/api/status")
async def get_feature_status():
"""Get feature status"""
features_status = {}
for key in FEATURES.keys():
features_status[key] = {
"feature_name": None,
"is_busy": False,
"process_id": None
}
return features_status
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)