""" Reachy Mini Controller A centralized server that listens for Robot connections and hosts a Gradio control interface. """ import asyncio import threading import time import queue from dataclasses import dataclass from typing import Optional, Tuple, Dict import cv2 import gradio as gr import numpy as np from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request, HTTPException from fastapi.responses import StreamingResponse from fastapi.staticfiles import StaticFiles import os import uvicorn from fastrtc import StreamHandler from huggingface_hub import get_token import httpx from reachy_mini.utils import create_head_pose # token = get_token() # if not token: # raise ValueError("No token found. Please set the HF_TOKEN environment variable or login to Hugging Face.") # ------------------------------------------------------------------- # 1. Configuration # ------------------------------------------------------------------- AUDIO_SAMPLE_RATE = 16000 # respeaker samplerate # Audio queue configuration MAX_AUDIO_QUEUE_SIZE = 2 # Movement step sizes NUDGE_ANGLE = 5.0 # degrees for head roll / yaw NUDGE_BODY = 0.3 # degrees for body_yaw NUDGE_PITCH = 5.0 # degrees for pitch # Video loop timing FRAME_SLEEP_S = 0.04 # 25 fps # TURN config TURN_TTL_SERVER_MS = 360_000 turn_credentials = None server_turn_credentials = None # while turn_credentials is None or server_turn_credentials is None: # try: # if turn_credentials is None: # turn_credentials = get_cloudflare_turn_credentials(hf_token=token) # if server_turn_credentials is None: # server_turn_credentials = get_cloudflare_turn_credentials(ttl=TURN_TTL_SERVER_MS, hf_token=token) # except Exception as e: # print(f"[Video] Error getting turn credentials: {e!r}") # time.sleep(1) # ------------------------------------------------------------------- # 2. Data Models # ------------------------------------------------------------------- @dataclass class Movement: name: str x: float = 0 y: float = 0 z: float = 0 roll: float = 0 pitch: float = 0 yaw: float = 0 body_yaw: float = 0 left_antenna: Optional[float] = None right_antenna: Optional[float] = None duration: float = 1.0 # ------------------------------------------------------------------- # 2b. Multi-User Authentication # ------------------------------------------------------------------- # Token cache to prevent rate limiting: {token -> (username, expiry_time)} _token_cache: Dict[str, Tuple[Optional[str], float]] = {} _token_cache_lock = threading.Lock() TOKEN_CACHE_TTL = 300 # 5 minutes async def validate_hf_token(token: str) -> Optional[str]: """ Validate HuggingFace token and return username. Uses caching to prevent rate limiting from HuggingFace API. Args: token: HuggingFace API token Returns: Username if token is valid, None otherwise """ # Check cache first current_time = time.time() with _token_cache_lock: if token in _token_cache: username, expiry = _token_cache[token] if current_time < expiry: print(f"[Auth] Using cached token validation for user: {username}") return username else: # Cache expired, remove it del _token_cache[token] # Validate with HuggingFace API try: async with httpx.AsyncClient() as client: response = await client.get( "https://huggingface.co/api/whoami-v2", headers={"Authorization": f"Bearer {token}"}, timeout=5.0 ) if response.status_code == 200: data = response.json() username = data.get("name") # Cache the result with _token_cache_lock: _token_cache[token] = (username, current_time + TOKEN_CACHE_TTL) print(f"[Auth] Token validated for user: {username}") return username elif response.status_code == 429: print(f"[Auth] Rate limited by HuggingFace API! Status: {response.status_code}") return None except Exception as e: print(f"[Auth] Token validation error: {e}") return None async def get_user_from_websocket(websocket: WebSocket) -> Optional[str]: """ Extract and validate HF token from WebSocket headers or query parameters. Args: websocket: WebSocket connection Returns: Username if authenticated, None otherwise """ # Check for Authorization header (for robot connections) auth_header = websocket.headers.get("authorization", "") if auth_header.startswith("Bearer "): token = auth_header[7:] # Remove "Bearer " prefix return await validate_hf_token(token) # Check for token in query parameters (for browser connections) token_param = websocket.query_params.get("token") if token_param: return await validate_hf_token(token_param) # Check for token in cookies (HuggingFace OAuth) cookie_header = websocket.headers.get("cookie", "") if cookie_header: cookies = {} for cookie in cookie_header.split(";"): if "=" in cookie: name, value = cookie.strip().split("=", 1) cookies[name] = value token = cookies.get("token") or cookies.get("hf_token") if token: return await validate_hf_token(token) return None async def get_user_from_request(request: Request, token_param: Optional[str] = None) -> Optional[str]: """ Extract and validate HF token from HTTP request. Args: request: FastAPI Request object token_param: Optional token from query parameter (from Gradio OAuth) Returns: Username if authenticated, None otherwise """ # First check query parameter (passed from Gradio OAuth) if token_param: print(f"[DEBUG] Using token from query parameter") return await validate_hf_token(token_param) # Check Authorization header (for robot connections) auth_header = request.headers.get("authorization", "") if auth_header.startswith("Bearer "): token = auth_header[7:] print(f"[DEBUG] Found token in Authorization header, validating...") return await validate_hf_token(token) # Check for token in cookies token = request.cookies.get("token") or request.cookies.get("hf_token") if token: print(f"[DEBUG] Found token in cookies, validating...") return await validate_hf_token(token) print("[DEBUG] No authentication found") return None class UserSession: """ Per-user session state for multi-user support. Stores all connection handles and state for a single user robot. """ def __init__(self, user_id: str): self.user_id = user_id # Connection handles self.robot_ws: Optional[WebSocket] = None self.robot_loop: Optional[asyncio.AbstractEventLoop] = None # Video Stream Data self.frame_lock = threading.Lock() self.black_frame = np.zeros((640, 640, 3), dtype=np.uint8) _, buffer = cv2.imencode(".jpg", self.black_frame) self.latest_frame_bytes = buffer.tobytes() self.latest_frame_ts = time.time() # Latency tracking for video self.video_latencies = [] self.video_latency_window = 100 # Audio from robot -> browser self.audio_queue: "queue.Queue[Tuple[int, bytes, float]]" = queue.Queue() # Audio from operator -> robot self.audio_to_robot_queue: "queue.Queue[bytes]" = queue.Queue() # Latency tracking for audio self.audio_latencies = [] self.audio_latency_window = 100 # Live pose state self.pose_lock = threading.Lock() self.current_pose = Movement( name="Current", x=0, y=0, z=0, roll=0, pitch=0, yaw=0, body_yaw=0, left_antenna=0, right_antenna=0, duration=0.2, ) # Robot state (joint positions) self.robot_state_lock = threading.Lock() self.latest_robot_state: Optional[dict] = None self.latest_robot_state_ts: float = 0.0 # --- Connection management --- def set_robot_connection(self, ws: WebSocket, loop: asyncio.AbstractEventLoop) -> None: self.robot_ws = ws self.robot_loop = loop def clear_robot_connection(self) -> None: self.robot_ws = None self.robot_loop = None # --- Video --- def update_frame(self, frame_bytes: bytes, robot_timestamp: Optional[float] = None) -> None: """Update the latest video frame.""" receive_time = time.time() with self.frame_lock: self.latest_frame_bytes = frame_bytes self.latest_frame_ts = receive_time if robot_timestamp is not None: latency_ms = (receive_time - robot_timestamp) * 1000 self.video_latencies.append(latency_ms) if len(self.video_latencies) > self.video_latency_window: self.video_latencies.pop(0) def get_video_latency_stats(self) -> dict: """Get video latency statistics in milliseconds.""" if not self.video_latencies: return {"min": 0, "max": 0, "avg": 0, "latest": 0, "count": 0} return { "min": min(self.video_latencies), "max": max(self.video_latencies), "avg": sum(self.video_latencies) / len(self.video_latencies), "latest": self.video_latencies[-1], "count": len(self.video_latencies) } # --- Audio queues --- @staticmethod def _push_bounded(q: queue.Queue, item, max_size: int, description: str) -> None: while q.qsize() >= max_size: try: dropped = q.get_nowait() del dropped except queue.Empty: break q.put(item) def push_audio_from_robot(self, audio_bytes: bytes, robot_timestamp: Optional[float] = None) -> None: """Push audio data from robot to the queue for browser playback.""" self._push_bounded( self.audio_queue, (AUDIO_SAMPLE_RATE, audio_bytes, robot_timestamp if robot_timestamp is not None else time.time()), MAX_AUDIO_QUEUE_SIZE, "FROM robot", ) def push_audio_to_robot(self, audio_bytes: bytes) -> None: self._push_bounded( self.audio_to_robot_queue, audio_bytes, MAX_AUDIO_QUEUE_SIZE, "TO robot", ) def get_audio_to_robot_blocking(self) -> bytes: try: return self.audio_to_robot_queue.get(timeout=0.2) except queue.Empty: return None def track_audio_latency(self, robot_timestamp: float) -> None: """Track audio latency when audio is about to be played.""" playback_time = time.time() latency_ms = (playback_time - robot_timestamp) * 1000 self.audio_latencies.append(latency_ms) if len(self.audio_latencies) > self.audio_latency_window: self.audio_latencies.pop(0) def get_audio_latency_stats(self) -> dict: """Get audio latency statistics in milliseconds.""" if not self.audio_latencies: return {"min": 0, "max": 0, "avg": 0, "latest": 0, "count": 0} return { "min": min(self.audio_latencies), "max": max(self.audio_latencies), "avg": sum(self.audio_latencies) / len(self.audio_latencies), "latest": self.audio_latencies[-1], "count": len(self.audio_latencies) } # --- Status --- def get_connection_status(self) -> str: return "✅ Robot Connected" if self.robot_ws else "🔴 Waiting for Robot..." # --- Robot state --- def update_robot_state(self, data: dict) -> None: with self.robot_state_lock: self.latest_robot_state = data self.latest_robot_state_ts = time.time() def get_latency_display(self) -> str: """Get formatted latency statistics for display.""" video_stats = self.get_video_latency_stats() audio_stats = self.get_audio_latency_stats() lines = [] if video_stats["count"] > 0: lines.append( f"📹 Video: {video_stats['latest']:.0f}ms " f"(avg: {video_stats['avg']:.0f}ms, max: {video_stats['max']:.0f}ms)" ) if audio_stats["count"] > 0: lines.append( f"đŸŽĩ Audio: {audio_stats['latest']:.0f}ms " f"(avg: {audio_stats['avg']:.0f}ms, max: {audio_stats['max']:.0f}ms)" ) if not lines: return "âąī¸ Latency: Waiting for data..." return "\n".join(lines) # --- Pose management --- def update_pose( self, dx: float = 0, dy: float = 0, dz: float = 0, droll: float = 0, dpitch: float = 0, dyaw: float = 0, dbody_yaw: float = 0, ) -> Movement: with self.pose_lock: p = self.current_pose new = Movement( name="Current", x=p.x + dx, y=p.y + dy, z=p.z + dz, roll=p.roll + droll, pitch=p.pitch + dpitch, yaw=p.yaw + dyaw, body_yaw=p.body_yaw + dbody_yaw, left_antenna=p.left_antenna, right_antenna=p.right_antenna, duration=0.1, ) # Clamp posed values new.pitch = float(np.clip(new.pitch, -30, 30)) new.yaw = float(np.clip(new.yaw, -180, 180)) new.roll = float(np.clip(new.roll, -40, 40)) new.body_yaw = float(np.clip(new.body_yaw, -3, 3)) new.z = float(np.clip(new.z, -20, 50)) new.x = float(np.clip(new.x, -50, 50)) new.y = float(np.clip(new.y, -50, 50)) self.current_pose = new return new def reset_pose(self) -> Movement: with self.pose_lock: self.current_pose = Movement( name="Current", x=0, y=0, z=0, roll=0, pitch=0, yaw=0, body_yaw=0, left_antenna=0, right_antenna=0, duration=0.3, ) return self.current_pose def get_pose_text(self) -> str: with self.pose_lock: p = self.current_pose return ( "Head position:\n" f" x={p.x:.1f}, y={p.y:.1f}, z={p.z:.1f}\n" f" roll={p.roll:.1f}, pitch={p.pitch:.1f}, yaw={p.yaw:.1f}\n" "Body:\n" f" body_yaw={p.body_yaw:.1f}" ) # ------------------------------------------------------------------- # 3. Global State # ------------------------------------------------------------------- class GlobalState: """ Multi-user state manager. Manages per-user sessions, each with its own robot connection and state. """ def __init__(self): # Multi-user sessions: user_id -> UserSession self.sessions: Dict[str, UserSession] = {} self.sessions_lock = threading.Lock() def get_or_create_session(self, user_id: str) -> UserSession: """Get existing session or create a new one for the user.""" with self.sessions_lock: if user_id not in self.sessions: print(f"[MultiUser] Creating new session for user: {user_id}") self.sessions[user_id] = UserSession(user_id) return self.sessions[user_id] def get_session(self, user_id: str) -> Optional[UserSession]: """Get session for user if it exists.""" with self.sessions_lock: return self.sessions.get(user_id) def remove_session(self, user_id: str) -> None: """Remove a user session.""" with self.sessions_lock: if user_id in self.sessions: print(f"[MultiUser] Removing session for user: {user_id}") del self.sessions[user_id] # Multi-user helper methods def get_all_sessions(self) -> list: """Get all active sessions.""" with self.sessions_lock: return list(self.sessions.values()) def get_session_count(self) -> int: """Get count of active sessions.""" with self.sessions_lock: return len(self.sessions) state = GlobalState() # ------------------------------------------------------------------- # 4. Robot commands # ------------------------------------------------------------------- def send_pose_to_robot(session: UserSession, mov: Movement, msg: str = "Move sent"): """Send pose command to robot for a specific user session.""" print(f"[DEBUG] send_pose_to_robot called for user: {session.user_id}") print(f"[DEBUG] robot_ws: {session.robot_ws}, robot_loop: {session.robot_loop}") if not (session.robot_ws and session.robot_loop): print(f"[DEBUG] Robot not connected for user: {session.user_id}") return get_pose_string_for_session(session), "âš ī¸ Robot not connected" pose = create_head_pose( x=mov.x, y=mov.y, z=mov.z, roll=mov.roll, pitch=mov.pitch, yaw=mov.yaw, degrees=True, mm=True, ) payload = { "type": "movement", "movement": { "head": pose.tolist(), "body_yaw": mov.body_yaw, "duration": mov.duration, }, } if mov.left_antenna is not None and mov.right_antenna is not None: payload["movement"]["antennas"] = [ np.deg2rad(mov.right_antenna), np.deg2rad(mov.left_antenna), ] print(f"[DEBUG] Sending payload to robot: {payload['type']}, body_yaw={payload['movement']['body_yaw']}") # Send to robot asynchronously to avoid blocking UI callbacks. try: fut = asyncio.run_coroutine_threadsafe( session.robot_ws.send_json(payload), session.robot_loop, ) def _log_send_error(done_fut): try: done_fut.result() print(f"[DEBUG] Successfully sent movement to robot for {session.user_id}") except Exception as e: print(f"[Move] Failed to send movement to robot for {session.user_id}: {e!r}") fut.add_done_callback(_log_send_error) except Exception as e: print(f"[Move] Failed to queue movement for {session.user_id}: {e!r}") return get_pose_string_for_session(session), "❌ Failed to send movement" return get_pose_string_for_session(session), f"✅ {msg}" # ------------------------------------------------------------------- # 6. FastAPI endpoints # ------------------------------------------------------------------- app = FastAPI() viz_dir = os.path.join(os.path.dirname(__file__), "dist") if not os.path.exists(viz_dir) and os.path.exists(os.path.join(os.path.dirname(__file__), "dist.zip")): # unzip the dist.zip file import zipfile zip_path = os.path.join(os.path.dirname(__file__), "dist.zip") with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(os.path.dirname(__file__)) viz_dir = os.path.join(os.path.dirname(__file__), "dist") if os.path.exists(viz_dir): app.mount( "/viz", StaticFiles(directory=viz_dir, html=True), name="viz", ) @app.websocket("/robot") async def robot_endpoint(ws: WebSocket): """Endpoint for the Robot to connect to (control channel).""" await ws.accept() # Authenticate robot user_id = await get_user_from_websocket(ws) if not user_id: print("[Auth] Robot connection rejected - no valid token") await ws.close(code=1008, reason="Authentication required") return # Get or create user session session = state.get_or_create_session(user_id) session.set_robot_connection(ws, asyncio.get_running_loop()) print(f"[System] Robot Connected for user: {user_id}") try: while True: msg = await ws.receive() if msg.get("type") == "websocket.disconnect": break except (WebSocketDisconnect, Exception): print(f"[System] Robot Disconnected for user: {user_id}") finally: session.clear_robot_connection() @app.websocket("/robot_state") async def robot_state_endpoint(ws: WebSocket): """Endpoint for the Robot to publish joint state.""" await ws.accept() # Authenticate robot user_id = await get_user_from_websocket(ws) if not user_id: print("[Auth] Robot state rejected - no valid token") await ws.close(code=1008, reason="Authentication required") return # Get user session session = state.get_or_create_session(user_id) print(f"[System] Robot State Connected for user: {user_id}") try: while True: data = await ws.receive_json() if isinstance(data, dict) and data.get("type") == "robot_state": session.update_robot_state(data) except (WebSocketDisconnect, Exception): print(f"[System] Robot State Disconnected for user: {user_id}") @app.websocket("/joint_states") async def joint_states_endpoint(ws: WebSocket): """Endpoint for the Browser to receive joint state.""" await ws.accept() # Authenticate browser user user_id = await get_user_from_websocket(ws) if not user_id: print("[Auth] Joint states rejected - no valid token") await ws.close(code=1008, reason="Authentication required") return # Get user session session = state.get_or_create_session(user_id) print(f"[System] Browser Joint State Connected for user: {user_id}") try: while True: with session.robot_state_lock: data = session.latest_robot_state if data and (head_vals := data.get("head")): payload = { "body_yaw": head_vals[0], "angles": head_vals[1:7], "antennas": data.get("antennas"), "timestamp": data.get("timestamp"), } await ws.send_json(payload) await asyncio.sleep(0.03) except (WebSocketDisconnect, Exception): print(f"[System] Browser Joint State Disconnected for user: {user_id}") @app.get("/video_feed") async def video_feed(request: Request, token: Optional[str] = None): """Video feed endpoint - requires authentication via token parameter.""" # Authenticate user user_id = await get_user_from_request(request, token) print(f"[video_feed] Authenticated user_id: {user_id}") if not user_id: raise HTTPException(status_code=401, detail="Authentication required. Please log in with Hugging Face.") # Get user's session session = state.get_session(user_id) if not session: print(f"[video_feed] No robot connected for user: {user_id}") raise HTTPException(status_code=404, detail="No robot connected for your account") # Create user-specific video generator def generate_user_video(): last_timestamp = 0.0 frame_count = 0 start_time = time.time() while True: with session.frame_lock: current_bytes = session.latest_frame_bytes current_timestamp = session.latest_frame_ts if current_timestamp > last_timestamp and current_bytes is not None: last_timestamp = current_timestamp frame_count += 1 elapsed = time.time() - start_time if elapsed > 1.0: fps = frame_count / elapsed print(f"[video_feed] User {user_id} FPS: {fps:.2f}") frame_count = 0 start_time = time.time() yield ( b"--frame\r\n" b"Content-Type: image/jpeg\r\n\r\n" + current_bytes + b"\r\n" ) else: time.sleep(FRAME_SLEEP_S) continue time.sleep(FRAME_SLEEP_S) return StreamingResponse( generate_user_video(), media_type="multipart/x-mixed-replace; boundary=frame", ) @app.get("/audio_feed") async def audio_feed(request: Request, token: Optional[str] = None): """Audio feed endpoint - requires authentication via token parameter.""" # Authenticate user user_id = await get_user_from_request(request, token) if not user_id: raise HTTPException(status_code=401, detail="Authentication required. Please log in with Hugging Face.") # Get user's session session = state.get_session(user_id) if not session: raise HTTPException(status_code=404, detail="No robot connected for your account") # Create user-specific audio generator def generate_user_audio(): # Clear old data to start fresh with session.audio_queue.mutex: session.audio_queue.queue.clear() TARGET_SAMPLES = 512 byte_buffer = bytearray() while True: try: sample_rate, chunk_bytes, robot_timestamp = session.audio_queue.get(timeout=1.0) if robot_timestamp is not None: session.track_audio_latency(robot_timestamp) if chunk_bytes: byte_buffer.extend(chunk_bytes) except queue.Empty: continue chunk_size = TARGET_SAMPLES * 2 while len(byte_buffer) >= chunk_size: out_bytes = byte_buffer[:chunk_size] byte_buffer = byte_buffer[chunk_size:] yield bytes(out_bytes) return StreamingResponse( generate_user_audio(), media_type="application/octet-stream", headers={ "Cache-Control": "no-cache", "X-Content-Type-Options": "nosniff", } ) @app.websocket("/video_stream") async def stream_endpoint(ws: WebSocket): """ Endpoint for Robot/Sim to send video frames. Expected message formats: 1. Binary only (legacy): Just the JPEG frame bytes 2. JSON with timestamp: {"timestamp": , "frame": } """ await ws.accept() # Authenticate robot user_id = await get_user_from_websocket(ws) if not user_id: print("[Auth] Video stream rejected - no valid token") await ws.close(code=1008, reason="Authentication required") return # Get user session session = state.get_or_create_session(user_id) print(f"[Video] Stream connected for user: {user_id}") frame_count = 0 start_time = time.time() latency_report_interval = 5.0 # Report latency stats every 5 seconds last_latency_report = time.time() try: while True: msg = await ws.receive() # Handle binary-only messages (legacy mode, no timestamp) data = msg.get("bytes") if data: session.update_frame(data, robot_timestamp=None) frame_count += 1 # Handle text/JSON messages with timestamp text_data = msg.get("text") if text_data: import base64 import json try: json_data = json.loads(text_data) timestamp = json_data.get("timestamp") frame_b64 = json_data.get("frame") if frame_b64: frame_bytes = base64.b64decode(frame_b64) session.update_frame(frame_bytes, robot_timestamp=timestamp) frame_count += 1 except (json.JSONDecodeError, KeyError) as e: print(f"[Video] Error parsing JSON: {e}") # FPS reporting elapsed = time.time() - start_time if elapsed > 1.0: fps = frame_count / elapsed print(f"[Video] Receiving FPS: {fps:.2f}") frame_count = 0 start_time = time.time() # Latency reporting if time.time() - last_latency_report > latency_report_interval: stats = session.get_video_latency_stats() if stats["count"] > 0: print( f"[Video Latency] User: {user_id}, " f"Latest: {stats['latest']:.1f}ms, " f"Avg: {stats['avg']:.1f}ms, " f"Min: {stats['min']:.1f}ms, " f"Max: {stats['max']:.1f}ms " f"(over {stats['count']} frames)" ) last_latency_report = time.time() except WebSocketDisconnect as e: print(f"[Video] WebSocketDisconnect: code={e.code}, reason={e.reason}") except asyncio.CancelledError: print("[Video] stream_endpoint cancelled") except Exception as e: print(f"[Video] stream_endpoint closed with error: {e!r}") finally: print("[Video] stream_endpoint closed (finally)") @app.websocket("/audio_stream") async def audio_endpoint(ws: WebSocket): """ Full duplex audio channel between Robot/Sim and server. Expected message formats from robot: 1. Binary only (legacy): Just the audio bytes 2. JSON with timestamp: {"timestamp": , "audio": } """ await ws.accept() # Authenticate robot user_id = await get_user_from_websocket(ws) if not user_id: print("[Auth] Audio stream rejected - no valid token") await ws.close(code=1008, reason="Authentication required") return # Get user session session = state.get_or_create_session(user_id) print(f"[Audio] Stream Connected for user: {user_id}") latency_report_interval = 5.0 # Report latency stats every 5 seconds last_latency_report = time.time() async def robot_to_server(): nonlocal last_latency_report try: while True: data = await ws.receive() t = data.get("type") if t == "websocket.disconnect": print(f"[Audio] Disconnected (recv) for user: {user_id}") break if t == "websocket.receive": # Handle binary-only messages (legacy mode, no timestamp) if data.get("bytes"): session.push_audio_from_robot(data["bytes"], robot_timestamp=None) # Handle JSON messages with timestamp elif data.get("text"): text_data = data.get("text") if text_data == "ping": print("[Audio] Received ping") else: import json import base64 try: json_data = json.loads(text_data) timestamp = json_data.get("timestamp") audio_b64 = json_data.get("audio") if audio_b64: audio_bytes = base64.b64decode(audio_b64) session.push_audio_from_robot(audio_bytes, robot_timestamp=timestamp) except (json.JSONDecodeError, KeyError): pass # Latency reporting if time.time() - last_latency_report > latency_report_interval: stats = session.get_audio_latency_stats() if stats["count"] > 0: print( f"[Audio Latency] " f"Latest: {stats['latest']:.1f}ms, " f"Avg: {stats['avg']:.1f}ms, " f"Min: {stats['min']:.1f}ms, " f"Max: {stats['max']:.1f}ms " f"(over {stats['count']} chunks)" ) last_latency_report = time.time() except asyncio.CancelledError: print("[Audio] robot_to_server cancelled") except Exception as e: print(f"[Audio] robot_to_server error: {e}") async def server_to_robot(): loop = asyncio.get_running_loop() try: while True: chunk: bytes = await loop.run_in_executor( None, session.get_audio_to_robot_blocking ) if chunk is not None: await ws.send_bytes(chunk) except asyncio.CancelledError: print("[Audio] server_to_robot cancelled") except Exception as e: print(f"[Audio] server_to_robot error: {e}") try: await asyncio.gather(robot_to_server(), server_to_robot()) except asyncio.CancelledError: print("[Audio] audio_endpoint cancelled") finally: print("[Audio] Stream Closed") @app.websocket("/browser_stream") async def browser_stream_endpoint(ws: WebSocket): """ Bi-directional connection for the Browser. - Sends Microphone data (Browser -> Robot) - Receives Speaker data (Robot -> Browser) """ await ws.accept() # Authenticate browser user user_id = await get_user_from_websocket(ws) if not user_id: print("[Auth] Browser stream rejected - no valid token") await ws.close(code=1008, reason="Authentication required") return # Get user session session = state.get_or_create_session(user_id) print(f"[Browser] WebSocket Connected for user: {user_id}") # Task: Send Audio FROM Robot TO Browser async def send_to_browser(): while True: # Get audio from the robot queue (non-blocking check) if not session.audio_queue.empty(): _, chunk_bytes, robot_timestamp = session.audio_queue.get(timeout=0.5) # Track latency if we have a timestamp if robot_timestamp is not None: session.track_audio_latency(robot_timestamp) try: # Send as binary message await ws.send_bytes(chunk_bytes) except Exception: break else: await asyncio.sleep(0.005) # Tiny sleep to prevent CPU burn # Task: Receive Audio FROM Browser TO Robot async def receive_from_browser(): try: while True: data = await ws.receive_bytes() # Push directly to the robot's input queue session.push_audio_to_robot(data) except Exception as e: print(f"[Browser] Input stream ended: {e}") try: # Run both tasks concurrently await asyncio.gather(send_to_browser(), receive_from_browser()) except Exception as e: print(f"[Browser] WebSocket Closed: {e}") finally: print("[Browser] Disconnected") @app.websocket("/control") async def control_endpoint(ws: WebSocket): """Keyboard control endpoint for browser -> robot movement commands.""" await ws.accept() user_id = await get_user_from_websocket(ws) if not user_id: print("[Auth] Control stream rejected - no valid token") await ws.close(code=1008, reason="Authentication required") return session = state.get_or_create_session(user_id) print(f"[Control] Keyboard stream connected for user: {user_id}") try: while True: msg = await ws.receive() if msg.get("type") == "websocket.disconnect": break payload = msg.get("text") if not payload: continue action = "" try: import json data = json.loads(payload) action = str(data.get("action", "")).lower().strip() except Exception: action = payload.lower().strip() if not action: continue pose = apply_keyboard_action(session, action) await ws.send_json({"pose": pose}) except (WebSocketDisconnect, Exception): print(f"[Control] Keyboard stream disconnected for user: {user_id}") # ------------------------------------------------------------------- # 8. Movement UI helpers # ------------------------------------------------------------------- def get_pose_string(): """Returns pose in format JS can parse: pitch:X,yaw:Y,roll:Z,body:B NOTE: This is a legacy function that returns empty if no sessions exist. Should be replaced with per-user version.""" sessions = state.get_all_sessions() if sessions: session = sessions[0] # Use first session for now (temporary) return get_pose_string_for_session(session) return "pitch:0.0,yaw:0.0,roll:0.0,body:0.0" def get_connection_status_for_user(profile: gr.OAuthProfile | None) -> str: """Get connection status for authenticated user.""" if profile is None: return "🔴 Not authenticated" user_id = profile.username session = state.get_session(user_id) return session.get_connection_status() if session else "🔴 Waiting for Robot..." def get_latency_display_for_user(profile: gr.OAuthProfile | None) -> str: """Get latency display for authenticated user.""" if profile is None: return "Not authenticated" user_id = profile.username session = state.get_session(user_id) return session.get_latency_display() if session else "No latency data available" def nudge_pose(profile: gr.OAuthProfile | None, dpitch=0, dyaw=0, droll=0, dbody_yaw=0, label="Move"): """Modified to return pose string instead of tuple. Requires OAuth profile.""" if profile is None: return "Not authenticated" user_id = profile.username session = state.get_session(user_id) if not session: return "No robot connected" mov = session.update_pose( dpitch=dpitch, dyaw=dyaw, droll=droll, dbody_yaw=dbody_yaw, ) send_pose_to_robot(session, mov, label) return get_pose_string_for_session(session) def center_pose(profile: gr.OAuthProfile | None): """Modified to return pose string. Requires OAuth profile.""" if profile is None: return "Not authenticated" user_id = profile.username session = state.get_session(user_id) if not session: return "No robot connected" mov = session.reset_pose() send_pose_to_robot(session, mov, "Reset pose") return get_pose_string_for_session(session) def get_pose_string_for_session(session: UserSession) -> str: """Get pose string for a specific session.""" p = session.current_pose return f"pitch:{p.pitch:.1f},yaw:{p.yaw:.1f},roll:{p.roll:.1f},body:{p.body_yaw:.1f}" def apply_keyboard_action(session: UserSession, action: str) -> str: """Apply one keyboard action to a user session and forward to robot.""" print(f"[Control] Action '{action}' for user: {session.user_id}") print(f"[DEBUG] Session has robot_ws: {session.robot_ws is not None}, robot_loop: {session.robot_loop is not None}") action_map = { "w": (dict(dpitch=-NUDGE_PITCH), "W"), "s": (dict(dpitch=NUDGE_PITCH), "S"), "a": (dict(dyaw=NUDGE_ANGLE * 2), "A"), "d": (dict(dyaw=-NUDGE_ANGLE * 2), "D"), "q": (dict(droll=-NUDGE_ANGLE), "Q"), "e": (dict(droll=NUDGE_ANGLE), "E"), "j": (dict(dbody_yaw=NUDGE_BODY), "J"), "l": (dict(dbody_yaw=-NUDGE_BODY), "L"), } if action == "h": mov = session.reset_pose() send_pose_to_robot(session, mov, "Reset pose") return get_pose_string_for_session(session) config = action_map.get(action) if config is None: return get_pose_string_for_session(session) kwargs, label = config mov = session.update_pose(**kwargs) send_pose_to_robot(session, mov, label) return get_pose_string_for_session(session) # Quick action button wrapper functions def qa_center(profile: gr.OAuthProfile | None): return center_pose(profile) def qa_look_up(profile: gr.OAuthProfile | None): return nudge_pose(profile, dpitch=-15, label="Look Up") def qa_curious(profile: gr.OAuthProfile | None): return nudge_pose(profile, dpitch=-10, droll=15, label="Curious") def qa_excited(profile: gr.OAuthProfile | None): return nudge_pose(profile, dpitch=-5, droll=-10, label="Excited") # ------------------------------------------------------------------- # 9. Gradio UI # ------------------------------------------------------------------- CUSTOM_CSS = """ /* Login view styling */ .login-container { position: fixed !important; top: 0 !important; left: 0 !important; width: 100vw !important; height: 100vh !important; display: flex !important; align-items: center !important; justify-content: center !important; padding: 20px !important; background: linear-gradient(135deg, #0f0f1a 0%, #1a1a2e 100%) !important; z-index: 9999 !important; overflow: auto !important; } .login-card { background: #16213E !important; padding: 48px !important; border-radius: 20px !important; text-align: center; max-width: 420px; box-shadow: 0 20px 60px rgba(0, 0, 0, 0.5) !important; } .login-logo { width: 80px; height: 80px; margin: 0 auto 24px; border-radius: 16px; display: block; } .login-title { color: #FF6B35 !important; margin-bottom: 12px; font-size: 2em !important; font-weight: 700 !important; } .login-description { color: #A0AEC0 !important; margin-bottom: 32px; font-size: 1em !important; line-height: 1.6 !important; } /* Dark theme overrides */ .gradio-container { background: linear-gradient(135deg, #0f0f1a 0%, #1a1a2e 100%) !important; min-height: 100vh; padding: 2rem 3rem !important; } .dark { --background-fill-primary: #16162a !important; --background-fill-secondary: #1e1e38 !important; --border-color-primary: #2d2d4a !important; --text-color-subdued: #9090b0 !important; } /* Header styling */ #header-row { background: transparent !important; border: none !important; margin-bottom: 2rem; display: flex !important; justify-content: space-between !important; align-items: flex-start !important; padding: 0 !important; } #app-title { font-size: 1.5rem !important; font-weight: 600 !important; background: linear-gradient(90deg, #fff, #888) !important; -webkit-background-clip: text !important; -webkit-text-fill-color: transparent !important; border: none !important; padding: 0 !important; margin: 0 !important; } /* Status badge */ #status-box { flex-shrink: 0 !important; width: auto !important; max-width: 200px !important; min-width: 160px !important; background: rgba(16, 185, 129, 0.15) !important; border: 1px solid rgba(16, 185, 129, 0.4) !important; border-radius: 9999px !important; padding: 0.4rem 1rem !important; font-size: 0.875rem !important; } #status-box textarea { background: transparent !important; border: none !important; color: #10b981 !important; text-align: center !important; font-weight: 500 !important; padding: 0 !important; min-height: unset !important; height: auto !important; line-height: 1.4 !important; } /* Latency display */ #latency-box { flex-shrink: 0 !important; width: 100% !important; max-width: 100% !important; min-width: 300px !important; background: rgba(139, 92, 246, 0.15) !important; border: 1px solid rgba(139, 92, 246, 0.4) !important; border-radius: 0.75rem !important; padding: 0.5rem 1rem !important; font-size: 0.85rem !important; margin-top: 0.5rem !important; } #latency-box textarea { background: transparent !important; border: none !important; color: #a78bfa !important; text-align: left !important; font-weight: 500 !important; font-family: monospace !important; padding: 0 !important; min-height: 2.5rem !important; height: auto !important; max-height: 4rem !important; line-height: 1.4 !important; white-space: pre-wrap !important; overflow-y: auto !important; resize: none !important; } #latency-box textarea::-webkit-scrollbar { display: none !important; } #latency-box .scroll-hide { scrollbar-width: none !important; } #latency-box::-webkit-scrollbar { display: none !important; } /* Video panel */ #video-column { background: linear-gradient(145deg, #1a1a2e 0%, #16162a 100%) !important; border-radius: 1.5rem !important; border: 1px solid rgba(139, 92, 246, 0.2) !important; box-shadow: 0 8px 32px rgba(0, 0, 0, 0.4) !important; overflow: hidden !important; min-height: 600px !important; padding: 1.5rem !important; } #robot-video { border-radius: 1rem !important; overflow: hidden !important; box-shadow: 0 4px 16px rgba(0, 0, 0, 0.3) !important; } /* Control panel cards */ .control-card { background: linear-gradient(145deg, #1e1e38 0%, #1a1a32 100%) !important; border: 1px solid rgba(139, 92, 246, 0.15) !important; border-radius: 1rem !important; padding: 1.25rem !important; box-shadow: 0 4px 16px rgba(0, 0, 0, 0.2) !important; margin-bottom: 1.5rem !important; } /* Audio section */ #audio-section { background: linear-gradient(145deg, #1e1e38 0%, #1a1a32 100%) !important; border: 1px solid rgba(139, 92, 246, 0.15) !important; border-radius: 1rem !important; padding: 1.25rem !important; box-shadow: 0 4px 16px rgba(0, 0, 0, 0.2) !important; margin-bottom: 1.5rem !important; } /* Markdown headings inside cards */ .control-card h3, #audio-section h3 { margin-top: 0 !important; margin-bottom: 1rem !important; font-size: 1rem !important; font-weight: 600 !important; color: #e0e0ff !important; } #listen-btn { background: rgba(139, 92, 246, 0.2) !important; border: 1px solid rgba(139, 92, 246, 0.3) !important; color: #a78bfa !important; border-radius: 0.5rem !important; transition: all 0.2s !important; } #listen-btn:hover { background: rgba(139, 92, 246, 0.3) !important; } /* Quick action buttons */ .quick-btn { background: linear-gradient(145deg, #2d2d4a 0%, #25253e 100%) !important; border: 1px solid rgba(139, 92, 246, 0.3) !important; border-radius: 0.75rem !important; padding: 0.75rem !important; font-size: 0.875rem !important; font-weight: 500 !important; transition: all 0.3s ease !important; box-shadow: 0 2px 8px rgba(0, 0, 0, 0.2) !important; } .quick-btn:hover { background: linear-gradient(145deg, #8b5cf6 0%, #7c3aed 100%) !important; border-color: rgba(139, 92, 246, 0.6) !important; transform: translateY(-2px) !important; box-shadow: 0 4px 16px rgba(139, 92, 246, 0.4) !important; } /* Add consistent spacing to main content */ .gradio-container > .main { gap: 1.5rem !important; } /* Ensure columns have proper gap */ .gradio-container .row { gap: 1.5rem !important; } /* Hide Gradio footer or make room for it */ footer { opacity: 0.5; } /* Force transparency on Gradio's inner wrappers */ .control-card .styler, .control-card .block, #audio-section .styler, #audio-section .block { background: transparent !important; border: none !important; box-shadow: none !important; } /* More specific targeting for the wrapper divs */ #audio-section > .styler > .block { background: transparent !important; padding: 0 !important; } /* Target the html-container wrapper that has padding class */ #audio-section .html-container, #audio-section .html-container.padding, .control-card .html-container, .control-card .html-container.padding { background: transparent !important; padding: 0 !important; } /* Target all nested divs inside control cards except our custom elements */ .control-card div:not(.key):not(.gauge-bar):not(.gauge-fill):not([style*="background"]), #audio-section div:not([style*="background"]) { background: transparent !important; border: none !important; } /* Hidden pose state (keep in DOM for JS) */ #pose-state { position: absolute !important; opacity: 0 !important; pointer-events: none !important; height: 0 !important; overflow: hidden !important; } """ KEYBOARD_VIZ_HTML_INLINE = """
Q
W
E
A
S
D
J
H
L
""" GAUGES_HTML_INLINE = """
Pitch
0.0°
Yaw
0.0°
Roll
0.0°
Body
0.0°
""" APP_JS = """ () => { if (window.__reachyControlsInitialized) return; window.__reachyControlsInitialized = true; // ========================================== // 1. BI-DIRECTIONAL AUDIO WITH MIC SELECTION // ========================================== // Global handles to manage hot-swapping window.currentStream = null; window.wsHandle = null; // --- Helper: Populate Mic List --- window.refreshMicList = async function() { const select = document.getElementById('mic-select'); try { const devices = await navigator.mediaDevices.enumerateDevices(); const audioInputs = devices.filter(device => device.kind === 'audioinput'); const currentVal = select.value; select.innerHTML = ''; // Clear existing // Add Default option const defaultOpt = document.createElement('option'); defaultOpt.value = ""; defaultOpt.text = "Default Microphone"; select.appendChild(defaultOpt); audioInputs.forEach(device => { const option = document.createElement('option'); option.value = device.deviceId; // If label is empty, permission isn't granted yet option.text = device.label || `Microphone ${device.deviceId.slice(0,5)}...`; select.appendChild(option); }); // Restore selection if it still exists if (currentVal) select.value = currentVal; } catch (e) { console.error("Error listing devices", e); } }; window.startAudioPlayer = async function(explicitToken) { const btn = document.getElementById('start-stream-btn'); const status = document.getElementById('audio-status'); const micSelect = document.getElementById('mic-select'); console.log("[Audio] Starting Bi-Directional Stream with injected token..."); try { // --- A. Setup Audio Context --- const AudioContext = window.AudioContext || window.webkitAudioContext; if (!window.audioCtx) { window.audioCtx = new AudioContext({ sampleRate: 16000 }); } const ctx = window.audioCtx; if (ctx.state === 'suspended') await ctx.resume(); status.innerText = "Status: Requesting Mic..."; btn.disabled = true; // --- B. Get Microphone (Input) --- // Check dropdown for specific device ID const selectedMicId = micSelect.value; const constraints = { audio: { deviceId: selectedMicId ? { exact: selectedMicId } : undefined, channelCount: 1, sampleRate: 16000, echoCancellation: true, noiseSuppression: true, autoGainControl: true } }; const stream = await navigator.mediaDevices.getUserMedia(constraints); window.currentStream = stream; // Save global ref // **Refresh list now that we have permission (to show labels)** await window.refreshMicList(); if (selectedMicId) micSelect.value = selectedMicId; status.innerText = "Status: Connecting WS..."; // --- C. Setup WebSocket with OAuth Token --- // Use the token passed directly from Python (injected into HTML) const hfToken = explicitToken; if (!hfToken) { console.error('[Auth] No HF token provided to startAudioPlayer'); status.innerText = "Error: Not authenticated. Please log in."; btn.disabled = false; return; } console.log(`[Auth] Using injected token: ${hfToken.substring(0, 10)}...`); // If we are restarting, reuse WS if open, or create new let ws = window.wsHandle; if (!ws || ws.readyState !== WebSocket.OPEN) { const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; // Include token as query parameter for WebSocket authentication const wsUrl = `${protocol}//${window.location.host}/browser_stream?token=${encodeURIComponent(hfToken)}`; console.log(`[Auth] Connecting to WebSocket...`); ws = new WebSocket(wsUrl); ws.binaryType = 'arraybuffer'; window.wsHandle = ws; } // --- D. Setup Input Processor (Mic -> WS) --- const source = ctx.createMediaStreamSource(stream); // BufferSize 1024 provides a good balance of latency vs stability const processor = ctx.createScriptProcessor(1024, 1, 1); processor.onaudioprocess = (e) => { if (ws.readyState !== WebSocket.OPEN) return; const inputData = e.inputBuffer.getChannelData(0); // Convert Float32 (-1.0 to 1.0) -> Int16 (-32768 to 32767) const int16Buffer = new Int16Array(inputData.length); for (let i = 0; i < inputData.length; i++) { let s = Math.max(-1, Math.min(1, inputData[i])); int16Buffer[i] = s < 0 ? s * 0x8000 : s * 0x7FFF; } ws.send(int16Buffer.buffer); }; source.connect(processor); processor.connect(ctx.destination); // --- E. Setup Output (WS -> Speaker) --- // Only attach listener if it's a new WS connection if (!ws.onmessage) { let nextTime = 0; ws.onopen = () => { console.log("[Audio] WebSocket Open"); status.innerText = "Status: đŸŸĸ Connected"; btn.innerText = "Microphone Active"; }; ws.onmessage = (event) => { const int16Data = new Int16Array(event.data); const floatBuffer = ctx.createBuffer(1, int16Data.length, 16000); const channelData = floatBuffer.getChannelData(0); for (let i = 0; i < int16Data.length; i++) { channelData[i] = (int16Data[i] / 32768.0); } const src = ctx.createBufferSource(); src.buffer = floatBuffer; src.connect(ctx.destination); const now = ctx.currentTime; if (nextTime < now) nextTime = now; if (nextTime > now + 0.08) { console.log("Catching up audio latency..."); nextTime = now; } src.start(nextTime); nextTime += floatBuffer.duration; }; ws.onerror = (e) => { console.error("WS Error", e); status.innerText = "Status: WebSocket Error"; btn.disabled = false; }; ws.onclose = () => { status.innerText = "Status: Disconnected"; btn.disabled = false; btn.innerText = "â–ļī¸ Reconnect"; if (window.currentStream) { window.currentStream.getTracks().forEach(track => track.stop()); } processor.disconnect(); source.disconnect(); }; } else { // If WS was already open, just update UI status.innerText = "Status: đŸŸĸ Connected (Mic Switched)"; btn.innerText = "Microphone Active"; } // Handle Mic Switching micSelect.onchange = async () => { console.log("Switching microphone..."); status.innerText = "Status: Switching Mic..."; // Stop current mic tracks if (window.currentStream) { window.currentStream.getTracks().forEach(t => t.stop()); } processor.disconnect(); source.disconnect(); // Restart player (will pick up new value from dropdown) await window.startAudioPlayer(); }; } catch (err) { console.error("[Audio] Setup Error:", err); status.innerText = "Error: " + err.message; btn.disabled = false; } }; // Attempt to list mics on load (will likely have empty labels until permission) setTimeout(window.refreshMicList, 1000); // ========================================== // 2. KEYBOARD & GAUGE LOGIC (Unchanged) // ========================================== const keyMap = {'w':'w','s':'s','a':'a','d':'d','q':'q','e':'e','h':'h','j':'j','l':'l'}; let controlWs = null; let controlWsReady = false; let pendingControlActions = []; const getAuthToken = () => { // Try to get token from data attribute first const container = document.getElementById('keyboard-token-container'); const token = container ? container.getAttribute('data-token') : ''; if (token) { console.log('[Control] Got keyboard token from data attribute:', token.substring(0, 15) + '...'); } else { console.log('[Control] No keyboard token available'); } return token; }; const ensureControlWs = () => { if (controlWs && (controlWs.readyState === WebSocket.OPEN || controlWs.readyState === WebSocket.CONNECTING)) { console.log('[Control] WebSocket already exists, state:', controlWs.readyState); return; } const token = getAuthToken(); if (!token) { console.log('[Control] No token available yet, cannot create WebSocket'); return; } const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; const wsUrl = `${protocol}//${window.location.host}/control?token=${encodeURIComponent(token)}`; console.log('[Control] Creating WebSocket:', wsUrl.replace(/token=[^&]+/, 'token=***')); controlWs = new WebSocket(wsUrl); controlWsReady = false; controlWs.onopen = () => { console.log('[Control] WebSocket OPENED'); controlWsReady = true; while (pendingControlActions.length > 0 && controlWs && controlWs.readyState === WebSocket.OPEN) { const action = pendingControlActions.shift(); controlWs.send(JSON.stringify({ action })); } }; controlWs.onclose = (e) => { console.log('[Control] WebSocket CLOSED', e.code, e.reason); controlWsReady = false; }; controlWs.onerror = (e) => { console.error('[Control] WebSocket ERROR', e); controlWsReady = false; }; controlWs.onmessage = (event) => { try { const payload = JSON.parse(event.data); if (payload.pose) { const poseEl = document.querySelector('#pose-state textarea'); if (poseEl) poseEl.value = payload.pose; } } catch (e) { console.warn('[Control] Failed to parse pose update', e); } }; }; const sendControlAction = (action) => { console.log('[Control] Sending action:', action); ensureControlWs(); if (!controlWs || !controlWsReady) { console.log('[Control] WS not ready, queuing action'); pendingControlActions.push(action); return; } console.log('[Control] Sending via WS:', action); controlWs.send(JSON.stringify({ action })); }; let lastPressed = {}; const REPEAT_MS = 120; document.addEventListener('keydown', (ev) => { const key = ev.key.toLowerCase(); if (!keyMap[key]) return; console.log('[Keyboard] Key pressed:', key); const keyEl = document.querySelector(`.key[data-key="${key}"]`); if (keyEl) keyEl.classList.add('active'); const now = Date.now(); if (lastPressed[key] && now - lastPressed[key] < REPEAT_MS) { console.log('[Keyboard] Key repeat ignored'); return; } lastPressed[key] = now; ev.preventDefault(); sendControlAction(keyMap[key]); }); document.addEventListener('keyup', (ev) => { const key = ev.key.toLowerCase(); const keyEl = document.querySelector(`.key[data-key="${key}"]`); if (keyEl) keyEl.classList.remove('active'); }); const updateGaugesFromState = () => { const poseEl = document.querySelector('#pose-state textarea'); if (!poseEl) return; const text = poseEl.value; const match = text.match(/pitch:([\\d.-]+),yaw:([\\d.-]+),roll:([\\d.-]+),body:([\\d.-]+)/); if (!match) return; const values = { pitch: parseFloat(match[1]), yaw: parseFloat(match[2]), roll: parseFloat(match[3]), body: parseFloat(match[4]) }; const gauges = { pitch: [-30, 30], yaw: [-180, 180], roll: [-40, 40], body: [-3, 3] }; Object.entries(gauges).forEach(([name, [min, max]]) => { const normalized = (values[name] - min) / (max - min); const angle = (normalized - 0.5) * 180; const needle = document.querySelector(`.gauge-needle[data-gauge="${name}"]`); if (needle) needle.setAttribute('transform', `rotate(${angle}, 36, 42)`); const display = document.querySelector(`.gauge-value[data-gauge="${name}"]`); if (display) display.textContent = values[name].toFixed(1) + '°'; }); }; setInterval(updateGaugesFromState, 100); const updateStatusStyle = () => { const statusBox = document.querySelector('#status-box'); if (!statusBox || !statusBox.querySelector('textarea')) return; const textarea = statusBox.querySelector('textarea'); const isConnected = textarea.value.includes('Connected'); statusBox.style.background = isConnected ? 'rgba(16, 185, 129, 0.15)' : 'rgba(239, 68, 68, 0.15)'; statusBox.style.borderColor = isConnected ? 'rgba(16, 185, 129, 0.4)' : 'rgba(239, 68, 68, 0.4)'; textarea.style.color = isConnected ? '#10b981' : '#ef4444'; }; setInterval(updateStatusStyle, 500); console.log('🎮 Controls & Mic Select Ready'); } """ # ------------------------------------------------------------------- # Gradio UI with new styling # ------------------------------------------------------------------- with gr.Blocks( title="Reachy Controller", theme=gr.themes.Base( primary_hue="violet", neutral_hue="slate", ), css=CUSTOM_CSS, js=APP_JS, ) as demo: def check_auth(profile: gr.OAuthProfile | None): """Check if user is authenticated and return visibility states.""" is_authenticated = profile is not None return { login_view: gr.update(visible=not is_authenticated), main_view: gr.update(visible=is_authenticated), } # Login View (shown when not authenticated) with gr.Column(visible=True, elem_classes="login-container") as login_view: with gr.Column(elem_classes="login-card"): gr.HTML(""" """) gr.Markdown("# Reachy Mini", elem_classes="login-title") gr.Markdown( "Sign in with your Hugging Face account to connect and control your robot remotely.", elem_classes="login-description" ) gr.LoginButton(value="🤗 Sign in with Hugging Face", variant="primary", size="lg") # Main View (shown when authenticated) with gr.Column(visible=False) as main_view: # Header with gr.Row(elem_id="header-row"): gr.Markdown("## 🤖 Reachy Mini", elem_id="app-title") gr.LoginButton(size="sm") status_box = gr.Textbox( value="🔴 Not authenticated", show_label=False, container=False, elem_id="status-box", ) with gr.Row(): # Left column - Controls with gr.Column(scale=1, min_width=280): # Small visualization window if os.path.exists(viz_dir): # Create a function that generates 3D viz iframe with OAuth token def get_viz_html(oauth_token: gr.OAuthToken | None, profile: gr.OAuthProfile | None) -> str: if oauth_token is None or profile is None: return """

🔒 Please log in to view 3D visualization

""" # Pass token as query parameter to the iframe # The JavaScript in the iframe will use this to connect to /joint_states WebSocket iframe_url = f"/viz/index.html?token={oauth_token.token}" return f"""
""" viz_html = gr.HTML(label="🤖 Reachy 3D") # Update viz HTML when page loads or user logs in demo.load(get_viz_html, inputs=[], outputs=viz_html) else: gr.Markdown("❌ Visualization not available") # Hidden pose state textbox - polls pose for JS gauges pose_state = gr.Textbox( value=get_pose_string, every=0.2, show_label=False, container=False, elem_id="pose-state", ) # Audio section with gr.Group(elem_id="audio-section"): gr.Markdown("### 🎧 Audio") # Create a function that generates audio HTML with OAuth token injected def get_audio_player_html(oauth_token: gr.OAuthToken | None, profile: gr.OAuthProfile | None) -> str: if oauth_token is None or profile is None: return """

🔒 Please log in to enable audio

Click "Sign in with Hugging Face" above

""" # Inject token directly into onclick handler token = oauth_token.token return f"""
Status: Stopped
""" audio_html_component = gr.HTML(label="đŸŽĩ Robot Audio") # Update audio HTML when page loads or user logs in demo.load(get_audio_player_html, inputs=[], outputs=audio_html_component) # Keyboard controls visualization (inline) with gr.Group(elem_classes="control-card"): gr.Markdown("### âŒ¨ī¸ Keyboard Controls") # Create function to inject token into keyboard controls def get_keyboard_html(oauth_token: gr.OAuthToken | None, profile: gr.OAuthProfile | None) -> str: if oauth_token is None or profile is None: return """

🔒 Please log in to enable keyboard controls

Click "Sign in with Hugging Face" above

""" # Inject token directly into keyboard HTML via data attribute token = oauth_token.token return f""" {KEYBOARD_VIZ_HTML_INLINE} """ keyboard_html = gr.HTML() demo.load(get_keyboard_html, inputs=[], outputs=keyboard_html) # Quick actions with gr.Group(elem_classes="control-card"): gr.Markdown("### ⚡ Quick Actions") with gr.Row(): btn_center_quick = gr.Button("🏠 Center", elem_classes="quick-btn") btn_look_up = gr.Button("👀 Look Up", elem_classes="quick-btn") with gr.Row(): btn_curious = gr.Button("🎭 Curious", elem_classes="quick-btn") btn_excited = gr.Button("🎉 Excited", elem_classes="quick-btn") # Robot orientation gauges (inline) with gr.Group(elem_classes="control-card"): gr.Markdown("### 📍 Robot Orientation") gr.HTML(GAUGES_HTML_INLINE) # Wire up quick action buttons btn_center_quick.click(fn=qa_center, outputs=[pose_state], queue=False) btn_look_up.click(fn=qa_look_up, outputs=[pose_state], queue=False) btn_curious.click(fn=qa_curious, outputs=[pose_state], queue=False) btn_excited.click(fn=qa_excited, outputs=[pose_state], queue=False) # Right column - Video with gr.Column(scale=3, elem_id="video-column"): with gr.Row(): # Main video feed with gr.Column(): gr.Markdown("### đŸŽŦ Robot Camera Feed", elem_id="video-title") # Create a function that generates video HTML with OAuth token def get_video_html(oauth_token: gr.OAuthToken | None, profile: gr.OAuthProfile | None) -> str: if oauth_token is None or profile is None: return """

🔒 Please log in to view video

Click "Sign in with Hugging Face" above

""" # Include token in video URL video_url = f"/video_feed?token={oauth_token.token}" return f""" """ video_html = gr.HTML(label="") # Update video HTML when page loads or user logs in demo.load(get_video_html, inputs=[], outputs=video_html) # Latency display below latency_box = gr.Textbox( value="No latency data available", show_label=False, container=False, elem_id="latency-box", lines=2, ) # Toggle views based on authentication demo.load(check_auth, inputs=[], outputs=[login_view, main_view]) # Create timers for periodic updates status_timer = gr.Timer(2) # Update every 2 seconds latency_timer = gr.Timer(1) # Update every 1 second # Wire up timers to update functions status_timer.tick(get_connection_status_for_user, inputs=[], outputs=status_box) latency_timer.tick(get_latency_display_for_user, inputs=[], outputs=latency_box) # ------------------------------------------------------------------- # 10. Mount & run # ------------------------------------------------------------------- app = gr.mount_gradio_app(app, demo, path="/") if __name__ == "__main__": print("🚀 Server starting on http://0.0.0.0:7860") print("â„šī¸ Point your Robot/Sim to: ws://:7860/robot") uvicorn.run(app, host="0.0.0.0", port=7860, proxy_headers=True, forwarded_allow_ips="*", log_level="warning")