hive / app.py
Paulhayes's picture
Update app.py
909114b verified
# === BOOTSTRAP + SAFE DATASETS SHIM (must be line 1) =========================
# Load model directly
from transformers import AutoTokenizer, AutoModelForCausalLM
tokenizer = AutoTokenizer.from_pretrained("TinyLlama/TinyLlama-1.1B-Chat-v1.0")
model = AutoModelForCausalLM.from_pretrained("TinyLlama/TinyLlama-1.1B-Chat-v1.0")
messages = [
{"role": "user", "content": "Who are you?"},
]
inputs = tokenizer.apply_chat_template(
messages,
add_generation_prompt=True,
tokenize=True,
return_dict=True,
return_tensors="pt",
).to(model.device)
outputs = model.generate(**inputs, max_new_tokens=40)
print(tokenizer.decode(outputs[0][inputs["input_ids"].shape[-1]:]))
import os, sys, subprocess, importlib.util, importlib.machinery, types
import sys # make sure this import is near the top of your file
# near the other imports
import os
# Defaults that work for both CLI and UI; can be overridden via env if you want
DEFAULT_EFFECTIVE_ROLE = os.getenv("EFFECTIVE_ROLE", "user")
DEFAULT_CALLER_ID_CLI = os.getenv("CALLER_ID_CLI", "cli")
DEFAULT_CALLER_ID_UI = os.getenv("CALLER_ID_UI", "ui")
def chat_once(hive: "Hive", text: str, *, role: str = DEFAULT_EFFECTIVE_ROLE, caller_id: str = DEFAULT_CALLER_ID_CLI):
"""
Thin wrapper so all calls to Hive.chat() include the required args.
"""
return hive.chat(text, effective_role=role, caller_id=caller_id)
try:
import gradio as gr
except Exception as e:
gr = None
print(f"[ui] Gradio import failed: {e}")
# --- SAFE loader for SentenceTransformer to avoid "meta tensor" crashes ---
import os
from sentence_transformers import SentenceTransformer
def load_sentence_transformer_safely(model_name: str, device: str = "cpu"):
"""
Load a SentenceTransformer without going through an accelerate/empty-weights
path that can yield 'meta' tensors. Never call .to(device) after creation.
"""
# Guardrails — avoid accelerate meta in some containers
os.environ.pop("ACCELERATE_USE_DEEPSPEED", None)
os.environ.pop("ACCELERATE_MIXED_PRECISION", None)
# First attempt: construct directly on the requested device
try:
st = SentenceTransformer(
model_name,
device=device, # <-- construct directly on the device
trust_remote_code=True, # some ST models need this
)
return st
except NotImplementedError as e:
# If we still hit the meta-copy error, load on CPU and keep it there.
print(f"[warn] ST meta-tensor error on device={device}: {e}. Falling back to CPU.")
st = SentenceTransformer(
model_name,
device="cpu",
trust_remote_code=True,
)
return st
def _read_line(prompt="> "):
# Avoid prompting when there’s no interactive terminal (e.g., Hugging Face space)
if not sys.stdin or not sys.stdin.isatty():
prompt = ""
try:
return input(prompt)
except EOFError:
return None
def _pip_install(pkgs):
try:
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q"] + pkgs)
except Exception as e:
print("Bootstrap warning:", e, flush=True)
def _datasets_ok():
try:
spec = importlib.util.find_spec("datasets")
if spec is None:
return False
import datasets as _ds
if getattr(_ds, "__spec__", None) is None:
return False
for name in ("Dataset","DatasetDict","IterableDataset","IterableDatasetDict","Value"):
if not hasattr(_ds, name):
return False
return True
except Exception:
return False
# Try to ensure a real, recent datasets exists
if not _datasets_ok():
_pip_install(["datasets>=2.16,<3"])
# If still not OK, hard-stub it so imports don’t crash
if not _datasets_ok():
ds = types.ModuleType("datasets")
ds.__file__ = "<stub:datasets>"
ds.__path__ = []
ds.__spec__ = importlib.machinery.ModuleSpec("datasets", loader=None, is_package=True)
class _Base: # simple no-op base
def __init__(self, *a, **k): pass
class Dataset(_Base):
def map(self,*a,**k): return self
def filter(self,*a,**k): return self
def select(self,*a,**k): return self
def shuffle(self,*a,**k): return self
def train_test_split(self,*a,**k): return {"train": self, "test": self}
class IterableDataset(_Base): pass
class DatasetDict(dict): pass
class IterableDatasetDict(dict): pass
class Value:
def __init__(self, dtype="string"): self.dtype = dtype
ds.Dataset = Dataset
ds.DatasetDict = DatasetDict
ds.IterableDataset = IterableDataset
ds.IterableDatasetDict = IterableDatasetDict
ds.Value = Value
sys.modules["datasets"] = ds
# ============================================================================
# === LIGHTWEIGHT ST IMPORT + FALLBACK ========================================
# IMPORTANT: avoid "from sentence_transformers import SentenceTransformer"
# (that import triggers cross_encoder -> datasets)
try:
from sentence_transformers.SentenceTransformer import SentenceTransformer
_ST_AVAILABLE = True
except Exception as _e:
print("[WARN] sentence-transformers direct import failed:", _e, flush=True)
SentenceTransformer = None
_ST_AVAILABLE = False
def get_embedder(model_name: str = "sentence-transformers/all-MiniLM-L6-v2", device: str | None = None):
"""
Returns a SentenceTransformer if available, otherwise a pure-Transformers
fallback with mean pooling and L2 normalization (API-compatible encode()).
"""
if _ST_AVAILABLE:
try:
return SentenceTransformer(model_name, device=device)
except Exception as e:
print("[WARN] ST model init failed, falling back to plain Transformers:", e, flush=True)
# Fallback: plain Transformers embedder
from transformers import AutoTokenizer, AutoModel
import torch
tok = AutoTokenizer.from_pretrained(model_name)
mdl = AutoModel.from_pretrained(model_name)
if device:
mdl.to(device)
mdl.eval()
class _Embedder:
def encode(self, texts, convert_to_tensor=False, normalize_embeddings=True, batch_size=32, **kw):
if isinstance(texts, str):
texts = [texts]
out_chunks = []
with torch.no_grad():
for i in range(0, len(texts), batch_size):
batch = texts[i:i+batch_size]
enc = tok(batch, padding=True, truncation=True, return_tensors="pt")
if device:
enc = {k: v.to(device) for k, v in enc.items()}
last_hidden = mdl(**enc).last_hidden_state # [B, T, H]
mask = enc["attention_mask"].unsqueeze(-1) # [B, T, 1]
pooled = (last_hidden * mask).sum(dim=1) / mask.sum(dim=1).clamp(min=1e-9) # mean pooling
if normalize_embeddings:
pooled = torch.nn.functional.normalize(pooled, p=2, dim=1)
out_chunks.append(pooled.detach().cpu())
embs = torch.cat(out_chunks, dim=0)
return embs if convert_to_tensor else embs.numpy()
return _Embedder()
# ============================================================================
# -*- coding: utf-8 -*-
"""
Hive — FULL COMBINED (Original-first, Tutor+ • SAFE v5 BACKGROUND)
- Embeds your fixed base (as `hive_base`)
- Tutor+ (retrieval + gentle phonics/CEFR/essay review)
- Instant boot: `datasets` stubbed at import
- **Background-only** staged condensed-curves builder:
• Streams dataset text -> embeds -> FAISS -> prunes caches
• No UI elements; runs automatically when needed and on a schedule
"""
import os, sys, types, threading, time, base64, json, re, shutil
from pathlib import Path
from typing import List, Dict, Optional
# ---------- Storage policy ----------
MAX_GB = float(os.getenv("HIVE_MAX_CACHE_GB", "8")) # cache budget (GB)
HF_HOME = Path(os.getenv("HF_HOME", str(Path.home() / ".cache" / "huggingface")))
TRANSFORMERS_CACHE = Path(os.getenv("TRANSFORMERS_CACHE", str(HF_HOME / "transformers")))
DATASETS_CACHE = Path(os.getenv("HF_DATASETS_CACHE", str(HF_HOME / "datasets")))
ALLOW_KEEP = [os.getenv("HIVE_MODEL_ID","TinyLlama/TinyLlama-1.1B-Chat-v1.0").split("/")[-1],
os.getenv("HIVE_EMB_ID","sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2").split("/")[-1]]
def _dir_size_bytes(p: Path) -> int:
total = 0
if not p.exists(): return 0
for root, _, files in os.walk(p):
for f in files:
try: total += (Path(root)/f).stat().st_size
except Exception: pass
return total
def _prune_path_to_limit(root: Path, max_bytes: int, allow_keep=None, log=None):
if not root.exists(): return
files = []
for rp, _, fns in os.walk(root):
for fn in fns:
fp = Path(rp)/fn
try:
st = fp.stat()
rel = str(fp).lower()
if allow_keep and any(k.lower() in rel for k in allow_keep):
continue
files.append((st.st_mtime, st.st_size, fp))
except Exception:
pass
files.sort() # oldest first
size = _dir_size_bytes(root)
if log: log(f"[prune] {root} size: {size/1e9:.2f} GB; target: {max_bytes/1e9:.2f} GB")
i = 0
while size > max_bytes and i < len(files):
_, s, fp = files[i]
try:
fp.unlink()
size -= s
if log: log(f"[prune] delete {fp} (-{s/1e6:.1f} MB)")
except Exception as e:
if log: log(f"[prune] skip {fp}: {e}")
i += 1
def enforce_cache_budget(log=None):
max_bytes = int(MAX_GB * (1024**3))
for p in [TRANSFORMERS_CACHE, DATASETS_CACHE]:
_prune_path_to_limit(Path(p), max_bytes, allow_keep=ALLOW_KEEP, log=log)
# ---------- Stub datasets at import for instant boot ----------
if os.getenv("HIVE_DISABLE_DATASETS", "1").lower() in ("1","true","yes","on"):
import importlib.machinery as _mach
ds = types.ModuleType("datasets")
# Provide a proper ModuleSpec so importlib.util.find_spec("datasets") does not crash
ds.__spec__ = _mach.ModuleSpec("datasets", loader=None)
ds.__path__ = [] # mark as package-like for safety
ds.__version__ = "0.0-stub"
class _Empty:
def __iter__(self): return iter([])
def __getitem__(self, k): return []
def map(self, *a, **k): return self
def filter(self, *a, **k): return self
def select(self, *a, **k): return self
def shuffle(self, *a, **k): return self
def train_test_split(self, *a, **k): return {"train": self, "test": self}
def to_pandas(self): import pandas as pd; return pd.DataFrame()
# Expose symbols sentence-transformers expects to import
ds.Dataset = _Empty
ds.IterableDataset = _Empty
ds.DatasetDict = dict
def _disabled_load_dataset(*args, **kwargs):
print("[datasets] disabled via HIVE_DISABLE_DATASETS; returning empty dataset.")
return _Empty()
ds.load_dataset = _disabled_load_dataset
sys.modules["datasets"] = ds
os.environ.setdefault("HF_HUB_DISABLE_TELEMETRY", "1")
os.environ.setdefault("HIVE_DOWNLOAD_DATASETS_ON_START", "0") # handled by staged builder
os.environ.setdefault("HIVE_DATASETS_LIST", "wi_locness")
os.environ.setdefault("HIVE_BUILD_CONDENSED_CURVES_ON_START", "1")
os.environ.setdefault("HIVE_CURVES_TARGET_MIN", "10000") # target #items in index
os.environ.setdefault("HIVE_CURVES_RECHECK_SECS", "1800") # 30 minutes
os.environ.setdefault("HIVE_STAGE_BATCH", "128")
os.environ.setdefault("HIVE_STAGE_SAVE_EVERY", "512")
os.environ.setdefault("HIVE_STAGE_MAX_DOCS_PER_DATASET", "5000")
# ---------- Load original base from Base64 ----------
_HIVE_BASE_B64 = """"""
_HIVE_BASE_SOURCE = base64.b64decode(_HIVE_BASE_B64.encode("utf-8")).decode("utf-8", errors="strict")
hive_base = types.ModuleType("hive_base")
hive_base.__dict__["__name__"] = "hive_base"
exec(compile(_HIVE_BASE_SOURCE, "hive_tinyllama_hf.py", "exec"), hive_base.__dict__)
sys.modules["hive_base"] = hive_base
# ---------- Compatibility Monkey Patch for Hive.chat ----------
# Some UI callbacks may call Hive.chat(...) without the required keyword-only
# args added in newer versions. Patch it so defaults are injected, preventing
# errors like: Hive.chat() missing 'effective_role' and 'caller_id'.
try:
DEFAULT_EFFECTIVE_ROLE = os.getenv("EFFECTIVE_ROLE", "user")
DEFAULT_CALLER_ID_CLI = os.getenv("CALLER_ID_CLI", "cli")
DEFAULT_CALLER_ID_UI = os.getenv("CALLER_ID_UI", "ui")
if hasattr(hive_base, "Hive"):
_Hive = hive_base.Hive
if hasattr(_Hive, "chat"):
_orig_chat = _Hive.chat
def _chat_compat(self, *args, **kwargs):
# Inject defaults only if missing
if "effective_role" not in kwargs or kwargs.get("effective_role") is None:
kwargs["effective_role"] = DEFAULT_EFFECTIVE_ROLE
if "caller_id" not in kwargs or kwargs.get("caller_id") is None:
# Prefer UI caller id if we are in the web app
if os.getenv("RUNNING_IN_UI", "0") == "1":
kwargs["caller_id"] = DEFAULT_CALLER_ID_UI
else:
kwargs["caller_id"] = DEFAULT_CALLER_ID_CLI
return _orig_chat(self, *args, **kwargs)
# Tag to avoid double-patching
if getattr(_Hive.chat, "_compat_patched", False) is False:
_chat_compat._compat_patched = True
_Hive.chat = _chat_compat
except Exception as _e:
# Do not fail the app if patching isn't possible
pass
# -------------------------------------------------------------
# ---------- Tutor+ Layer & Curves builder (no UI) ----------
try:
import numpy as np
except Exception:
np = None
try:
import faiss
_FAISS = True
except Exception:
faiss = None; _FAISS = False
try:
from sentence_transformers import SentenceTransformer
except Exception:
SentenceTransformer = None
def ENV(k, d=None, cast=str):
v = os.getenv(k, None)
if v is None: return d
if cast is bool: return str(v).lower() in ("1","true","yes","on")
if cast is int:
try: return int(v)
except: return int(float(v))
return v
CFG = {
"CURVE_DIR": ENV("HIVE_CURVE_DIR","./state/curves"),
"EMB_ID": ENV("HIVE_EMB_ID","sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"),
"EMB_LOCAL_DIR": ENV("HIVE_EMB_LOCAL_DIR",""),
"KNN_TOPK": ENV("HIVE_KNN_TOPK","8", int),
"KNN_LAMBDA": float(ENV("HIVE_KNN_LAMBDA","0.35")),
"PHONICS_MAX_LINES": ENV("HIVE_PHONICS_MAX_LINES","2", int),
"STAGE_BATCH": ENV("HIVE_STAGE_BATCH","128", int),
"STAGE_SAVE_EVERY": ENV("HIVE_STAGE_SAVE_EVERY","512", int),
"STAGE_MAX_DOCS_PER_DATASET": ENV("HIVE_STAGE_MAX_DOCS_PER_DATASET","5000", int),
"CURVES_TARGET_MIN": ENV("HIVE_CURVES_TARGET_MIN","10000", int),
"CURVES_RECHECK_SECS": ENV("HIVE_CURVES_RECHECK_SECS","1800", int),
}
try: os.makedirs(CFG["CURVE_DIR"], exist_ok=True)
except Exception: pass
class _EmbMux:
def __init__(self):
if SentenceTransformer is None:
self.model=None; self.dim=384
else:
p=CFG["EMB_LOCAL_DIR"].strip()
self.model = SentenceTransformer(p) if (p and os.path.isdir(p)) else SentenceTransformer(CFG["EMB_ID"])
try:
v=self.model.encode(["hi"], normalize_embeddings=True)
self.dim=int(getattr(v,"shape",[1,384])[1])
except Exception:
self.dim=384
def encode(self, texts: List[str]):
if self.model is None:
dim=getattr(self,"dim",384); out=[]
for t in texts:
h=abs(hash(t))%1000003; vec=[0.0]*dim; vec[h%dim]=1.0; out.append(vec)
return out
return self.model.encode(texts, normalize_embeddings=True)
class _CurveStore:
def __init__(self, root:str):
self.root=root
try: os.makedirs(self.root, exist_ok=True)
except Exception: pass
self.emb=_EmbMux()
self.dim=getattr(self.emb,"dim",384)
self.idx_path=os.path.join(self.root,"faiss.index")
self.meta_path=os.path.join(self.root,"meta.jsonl")
self.idx=self._load()
def _load(self):
if not _FAISS or faiss is None: return None
if os.path.exists(self.idx_path):
try: return faiss.read_index(self.idx_path)
except Exception: pass
return faiss.IndexFlatIP(self.dim)
def save_index(self):
if _FAISS and self.idx is not None:
try: faiss.write_index(self.idx, self.idx_path)
except Exception: pass
def add_vectors(self, vecs, metas: List[Dict]):
if not _FAISS or self.idx is None or np is None: return 0
try:
v=np.asarray(vecs, dtype="float32")
self.idx.add(v)
with open(self.meta_path,"a",encoding="utf-8") as f:
for m in metas: f.write(json.dumps(m, ensure_ascii=False)+"\n")
return len(metas)
except Exception:
return 0
def add_texts(self, texts: List[str], tag="stage", scope="general"):
if not texts: return 0
vecs=self.emb.encode(texts)
metas=[{"scope":scope, "tag":tag, "text":t[:500]} for t in texts]
n=self.add_vectors(vecs, metas); self.save_index(); return n
@property
def count(self)->int:
try:
return int(self.idx.ntotal) if (self.idx is not None) else 0
except Exception:
return 0
def _extract_text(rec: dict):
for k in ("text","sentence","sentences","content","input","inputs","prompt","source","article","document","review","body"):
if k in rec and isinstance(rec[k], str) and rec[k].strip():
return rec[k]
if k in rec and isinstance(rec[k], list) and rec[k] and isinstance(rec[k][0], str):
return " ".join(rec[k])
parts=[]
for k,v in rec.items():
if isinstance(v,str) and 5<=len(v)<=2000: parts.append(v)
return " ".join(parts) if parts else ""
def _iter_texts(name: str, max_docs:int):
# un-stub datasets
if "datasets" in sys.modules and getattr(sys.modules["datasets"].load_dataset, "__name__", "") == "_disabled_load_dataset":
del sys.modules["datasets"]
import datasets as _ds
for split in ("train","validation","test"):
# prefer streaming
try:
ds = _ds.load_dataset(name, split=split, streaming=True)
cnt=0
for rec in ds:
try:
txt = _extract_text(rec)
if txt:
yield txt
cnt += 1
if cnt >= max_docs: return
except Exception: continue
except Exception:
try:
ds = _ds.load_dataset(name, split=split)
cnt=0
for rec in ds:
try:
txt = _extract_text(rec)
if txt:
yield txt
cnt += 1
if cnt >= max_docs: return
except Exception: continue
except Exception:
continue
def build_condensed_curves(datasets_csv=None, curve_dir=None, log_cb=None):
def log(m):
if log_cb: log_cb(m)
enforce_cache_budget(log)
curve_dir = curve_dir or CFG["CURVE_DIR"]
names = [x.strip() for x in (datasets_csv or os.getenv("HIVE_DATASETS_LIST","wi_locness")).split(",") if x.strip()]
max_docs = int(CFG["STAGE_MAX_DOCS_PER_DATASET"])
batch = int(CFG["STAGE_BATCH"])
save_every = int(CFG["STAGE_SAVE_EVERY"])
store = _CurveStore(curve_dir)
total_added=0
for name in names:
log(f"[staged] building condensed curves from '{name}' (max {max_docs} docs)…")
buf=[]; added=0; last_save=0
for txt in _iter_texts(name, max_docs):
buf.append(txt)
if len(buf) >= batch:
n = store.add_texts(buf, tag=f"ds:{name}", scope="general")
added += n; total_added += n; buf = []
if added - last_save >= save_every:
store.save_index(); enforce_cache_budget(log); last_save = added
log(f"[staged] progress '{name}': {added} items … (index={store.count})")
if buf:
n = store.add_texts(buf, tag=f"ds:{name}", scope="general")
added += n; total_added += n; buf = []
store.save_index(); enforce_cache_budget(log)
log(f"[staged] '{name}' done: {added} items condensed (index={store.count}).")
log(f"[staged] total added: {total_added} (index={store.count})")
return True
# Background supervisor: runs at start and on interval; only builds if below target
def _should_build(curve_dir=None)->bool:
store=_CurveStore(curve_dir or CFG["CURVE_DIR"])
return store.count < int(CFG["CURVES_TARGET_MIN"])
def _background_supervisor(log_cb=None):
def log(m):
if log_cb: log_cb(m)
interval = int(CFG["CURVES_RECHECK_SECS"])
while True:
try:
if _should_build():
log("[staged] target not met; starting condensed-curves build…")
build_condensed_curves(log_cb=log)
else:
log("[staged] target met; no build needed.")
except Exception as e:
log(f"[staged] supervisor error: {e}")
time.sleep(interval)
def _kickoff_background_if_enabled(log_cb=None):
if os.getenv("HIVE_BUILD_CONDENSED_CURVES_ON_START","0").lower() not in ("1","true","yes","on"):
return
threading.Thread(target=_background_supervisor, args=(log_cb,), daemon=True).start()
# ---------- Tutor+ light additions (no UI) ----------
def _ipa_or_hyphenate(text:str)->str:
try:
import eng_to_ipa as ipa
ipa_text=ipa.convert(text)
if ipa_text and ipa_text!=text:
return f"{text} /{ipa_text}/"
except Exception:
pass
try:
import pyphen
dic=pyphen.Pyphen(lang="en"); return dic.inserted(text)
except Exception:
return text
def _gentle_phonics_block(text:str, max_lines:int)->str:
import re as _re
words=_re.findall(r"[A-Za-z][A-Za-z\-']{2,}", text or "")
words=sorted(set(words), key=lambda w:(-len(w), w.lower()))
picks=words[:max_lines]
if not picks: return ""
return "\n".join([f"- {_ipa_or_hyphenate(w)}" for w in picks])
def _route_intent(txt:str)->str:
import re as _re
if _re.search(r"\b(spell|spelling|how\s+do\s+you\s+spell)\b", txt or "", _re.I): return "direct_spell"
if _re.search(r"\b(pronounc(e|iation)|ipa|phonics|how\s+do\s+you\s+say)\b", txt or "", _re.I): return "pronounce"
if _re.search(r"\b(essay|review|evaluate|feedback|improv(e|ements?)|revise|critique|proofread\s+my\s+essay)\b", txt or "", _re.I): return "essay_review"
if _re.search(r"\b(grammar|correct|fix|proofread|mistakes?)\b", txt or "", _re.I): return "direct_grammar"
return "tutor"
BaseHive = getattr(hive_base, "Hive", object)
class Hive(BaseHive):
def __init__(self, *a, **k):
super().__init__(*a, **k)
def chat(self, message:str, *a, **k)->str:
mode=_route_intent(message or "")
try:
reply=super().chat(message, *a, **k)
except Exception as e:
reply=f"[Base chat failed: {e}]"
if isinstance(reply,str) and (mode in ("pronounce","direct_spell") or (mode=="tutor" and len(reply.split())<=40)):
hints=_gentle_phonics_block(reply, int(CFG["PHONICS_MAX_LINES"]))
if hints: reply += "\n\n**Phonics hints (brief)**\n" + hints
return reply
# ---------- Entrypoint ----------
def build_ui():
# Respect user's base UI if present; we do NOT add any tabs here.
try:
import gradio as gr
except Exception:
return None
for name in ("build_ui","launch_ui","get_ui","make_ui"):
if hasattr(hive_base, name):
try:
ui = getattr(hive_base, name)()
return ui if isinstance(ui, gr.Blocks) else None
except Exception:
pass
return None
# --- BOTTOM OF FILE: replace your old `if __name__ == "__main__":` with this ---
# --- BOTTOM OF FILE: replace any old REPL/main block with this ---
import os
import sys
import time
import argparse
def _read_line(prompt="> "):
# Avoid prompting when there’s no interactive terminal (e.g., Spaces)
if not sys.stdin or not sys.stdin.isatty():
prompt = ""
try:
return input(prompt)
except EOFError:
return None
def handle_user_input(s: str) -> str:
# minimal glue to your backend
global _HIVE_SINGLETON
if _HIVE_SINGLETON is None:
_HIVE_SINGLETON = Hive()
return _HIVE_SINGLETON.chat(s)
def run_cli_loop():
while True:
s = _read_line("> ")
if s is None: # No stdin / non-interactive environment
break # fall through to headless wait
s = s.strip()
if not s:
continue
reply = handle_user_input(s)
print(reply, flush=True)
def run_headless_wait():
print("APP_READY: initialized (headless). Waiting for requests...", flush=True)
while True:
time.sleep(3600)
_HIVE_SINGLETON = None # global lazy instance for CLI & UI
def build_ui():
try:
import gradio as gr
except Exception as e:
print(f"[ui] Gradio import failed: {e}")
return None
with gr.Blocks() as demo:
state = gr.State(None) # we'll lazily create Hive on first use
chatbox = gr.Chatbot(height=400)
msg = gr.Textbox(placeholder="Type your message…", label="Message")
send = gr.Button("Send")
def on_send(user_msg, st, history):
global _HIVE_SINGLETON
if _HIVE_SINGLETON is None:
_HIVE_SINGLETON = Hive() # <- construct here, not at import time
if not user_msg.strip():
return history, ""
reply = _HIVE_SINGLETON.chat(user_msg)
history = (history or []) + [[user_msg, reply]]
return history, ""
send.click(on_send, inputs=[msg, state, chatbox], outputs=[chatbox, msg])
return demo
if __name__ == "__main__":
import os, sys, argparse
parser = argparse.ArgumentParser()
parser.add_argument("--ui", action="store_true", help="Force-launch Gradio UI")
args = parser.parse_args()
# Detect headless container (no interactive stdin/TTY)
HEADLESS = (not sys.stdin) or (not sys.stdin.isatty())
# Decide whether to bring up the web UI:
# - if user passed --ui
# - OR if we're headless (no stdin)
# - OR if FORCE_UI env var is set (FORCE_UI=1/true/yes)
force_env = os.getenv("FORCE_UI", "").lower() in ("1", "true", "yes")
WANTS_UI = args.ui or HEADLESS or force_env
if WANTS_UI:
ui = build_ui()
if ui is None:
print("Gradio not installed; falling back to CLI.")
run_cli_loop()
if HEADLESS:
run_headless_wait()
else:
# Start any background tasks after app is live (ignore if not defined)
try:
_kickoff_background_if_enabled(log_cb=print)
except NameError:
pass
# Respect platform port if provided
port = int(os.getenv("PORT", "7860"))
os.environ.setdefault("RUNNING_IN_UI","1"); ui.queue().launch(server_name="0.0.0.0", server_port=port)
else:
# CLI mode when a TTY exists
print("Hive (Original-first, Tutor+ • SAFE v5 BACKGROUND) ready. Type to chat.")
try:
_kickoff_background_if_enabled(log_cb=print)
except NameError:
pass
run_cli_loop()
# If the CLI loop ended because stdin vanished, keep app alive
if HEADLESS:
run_headless_wait()