G.E.N.I.EmarkII / handler.py
nihalaninihal's picture
Update handler.py
76fb0a8 verified
# basic_handler.py
import asyncio
import base64
import json
import os
import traceback
from websockets.asyncio.client import connect
# Configuration for Gemini API
host = "generativelanguage.googleapis.com"
model = "gemini-2.0-flash-live-001" # You can change this to a different model if needed
api_key_env = os.environ.get("GOOGLE_API_KEY", "")
uri_template = f"wss://{host}/ws/google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContent?key={{api_key}}"
class AudioLoop:
def __init__(self):
self.ws = None
# Queue for messages to be sent *to* Gemini
self.out_queue = asyncio.Queue()
# Queue for PCM audio received *from* Gemini
self.audio_in_queue = asyncio.Queue()
# Flag to signal shutdown
self.shutdown_event = asyncio.Event()
async def startup(self, api_key=None):
"""Send the model setup message to Gemini.
Args:
api_key: API key to use (overrides environment variable)
"""
# Use provided API key or fallback to environment variable
key = api_key or api_key_env
if not key:
raise ValueError("No API key provided and GOOGLE_API_KEY environment variable not set")
uri = uri_template.format(api_key=key)
self.ws = await connect(uri, additional_headers={"Content-Type": "application/json"})
# Absolutely minimal setup message
setup_msg = {
"setup": {
"model": f"models/{model}"
}
}
await self.ws.send(json.dumps(setup_msg))
raw_response = await self.ws.recv()
setup_response = json.loads(raw_response)
print("[AudioLoop] Setup response from Gemini:", setup_response)
async def send_realtime(self):
"""Read from out_queue and forward those messages to Gemini in real time."""
try:
while not self.shutdown_event.is_set():
# Get next message from queue with timeout
try:
msg = await asyncio.wait_for(self.out_queue.get(), 0.5)
await self.ws.send(json.dumps(msg))
except asyncio.TimeoutError:
# No message in queue, continue checking
continue
except asyncio.CancelledError:
print("[AudioLoop] send_realtime task cancelled")
except Exception as e:
print(f"[AudioLoop] Error in send_realtime: {e}")
traceback.print_exc()
finally:
print("[AudioLoop] send_realtime task ended")
async def receive_audio(self):
"""Read from Gemini websocket and process responses."""
try:
while not self.shutdown_event.is_set():
try:
raw_response = await asyncio.wait_for(self.ws.recv(), 0.5)
response = json.loads(raw_response)
# Print for debugging
print(f"[AudioLoop] Received response: {json.dumps(response)[:500]}...")
# Process audio data if present
try:
# Check for inline PCM data
if ("serverContent" in response and
"modelTurn" in response["serverContent"] and
"parts" in response["serverContent"]["modelTurn"]):
parts = response["serverContent"]["modelTurn"]["parts"]
for part in parts:
if "inlineData" in part and "data" in part["inlineData"]:
b64data = part["inlineData"]["data"]
pcm_data = base64.b64decode(b64data)
await self.audio_in_queue.put(pcm_data)
except Exception as e:
print(f"[AudioLoop] Error extracting audio: {e}")
# Handle tool calls if present
tool_call = response.pop('toolCall', None)
if tool_call:
print(f"[AudioLoop] Tool call received: {tool_call}")
# Send simple OK response for now
for fc in tool_call.get('functionCalls', []):
resp_msg = {
'tool_response': {
'function_responses': [{
'id': fc.get('id', ''),
'name': fc.get('name', ''),
'response': {'result': {'string_value': 'ok'}}
}]
}
}
await self.ws.send(json.dumps(resp_msg))
except asyncio.TimeoutError:
# No message received, continue checking
continue
except Exception as e:
print(f"[AudioLoop] Error processing message: {e}")
traceback.print_exc()
except asyncio.CancelledError:
print("[AudioLoop] receive_audio task cancelled")
except Exception as e:
print(f"[AudioLoop] Error in receive_audio: {e}")
traceback.print_exc()
finally:
print("[AudioLoop] receive_audio task ended")
async def run(self):
"""Main entry point: connects to Gemini, starts send/receive tasks."""
try:
# Initialize the connection with Gemini
await self.startup()
# Start processing tasks
try:
# Create tasks for sending and receiving data
send_task = asyncio.create_task(self.send_realtime())
receive_task = asyncio.create_task(self.receive_audio())
# Wait for shutdown event
await self.shutdown_event.wait()
# Cancel tasks
send_task.cancel()
receive_task.cancel()
# Wait for tasks to complete
await asyncio.gather(send_task, receive_task, return_exceptions=True)
finally:
# Clean up connection
try:
await self.ws.close()
print("[AudioLoop] Closed WebSocket connection")
except Exception as e:
print(f"[AudioLoop] Error closing Gemini connection: {e}")
except asyncio.CancelledError:
print("[AudioLoop] run task cancelled")
except Exception as e:
print(f"[AudioLoop] Error in run: {e}")
traceback.print_exc()
finally:
print("[AudioLoop] run task ended")
def stop(self):
"""Signal tasks to stop and clean up resources."""
self.shutdown_event.set()