reachy_phone_home / reachy_phone_home_app.py
itsMarco-G's picture
Merge branch 'phone_use_integration'
ac2865e
raw
history blame
19.6 kB
"""Reachy phone tracking + phone-use detection using YOLO26l (no look-down logic)."""
from __future__ import annotations
import argparse
import logging
import math
import os
import time
from pathlib import Path
import cv2
from ultralytics import YOLO
from reachy_mini import ReachyMini
from reachy_mini.utils import create_head_pose
try:
from reachy_phone_home.movements import MovementScheduler, SituationMovements
except ModuleNotFoundError:
from pathlib import Path
import sys
repo_root = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(repo_root))
from reachy_phone_home.movements import MovementScheduler, SituationMovements
PHONE_CLASS_ID = 67 # COCO "cell phone"
PERSON_CLASS_ID = 0
class PhoneFollower:
def __init__(self, move_threshold_px: int = 60, head_duration: float = 0.6) -> None:
self.move_threshold_px = int(move_threshold_px)
self.head_duration = float(head_duration)
self.last_x = None
self.last_y = None
def update_box(self, reachy: ReachyMini, box, y_ratio: float = 0.5) -> None:
x1, y1, x2, y2 = _box_xyxy(box)
cx = int((x1 + x2) / 2)
cy = int(y1 + (y2 - y1) * y_ratio)
self.update_xy(reachy, cx, cy)
def update_xy(self, reachy: ReachyMini, cx: int, cy: int) -> None:
if self.last_x is not None and self.last_y is not None:
if (
abs(cx - self.last_x) < self.move_threshold_px
and abs(cy - self.last_y) < self.move_threshold_px
):
return
reachy.look_at_image(cx, cy, duration=self.head_duration, perform_movement=True)
self.last_x = cx
self.last_y = cy
def _box_xyxy(box) -> tuple[int, int, int, int]:
xyxy = box.xyxy[0].tolist()
return int(xyxy[0]), int(xyxy[1]), int(xyxy[2]), int(xyxy[3])
def _overlaps(a: tuple[int, int, int, int], b: tuple[int, int, int, int]) -> bool:
ax1, ay1, ax2, ay2 = a
bx1, by1, bx2, by2 = b
return not (ax2 < bx1 or ax1 > bx2 or ay2 < by1 or ay1 > by2)
def _pad_box(box: tuple[int, int, int, int], pad_ratio: float) -> tuple[int, int, int, int]:
x1, y1, x2, y2 = box
bw = max(1, x2 - x1)
bh = max(1, y2 - y1)
px = int(bw * pad_ratio)
py = int(bh * pad_ratio)
return x1 - px, y1 - py, x2 + px, y2 + py
def _clamp_box(
box: tuple[int, int, int, int], frame_w: int, frame_h: int
) -> tuple[int, int, int, int]:
x1, y1, x2, y2 = box
x1 = max(0, min(frame_w - 1, x1))
x2 = max(0, min(frame_w - 1, x2))
y1 = max(0, min(frame_h - 1, y1))
y2 = max(0, min(frame_h - 1, y2))
if x2 < x1:
x1, x2 = x2, x1
if y2 < y1:
y1, y2 = y2, y1
return x1, y1, x2, y2
def _resolve_weights(weights: str, logger: logging.Logger) -> str:
if weights in ("yolo26l", "yolo26m", "yolo26s", "yolo26n"):
filename = f"{weights}.pt"
cache_root = os.getenv("HF_HOME") or str(Path.home() / ".cache" / "reachy_phone_home")
cache_dir = Path(cache_root) / "models"
cache_path = cache_dir / filename
if not cache_path.exists():
cache_dir.mkdir(parents=True, exist_ok=True)
url = f"https://huggingface.co/Ultralytics/YOLO26/resolve/main/{filename}"
logger.info("Downloading %s to %s", url, cache_path)
_download_file(url, cache_path)
return str(cache_path)
return weights
def _download_file(url: str, dest: Path) -> None:
import urllib.request
with urllib.request.urlopen(url) as response, open(dest, "wb") as out_file:
out_file.write(response.read())
def main() -> None:
logging.getLogger("reachy_mini.media.audio_base").setLevel(logging.ERROR)
parser = argparse.ArgumentParser(description="Reachy phone use tracker (YOLO26l)")
parser.add_argument("--weights", type=str, default="yolo26l")
parser.add_argument("--conf", type=float, default=0.15)
parser.add_argument("--process-every", type=int, default=1)
parser.add_argument("--imgsz", type=int, default=640)
parser.add_argument("--head-duration", type=float, default=1.2)
parser.add_argument("--move-threshold-px", type=int, default=60)
parser.add_argument("--no-head", action="store_true")
parser.add_argument("--phone-use-confirm-sec", type=float, default=0.5)
parser.add_argument("--phone-use-clear-sec", type=float, default=0.5)
parser.add_argument("--phone-not-seen-clear-sec", type=float, default=1.0)
parser.add_argument("--pad", type=float, default=0.1)
parser.add_argument("--missing-neutral-sec", type=float, default=0.5)
parser.add_argument("--neutral-duration", type=float, default=1.2)
parser.add_argument("--person-y-ratio", type=float, default=0.3)
parser.add_argument("--look-down-after-sec", type=float, default=5.0)
parser.add_argument("--look-down-duration", type=float, default=1.2)
parser.add_argument("--look-down-z-mm", type=float, default=8.0)
parser.add_argument("--look-down-pitch-deg", type=float, default=30.0)
parser.add_argument("--look-down-window-sec", type=float, default=5.0)
parser.add_argument("--person-search-window-sec", type=float, default=5.0)
parser.add_argument("--no-antenna", action="store_true")
parser.add_argument("--antenna-angry-left", type=float, default=-2.6)
parser.add_argument("--antenna-angry-right", type=float, default=2.6)
parser.add_argument("--antenna-neutral-left", type=float, default=0.0)
parser.add_argument("--antenna-neutral-right", type=float, default=0.0)
parser.add_argument("--antenna-transition-sec", type=float, default=0.5)
parser.add_argument("--antenna-relax-sec", type=float, default=1.0)
parser.add_argument("--antenna-happy-amp", type=float, default=0.2)
parser.add_argument("--antenna-happy-duration", type=float, default=0.5)
parser.add_argument("--good-job-heartbeats", type=int, default=3)
parser.add_argument("--phone-use-bad-sec", type=float, default=10.0)
parser.add_argument("--movement-restore-sec", type=float, default=0.6)
parser.add_argument("--display", action="store_true", help="Show the OpenCV window")
parser.add_argument("--no-display", action="store_true", help="Disable the OpenCV window")
parser.set_defaults(no_display=True)
args = parser.parse_args()
if args.display:
args.no_display = False
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("yolo26l_phone_use_tracker")
weights = _resolve_weights(args.weights, logger)
model = YOLO(weights)
with ReachyMini() as reachy:
follower = PhoneFollower(
move_threshold_px=args.move_threshold_px,
head_duration=args.head_duration,
)
movements = SituationMovements(reachy)
scheduler = MovementScheduler(
movements,
good_job_heartbeats=args.good_job_heartbeats,
phone_use_bad_sec=args.phone_use_bad_sec,
phone_use_clear_sec=args.phone_use_clear_sec,
restore_head_duration=args.movement_restore_sec,
)
if not args.no_display:
cv2.namedWindow("YOLO26L Phone Use")
last_phone_use = 0.0
last_phone_use_state = False
phone_use_start = None
phone_use_stop = None
frame_count = 0
last_phone_label = None
phone_label_miss = 0
last_antenna_mode = "neutral"
last_phone_seen = time.time()
neutral_active = False
look_down_active = False
person_track_start = None
missing_start = None
last_heartbeat = time.time()
last_track_state = None
oscillate_start = None
last_oscillate_update = 0.0
look_down_cycles = 0
last_prompt = 0.0
mode = "tracking_phone"
mode_start = time.time()
while True:
frame = reachy.media.get_frame()
if frame is None:
continue
frame_count += 1
if args.process_every > 1 and frame_count % max(1, args.process_every) != 0:
if not args.no_display:
cv2.imshow("YOLO26L Phone Use", frame)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
continue
results = model(
frame,
verbose=False,
classes=[PERSON_CLASS_ID, PHONE_CLASS_ID],
conf=args.conf,
imgsz=args.imgsz,
)
boxes = results[0].boxes if results else None
person_boxes = []
phone_boxes = []
if boxes is not None and hasattr(boxes, "cls"):
for i in range(len(boxes)):
cls = int(boxes.cls[i].item())
if cls == PERSON_CLASS_ID:
person_boxes.append(boxes[i])
elif cls == PHONE_CLASS_ID:
phone_boxes.append(boxes[i])
phone_seen = len(phone_boxes) > 0
phone_use = False
best_phone = None
if person_boxes and phone_boxes:
best_person = max(
person_boxes,
key=lambda b: float(b.conf[0].item()) if hasattr(b, "conf") else 0.0,
)
pxyxy = _pad_box(_box_xyxy(best_person), args.pad)
for phbox in phone_boxes:
phxyxy = _pad_box(_box_xyxy(phbox), args.pad)
if _overlaps(pxyxy, phxyxy):
phone_use = True
break
if phone_boxes:
best_phone = max(
phone_boxes,
key=lambda b: float(b.conf[0].item()) if hasattr(b, "conf") else 0.0,
)
if not args.no_head:
follower.update_box(reachy, best_phone, y_ratio=0.5)
last_phone_seen = time.time()
neutral_active = False
look_down_active = False
person_track_start = None
missing_start = None
look_down_cycles = 0
last_prompt = 0.0
mode = "tracking_phone"
mode_start = time.time()
if not phone_seen and time.time() - last_phone_seen >= args.missing_neutral_sec:
if missing_start is None:
missing_start = time.time()
if not neutral_active:
reachy.goto_target(head=create_head_pose(), duration=args.neutral_duration)
neutral_active = True
if mode == "tracking_phone":
mode = "tracking_person"
mode_start = time.time()
look_down_active = False
if mode == "tracking_person":
if person_boxes and not args.no_head:
best_person = max(
person_boxes,
key=lambda b: float(b.conf[0].item()) if hasattr(b, "conf") else 0.0,
)
follower.update_box(reachy, best_person, y_ratio=args.person_y_ratio)
if time.time() - mode_start >= args.person_search_window_sec:
mode = "looking_down"
mode_start = time.time()
look_down_active = False
if mode == "looking_down":
if not look_down_active:
reachy.goto_target(
head=create_head_pose(
z=args.look_down_z_mm,
pitch=args.look_down_pitch_deg,
mm=True,
degrees=True,
),
duration=args.look_down_duration,
)
look_down_active = True
look_down_cycles += 1
if look_down_cycles >= 2 and time.time() - last_prompt >= 10.0:
logger.info("Can you please put the phone in front of me?")
last_prompt = time.time()
if time.time() - mode_start >= args.look_down_window_sec:
mode = "tracking_person"
mode_start = time.time()
look_down_active = False
reachy.goto_target(head=create_head_pose(), duration=args.neutral_duration)
if phone_seen:
track_state = "tracking_phone"
elif mode == "looking_down":
track_state = "looking_down"
elif mode == "tracking_person":
track_state = "tracking_person"
else:
track_state = "searching_phone"
if track_state != last_track_state:
logger.info("[state] %s", track_state)
last_track_state = track_state
if phone_use:
last_phone_use = time.time()
phone_use_stop = None
if phone_use_start is None:
phone_use_start = time.time()
if not last_phone_use_state and time.time() - phone_use_start >= args.phone_use_confirm_sec:
logger.info("[state] phone use detected")
last_phone_use_state = True
else:
phone_use_start = None
if phone_seen:
if phone_use_stop is None:
phone_use_stop = time.time()
if last_phone_use_state and time.time() - phone_use_stop >= args.phone_use_clear_sec:
logger.info("[state] phone use stopped")
last_phone_use_state = False
else:
phone_use_stop = None
if last_phone_use_state and time.time() - last_phone_use >= args.phone_not_seen_clear_sec:
logger.info("[state] phone use stopped")
last_phone_use_state = False
if time.time() - last_heartbeat >= 10.0:
logger.info("[heartbeat] phone_detected=%s", "yes" if phone_seen else "no")
if (
not args.no_antenna
and not last_phone_use_state
and last_track_state == "tracking_phone"
and phone_seen
):
oscillate_start = time.time()
last_heartbeat = time.time()
scheduler.on_heartbeat(
phone_tracked=(last_track_state == "tracking_phone"),
phone_use=last_phone_use_state,
)
if not args.no_antenna:
if last_phone_use_state:
antenna_mode = "angry"
elif phone_seen:
antenna_mode = "tracking"
else:
antenna_mode = "neutral"
if antenna_mode != last_antenna_mode:
if antenna_mode == "angry":
reachy.goto_target(
antennas=[args.antenna_angry_left, args.antenna_angry_right],
duration=args.antenna_transition_sec,
)
else:
duration = (
args.antenna_relax_sec
if last_antenna_mode == "angry"
else args.antenna_transition_sec
)
reachy.goto_target(
antennas=[args.antenna_neutral_left, args.antenna_neutral_right],
duration=duration,
)
last_antenna_mode = antenna_mode
if oscillate_start is not None and antenna_mode == "tracking":
elapsed = time.time() - oscillate_start
if elapsed <= args.antenna_happy_duration:
now = time.time()
if now - last_oscillate_update >= 0.05:
t = elapsed / args.antenna_happy_duration
val = args.antenna_happy_amp * math.sin(-math.pi / 2 + math.pi * t)
reachy.set_target(antennas=(val, -val))
last_oscillate_update = now
else:
oscillate_start = None
if not args.no_display:
display = frame.copy()
frame_h, frame_w = display.shape[:2]
if person_boxes:
best_person = max(
person_boxes,
key=lambda b: float(b.conf[0].item()) if hasattr(b, "conf") else 0.0,
)
x1, y1, x2, y2 = _clamp_box(
_pad_box(_box_xyxy(best_person), args.pad), frame_w, frame_h
)
cv2.rectangle(display, (x1, y1), (x2, y2), (0, 165, 255), 2)
if hasattr(best_person, "conf"):
conf = float(best_person.conf[0].item())
cv2.putText(
display,
f"{conf:.2f}",
(x1, max(20, y1 - 6)),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 165, 255),
2,
)
if best_phone is not None:
x1, y1, x2, y2 = _clamp_box(
_pad_box(_box_xyxy(best_phone), args.pad), frame_w, frame_h
)
cv2.rectangle(display, (x1, y1), (x2, y2), (0, 255, 0), 2)
conf = float(best_phone.conf[0].item()) if hasattr(best_phone, "conf") else None
label = f"phone {conf:.2f}" if conf is not None else "phone"
last_phone_label = (x1, y1, label)
phone_label_miss = 0
else:
phone_label_miss += 1
if phone_label_miss >= 5:
last_phone_label = None
if last_phone_label is not None:
x1, y1, label = last_phone_label
cv2.putText(
display,
label,
(x1, max(20, y1 - 6)),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(0, 255, 0),
2,
cv2.LINE_AA,
)
if last_phone_use_state:
cv2.rectangle(display, (5, 5), (230, 55), (0, 0, 255), -1)
cv2.putText(
display,
"PHONE USE",
(12, 42),
cv2.FONT_HERSHEY_SIMPLEX,
1.2,
(255, 255, 255),
3,
)
cv2.imshow("YOLO26L Phone Use", display)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
if not args.no_display:
cv2.destroyAllWindows()
if __name__ == "__main__":
main()