"""Reachy phone tracking + phone-use detection using YOLO26l (no look-down logic).""" from __future__ import annotations import argparse import logging import math import os import time from pathlib import Path import cv2 from ultralytics import YOLO from reachy_mini import ReachyMini from reachy_mini.utils import create_head_pose try: from reachy_phone_home.movements import MovementScheduler, SituationMovements except ModuleNotFoundError: from pathlib import Path import sys repo_root = Path(__file__).resolve().parents[1] sys.path.insert(0, str(repo_root)) from reachy_phone_home.movements import MovementScheduler, SituationMovements PHONE_CLASS_ID = 67 # COCO "cell phone" PERSON_CLASS_ID = 0 class PhoneFollower: def __init__(self, move_threshold_px: int = 60, head_duration: float = 0.6) -> None: self.move_threshold_px = int(move_threshold_px) self.head_duration = float(head_duration) self.last_x = None self.last_y = None def update_box(self, reachy: ReachyMini, box, y_ratio: float = 0.5) -> None: x1, y1, x2, y2 = _box_xyxy(box) cx = int((x1 + x2) / 2) cy = int(y1 + (y2 - y1) * y_ratio) self.update_xy(reachy, cx, cy) def update_xy(self, reachy: ReachyMini, cx: int, cy: int) -> None: if self.last_x is not None and self.last_y is not None: if ( abs(cx - self.last_x) < self.move_threshold_px and abs(cy - self.last_y) < self.move_threshold_px ): return reachy.look_at_image(cx, cy, duration=self.head_duration, perform_movement=True) self.last_x = cx self.last_y = cy def _box_xyxy(box) -> tuple[int, int, int, int]: xyxy = box.xyxy[0].tolist() return int(xyxy[0]), int(xyxy[1]), int(xyxy[2]), int(xyxy[3]) def _overlaps(a: tuple[int, int, int, int], b: tuple[int, int, int, int]) -> bool: ax1, ay1, ax2, ay2 = a bx1, by1, bx2, by2 = b return not (ax2 < bx1 or ax1 > bx2 or ay2 < by1 or ay1 > by2) def _pad_box(box: tuple[int, int, int, int], pad_ratio: float) -> tuple[int, int, int, int]: x1, y1, x2, y2 = box bw = max(1, x2 - x1) bh = max(1, y2 - y1) px = int(bw * pad_ratio) py = int(bh * pad_ratio) return x1 - px, y1 - py, x2 + px, y2 + py def _clamp_box( box: tuple[int, int, int, int], frame_w: int, frame_h: int ) -> tuple[int, int, int, int]: x1, y1, x2, y2 = box x1 = max(0, min(frame_w - 1, x1)) x2 = max(0, min(frame_w - 1, x2)) y1 = max(0, min(frame_h - 1, y1)) y2 = max(0, min(frame_h - 1, y2)) if x2 < x1: x1, x2 = x2, x1 if y2 < y1: y1, y2 = y2, y1 return x1, y1, x2, y2 def _resolve_weights(weights: str, logger: logging.Logger) -> str: if weights in ("yolo26l", "yolo26m", "yolo26s", "yolo26n"): filename = f"{weights}.pt" cache_root = os.getenv("HF_HOME") or str(Path.home() / ".cache" / "reachy_phone_home") cache_dir = Path(cache_root) / "models" cache_path = cache_dir / filename if not cache_path.exists(): cache_dir.mkdir(parents=True, exist_ok=True) url = f"https://huggingface.co/Ultralytics/YOLO26/resolve/main/{filename}" logger.info("Downloading %s to %s", url, cache_path) _download_file(url, cache_path) return str(cache_path) return weights def _download_file(url: str, dest: Path) -> None: import urllib.request with urllib.request.urlopen(url) as response, open(dest, "wb") as out_file: out_file.write(response.read()) def main() -> None: logging.getLogger("reachy_mini.media.audio_base").setLevel(logging.ERROR) parser = argparse.ArgumentParser(description="Reachy phone use tracker (YOLO26l)") parser.add_argument("--weights", type=str, default="yolo26l") parser.add_argument("--conf", type=float, default=0.15) parser.add_argument("--process-every", type=int, default=1) parser.add_argument("--imgsz", type=int, default=640) parser.add_argument("--head-duration", type=float, default=1.2) parser.add_argument("--move-threshold-px", type=int, default=60) parser.add_argument("--no-head", action="store_true") parser.add_argument("--phone-use-confirm-sec", type=float, default=0.5) parser.add_argument("--phone-use-clear-sec", type=float, default=0.5) parser.add_argument("--phone-not-seen-clear-sec", type=float, default=1.0) parser.add_argument("--pad", type=float, default=0.1) parser.add_argument("--missing-neutral-sec", type=float, default=0.5) parser.add_argument("--neutral-duration", type=float, default=1.2) parser.add_argument("--person-y-ratio", type=float, default=0.3) parser.add_argument("--look-down-after-sec", type=float, default=5.0) parser.add_argument("--look-down-duration", type=float, default=1.2) parser.add_argument("--look-down-z-mm", type=float, default=8.0) parser.add_argument("--look-down-pitch-deg", type=float, default=30.0) parser.add_argument("--look-down-window-sec", type=float, default=5.0) parser.add_argument("--person-search-window-sec", type=float, default=5.0) parser.add_argument("--no-antenna", action="store_true") parser.add_argument("--antenna-angry-left", type=float, default=-2.6) parser.add_argument("--antenna-angry-right", type=float, default=2.6) parser.add_argument("--antenna-neutral-left", type=float, default=0.0) parser.add_argument("--antenna-neutral-right", type=float, default=0.0) parser.add_argument("--antenna-transition-sec", type=float, default=0.5) parser.add_argument("--antenna-relax-sec", type=float, default=1.0) parser.add_argument("--antenna-happy-amp", type=float, default=0.2) parser.add_argument("--antenna-happy-duration", type=float, default=0.5) parser.add_argument("--good-job-heartbeats", type=int, default=3) parser.add_argument("--phone-use-bad-sec", type=float, default=10.0) parser.add_argument("--movement-restore-sec", type=float, default=0.6) parser.add_argument("--display", action="store_true", help="Show the OpenCV window") parser.add_argument("--no-display", action="store_true", help="Disable the OpenCV window") parser.set_defaults(no_display=True) args = parser.parse_args() if args.display: args.no_display = False logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logger = logging.getLogger("yolo26l_phone_use_tracker") weights = _resolve_weights(args.weights, logger) model = YOLO(weights) with ReachyMini() as reachy: follower = PhoneFollower( move_threshold_px=args.move_threshold_px, head_duration=args.head_duration, ) movements = SituationMovements(reachy) scheduler = MovementScheduler( movements, good_job_heartbeats=args.good_job_heartbeats, phone_use_bad_sec=args.phone_use_bad_sec, phone_use_clear_sec=args.phone_use_clear_sec, restore_head_duration=args.movement_restore_sec, ) if not args.no_display: cv2.namedWindow("YOLO26L Phone Use") last_phone_use = 0.0 last_phone_use_state = False phone_use_start = None phone_use_stop = None frame_count = 0 last_phone_label = None phone_label_miss = 0 last_antenna_mode = "neutral" last_phone_seen = time.time() neutral_active = False look_down_active = False person_track_start = None missing_start = None last_heartbeat = time.time() last_track_state = None oscillate_start = None last_oscillate_update = 0.0 look_down_cycles = 0 last_prompt = 0.0 mode = "tracking_phone" mode_start = time.time() while True: frame = reachy.media.get_frame() if frame is None: continue frame_count += 1 if args.process_every > 1 and frame_count % max(1, args.process_every) != 0: if not args.no_display: cv2.imshow("YOLO26L Phone Use", frame) if cv2.waitKey(1) & 0xFF == ord("q"): break continue results = model( frame, verbose=False, classes=[PERSON_CLASS_ID, PHONE_CLASS_ID], conf=args.conf, imgsz=args.imgsz, ) boxes = results[0].boxes if results else None person_boxes = [] phone_boxes = [] if boxes is not None and hasattr(boxes, "cls"): for i in range(len(boxes)): cls = int(boxes.cls[i].item()) if cls == PERSON_CLASS_ID: person_boxes.append(boxes[i]) elif cls == PHONE_CLASS_ID: phone_boxes.append(boxes[i]) phone_seen = len(phone_boxes) > 0 phone_use = False best_phone = None if person_boxes and phone_boxes: best_person = max( person_boxes, key=lambda b: float(b.conf[0].item()) if hasattr(b, "conf") else 0.0, ) pxyxy = _pad_box(_box_xyxy(best_person), args.pad) for phbox in phone_boxes: phxyxy = _pad_box(_box_xyxy(phbox), args.pad) if _overlaps(pxyxy, phxyxy): phone_use = True break if phone_boxes: best_phone = max( phone_boxes, key=lambda b: float(b.conf[0].item()) if hasattr(b, "conf") else 0.0, ) if not args.no_head: follower.update_box(reachy, best_phone, y_ratio=0.5) last_phone_seen = time.time() neutral_active = False look_down_active = False person_track_start = None missing_start = None look_down_cycles = 0 last_prompt = 0.0 mode = "tracking_phone" mode_start = time.time() if not phone_seen and time.time() - last_phone_seen >= args.missing_neutral_sec: if missing_start is None: missing_start = time.time() if not neutral_active: reachy.goto_target(head=create_head_pose(), duration=args.neutral_duration) neutral_active = True if mode == "tracking_phone": mode = "tracking_person" mode_start = time.time() look_down_active = False if mode == "tracking_person": if person_boxes and not args.no_head: best_person = max( person_boxes, key=lambda b: float(b.conf[0].item()) if hasattr(b, "conf") else 0.0, ) follower.update_box(reachy, best_person, y_ratio=args.person_y_ratio) if time.time() - mode_start >= args.person_search_window_sec: mode = "looking_down" mode_start = time.time() look_down_active = False if mode == "looking_down": if not look_down_active: reachy.goto_target( head=create_head_pose( z=args.look_down_z_mm, pitch=args.look_down_pitch_deg, mm=True, degrees=True, ), duration=args.look_down_duration, ) look_down_active = True look_down_cycles += 1 if look_down_cycles >= 2 and time.time() - last_prompt >= 10.0: logger.info("Can you please put the phone in front of me?") last_prompt = time.time() if time.time() - mode_start >= args.look_down_window_sec: mode = "tracking_person" mode_start = time.time() look_down_active = False reachy.goto_target(head=create_head_pose(), duration=args.neutral_duration) if phone_seen: track_state = "tracking_phone" elif mode == "looking_down": track_state = "looking_down" elif mode == "tracking_person": track_state = "tracking_person" else: track_state = "searching_phone" if track_state != last_track_state: logger.info("[state] %s", track_state) last_track_state = track_state if phone_use: last_phone_use = time.time() phone_use_stop = None if phone_use_start is None: phone_use_start = time.time() if not last_phone_use_state and time.time() - phone_use_start >= args.phone_use_confirm_sec: logger.info("[state] phone use detected") last_phone_use_state = True else: phone_use_start = None if phone_seen: if phone_use_stop is None: phone_use_stop = time.time() if last_phone_use_state and time.time() - phone_use_stop >= args.phone_use_clear_sec: logger.info("[state] phone use stopped") last_phone_use_state = False else: phone_use_stop = None if last_phone_use_state and time.time() - last_phone_use >= args.phone_not_seen_clear_sec: logger.info("[state] phone use stopped") last_phone_use_state = False if time.time() - last_heartbeat >= 10.0: logger.info("[heartbeat] phone_detected=%s", "yes" if phone_seen else "no") if ( not args.no_antenna and not last_phone_use_state and last_track_state == "tracking_phone" and phone_seen ): oscillate_start = time.time() last_heartbeat = time.time() scheduler.on_heartbeat( phone_tracked=(last_track_state == "tracking_phone"), phone_use=last_phone_use_state, ) if not args.no_antenna: if last_phone_use_state: antenna_mode = "angry" elif phone_seen: antenna_mode = "tracking" else: antenna_mode = "neutral" if antenna_mode != last_antenna_mode: if antenna_mode == "angry": reachy.goto_target( antennas=[args.antenna_angry_left, args.antenna_angry_right], duration=args.antenna_transition_sec, ) else: duration = ( args.antenna_relax_sec if last_antenna_mode == "angry" else args.antenna_transition_sec ) reachy.goto_target( antennas=[args.antenna_neutral_left, args.antenna_neutral_right], duration=duration, ) last_antenna_mode = antenna_mode if oscillate_start is not None and antenna_mode == "tracking": elapsed = time.time() - oscillate_start if elapsed <= args.antenna_happy_duration: now = time.time() if now - last_oscillate_update >= 0.05: t = elapsed / args.antenna_happy_duration val = args.antenna_happy_amp * math.sin(-math.pi / 2 + math.pi * t) reachy.set_target(antennas=(val, -val)) last_oscillate_update = now else: oscillate_start = None if not args.no_display: display = frame.copy() frame_h, frame_w = display.shape[:2] if person_boxes: best_person = max( person_boxes, key=lambda b: float(b.conf[0].item()) if hasattr(b, "conf") else 0.0, ) x1, y1, x2, y2 = _clamp_box( _pad_box(_box_xyxy(best_person), args.pad), frame_w, frame_h ) cv2.rectangle(display, (x1, y1), (x2, y2), (0, 165, 255), 2) if hasattr(best_person, "conf"): conf = float(best_person.conf[0].item()) cv2.putText( display, f"{conf:.2f}", (x1, max(20, y1 - 6)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 165, 255), 2, ) if best_phone is not None: x1, y1, x2, y2 = _clamp_box( _pad_box(_box_xyxy(best_phone), args.pad), frame_w, frame_h ) cv2.rectangle(display, (x1, y1), (x2, y2), (0, 255, 0), 2) conf = float(best_phone.conf[0].item()) if hasattr(best_phone, "conf") else None label = f"phone {conf:.2f}" if conf is not None else "phone" last_phone_label = (x1, y1, label) phone_label_miss = 0 else: phone_label_miss += 1 if phone_label_miss >= 5: last_phone_label = None if last_phone_label is not None: x1, y1, label = last_phone_label cv2.putText( display, label, (x1, max(20, y1 - 6)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2, cv2.LINE_AA, ) if last_phone_use_state: cv2.rectangle(display, (5, 5), (230, 55), (0, 0, 255), -1) cv2.putText( display, "PHONE USE", (12, 42), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (255, 255, 255), 3, ) cv2.imshow("YOLO26L Phone Use", display) if cv2.waitKey(1) & 0xFF == ord("q"): break if not args.no_display: cv2.destroyAllWindows() if __name__ == "__main__": main()