reachy_phone_home / web_log_ui.py
itsMarco-G's picture
Add phone home ROI and web status UI
3be6312
raw
history blame
7.38 kB
import argparse
import json
import logging
import threading
import time
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import List, Tuple
import cv2
import numpy as np
from reachy_mini import ReachyMini
from reachy_phone_home.main import TrackerConfig, run_tracker
LOG_LOCK = threading.Lock()
FRAME_LOCK = threading.Lock()
VIDEO_LOCK = threading.Lock()
LOG_LINES: List[Tuple[int, str]] = []
LOG_SEQ = 0
MAX_LINES = 1000
LATEST_FRAME = None
VIDEO_ENABLED = False
def _append_log(line: str) -> None:
global LOG_SEQ
with LOG_LOCK:
LOG_SEQ += 1
LOG_LINES.append((LOG_SEQ, line))
if len(LOG_LINES) > MAX_LINES:
LOG_LINES.pop(0)
class _LogWriter:
def write(self, data: str) -> int:
text = data.strip()
if text:
for line in text.splitlines():
_append_log(line)
return len(data)
def flush(self) -> None:
return None
class _QueueHandler(logging.Handler):
def emit(self, record: logging.LogRecord) -> None:
try:
msg = self.format(record)
_append_log(msg)
except Exception:
pass
class LogHandler(BaseHTTPRequestHandler):
def log_message(self, format: str, *args) -> None:
return None
def _send_json(self, payload: dict) -> None:
body = json.dumps(payload).encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_GET(self) -> None:
if self.path.startswith("/debug"):
enabled = False
if "?" in self.path:
query = self.path.split("?", 1)[1]
for part in query.split("&"):
if part.startswith("enabled="):
enabled = part.split("=", 1)[1] in ("1", "true", "yes", "on")
break
global VIDEO_ENABLED
with VIDEO_LOCK:
VIDEO_ENABLED = enabled
current = VIDEO_ENABLED
self._send_json({"enabled": current})
return
if self.path.startswith("/stream.mjpg"):
self.send_response(200)
self.send_header("Content-Type", "multipart/x-mixed-replace; boundary=frame")
self.send_header("Cache-Control", "no-cache, no-store, must-revalidate")
self.end_headers()
try:
while True:
with VIDEO_LOCK:
enabled = VIDEO_ENABLED
if not enabled:
placeholder = _placeholder_frame("Debug video disabled")
ok, jpg = cv2.imencode(".jpg", placeholder)
if ok:
self.wfile.write(b"--frame\r\n")
self.wfile.write(b"Content-Type: image/jpeg\r\n\r\n")
self.wfile.write(jpg.tobytes())
self.wfile.write(b"\r\n")
time.sleep(0.25)
continue
with FRAME_LOCK:
frame = LATEST_FRAME.copy() if LATEST_FRAME is not None else None
if frame is not None:
ok, jpg = cv2.imencode(".jpg", frame)
if ok:
self.wfile.write(b"--frame\r\n")
self.wfile.write(b"Content-Type: image/jpeg\r\n\r\n")
self.wfile.write(jpg.tobytes())
self.wfile.write(b"\r\n")
else:
placeholder = _placeholder_frame("Waiting for camera frames...")
ok, jpg = cv2.imencode(".jpg", placeholder)
if ok:
self.wfile.write(b"--frame\r\n")
self.wfile.write(b"Content-Type: image/jpeg\r\n\r\n")
self.wfile.write(jpg.tobytes())
self.wfile.write(b"\r\n")
time.sleep(0.1)
except Exception:
pass
return
if self.path.startswith("/logs"):
since = 0
if "?" in self.path:
query = self.path.split("?", 1)[1]
for part in query.split("&"):
if part.startswith("since="):
try:
since = int(part.split("=", 1)[1])
except ValueError:
since = 0
with LOG_LOCK:
lines = [item for item in LOG_LINES if item[0] > since]
last_id = LOG_LINES[-1][0] if LOG_LINES else since
self._send_json(
{
"lines": [{"id": i, "text": t} for i, t in lines],
"last_id": last_id,
}
)
return
if self.path == "/" or self.path.startswith("/index"):
html = Path(__file__).with_name("web_log_ui.html").read_bytes()
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(html)))
self.end_headers()
self.wfile.write(html)
return
self.send_response(404)
self.end_headers()
def _start_tracker() -> None:
def _frame_hook(frame) -> None:
with VIDEO_LOCK:
if not VIDEO_ENABLED:
return
global LATEST_FRAME
with FRAME_LOCK:
LATEST_FRAME = frame.copy()
config = TrackerConfig(no_display=True)
try:
with ReachyMini() as reachy:
run_tracker(
reachy,
config,
frame_hook=_frame_hook,
frame_hook_enabled=lambda: VIDEO_ENABLED,
)
except Exception:
logging.exception("Tracker crashed")
def _placeholder_frame(text: str):
canvas = np.full((360, 640, 3), 40, dtype=np.uint8)
cv2.putText(
canvas,
text,
(20, 190),
cv2.FONT_HERSHEY_SIMPLEX,
0.8,
(200, 200, 200),
2,
cv2.LINE_AA,
)
return canvas
def main() -> None:
parser = argparse.ArgumentParser(description="Local web UI for Reachy Phone Home logs")
parser.add_argument("--host", type=str, default="127.0.0.1")
parser.add_argument("--port", type=int, default=8088)
# No display in the web UI; video is streamed via MJPEG.
args = parser.parse_args()
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
root_logger = logging.getLogger()
root_logger.addHandler(_QueueHandler())
import sys
sys.stdout = _LogWriter()
sys.stderr = _LogWriter()
server = ThreadingHTTPServer((args.host, args.port), LogHandler)
print(f"[phone_home] Web UI running at http://{args.host}:{args.port}")
tracker_thread = threading.Thread(target=_start_tracker, daemon=True)
tracker_thread.start()
try:
server.serve_forever()
except KeyboardInterrupt:
pass
if __name__ == "__main__":
main()