Spaces:
Running
Running
| """Reachy phone tracking + phone-use detection using YOLO26l (no look-down logic).""" | |
| from __future__ import annotations | |
| import argparse | |
| import logging | |
| import math | |
| import os | |
| import time | |
| from pathlib import Path | |
| import cv2 | |
| from ultralytics import YOLO | |
| from reachy_mini import ReachyMini | |
| from reachy_mini.utils import create_head_pose | |
| try: | |
| from reachy_phone_home.movements import MovementScheduler, SituationMovements | |
| except ModuleNotFoundError: | |
| from pathlib import Path | |
| import sys | |
| repo_root = Path(__file__).resolve().parents[1] | |
| sys.path.insert(0, str(repo_root)) | |
| from reachy_phone_home.movements import MovementScheduler, SituationMovements | |
| PHONE_CLASS_ID = 67 # COCO "cell phone" | |
| PERSON_CLASS_ID = 0 | |
| class PhoneFollower: | |
| def __init__(self, move_threshold_px: int = 60, head_duration: float = 0.6) -> None: | |
| self.move_threshold_px = int(move_threshold_px) | |
| self.head_duration = float(head_duration) | |
| self.last_x = None | |
| self.last_y = None | |
| def update_box(self, reachy: ReachyMini, box, y_ratio: float = 0.5) -> None: | |
| x1, y1, x2, y2 = _box_xyxy(box) | |
| cx = int((x1 + x2) / 2) | |
| cy = int(y1 + (y2 - y1) * y_ratio) | |
| self.update_xy(reachy, cx, cy) | |
| def update_xy(self, reachy: ReachyMini, cx: int, cy: int) -> None: | |
| if self.last_x is not None and self.last_y is not None: | |
| if ( | |
| abs(cx - self.last_x) < self.move_threshold_px | |
| and abs(cy - self.last_y) < self.move_threshold_px | |
| ): | |
| return | |
| reachy.look_at_image(cx, cy, duration=self.head_duration, perform_movement=True) | |
| self.last_x = cx | |
| self.last_y = cy | |
| def _box_xyxy(box) -> tuple[int, int, int, int]: | |
| xyxy = box.xyxy[0].tolist() | |
| return int(xyxy[0]), int(xyxy[1]), int(xyxy[2]), int(xyxy[3]) | |
| def _overlaps(a: tuple[int, int, int, int], b: tuple[int, int, int, int]) -> bool: | |
| ax1, ay1, ax2, ay2 = a | |
| bx1, by1, bx2, by2 = b | |
| return not (ax2 < bx1 or ax1 > bx2 or ay2 < by1 or ay1 > by2) | |
| def _pad_box(box: tuple[int, int, int, int], pad_ratio: float) -> tuple[int, int, int, int]: | |
| x1, y1, x2, y2 = box | |
| bw = max(1, x2 - x1) | |
| bh = max(1, y2 - y1) | |
| px = int(bw * pad_ratio) | |
| py = int(bh * pad_ratio) | |
| return x1 - px, y1 - py, x2 + px, y2 + py | |
| def _clamp_box( | |
| box: tuple[int, int, int, int], frame_w: int, frame_h: int | |
| ) -> tuple[int, int, int, int]: | |
| x1, y1, x2, y2 = box | |
| x1 = max(0, min(frame_w - 1, x1)) | |
| x2 = max(0, min(frame_w - 1, x2)) | |
| y1 = max(0, min(frame_h - 1, y1)) | |
| y2 = max(0, min(frame_h - 1, y2)) | |
| if x2 < x1: | |
| x1, x2 = x2, x1 | |
| if y2 < y1: | |
| y1, y2 = y2, y1 | |
| return x1, y1, x2, y2 | |
| def _resolve_weights(weights: str, logger: logging.Logger) -> str: | |
| if weights in ("yolo26l", "yolo26m", "yolo26s", "yolo26n"): | |
| filename = f"{weights}.pt" | |
| cache_root = os.getenv("HF_HOME") or str(Path.home() / ".cache" / "reachy_phone_home") | |
| cache_dir = Path(cache_root) / "models" | |
| cache_path = cache_dir / filename | |
| if not cache_path.exists(): | |
| cache_dir.mkdir(parents=True, exist_ok=True) | |
| url = f"https://huggingface.co/Ultralytics/YOLO26/resolve/main/{filename}" | |
| logger.info("Downloading %s to %s", url, cache_path) | |
| _download_file(url, cache_path) | |
| return str(cache_path) | |
| return weights | |
| def _download_file(url: str, dest: Path) -> None: | |
| import urllib.request | |
| with urllib.request.urlopen(url) as response, open(dest, "wb") as out_file: | |
| out_file.write(response.read()) | |
| def main() -> None: | |
| logging.getLogger("reachy_mini.media.audio_base").setLevel(logging.ERROR) | |
| parser = argparse.ArgumentParser(description="Reachy phone use tracker (YOLO26l)") | |
| parser.add_argument("--weights", type=str, default="yolo26l") | |
| parser.add_argument("--conf", type=float, default=0.15) | |
| parser.add_argument("--process-every", type=int, default=1) | |
| parser.add_argument("--imgsz", type=int, default=640) | |
| parser.add_argument("--head-duration", type=float, default=1.2) | |
| parser.add_argument("--move-threshold-px", type=int, default=60) | |
| parser.add_argument("--no-head", action="store_true") | |
| parser.add_argument("--phone-use-confirm-sec", type=float, default=0.5) | |
| parser.add_argument("--phone-use-clear-sec", type=float, default=0.5) | |
| parser.add_argument("--phone-not-seen-clear-sec", type=float, default=1.0) | |
| parser.add_argument("--pad", type=float, default=0.1) | |
| parser.add_argument("--missing-neutral-sec", type=float, default=0.5) | |
| parser.add_argument("--neutral-duration", type=float, default=1.2) | |
| parser.add_argument("--person-y-ratio", type=float, default=0.3) | |
| parser.add_argument("--look-down-after-sec", type=float, default=5.0) | |
| parser.add_argument("--look-down-duration", type=float, default=1.2) | |
| parser.add_argument("--look-down-z-mm", type=float, default=8.0) | |
| parser.add_argument("--look-down-pitch-deg", type=float, default=30.0) | |
| parser.add_argument("--look-down-window-sec", type=float, default=5.0) | |
| parser.add_argument("--person-search-window-sec", type=float, default=5.0) | |
| parser.add_argument("--no-antenna", action="store_true") | |
| parser.add_argument("--antenna-angry-left", type=float, default=-2.6) | |
| parser.add_argument("--antenna-angry-right", type=float, default=2.6) | |
| parser.add_argument("--antenna-neutral-left", type=float, default=0.0) | |
| parser.add_argument("--antenna-neutral-right", type=float, default=0.0) | |
| parser.add_argument("--antenna-transition-sec", type=float, default=0.5) | |
| parser.add_argument("--antenna-relax-sec", type=float, default=1.0) | |
| parser.add_argument("--antenna-happy-amp", type=float, default=0.2) | |
| parser.add_argument("--antenna-happy-duration", type=float, default=0.5) | |
| parser.add_argument("--good-job-heartbeats", type=int, default=3) | |
| parser.add_argument("--phone-use-bad-sec", type=float, default=10.0) | |
| parser.add_argument("--movement-restore-sec", type=float, default=0.6) | |
| parser.add_argument("--display", action="store_true", help="Show the OpenCV window") | |
| parser.add_argument("--no-display", action="store_true", help="Disable the OpenCV window") | |
| parser.set_defaults(no_display=True) | |
| args = parser.parse_args() | |
| if args.display: | |
| args.no_display = False | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") | |
| logger = logging.getLogger("yolo26l_phone_use_tracker") | |
| weights = _resolve_weights(args.weights, logger) | |
| model = YOLO(weights) | |
| with ReachyMini() as reachy: | |
| follower = PhoneFollower( | |
| move_threshold_px=args.move_threshold_px, | |
| head_duration=args.head_duration, | |
| ) | |
| movements = SituationMovements(reachy) | |
| scheduler = MovementScheduler( | |
| movements, | |
| good_job_heartbeats=args.good_job_heartbeats, | |
| phone_use_bad_sec=args.phone_use_bad_sec, | |
| phone_use_clear_sec=args.phone_use_clear_sec, | |
| restore_head_duration=args.movement_restore_sec, | |
| ) | |
| if not args.no_display: | |
| cv2.namedWindow("YOLO26L Phone Use") | |
| last_phone_use = 0.0 | |
| last_phone_use_state = False | |
| phone_use_start = None | |
| phone_use_stop = None | |
| frame_count = 0 | |
| last_phone_label = None | |
| phone_label_miss = 0 | |
| last_antenna_mode = "neutral" | |
| last_phone_seen = time.time() | |
| neutral_active = False | |
| look_down_active = False | |
| person_track_start = None | |
| missing_start = None | |
| last_heartbeat = time.time() | |
| last_track_state = None | |
| oscillate_start = None | |
| last_oscillate_update = 0.0 | |
| look_down_cycles = 0 | |
| last_prompt = 0.0 | |
| mode = "tracking_phone" | |
| mode_start = time.time() | |
| while True: | |
| frame = reachy.media.get_frame() | |
| if frame is None: | |
| continue | |
| frame_count += 1 | |
| if args.process_every > 1 and frame_count % max(1, args.process_every) != 0: | |
| if not args.no_display: | |
| cv2.imshow("YOLO26L Phone Use", frame) | |
| if cv2.waitKey(1) & 0xFF == ord("q"): | |
| break | |
| continue | |
| results = model( | |
| frame, | |
| verbose=False, | |
| classes=[PERSON_CLASS_ID, PHONE_CLASS_ID], | |
| conf=args.conf, | |
| imgsz=args.imgsz, | |
| ) | |
| boxes = results[0].boxes if results else None | |
| person_boxes = [] | |
| phone_boxes = [] | |
| if boxes is not None and hasattr(boxes, "cls"): | |
| for i in range(len(boxes)): | |
| cls = int(boxes.cls[i].item()) | |
| if cls == PERSON_CLASS_ID: | |
| person_boxes.append(boxes[i]) | |
| elif cls == PHONE_CLASS_ID: | |
| phone_boxes.append(boxes[i]) | |
| phone_seen = len(phone_boxes) > 0 | |
| phone_use = False | |
| best_phone = None | |
| if person_boxes and phone_boxes: | |
| best_person = max( | |
| person_boxes, | |
| key=lambda b: float(b.conf[0].item()) if hasattr(b, "conf") else 0.0, | |
| ) | |
| pxyxy = _pad_box(_box_xyxy(best_person), args.pad) | |
| for phbox in phone_boxes: | |
| phxyxy = _pad_box(_box_xyxy(phbox), args.pad) | |
| if _overlaps(pxyxy, phxyxy): | |
| phone_use = True | |
| break | |
| if phone_boxes: | |
| best_phone = max( | |
| phone_boxes, | |
| key=lambda b: float(b.conf[0].item()) if hasattr(b, "conf") else 0.0, | |
| ) | |
| if not args.no_head: | |
| follower.update_box(reachy, best_phone, y_ratio=0.5) | |
| last_phone_seen = time.time() | |
| neutral_active = False | |
| look_down_active = False | |
| person_track_start = None | |
| missing_start = None | |
| look_down_cycles = 0 | |
| last_prompt = 0.0 | |
| mode = "tracking_phone" | |
| mode_start = time.time() | |
| if not phone_seen and time.time() - last_phone_seen >= args.missing_neutral_sec: | |
| if missing_start is None: | |
| missing_start = time.time() | |
| if not neutral_active: | |
| reachy.goto_target(head=create_head_pose(), duration=args.neutral_duration) | |
| neutral_active = True | |
| if mode == "tracking_phone": | |
| mode = "tracking_person" | |
| mode_start = time.time() | |
| look_down_active = False | |
| if mode == "tracking_person": | |
| if person_boxes and not args.no_head: | |
| best_person = max( | |
| person_boxes, | |
| key=lambda b: float(b.conf[0].item()) if hasattr(b, "conf") else 0.0, | |
| ) | |
| follower.update_box(reachy, best_person, y_ratio=args.person_y_ratio) | |
| if time.time() - mode_start >= args.person_search_window_sec: | |
| mode = "looking_down" | |
| mode_start = time.time() | |
| look_down_active = False | |
| if mode == "looking_down": | |
| if not look_down_active: | |
| reachy.goto_target( | |
| head=create_head_pose( | |
| z=args.look_down_z_mm, | |
| pitch=args.look_down_pitch_deg, | |
| mm=True, | |
| degrees=True, | |
| ), | |
| duration=args.look_down_duration, | |
| ) | |
| look_down_active = True | |
| look_down_cycles += 1 | |
| if look_down_cycles >= 2 and time.time() - last_prompt >= 10.0: | |
| logger.info("Can you please put the phone in front of me?") | |
| last_prompt = time.time() | |
| if time.time() - mode_start >= args.look_down_window_sec: | |
| mode = "tracking_person" | |
| mode_start = time.time() | |
| look_down_active = False | |
| reachy.goto_target(head=create_head_pose(), duration=args.neutral_duration) | |
| if phone_seen: | |
| track_state = "tracking_phone" | |
| elif mode == "looking_down": | |
| track_state = "looking_down" | |
| elif mode == "tracking_person": | |
| track_state = "tracking_person" | |
| else: | |
| track_state = "searching_phone" | |
| if track_state != last_track_state: | |
| logger.info("[state] %s", track_state) | |
| last_track_state = track_state | |
| if phone_use: | |
| last_phone_use = time.time() | |
| phone_use_stop = None | |
| if phone_use_start is None: | |
| phone_use_start = time.time() | |
| if not last_phone_use_state and time.time() - phone_use_start >= args.phone_use_confirm_sec: | |
| logger.info("[state] phone use detected") | |
| last_phone_use_state = True | |
| else: | |
| phone_use_start = None | |
| if phone_seen: | |
| if phone_use_stop is None: | |
| phone_use_stop = time.time() | |
| if last_phone_use_state and time.time() - phone_use_stop >= args.phone_use_clear_sec: | |
| logger.info("[state] phone use stopped") | |
| last_phone_use_state = False | |
| else: | |
| phone_use_stop = None | |
| if last_phone_use_state and time.time() - last_phone_use >= args.phone_not_seen_clear_sec: | |
| logger.info("[state] phone use stopped") | |
| last_phone_use_state = False | |
| if time.time() - last_heartbeat >= 10.0: | |
| logger.info("[heartbeat] phone_detected=%s", "yes" if phone_seen else "no") | |
| if ( | |
| not args.no_antenna | |
| and not last_phone_use_state | |
| and last_track_state == "tracking_phone" | |
| and phone_seen | |
| ): | |
| oscillate_start = time.time() | |
| last_heartbeat = time.time() | |
| scheduler.on_heartbeat( | |
| phone_tracked=(last_track_state == "tracking_phone"), | |
| phone_use=last_phone_use_state, | |
| ) | |
| if not args.no_antenna: | |
| if last_phone_use_state: | |
| antenna_mode = "angry" | |
| elif phone_seen: | |
| antenna_mode = "tracking" | |
| else: | |
| antenna_mode = "neutral" | |
| if antenna_mode != last_antenna_mode: | |
| if antenna_mode == "angry": | |
| reachy.goto_target( | |
| antennas=[args.antenna_angry_left, args.antenna_angry_right], | |
| duration=args.antenna_transition_sec, | |
| ) | |
| else: | |
| duration = ( | |
| args.antenna_relax_sec | |
| if last_antenna_mode == "angry" | |
| else args.antenna_transition_sec | |
| ) | |
| reachy.goto_target( | |
| antennas=[args.antenna_neutral_left, args.antenna_neutral_right], | |
| duration=duration, | |
| ) | |
| last_antenna_mode = antenna_mode | |
| if oscillate_start is not None and antenna_mode == "tracking": | |
| elapsed = time.time() - oscillate_start | |
| if elapsed <= args.antenna_happy_duration: | |
| now = time.time() | |
| if now - last_oscillate_update >= 0.05: | |
| t = elapsed / args.antenna_happy_duration | |
| val = args.antenna_happy_amp * math.sin(-math.pi / 2 + math.pi * t) | |
| reachy.set_target(antennas=(val, -val)) | |
| last_oscillate_update = now | |
| else: | |
| oscillate_start = None | |
| if not args.no_display: | |
| display = frame.copy() | |
| frame_h, frame_w = display.shape[:2] | |
| if person_boxes: | |
| best_person = max( | |
| person_boxes, | |
| key=lambda b: float(b.conf[0].item()) if hasattr(b, "conf") else 0.0, | |
| ) | |
| x1, y1, x2, y2 = _clamp_box( | |
| _pad_box(_box_xyxy(best_person), args.pad), frame_w, frame_h | |
| ) | |
| cv2.rectangle(display, (x1, y1), (x2, y2), (0, 165, 255), 2) | |
| if hasattr(best_person, "conf"): | |
| conf = float(best_person.conf[0].item()) | |
| cv2.putText( | |
| display, | |
| f"{conf:.2f}", | |
| (x1, max(20, y1 - 6)), | |
| cv2.FONT_HERSHEY_SIMPLEX, | |
| 0.5, | |
| (0, 165, 255), | |
| 2, | |
| ) | |
| if best_phone is not None: | |
| x1, y1, x2, y2 = _clamp_box( | |
| _pad_box(_box_xyxy(best_phone), args.pad), frame_w, frame_h | |
| ) | |
| cv2.rectangle(display, (x1, y1), (x2, y2), (0, 255, 0), 2) | |
| conf = float(best_phone.conf[0].item()) if hasattr(best_phone, "conf") else None | |
| label = f"phone {conf:.2f}" if conf is not None else "phone" | |
| last_phone_label = (x1, y1, label) | |
| phone_label_miss = 0 | |
| else: | |
| phone_label_miss += 1 | |
| if phone_label_miss >= 5: | |
| last_phone_label = None | |
| if last_phone_label is not None: | |
| x1, y1, label = last_phone_label | |
| cv2.putText( | |
| display, | |
| label, | |
| (x1, max(20, y1 - 6)), | |
| cv2.FONT_HERSHEY_SIMPLEX, | |
| 0.6, | |
| (0, 255, 0), | |
| 2, | |
| cv2.LINE_AA, | |
| ) | |
| if last_phone_use_state: | |
| cv2.rectangle(display, (5, 5), (230, 55), (0, 0, 255), -1) | |
| cv2.putText( | |
| display, | |
| "PHONE USE", | |
| (12, 42), | |
| cv2.FONT_HERSHEY_SIMPLEX, | |
| 1.2, | |
| (255, 255, 255), | |
| 3, | |
| ) | |
| cv2.imshow("YOLO26L Phone Use", display) | |
| if cv2.waitKey(1) & 0xFF == ord("q"): | |
| break | |
| if not args.no_display: | |
| cv2.destroyAllWindows() | |
| if __name__ == "__main__": | |
| main() | |