Spaces:
Sleeping
Sleeping
| # app.py — OpScanIA: DeepSeek-OCR + BioMedLM-7B (GGUF local con llama.cpp, ZeroGPU-safe) — Gradio 5 | |
| # ----------------------------------------------------------------------------------------------- | |
| # • OCR: DeepSeek-OCR (GPU SOLO dentro del worker @spaces.GPU; sin inicializar CUDA en el main). | |
| # • Chat: BioMedLM-7B en formato GGUF con llama.cpp (también SOLO en worker GPU). | |
| # • Sin llamadas GPU anidadas; todo atrapado con try/except para evitar RuntimeError genéricos. | |
| # • Prompt reforzado en español y generación determinista (sensible a OCR y sin alucinaciones). | |
| # • Config por variables: GGUF_REPO, GGUF_FILE, GGUF_LOCAL_PATH, N_CTX, N_BATCH, N_GPU_LAYERS, etc. | |
| # • Valores por defecto: repo público mradermacher/BioMedLM-7B-GGUF + Q4_K_M. | |
| # ----------------------------------------------------------------------------------------------- | |
| import os, re, tempfile, traceback, glob | |
| import gradio as gr | |
| import torch | |
| from PIL import Image | |
| from transformers import AutoModel, AutoTokenizer | |
| import spaces | |
| from huggingface_hub import hf_hub_download | |
| from llama_cpp import Llama | |
| # ========================= | |
| # CONFIG (entorno) | |
| # ========================= | |
| # --- BioMedLM-7B (GGUF / llama.cpp) --- | |
| GGUF_REPO = os.getenv("GGUF_REPO", "mradermacher/BioMedLM-7B-GGUF").strip() | |
| GGUF_FILE = os.getenv("GGUF_FILE", "BioMedLM-7B.Q4_K_M.gguf").strip() | |
| GGUF_LOCAL_PATH = os.getenv("GGUF_LOCAL_PATH", "").strip() # ej: ./models/BioMedLM-7B.Q4_K_M.gguf | |
| # Candidatos comunes (si no coincide el nombre exacto) | |
| _GGUF_CANDIDATES = [ | |
| "BioMedLM-7B.Q4_K_M.gguf", "BioMedLM-7B.Q4_K_S.gguf", | |
| "BioMedLM-7B.Q5_K_M.gguf", "BioMedLM-7B.Q5_K_S.gguf", | |
| "BioMedLM-7B.Q6_K.gguf", "BioMedLM-7B.Q8_0.gguf", | |
| "BioMedLM-7B.IQ4_XS.gguf", "BioMedLM-7B.Q2_K.gguf", | |
| "BioMedLM-7B.f16.gguf", | |
| "biomedlm-7b.Q4_K_M.gguf", "biomedlm-7b.Q5_K_M.gguf", | |
| "biomedlm-7b.Q8_0.gguf", "biomedlm-7b-f16.gguf", | |
| ] | |
| GGUF_CANDIDATES = [GGUF_FILE] if GGUF_FILE else _GGUF_CANDIDATES | |
| # Rendimiento / memoria (ajusta según GPU del Space: T4 / A10G) | |
| N_CTX = int(os.getenv("N_CTX", "4096")) | |
| N_THREADS = int(os.getenv("N_THREADS", str(os.cpu_count() or 4))) | |
| N_GPU_LAYERS = int(os.getenv("N_GPU_LAYERS", "35")) # 7B ~32 capas; 35 ≈ todas (si hay VRAM) | |
| N_BATCH = int(os.getenv("N_BATCH", "512")) # 512/768/1024 según VRAM | |
| # Decodificación (determinista por defecto) | |
| GEN_TEMPERATURE = float(os.getenv("TEMPERATURE", "0.0")) | |
| GEN_TOP_P = float(os.getenv("TOP_P", "1.0")) | |
| GEN_MAX_NEW_TOKENS = int(os.getenv("MAX_NEW_TOKENS", "384")) | |
| STOP_SEQS = ["\n###", "\nUser:", "\nAssistant:", "\nUsuario:", "\nAsistente:"] | |
| # Token para repos privados en HF (opcional; este repo es público) | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| # DeepSeek-OCR: fija una revisión/commit opcional para evitar cambios inesperados | |
| DS_OCR_REV = os.getenv("DS_OCR_REV", None) # e.g., "2b6f6c2..." | |
| # ========================= | |
| # Utilidades de texto / prompt | |
| # ========================= | |
| def _truncate(s: str, n=3000): | |
| s = (s or "") | |
| return s if len(s) <= n else s[:n] | |
| def _clean_ocr(s: str) -> str: | |
| if not s: | |
| return "" | |
| s = re.sub(r"[^\S\r\n]+", " ", s) # colapsa espacios | |
| s = re.sub(r"(\{#Sec\d+\}|#+\w*)", " ", s) # anchors/headers raros | |
| s = re.sub(r"\s{2,}", " ", s) | |
| lines = [] | |
| for par in s.splitlines(): | |
| par = par.strip() | |
| if 0 < len(par) <= 600: | |
| lines.append(par) | |
| return "\n".join(lines) | |
| SYSTEM_INSTR = ( | |
| "Eres un analista clínico educativo. Responde SIEMPRE en español. " | |
| "Reglas: (1) Usa ÚNICAMENTE el CONTEXTO_OCR; " | |
| "(2) Si falta un dato, escribe literalmente: 'dato no disponible en el OCR'; " | |
| "(3) No inventes nada; (4) Responde en viñetas claras; " | |
| "(5) Cita fragmentos exactos del OCR entre comillas como evidencia." | |
| ) | |
| FEWSHOT = """ | |
| ### EJEMPLO 1 | |
| CONTEXTO_OCR: | |
| Paciente: Juan Pérez. Medicamento: Amoxicilina 500 mg cada 8 horas por 7 días. | |
| PREGUNTA: | |
| ¿Cuál es el medicamento y la dosis? | |
| SALIDA_ES: | |
| - Medicamento: **Amoxicilina** | |
| - Dosis: **500 mg cada 8 horas por 7 días** | |
| - Evidencia OCR: "Amoxicilina 500 mg cada 8 horas por 7 días" | |
| ### EJEMPLO 2 | |
| CONTEXTO_OCR: | |
| Paciente: —. Indicaciones ilegibles. | |
| PREGUNTA: | |
| ¿Hay contraindicaciones registradas? | |
| SALIDA_ES: | |
| - Contraindicaciones: **dato no disponible en el OCR** | |
| - Evidencia OCR: "Indicaciones ilegibles" | |
| """.strip() | |
| def build_user_prompt(ocr_md, ocr_txt, user_msg): | |
| raw = ocr_md if (ocr_md and ocr_md.strip()) else ocr_txt | |
| ctx = _truncate(_clean_ocr(raw), 3000) | |
| question = (user_msg or "Analiza el CONTEXTO_OCR y resume lo clínicamente relevante en viñetas.").strip() | |
| prompt = ( | |
| f"{FEWSHOT}\n\n" | |
| f"### CONTEXTO_OCR\n{(ctx if ctx else '—')}\n\n" | |
| f"### PREGUNTA\n{question}\n\n" | |
| f"### SALIDA_ES\n" | |
| ) | |
| return prompt | |
| def _to_chatml(system_prompt, user_prompt): | |
| # Formato minimalista tipo ChatML/llama.cpp | |
| return [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt}, | |
| ] | |
| # ========================= | |
| # BioMedLM-7B GGUF — llama.cpp (GPU solo en worker) | |
| # ========================= | |
| _llm = None | |
| _llm_name = None | |
| def _download_gguf_path(): | |
| """ | |
| Prioridad de búsqueda: | |
| 0) GGUF_LOCAL_PATH (ruta directa) | |
| 1) Archivo subido al Space (cwd): GGUF_FILE exacto o cualquier *.gguf | |
| 2) Repo de HF: GGUF_REPO + GGUF_FILE (o candidatos) | |
| """ | |
| # 0) Ruta local explícita | |
| if GGUF_LOCAL_PATH: | |
| p = os.path.abspath(GGUF_LOCAL_PATH) | |
| if os.path.exists(p): | |
| return p, p | |
| raise RuntimeError(f"GGUF_LOCAL_PATH apunta a un archivo inexistente: {p}") | |
| # 1) Archivo subido al Space | |
| if GGUF_FILE: | |
| local_path = os.path.join(os.getcwd(), GGUF_FILE) | |
| if os.path.exists(local_path): | |
| return local_path, f"./{GGUF_FILE}" | |
| found = sorted(glob.glob(os.path.join(os.getcwd(), "*.gguf"))) | |
| if found: | |
| return found[0], f"./{os.path.basename(found[0])}" | |
| # 2) HF repo | |
| last_err = None | |
| if GGUF_REPO: | |
| candidates = [GGUF_FILE] if GGUF_FILE else _GGUF_CANDIDATES | |
| for fname in candidates: | |
| try: | |
| path = hf_hub_download(repo_id=GGUF_REPO, filename=fname, token=HF_TOKEN) | |
| return path, f"{GGUF_REPO}:{fname}" | |
| except Exception as e: | |
| last_err = e | |
| raise RuntimeError("No se encontró el GGUF. " | |
| "Sube el .gguf (Files) y pon GGUF_FILE, o define GGUF_REPO+GGUF_FILE, " | |
| f"o usa GGUF_LOCAL_PATH. Último error HF: {last_err}") | |
| def _ensure_llm(): | |
| """Inicializa llama.cpp en el MISMO worker; nunca lanza excepción hacia arriba.""" | |
| global _llm, _llm_name | |
| if _llm is not None: | |
| return True, f"warm (reusing {_llm_name})" | |
| try: | |
| gguf_path, used = _download_gguf_path() | |
| _llm = Llama( | |
| model_path=gguf_path, | |
| n_ctx=N_CTX, | |
| n_threads=N_THREADS, | |
| n_gpu_layers=N_GPU_LAYERS, | |
| n_batch=N_BATCH, | |
| verbose=False, | |
| ) | |
| _llm_name = used | |
| return True, f"loaded {used}" | |
| except Exception as e: | |
| return False, f"[{e.__class__.__name__}] {str(e) or repr(e)}" | |
| def biomedlm_warmup(): | |
| """Warmup opcional (manual) — NO se llama desde otra función GPU.""" | |
| ok, msg = _ensure_llm() | |
| return ("OK::" if ok else "ERR::") + msg | |
| def biomedlm_chat(ocr_md, ocr_txt, user_msg, | |
| temperature=GEN_TEMPERATURE, top_p=GEN_TOP_P, max_tokens=GEN_MAX_NEW_TOKENS): | |
| """Chat en GPU; TODO envuelto en try/except para evitar RuntimeError del worker.""" | |
| try: | |
| ok, msg = _ensure_llm() | |
| if not ok: | |
| return "ERR::No se pudo inicializar el modelo GGUF -> " + msg | |
| prompt_user = build_user_prompt(ocr_md, ocr_txt, user_msg) | |
| messages = _to_chatml(SYSTEM_INSTR, prompt_user) | |
| try: | |
| out = _llm.create_chat_completion( | |
| messages=messages, | |
| temperature=temperature, | |
| top_p=top_p, | |
| max_tokens=max_tokens, | |
| stop=STOP_SEQS, | |
| ) | |
| ans = (out["choices"][0]["message"]["content"] or "").strip() | |
| return "OK::" + ans | |
| except Exception as e: | |
| return f"ERR::[Inferencia] {e.__class__.__name__}: {str(e) or repr(e)}" | |
| except Exception as e: | |
| return f"ERR::[Worker] {e.__class__.__name__}: {str(e) or repr(e)}" | |
| # ========================= | |
| # DeepSeek-OCR (GPU solo en worker) | |
| # ========================= | |
| def _load_ocr_model(): | |
| model_name = "deepseek-ai/DeepSeek-OCR" | |
| tok = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) | |
| kwargs = dict( | |
| _attn_implementation=os.getenv("OCR_ATTN_IMPL", "flash_attention_2"), | |
| trust_remote_code=True, | |
| use_safetensors=True, | |
| ) | |
| if DS_OCR_REV: | |
| kwargs["revision"] = DS_OCR_REV | |
| try: | |
| mdl = AutoModel.from_pretrained(model_name, **kwargs).eval() | |
| return tok, mdl | |
| except Exception as e: | |
| # Fallback si FA2 no está disponible | |
| if any(k in str(e).lower() for k in ["flash_attn", "flashattention2", "flash_attention_2"]): | |
| kwargs["_attn_implementation"] = "eager" | |
| mdl = AutoModel.from_pretrained(model_name, **kwargs).eval() | |
| return tok, mdl | |
| raise | |
| tokenizer, model = _load_ocr_model() | |
| def process_image(image, model_size, task_type, is_eval_mode): | |
| if image is None: | |
| return None, "Please upload an image first.", "Please upload an image first." | |
| # Mover a GPU SOLO dentro del worker | |
| if torch.cuda.is_available(): | |
| dtype = torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16 | |
| model_device = model.to(dtype).to("cuda") | |
| else: | |
| dtype = torch.float32 | |
| model_device = model.to(dtype) | |
| with tempfile.TemporaryDirectory() as output_path: | |
| prompt = "<image>\nFree OCR. " if task_type == "Free OCR" else "<image>\n<|grounding|>Convert the document to markdown. " | |
| temp_image_path = os.path.join(output_path, "temp_image.jpg") | |
| image.save(temp_image_path) | |
| size_cfg = { | |
| "Tiny": (512, 512, False), | |
| "Small": (640, 640, False), | |
| "Base": (1024, 1024, False), | |
| "Large": (1280, 1280, False), | |
| "Gundam (Recommended)": (1024, 640, True), | |
| } | |
| base_size, image_size, crop_mode = size_cfg.get(model_size, (1024, 640, True)) | |
| plain_text = model_device.infer( | |
| tokenizer, | |
| prompt=prompt, | |
| image_file=temp_image_path, | |
| output_path=output_path, | |
| base_size=base_size, | |
| image_size=image_size, | |
| crop_mode=crop_mode, | |
| save_results=True, | |
| test_compress=True, | |
| eval_mode=is_eval_mode, | |
| ) | |
| image_result_path = os.path.join(output_path, "result_with_boxes.jpg") | |
| markdown_result_path = os.path.join(output_path, "result.mmd") | |
| markdown_content = "Markdown result was not generated. This is expected for 'Free OCR' task." | |
| if os.path.exists(markdown_result_path): | |
| with open(markdown_result_path, "r", encoding="utf-8") as f: | |
| markdown_content = f.read() | |
| result_image = None | |
| if os.path.exists(image_result_path): | |
| result_image = Image.open(image_result_path) | |
| result_image.load() | |
| text_result = plain_text if plain_text else markdown_content | |
| return result_image, markdown_content, text_result | |
| # ========================= | |
| # Orquestador de chat (NO GPU) | |
| # ========================= | |
| def biomedlm_reply(user_msg, chat_msgs, ocr_md, ocr_txt): | |
| try: | |
| res = biomedlm_chat( | |
| ocr_md, | |
| ocr_txt, | |
| user_msg, | |
| temperature=GEN_TEMPERATURE, | |
| top_p=GEN_TOP_P, | |
| max_tokens=GEN_MAX_NEW_TOKENS, | |
| ) | |
| s = str(res) | |
| if s.startswith("OK::"): | |
| answer = s[4:] | |
| updated = (chat_msgs or []) + [ | |
| {"role": "user", "content": user_msg or "(analizar solo OCR)"}, | |
| {"role": "assistant", "content": answer}, | |
| ] | |
| return updated, "", gr.update(value="") | |
| else: | |
| # Mostramos TODO el mensaje de error del worker en el panel Debug | |
| err_msg = s[5:] if s.startswith("ERR::") else s | |
| updated = (chat_msgs or []) + [ | |
| {"role": "user", "content": user_msg or ""}, | |
| {"role": "assistant", "content": "⚠️ Error LLM (local). Revisa el panel de debug."}, | |
| ] | |
| return updated, "", gr.update(value=err_msg) | |
| except Exception as e: | |
| tb = traceback.format_exc(limit=2) | |
| updated = (chat_msgs or []) + [ | |
| {"role": "user", "content": user_msg or ""}, | |
| {"role": "assistant", "content": f"⚠️ Error LLM: {e}"}, | |
| ] | |
| return updated, "", gr.update(value=f"{e}\n{tb}") | |
| def clear_chat(): | |
| return [], "", gr.update(value="") | |
| # ========================= | |
| # UI (Gradio 5) | |
| # ========================= | |
| with gr.Blocks(title="OpScanIA — DeepSeek-OCR + BioMedLM-7B (GGUF)", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown( | |
| """ | |
| # DeepSeek-OCR → Chat Clínico con **BioMedLM-7B (GGUF local, llama.cpp)** | |
| 1) **Sube una imagen** y corre **OCR** (imagen anotada, Markdown y texto). | |
| 2) **Chatea** con **BioMedLM-7B GGUF** usando automáticamente el **OCR** como contexto. | |
| *Uso educativo; no reemplaza consejo médico.* | |
| """ | |
| ) | |
| ocr_md_state = gr.State("") | |
| ocr_txt_state = gr.State("") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| image_input = gr.Image( | |
| type="pil", | |
| label="Upload Image", | |
| sources=["upload", "clipboard", "webcam"] | |
| ) | |
| model_size = gr.Dropdown( | |
| choices=["Tiny", "Small", "Base", "Large", "Gundam (Recommended)"], | |
| value="Gundam (Recommended)", | |
| label="Model Size" | |
| ) | |
| task_type = gr.Dropdown( | |
| choices=["Free OCR", "Convert to Markdown"], | |
| value="Convert to Markdown", | |
| label="Task Type" | |
| ) | |
| eval_mode_checkbox = gr.Checkbox( | |
| value=False, | |
| label="Enable Evaluation Mode", | |
| info="Solo texto (más rápido). Desmárcalo para ver imagen anotada y markdown." | |
| ) | |
| submit_btn = gr.Button("Process Image", variant="primary") | |
| warm_btn = gr.Button("Warmup BioMedLM-7B (GGUF)") | |
| with gr.Column(scale=2): | |
| with gr.Tabs(): | |
| with gr.TabItem("Annotated Image"): | |
| output_image = gr.Image(interactive=False) | |
| with gr.TabItem("Markdown Preview"): | |
| output_markdown = gr.Markdown() | |
| with gr.TabItem("Markdown Source / Eval"): | |
| output_text = gr.Textbox(lines=18, show_copy_button=True, interactive=False) | |
| with gr.Row(): | |
| md_preview = gr.Textbox(label="Snapshot Markdown OCR", lines=10, interactive=False) | |
| txt_preview = gr.Textbox(label="Snapshot Texto OCR", lines=10, interactive=False) | |
| gr.Markdown("## Chat Clínico (BioMedLM-7B GGUF)") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| chatbot = gr.Chatbot(label="Asistente OCR (BioMedLM-7B GGUF)", type="messages", height=420) | |
| user_in = gr.Textbox( | |
| label="Mensaje", | |
| placeholder="Escribe tu consulta… (vacío = analiza solo el OCR)", | |
| lines=2 | |
| ) | |
| with gr.Row(): | |
| send_btn = gr.Button("Enviar", variant="primary") | |
| clear_btn = gr.Button("Limpiar") | |
| with gr.Column(scale=1): | |
| debug_box = gr.Textbox(label="Debug", lines=10, interactive=False) | |
| # OCR | |
| submit_btn.click( | |
| fn=process_image, | |
| inputs=[image_input, model_size, task_type, eval_mode_checkbox], | |
| outputs=[output_image, output_markdown, output_text], | |
| ).then( | |
| fn=lambda md, tx: (_truncate(md, 3000), _truncate(tx, 3000), md, tx), | |
| inputs=[output_markdown, output_text], | |
| outputs=[ocr_md_state, ocr_txt_state, md_preview, txt_preview], | |
| ) | |
| # Warmup LLM (descarga/carga el GGUF y crea el objeto Llama en GPU) | |
| warm_btn.click(fn=biomedlm_warmup, outputs=[debug_box]) | |
| # Chat | |
| send_btn.click( | |
| fn=biomedlm_reply, | |
| inputs=[user_in, chatbot, ocr_md_state, ocr_txt_state], | |
| outputs=[chatbot, user_in, debug_box] | |
| ) | |
| clear_btn.click(fn=clear_chat, outputs=[chatbot, user_in, debug_box]) | |
| if __name__ == "__main__": | |
| demo.queue(max_size=20) | |
| demo.launch() | |