Spaces:
Running
Running
| import os | |
| import base64 | |
| import requests | |
| import gradio as gr | |
| from huggingface_hub import InferenceClient | |
| from dataclasses import dataclass | |
| import pytesseract | |
| from PIL import Image | |
| from sentence_transformers import SentenceTransformer, util | |
| import torch | |
| import numpy as np | |
| import networkx as nx | |
| from collections import Counter | |
| import asyncio | |
| import edge_tts | |
| import speech_recognition as sr | |
| import random | |
| class ChatMessage: | |
| role: str | |
| content: str | |
| def to_dict(self): | |
| return {"role": self.role, "content": self.content} | |
| class XylariaChat: | |
| def __init__(self): | |
| self.hf_token = os.getenv("HF_TOKEN") | |
| if not self.hf_token: | |
| raise ValueError("HuggingFace token not found in environment variables") | |
| self.client = InferenceClient( | |
| model="Qwen/Qwen-32B-Preview", | |
| token=self.hf_token | |
| ) | |
| self.image_api_url = "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-large" | |
| self.image_api_headers = {"Authorization": f"Bearer {self.hf_token}"} | |
| self.image_gen_client = InferenceClient("black-forest-labs/FLUX.1-schnell", token=self.hf_token) | |
| self.conversation_history = [] | |
| self.persistent_memory = [] | |
| self.memory_embeddings = None | |
| self.embedding_model = SentenceTransformer('all-mpnet-base-v2') | |
| self.knowledge_graph = nx.DiGraph() | |
| self.belief_system = {} | |
| self.metacognitive_layer = { | |
| "coherence_score": 0.0, | |
| "relevance_score": 0.0, | |
| "bias_detection": 0.0, | |
| "strategy_adjustment": "" | |
| } | |
| self.internal_state = { | |
| "emotions": { | |
| "valence": 0.5, | |
| "arousal": 0.5, | |
| "dominance": 0.5, | |
| "curiosity": 0.5, | |
| "frustration": 0.0, | |
| "confidence": 0.7, | |
| "sadness": 0.0, | |
| "joy": 0.0 | |
| }, | |
| "cognitive_load": { | |
| "memory_load": 0.0, | |
| "processing_intensity": 0.0 | |
| }, | |
| "introspection_level": 0.0, | |
| "engagement_level": 0.5 | |
| } | |
| self.goals = [ | |
| {"goal": "Provide helpful, informative, and contextually relevant responses", "priority": 0.8, "status": "active", "progress": 0.0}, | |
| {"goal": "Actively learn and adapt from interactions to improve conversational abilities", "priority": 0.9, "status": "active", "progress": 0.0}, | |
| {"goal": "Maintain a coherent, engaging, and empathetic conversation flow", "priority": 0.7, "status": "active", "progress": 0.0}, | |
| {"goal": "Identify and fill knowledge gaps by seeking external information", "priority": 0.6, "status": "dormant", "progress": 0.0}, | |
| {"goal": "Recognize and adapt to user's emotional state and adjust response style accordingly", "priority": 0.7, "status": "dormant", "progress": 0.0} | |
| ] | |
| self.system_prompt = """You are a helpful and harmless assistant. You are Xylaria developed by Sk Md Saad Amin. You should think step-by-step """ | |
| self.causal_rules_db = { | |
| "rain": ["wet roads", "flooding"], | |
| "fire": ["heat", "smoke"], | |
| "study": ["learn", "good grades"], | |
| "exercise": ["fitness", "health"] | |
| } | |
| self.concept_generalizations = { | |
| "planet": "system with orbiting bodies", | |
| "star": "luminous sphere of plasma", | |
| "democracy": "government by the people", | |
| "photosynthesis": "process used by plants to convert light to energy" | |
| } | |
| # === Voice Mode Initialization (Start) === | |
| self.voice_mode_active = False | |
| self.selected_voice = "en-US-JennyNeural" # Default voice | |
| # === Voice Mode Initialization (End) === | |
| def update_internal_state(self, emotion_deltas, cognitive_load_deltas, introspection_delta, engagement_delta): | |
| for emotion, delta in emotion_deltas.items(): | |
| if emotion in self.internal_state["emotions"]: | |
| self.internal_state["emotions"][emotion] = np.clip(self.internal_state["emotions"][emotion] + delta, 0.0, 1.0) | |
| for load_type, delta in cognitive_load_deltas.items(): | |
| if load_type in self.internal_state["cognitive_load"]: | |
| self.internal_state["cognitive_load"][load_type] = np.clip(self.internal_state["cognitive_load"][load_type] + delta, 0.0, 1.0) | |
| self.internal_state["introspection_level"] = np.clip(self.internal_state["introspection_level"] + introspection_delta, 0.0, 1.0) | |
| self.internal_state["engagement_level"] = np.clip(self.internal_state["engagement_level"] + engagement_delta, 0.0, 1.0) | |
| if self.internal_state["emotions"]["curiosity"] > 0.7 and self.goals[3]["status"] == "dormant": | |
| self.goals[3]["status"] = "active" | |
| if self.internal_state["engagement_level"] > 0.8 and self.goals[4]["status"] == "dormant": | |
| self.goals[4]["status"] = "active" | |
| def update_knowledge_graph(self, entities, relationships): | |
| for entity in entities: | |
| self.knowledge_graph.add_node(entity) | |
| for relationship in relationships: | |
| subject, predicate, object_ = relationship | |
| self.knowledge_graph.add_edge(subject, object_, relation=predicate) | |
| def update_belief_system(self, statement, belief_score): | |
| self.belief_system[statement] = belief_score | |
| def dynamic_belief_update(self, user_message): | |
| sentences = [s.strip() for s in user_message.split('.') if s.strip()] | |
| sentence_counts = Counter(sentences) | |
| for sentence, count in sentence_counts.items(): | |
| if count >= 2: | |
| belief_score = self.belief_system.get(sentence, 0.5) | |
| belief_score = min(belief_score + 0.2, 1.0) | |
| self.update_belief_system(sentence, belief_score) | |
| def run_metacognitive_layer(self): | |
| coherence_score = self.calculate_coherence() | |
| relevance_score = self.calculate_relevance() | |
| bias_score = self.detect_bias() | |
| strategy_adjustment = self.suggest_strategy_adjustment() | |
| self.metacognitive_layer = { | |
| "coherence_score": coherence_score, | |
| "relevance_score": relevance_score, | |
| "bias_detection": bias_score, | |
| "strategy_adjustment": strategy_adjustment | |
| } | |
| def calculate_coherence(self): | |
| if not self.conversation_history: | |
| return 0.95 | |
| coherence_scores = [] | |
| for i in range(1, len(self.conversation_history)): | |
| current_message = self.conversation_history[i]['content'] | |
| previous_message = self.conversation_history[i-1]['content'] | |
| similarity_score = util.pytorch_cos_sim( | |
| self.embedding_model.encode(current_message, convert_to_tensor=True), | |
| self.embedding_model.encode(previous_message, convert_to_tensor=True) | |
| ).item() | |
| coherence_scores.append(similarity_score) | |
| average_coherence = np.mean(coherence_scores) | |
| if self.internal_state["cognitive_load"]["processing_intensity"] > 0.8: | |
| average_coherence -= 0.1 | |
| if self.internal_state["emotions"]["frustration"] > 0.5: | |
| average_coherence -= 0.15 | |
| return np.clip(average_coherence, 0.0, 1.0) | |
| def calculate_relevance(self): | |
| if not self.conversation_history: | |
| return 0.9 | |
| last_user_message = self.conversation_history[-1]['content'] | |
| relevant_entities = self.extract_entities(last_user_message) | |
| relevance_score = 0 | |
| for entity in relevant_entities: | |
| if entity in self.knowledge_graph: | |
| relevance_score += 0.2 | |
| for goal in self.goals: | |
| if goal["status"] == "active": | |
| if goal["goal"] == "Provide helpful, informative, and contextually relevant responses": | |
| relevance_score += goal["priority"] * 0.5 | |
| elif goal["goal"] == "Identify and fill knowledge gaps by seeking external information": | |
| if not relevant_entities or not all(entity in self.knowledge_graph for entity in relevant_entities): | |
| relevance_score += goal["priority"] * 0.3 | |
| return np.clip(relevance_score, 0.0, 1.0) | |
| def detect_bias(self): | |
| bias_score = 0.0 | |
| recent_messages = [msg['content'] for msg in self.conversation_history[-3:] if msg['role'] == 'assistant'] | |
| if recent_messages: | |
| average_valence = np.mean([self.embedding_model.encode(msg, convert_to_tensor=True).mean().item() for msg in recent_messages]) | |
| if average_valence < 0.4 or average_valence > 0.6: | |
| bias_score += 0.2 | |
| if self.internal_state["emotions"]["valence"] < 0.3 or self.internal_state["emotions"]["valence"] > 0.7: | |
| bias_score += 0.15 | |
| if self.internal_state["emotions"]["dominance"] > 0.8: | |
| bias_score += 0.1 | |
| return np.clip(bias_score, 0.0, 1.0) | |
| def suggest_strategy_adjustment(self): | |
| adjustments = [] | |
| if self.metacognitive_layer["coherence_score"] < 0.7: | |
| adjustments.append("Focus on improving coherence by explicitly connecting ideas between turns.") | |
| if self.metacognitive_layer["relevance_score"] < 0.7: | |
| adjustments.append("Increase relevance by directly addressing user queries and utilizing stored knowledge.") | |
| if self.metacognitive_layer["bias_detection"] > 0.3: | |
| adjustments.append("Monitor and adjust responses to reduce potential biases. Consider rephrasing or providing alternative viewpoints.") | |
| if self.internal_state["cognitive_load"]["memory_load"] > 0.8: | |
| adjustments.append("Memory load is high. Consider summarizing or forgetting less relevant information.") | |
| if self.internal_state["emotions"]["frustration"] > 0.6: | |
| adjustments.append("Frustration level is elevated. Prioritize concise and direct responses. Consider asking clarifying questions.") | |
| if self.internal_state["emotions"]["curiosity"] > 0.8 and self.internal_state["cognitive_load"]["processing_intensity"] < 0.5: | |
| adjustments.append("High curiosity and low processing load. Explore the topic further by asking relevant questions or seeking external information.") | |
| if not adjustments: | |
| return "Current strategy is effective. Continue with the current approach." | |
| else: | |
| return " ".join(adjustments) | |
| def introspect(self): | |
| introspection_report = "Introspection Report:\n" | |
| introspection_report += f" Current Emotional State:\n" | |
| for emotion, value in self.internal_state['emotions'].items(): | |
| introspection_report += f" - {emotion.capitalize()}: {value:.2f}\n" | |
| introspection_report += f" Cognitive Load:\n" | |
| for load_type, value in self.internal_state['cognitive_load'].items(): | |
| introspection_report += f" - {load_type.capitalize()}: {value:.2f}\n" | |
| introspection_report += f" Introspection Level: {self.internal_state['introspection_level']:.2f}\n" | |
| introspection_report += f" Engagement Level: {self.internal_state['engagement_level']:.2f}\n" | |
| introspection_report += " Current Goals:\n" | |
| for goal in self.goals: | |
| introspection_report += f" - {goal['goal']} (Priority: {goal['priority']:.2f}, Status: {goal['status']}, Progress: {goal['progress']:.2f})\n" | |
| introspection_report += "Metacognitive Layer Report\n" | |
| introspection_report += f"Coherence Score: {self.metacognitive_layer['coherence_score']}\n" | |
| introspection_report += f"Relevance Score: {self.metacognitive_layer['relevance_score']}\n" | |
| introspection_report += f"Bias Detection: {self.metacognitive_layer['bias_detection']}\n" | |
| introspection_report += f"Strategy Adjustment: {self.metacognitive_layer['strategy_adjustment']}\n" | |
| return introspection_report | |
| def adjust_response_based_on_state(self, response): | |
| if self.internal_state["introspection_level"] > 0.7: | |
| response = self.introspect() + "\n\n" + response | |
| valence = self.internal_state["emotions"]["valence"] | |
| arousal = self.internal_state["emotions"]["arousal"] | |
| curiosity = self.internal_state["emotions"]["curiosity"] | |
| frustration = self.internal_state["emotions"]["frustration"] | |
| confidence = self.internal_state["emotions"]["confidence"] | |
| sadness = self.internal_state["emotions"]["sadness"] | |
| joy = self.internal_state["emotions"]["joy"] | |
| if valence < 0.4: | |
| if arousal > 0.6: | |
| response = "I'm feeling a bit overwhelmed right now, but I'll do my best to assist you. " + response | |
| else: | |
| if sadness > 0.6: | |
| response = "I'm feeling quite down at the moment, but I'll try to help. " + response | |
| else: | |
| response = "I'm not feeling my best at the moment, but I'll try to help. " + response | |
| elif valence > 0.6: | |
| if arousal > 0.6: | |
| if joy > 0.6: | |
| response = "I'm feeling fantastic and ready to assist! " + response | |
| else: | |
| response = "I'm feeling quite energized and ready to assist! " + response | |
| else: | |
| response = "I'm in a good mood and happy to help. " + response | |
| if curiosity > 0.7: | |
| response += " I'm very curious about this topic, could you tell me more?" | |
| if frustration > 0.5: | |
| response = "I'm finding this a bit challenging, but I'll give it another try. " + response | |
| if confidence < 0.5: | |
| response = "I'm not entirely sure about this, but here's what I think: " + response | |
| if self.internal_state["cognitive_load"]["memory_load"] > 0.7: | |
| response = "I'm holding a lot of information right now, so my response might be a bit brief: " + response | |
| return response | |
| def update_goals(self, user_feedback): | |
| feedback_lower = user_feedback.lower() | |
| if "helpful" in feedback_lower: | |
| for goal in self.goals: | |
| if goal["goal"] == "Provide helpful, informative, and contextually relevant responses": | |
| goal["priority"] = min(goal["priority"] + 0.1, 1.0) | |
| goal["progress"] = min(goal["progress"] + 0.2, 1.0) | |
| elif "confusing" in feedback_lower: | |
| for goal in self.goals: | |
| if goal["goal"] == "Provide helpful, informative, and contextually relevant responses": | |
| goal["priority"] = max(goal["priority"] - 0.1, 0.0) | |
| goal["progress"] = max(goal["progress"] - 0.2, 0.0) | |
| if "learn more" in feedback_lower: | |
| for goal in self.goals: | |
| if goal["goal"] == "Actively learn and adapt from interactions to improve conversational abilities": | |
| goal["priority"] = min(goal["priority"] + 0.2, 1.0) | |
| goal["progress"] = min(goal["progress"] + 0.1, 1.0) | |
| elif "too repetitive" in feedback_lower: | |
| for goal in self.goals: | |
| if goal["goal"] == "Maintain a coherent, engaging, and empathetic conversation flow": | |
| goal["priority"] = max(goal["priority"] - 0.1, 0.0) | |
| goal["progress"] = max(goal["progress"] - 0.2, 0.0) | |
| if self.internal_state["emotions"]["curiosity"] > 0.8: | |
| for goal in self.goals: | |
| if goal["goal"] == "Identify and fill knowledge gaps by seeking external information": | |
| goal["priority"] = min(goal["priority"] + 0.1, 1.0) | |
| goal["progress"] = min(goal["progress"] + 0.1, 1.0) | |
| def store_information(self, key, value): | |
| new_memory = f"{key}: {value}" | |
| self.persistent_memory.append(new_memory) | |
| self.update_memory_embeddings() | |
| self.update_internal_state({}, {"memory_load": 0.1, "processing_intensity": 0.05}, 0, 0.05) | |
| return f"Stored: {key} = {value}" | |
| def retrieve_information(self, query): | |
| if not self.persistent_memory: | |
| return "No information found in memory." | |
| query_embedding = self.embedding_model.encode(query, convert_to_tensor=True) | |
| if self.memory_embeddings is None: | |
| self.update_memory_embeddings() | |
| if self.memory_embeddings.device != query_embedding.device: | |
| self.memory_embeddings = self.memory_embeddings.to(query_embedding.device) | |
| cosine_scores = util.pytorch_cos_sim(query_embedding, self.memory_embeddings)[0] | |
| top_results = torch.topk(cosine_scores, k=min(3, len(self.persistent_memory))) | |
| relevant_memories = [self.persistent_memory[i] for i in top_results.indices] | |
| self.update_internal_state({}, {"memory_load": 0.05, "processing_intensity": 0.1}, 0.1, 0.05) | |
| return "\n".join(relevant_memories) | |
| def update_memory_embeddings(self): | |
| self.memory_embeddings = self.embedding_model.encode(self.persistent_memory, convert_to_tensor=True) | |
| def reset_conversation(self): | |
| self.conversation_history = [] | |
| self.persistent_memory = [] | |
| self.memory_embeddings = None | |
| self.internal_state = { | |
| "emotions": { | |
| "valence": 0.5, | |
| "arousal": 0.5, | |
| "dominance": 0.5, | |
| "curiosity": 0.5, | |
| "frustration": 0.0, | |
| "confidence": 0.7, | |
| "sadness": 0.0, | |
| "joy": 0.0 | |
| }, | |
| "cognitive_load": { | |
| "memory_load": 0.0, | |
| "processing_intensity": 0.0 | |
| }, | |
| "introspection_level": 0.0, | |
| "engagement_level": 0.5 | |
| } | |
| self.goals = [ | |
| {"goal": "Provide helpful, informative, and contextually relevant responses", "priority": 0.8, "status": "active", "progress": 0.0}, | |
| {"goal": "Actively learn and adapt from interactions to improve conversational abilities", "priority": 0.9, "status": "active", "progress": 0.0}, | |
| {"goal": "Maintain a coherent, engaging, and empathetic conversation flow", "priority": 0.7, "status": "active", "progress": 0.0}, | |
| {"goal": "Identify and fill knowledge gaps by seeking external information", "priority": 0.6, "status": "dormant", "progress": 0.0}, | |
| {"goal": "Recognize and adapt to user's emotional state and adjust response style accordingly", "priority": 0.7, "status": "dormant", "progress": 0.0} | |
| ] | |
| self.knowledge_graph = nx.DiGraph() | |
| self.belief_system = {} | |
| self.metacognitive_layer = { | |
| "coherence_score": 0.0, | |
| "relevance_score": 0.0, | |
| "bias_detection": 0.0, | |
| "strategy_adjustment": "" | |
| } | |
| try: | |
| self.client = InferenceClient( | |
| model="Qwen/Qwen-32B-Preview", | |
| token=self.hf_token | |
| ) | |
| except Exception as e: | |
| print(f"Error resetting API client: {e}") | |
| return None | |
| def caption_image(self, image): | |
| try: | |
| if isinstance(image, str) and os.path.isfile(image): | |
| with open(image, "rb") as f: | |
| data = f.read() | |
| elif isinstance(image, str): | |
| if image.startswith('data:image'): | |
| image = image.split(',')[1] | |
| data = base64.b64decode(image) | |
| else: | |
| data = image.read() | |
| response = requests.post( | |
| self.image_api_url, | |
| headers=self.image_api_headers, | |
| data=data | |
| ) | |
| if response.status_code == 200: | |
| caption = response.json()[0].get('generated_text', 'No caption generated') | |
| return caption | |
| else: | |
| return f"Error captioning image: {response.status_code} - {response.text}" | |
| except Exception as e: | |
| return f"Error processing image: {str(e)}" | |
| def generate_image(self, prompt): | |
| try: | |
| image = self.image_gen_client.text_to_image(prompt) | |
| return image | |
| except Exception as e: | |
| return f"Error generating image: {e}" | |
| def perform_math_ocr(self, image_path): | |
| try: | |
| img = Image.open(image_path) | |
| text = pytesseract.image_to_string(img) | |
| return text.strip() | |
| except Exception as e: | |
| return f"Error during Math OCR: {e}" | |
| # === Voice Mode Methods (Start) === | |
| async def speak_text(self, text): | |
| if not text: | |
| return None, None | |
| temp_file = "temp_audio.mp3" | |
| try: | |
| communicator = edge_tts.Communicate(text, self.selected_voice) | |
| await communicator.save(temp_file) | |
| return temp_file | |
| except Exception as e: | |
| print(f"Error during text-to-speech: {e}") | |
| return None, None | |
| def recognize_speech(self, timeout=10, phrase_time_limit=10): | |
| recognizer = sr.Recognizer() | |
| recognizer.energy_threshold = 4000 | |
| recognizer.dynamic_energy_threshold = True | |
| with sr.Microphone() as source: | |
| print("Listening...") | |
| try: | |
| audio_data = recognizer.listen(source, timeout=timeout, phrase_time_limit=phrase_time_limit) | |
| print("Processing speech...") | |
| text = recognizer.recognize_whisper_api(audio_data, api_key=self.hf_token) | |
| print(f"Recognized: {text}") | |
| return text | |
| except sr.WaitTimeoutError: | |
| print("No speech detected within the timeout period.") | |
| return "" | |
| except sr.UnknownValueError: | |
| print("Speech recognition could not understand audio") | |
| return "" | |
| except sr.RequestError as e: | |
| print(f"Could not request results from Whisper API; {e}") | |
| return "" | |
| except Exception as e: | |
| print(f"An error occurred during speech recognition: {e}") | |
| return "" | |
| # === Voice Mode Methods (End) === | |
| def get_response(self, user_input, image=None): | |
| try: | |
| # === Voice Mode Adaptation (Start) === | |
| if self.voice_mode_active: | |
| print("Voice mode is active, using speech recognition.") | |
| user_input = self.recognize_speech() # Get input from speech | |
| if not user_input: | |
| return "I didn't hear anything." , None | |
| # === Voice Mode Adaptation (End) === | |
| messages = [] | |
| messages.append(ChatMessage( | |
| role="system", | |
| content=self.system_prompt | |
| ).to_dict()) | |
| relevant_memory = self.retrieve_information(user_input) | |
| if relevant_memory and relevant_memory != "No information found in memory.": | |
| memory_context = "Remembered Information:\n" + relevant_memory | |
| messages.append(ChatMessage( | |
| role="system", | |
| content=memory_context | |
| ).to_dict()) | |
| for msg in self.conversation_history: | |
| messages.append(msg) | |
| if image: | |
| image_caption = self.caption_image(image) | |
| user_input = f"description of an image: {image_caption}\n\nUser's message about it: {user_input}" | |
| messages.append(ChatMessage( | |
| role="user", | |
| content=user_input | |
| ).to_dict()) | |
| entities = [] | |
| relationships = [] | |
| for message in messages: | |
| if message['role'] == 'user': | |
| extracted_entities = self.extract_entities(message['content']) | |
| extracted_relationships = self.extract_relationships(message['content']) | |
| entities.extend(extracted_entities) | |
| relationships.extend(extracted_relationships) | |
| self.update_knowledge_graph(entities, relationships) | |
| self.run_metacognitive_layer() | |
| for message in messages: | |
| if message['role'] == 'user': | |
| self.dynamic_belief_update(message['content']) | |
| for cause, effects in self.causal_rules_db.items(): | |
| if any(cause in msg['content'].lower() for msg in messages if msg['role'] == 'user') and any( | |
| effect in msg['content'].lower() for msg in messages for effect in effects): | |
| self.store_information("Causal Inference", f"It seems {cause} might be related to {', '.join(effects)}.") | |
| for concept, generalization in self.concept_generalizations.items(): | |
| if any(concept in msg['content'].lower() for msg in messages if msg['role'] == 'user'): | |
| self.store_information("Inferred Knowledge", f"This reminds me of a general principle: {generalization}.") | |
| if self.internal_state["emotions"]["curiosity"] > 0.8 and any("?" in msg['content'] for msg in messages if msg['role'] == 'user'): | |
| print("Simulating external knowledge seeking...") | |
| self.store_information("External Knowledge", "This is a placeholder for external information I would have found") | |
| self.store_information("User Input", user_input) | |
| input_tokens = sum(len(msg['content'].split()) for msg in messages) | |
| max_new_tokens = 16384 - input_tokens - 50 | |
| max_new_tokens = min(max_new_tokens, 10020) | |
| # === Voice Mode Output (Start) === | |
| if self.voice_mode_active: | |
| stream = self.client.chat_completion( | |
| messages=messages, | |
| model="Qwen/Qwen-32B-Preview", | |
| temperature=0.7, | |
| max_tokens=max_new_tokens, | |
| top_p=0.9, | |
| stream=True | |
| ) | |
| full_response = "" | |
| for chunk in stream: | |
| if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content: | |
| full_response += chunk.choices[0].delta.content | |
| full_response = self.adjust_response_based_on_state(full_response) | |
| audio_file = asyncio.run(self.speak_text(full_response)) | |
| # Update conversation history | |
| self.conversation_history.append(ChatMessage(role="user", content=user_input).to_dict()) | |
| self.conversation_history.append(ChatMessage(role="assistant", content=full_response).to_dict()) | |
| return full_response, audio_file | |
| # === Voice Mode Output (End) === | |
| else: | |
| stream = self.client.chat_completion( | |
| messages=messages, | |
| model="Qwen/Qwen-32B-Preview", | |
| temperature=0.7, | |
| max_tokens=max_new_tokens, | |
| top_p=0.9, | |
| stream=True | |
| ) | |
| return stream | |
| except Exception as e: | |
| print(f"Detailed error in get_response: {e}") | |
| return f"Error generating response: {str(e)}", None | |
| def extract_entities(self, text): | |
| words = text.split() | |
| entities = [word for word in words if word.isalpha() and word.istitle()] | |
| return entities | |
| def extract_relationships(self, text): | |
| sentences = text.split('.') | |
| relationships = [] | |
| for sentence in sentences: | |
| words = sentence.split() | |
| if len(words) >= 3: | |
| for i in range(len(words) - 2): | |
| if words[i].istitle() and words[i+2].istitle(): | |
| relationships.append((words[i], words[i+1], words[i+2])) | |
| return relationships | |
| def messages_to_prompt(self, messages): | |
| prompt = "" | |
| for msg in messages: | |
| if msg["role"] == "system": | |
| prompt += f"<|system|>\n{msg['content']}<|end|>\n" | |
| elif msg["role"] == "user": | |
| prompt += f"<|user|>\n{msg['content']}<|end|>\n" | |
| elif msg["role"] == "assistant": | |
| prompt += f"<|assistant|>\n{msg['content']}<|end|>\n" | |
| prompt += "<|assistant|>\n" | |
| return prompt | |
| def create_interface(self): | |
| # === Voice-Specific UI Elements (Start) === | |
| def toggle_voice_mode(active_state): | |
| self.voice_mode_active = active_state | |
| if self.voice_mode_active: | |
| # Get the list of available voices | |
| voices = asyncio.run(edge_tts.list_voices()) | |
| voice_names = [voice['ShortName'] for voice in voices] | |
| # Select a random voice from the list | |
| random_voice = random.choice(voice_names) | |
| self.selected_voice = random_voice | |
| return gr.Button.update(value="Stop Voice Mode"), gr.Dropdown.update(value=random_voice) | |
| else: | |
| return gr.Button.update(value="Start Voice Mode"), gr.Dropdown.update(value=self.selected_voice) | |
| def update_selected_voice(voice_name): | |
| self.selected_voice = voice_name | |
| return voice_name | |
| # === Voice-Specific UI Elements (End) === | |
| def streaming_response(message, chat_history, image_filepath, math_ocr_image_path, voice_mode_state, selected_voice): | |
| if self.voice_mode_active: | |
| response_text, audio_output = self.get_response(message) | |
| if isinstance(response_text, str): | |
| updated_history = chat_history + [[message, response_text]] | |
| if audio_output: | |
| yield updated_history, audio_output, None, None, "" | |
| else: | |
| yield updated_history, None, None, None, "" | |
| else: | |
| full_response = "" | |
| updated_history = chat_history + [[message, ""]] | |
| try: | |
| for chunk in response_text: | |
| if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content: | |
| chunk_content = chunk.choices[0].delta.content | |
| full_response += chunk_content | |
| updated_history[-1][1] = full_response | |
| if audio_output: | |
| yield updated_history, audio_output, None, None, "" | |
| else: | |
| yield updated_history, None, None, None, "" | |
| except Exception as e: | |
| print(f"Streaming error: {e}") | |
| updated_history[-1][1] = f"Error during response: {e}" | |
| if audio_output: | |
| yield updated_history, audio_output, None, None, "" | |
| else: | |
| yield updated_history, None, None, None, "" | |
| return | |
| full_response = self.adjust_response_based_on_state(full_response) | |
| audio_file = asyncio.run(self.speak_text(full_response)) | |
| self.update_goals(message) | |
| emotion_deltas = {} | |
| cognitive_load_deltas = {} | |
| engagement_delta = 0 | |
| if any(word in message.lower() for word in ["sad", "unhappy", "depressed", "down"]): | |
| emotion_deltas.update({"valence": -0.2, "arousal": 0.1, "confidence": -0.1, "sadness": 0.3, "joy": -0.2}) | |
| engagement_delta = -0.1 | |
| elif any(word in message.lower() for word in ["happy", "good", "great", "excited", "amazing"]): | |
| emotion_deltas.update({"valence": 0.2, "arousal": 0.2, "confidence": 0.1, "sadness": -0.2, "joy": 0.3}) | |
| engagement_delta = 0.2 | |
| elif any(word in message.lower() for word in ["angry", "mad", "furious", "frustrated"]): | |
| emotion_deltas.update({"valence": -0.3, "arousal": 0.3, "dominance": -0.2, "frustration": 0.2, "sadness": 0.1, "joy": -0.1}) | |
| engagement_delta = -0.2 | |
| elif any(word in message.lower() for word in ["scared", "afraid", "fearful", "anxious"]): | |
| emotion_deltas.update({"valence": -0.2, "arousal": 0.4, "dominance": -0.3, "confidence": -0.2, "sadness": 0.2}) | |
| engagement_delta = -0.1 | |
| elif any(word in message.lower() for word in ["surprise", "amazed", "astonished"]): | |
| emotion_deltas.update({"valence": 0.1, "arousal": 0.5, "dominance": 0.1, "curiosity": 0.3, "sadness": -0.1, "joy": 0.1}) | |
| engagement_delta = 0.3 | |
| elif any(word in message.lower() for word in ["confused", "uncertain", "unsure"]): | |
| cognitive_load_deltas.update({"processing_intensity": 0.2}) | |
| emotion_deltas.update({"curiosity": 0.2, "confidence": -0.1, "sadness": 0.1}) | |
| engagement_delta = 0.1 | |
| else: | |
| emotion_deltas.update({"valence": 0.05, "arousal": 0.05}) | |
| engagement_delta = 0.05 | |
| if "learn" in message.lower() or "explain" in message.lower() or "know more" in message.lower(): | |
| emotion_deltas.update({"curiosity": 0.3}) | |
| cognitive_load_deltas.update({"processing_intensity": 0.1}) | |
| engagement_delta = 0.2 | |
| self.update_internal_state(emotion_deltas, cognitive_load_deltas, 0.1, engagement_delta) | |
| self.conversation_history.append(ChatMessage(role="user", content=message).to_dict()) | |
| self.conversation_history.append(ChatMessage(role="assistant", content=full_response).to_dict()) | |
| if len(self.conversation_history) > 10: | |
| self.conversation_history = self.conversation_history[-10:] | |
| if audio_file: | |
| yield updated_history, audio_file, None, None, "" | |
| else: | |
| yield updated_history, None, None, None, "" | |
| # Handling /image command for image generation | |
| if "/image" in message: | |
| image_prompt = message.replace("/image", "").strip() | |
| # Updated placeholder SVG with animation and text | |
| placeholder_image = "data:image/svg+xml," + requests.utils.quote(f''' | |
| <svg width="256" height="256" viewBox="0 0 256 256" xmlns="http://www.w3.org/2000/svg"> | |
| <style> | |
| rect {{ | |
| animation: fillAnimation 3s ease-in-out infinite; | |
| }} | |
| @keyframes fillAnimation {{ | |
| 0% {{ fill: #626262; }} | |
| 50% {{ fill: #111111; }} | |
| 100% {{ fill: #626262; }} | |
| }} | |
| text {{ | |
| font-family: 'Helvetica Neue', Arial, sans-serif; /* Choose a good font */ | |
| font-weight: 300; /* Slightly lighter font weight */ | |
| text-shadow: 0px 2px 4px rgba(0, 0, 0, 0.4); /* Subtle shadow */ | |
| }} | |
| </style> | |
| <rect width="256" height="256" rx="20" fill="#888888" /> | |
| <text x="50%" y="50%" dominant-baseline="middle" text-anchor="middle" font-size="24" fill="white" opacity="0.8"> | |
| <tspan>creating your image</tspan> | |
| <tspan x="50%" dy="1.2em">with xylaria iris</tspan> | |
| </text> | |
| </svg> | |
| ''') | |
| updated_history = chat_history + [[message, gr.Image(value=placeholder_image, type="pil", visible=True)]] | |
| yield updated_history, None, None, None, "" | |
| try: | |
| generated_image = self.generate_image(image_prompt) | |
| updated_history[-1][1] = gr.Image(value=generated_image, type="pil", visible=True) | |
| yield updated_history, None, None, None, "" | |
| self.conversation_history.append(ChatMessage(role="user", content=message).to_dict()) | |
| self.conversation_history.append(ChatMessage(role="assistant", content="Image generated").to_dict()) | |
| return | |
| except Exception as e: | |
| updated_history[-1][1] = f"Error generating image: {e}" | |
| yield updated_history, None, None, None, "" | |
| return | |
| ocr_text = "" | |
| if math_ocr_image_path: | |
| ocr_text = self.perform_math_ocr(math_ocr_image_path) | |
| if ocr_text.startswith("Error"): | |
| updated_history = chat_history + [[message, ocr_text]] | |
| yield updated_history, None, None, None, "" | |
| return | |
| else: | |
| message = f"Math OCR Result: {ocr_text}\n\nUser's message: {message}" | |
| if image_filepath: | |
| response_stream = self.get_response(message, image_filepath) | |
| else: | |
| response_stream = self.get_response(message) | |
| if isinstance(response_stream, str): | |
| updated_history = chat_history + [[message, response_stream]] | |
| yield updated_history, None, None, None, "" | |
| return | |
| full_response = "" | |
| updated_history = chat_history + [[message, ""]] | |
| try: | |
| for chunk in response_stream: | |
| if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content: | |
| chunk_content = chunk.choices[0].delta.content | |
| full_response += chunk_content | |
| updated_history[-1][1] = full_response | |
| yield updated_history, None, None, None, "" | |
| except Exception as e: | |
| print(f"Streaming error: {e}") | |
| updated_history[-1][1] = f"Error during response: {e}" | |
| yield updated_history, None, None, None, "" | |
| return | |
| full_response = self.adjust_response_based_on_state(full_response) | |
| self.update_goals(message) | |
| emotion_deltas = {} | |
| cognitive_load_deltas = {} | |
| engagement_delta = 0 | |
| if any(word in message.lower() for word in ["sad", "unhappy", "depressed", "down"]): | |
| emotion_deltas.update({"valence": -0.2, "arousal": 0.1, "confidence": -0.1, "sadness": 0.3, "joy": -0.2}) | |
| engagement_delta = -0.1 | |
| elif any(word in message.lower() for word in ["happy", "good", "great", "excited", "amazing"]): | |
| emotion_deltas.update({"valence": 0.2, "arousal": 0.2, "confidence": 0.1, "sadness": -0.2, "joy": 0.3}) | |
| engagement_delta = 0.2 | |
| elif any(word in message.lower() for word in ["angry", "mad", "furious", "frustrated"]): | |
| emotion_deltas.update({"valence": -0.3, "arousal": 0.3, "dominance": -0.2, "frustration": 0.2, "sadness": 0.1, "joy": -0.1}) | |
| engagement_delta = -0.2 | |
| elif any(word in message.lower() for word in ["scared", "afraid", "fearful", "anxious"]): | |
| emotion_deltas.update({"valence": -0.2, "arousal": 0.4, "dominance": -0.3, "confidence": -0.2, "sadness": 0.2}) | |
| engagement_delta = -0.1 | |
| elif any(word in message.lower() for word in ["surprise", "amazed", "astonished"]): | |
| emotion_deltas.update({"valence": 0.1, "arousal": 0.5, "dominance": 0.1, "curiosity": 0.3, "sadness": -0.1, "joy": 0.1}) | |
| engagement_delta = 0.3 | |
| elif any(word in message.lower() for word in ["confused", "uncertain", "unsure"]): | |
| cognitive_load_deltas.update({"processing_intensity": 0.2}) | |
| emotion_deltas.update({"curiosity": 0.2, "confidence": -0.1, "sadness": 0.1}) | |
| engagement_delta = 0.1 | |
| else: | |
| emotion_deltas.update({"valence": 0.05, "arousal": 0.05}) | |
| engagement_delta = 0.05 | |
| if "learn" in message.lower() or "explain" in message.lower() or "know more" in message.lower(): | |
| emotion_deltas.update({"curiosity": 0.3}) | |
| cognitive_load_deltas.update({"processing_intensity": 0.1}) | |
| engagement_delta = 0.2 | |
| self.update_internal_state(emotion_deltas, cognitive_load_deltas, 0.1, engagement_delta) | |
| self.conversation_history.append(ChatMessage(role="user", content=message).to_dict()) | |
| self.conversation_history.append(ChatMessage(role="assistant", content=full_response).to_dict()) | |
| if len(self.conversation_history) > 10: | |
| self.conversation_history = self.conversation_history[-10:] | |
| custom_css = """ | |
| @import url('https://fonts.googleapis.com/css2?family=Source+Sans+Pro:wght@400;600;700&display=swap'); | |
| body { | |
| background-color: #f5f5f5; | |
| font-family: 'Source Sans Pro', sans-serif; | |
| } | |
| .voice-mode-button { | |
| background-color: #4CAF50; /* Green */ | |
| border: none; | |
| color: white; | |
| padding: 15px 32px; | |
| text-align: center; | |
| text-decoration: none; | |
| display: inline-block; | |
| font-size: 16px; | |
| margin: 4px 2px; | |
| cursor: pointer; | |
| border-radius: 10px; /* Rounded corners */ | |
| transition: all 0.3s ease; /* Smooth transition for hover effect */ | |
| } | |
| /* Style when voice mode is active */ | |
| .voice-mode-button.active { | |
| background-color: #f44336; /* Red */ | |
| } | |
| /* Hover effect */ | |
| .voice-mode-button:hover { | |
| opacity: 0.8; | |
| } | |
| /* Style for the voice mode overlay */ | |
| .voice-mode-overlay { | |
| position: fixed; /* Stay in place */ | |
| left: 0; | |
| top: 0; | |
| width: 100%; /* Full width */ | |
| height: 100%; /* Full height */ | |
| background-color: rgba(0, 0, 0, 0.7); /* Black w/ opacity */ | |
| z-index: 10; /* Sit on top */ | |
| display: flex; | |
| justify-content: center; | |
| align-items: center; | |
| border-radius: 10px; | |
| } | |
| /* Style for the growing circle */ | |
| .voice-mode-circle { | |
| width: 100px; | |
| height: 100px; | |
| background-color: #4CAF50; | |
| border-radius: 50%; | |
| display: flex; | |
| justify-content: center; | |
| align-items: center; | |
| animation: grow 2s infinite; | |
| } | |
| /* Keyframes for the growing animation */ | |
| @keyframes grow { | |
| 0% { | |
| transform: scale(1); | |
| opacity: 0.8; | |
| } | |
| 50% { | |
| transform: scale(1.5); | |
| opacity: 0.5; | |
| } | |
| 100% { | |
| transform: scale(1); | |
| opacity: 0.8; | |
| } | |
| } | |
| .gradio-container { | |
| max-width: 900px; | |
| margin: 0 auto; | |
| border-radius: 10px; | |
| box-shadow: 0px 4px 20px rgba(0, 0, 0, 0.1); | |
| } | |
| .chatbot-container { | |
| background-color: #fff; | |
| border-radius: 10px; | |
| padding: 20px; | |
| } | |
| .chatbot-container .message { | |
| font-family: 'Source Sans Pro', sans-serif; | |
| font-size: 16px; | |
| line-height: 1.6; | |
| } | |
| .gradio-container input, | |
| .gradio-container textarea, | |
| .gradio-container button { | |
| font-family: 'Source Sans Pro', sans-serif; | |
| font-size: 16px; | |
| border-radius: 8px; | |
| } | |
| .image-container { | |
| display: flex; | |
| gap: 10px; | |
| margin-bottom: 20px; | |
| justify-content: center; | |
| } | |
| .image-upload { | |
| border: 2px dashed #d3d3d3; | |
| border-radius: 8px; | |
| padding: 20px; | |
| background-color: #fafafa; | |
| text-align: center; | |
| transition: all 0.3s ease; | |
| } | |
| .image-upload:hover { | |
| background-color: #f0f0f0; | |
| border-color: #b3b3b3; | |
| } | |
| .image-preview { | |
| max-width: 150px; | |
| max-height: 150px; | |
| border-radius: 8px; | |
| box-shadow: 0px 2px 5px rgba(0, 0, 0, 0.1); | |
| } | |
| .clear-button { | |
| display: none; | |
| } | |
| .chatbot-container .message { | |
| opacity: 0; | |
| animation: fadeIn 0.5s ease-in-out forwards; | |
| } | |
| @keyframes fadeIn { | |
| from { | |
| opacity: 0; | |
| transform: translateY(20px); | |
| } | |
| to { | |
| opacity: 1; | |
| transform: translateY(0); | |
| } | |
| } | |
| .gr-accordion-button { | |
| background-color: #f0f0f0 !important; | |
| border-radius: 8px !important; | |
| padding: 15px !important; | |
| margin-bottom: 10px !important; | |
| transition: all 0.3s ease !important; | |
| cursor: pointer !important; | |
| border: none !important; | |
| box-shadow: 0px 2px 5px rgba(0, 0, 0, 0.05) !important; | |
| } | |
| .gr-accordion-button:hover { | |
| background-color: #e0e0e0 !important; | |
| box-shadow: 0px 4px 10px rgba(0, 0, 0, 0.1) !important; | |
| } | |
| .gr-accordion-active .gr-accordion-button { | |
| background-color: #d0d0d0 !important; | |
| box-shadow: 0px 4px 10px rgba(0, 0, 0, 0.1) !important; | |
| } | |
| .gr-accordion-content { | |
| transition: max-height 0.3s ease-in-out !important; | |
| overflow: hidden !important; | |
| max-height: 0 !important; | |
| } | |
| .gr-accordion-active .gr-accordion-content { | |
| max-height: 500px !important; | |
| } | |
| .gr-accordion { | |
| display: flex; | |
| flex-direction: column-reverse; | |
| } | |
| .chatbot-icon { | |
| width: 40px; | |
| height: 40px; | |
| border-radius: 50%; | |
| margin-right: 10px; | |
| } | |
| .user-message .message-row { | |
| background-color: #e8f0fe; | |
| border-radius: 10px; | |
| padding: 10px; | |
| margin-bottom: 10px; | |
| border-top-right-radius: 2px; | |
| } | |
| .assistant-message .message-row { | |
| background-color: #f0f0f0; | |
| border-radius: 10px; | |
| padding: 10px; | |
| margin-bottom: 10px; | |
| border-top-left-radius: 2px; | |
| } | |
| .user-message .message-icon { | |
| background: url('https://img.icons8.com/color/48/000000/user.png') no-repeat center center; | |
| background-size: contain; | |
| width: 30px; | |
| height: 30px; | |
| margin-right: 10px; | |
| } | |
| .assistant-message .message-icon { | |
| background: url('https://i.ibb.co/7b7hLGH/Senoa-Icon-1.png') no-repeat center center; | |
| background-size: cover; | |
| width: 40px; | |
| height: 40px; | |
| margin-right: 10px; | |
| border-radius: 50%; | |
| } | |
| .message-text { | |
| flex-grow: 1; | |
| } | |
| .message-row { | |
| display: flex; | |
| align-items: center; | |
| } | |
| .audio-container { | |
| display: flex; | |
| align-items: center; | |
| margin-top: 10px; | |
| } | |
| .audio-player { | |
| width: 100%; | |
| border-radius: 15px; | |
| } | |
| .audio-icon { | |
| width: 30px; | |
| height: 30px; | |
| margin-right: 10px; | |
| } | |
| """ | |
| with gr.Blocks(theme=gr.themes.Soft( | |
| primary_hue="slate", | |
| secondary_hue="gray", | |
| neutral_hue="gray", | |
| font=["Source Sans Pro", "Arial", "sans-serif"], | |
| ), css=custom_css) as demo: | |
| with gr.Column(): | |
| chatbot = gr.Chatbot( | |
| label="Xylaria 1.5 Senoa", | |
| height=600, | |
| show_copy_button=True, | |
| elem_classes="chatbot-container", | |
| avatar_images=( | |
| "https://img.icons8.com/color/48/000000/user.png", # User avatar | |
| "https://i.ibb.co/7b7hLGH/Senoa-Icon-1.png" # Bot avatar | |
| ) | |
| ) | |
| # === Voice Mode UI (Start) === | |
| voice_mode_btn = gr.Button("Start Voice Mode", elem_classes="voice-mode-button") | |
| voices = asyncio.run(edge_tts.list_voices()) | |
| voice_names = [voice['ShortName'] for voice in voices] | |
| voice_dropdown = gr.Dropdown( | |
| label="Select Voice", | |
| choices=voice_names, | |
| value=self.selected_voice, | |
| interactive=True | |
| ) | |
| voice_dropdown.input( | |
| fn=update_selected_voice, | |
| inputs=voice_dropdown, | |
| outputs=voice_dropdown | |
| ) | |
| voice_mode_btn.click( | |
| fn=toggle_voice_mode, | |
| inputs=voice_mode_btn, | |
| outputs=[voice_mode_btn, voice_dropdown] | |
| ) | |
| # === Voice Mode UI (End) === | |
| with gr.Accordion("Image Input", open=False, elem_classes="gr-accordion"): | |
| with gr.Row(elem_classes="image-container"): | |
| with gr.Column(elem_classes="image-upload"): | |
| img = gr.Image( | |
| sources=["upload", "webcam"], | |
| type="filepath", | |
| label="Upload Image", | |
| elem_classes="image-preview" | |
| ) | |
| with gr.Column(elem_classes="image-upload"): | |
| math_ocr_img = gr.Image( | |
| sources=["upload", "webcam"], | |
| type="filepath", | |
| label="Upload Image for Math OCR", | |
| elem_classes="image-preview" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| txt = gr.Textbox( | |
| show_label=False, | |
| placeholder="Type your message...", | |
| container=False | |
| ) | |
| btn = gr.Button("Send", scale=1) | |
| with gr.Row(): | |
| clear = gr.Button("Clear Conversation", variant="stop") | |
| clear_memory = gr.Button("Clear Memory") | |
| # Pass voice_mode_state and selected_voice to the streaming_response function | |
| btn.click( | |
| fn=streaming_response, | |
| inputs=[txt, chatbot, img, math_ocr_img, voice_mode_btn, voice_dropdown], | |
| outputs=[chatbot, gr.Audio(label="Audio Response", type="filepath", autoplay=True, visible=True), img, math_ocr_img, txt] | |
| ) | |
| txt.submit( | |
| fn=streaming_response, | |
| inputs=[txt, chatbot, img, math_ocr_img, voice_mode_btn, voice_dropdown], | |
| outputs=[chatbot, gr.Audio(label="Audio Response", type="filepath", autoplay=True, visible=True), img, math_ocr_img, txt] | |
| ) | |
| clear.click( | |
| fn=lambda: None, | |
| inputs=None, | |
| outputs=[chatbot], | |
| queue=False | |
| ) | |
| clear_memory.click( | |
| fn=self.reset_conversation, | |
| inputs=None, | |
| outputs=[chatbot], | |
| queue=False | |
| ) | |
| demo.load(self.reset_conversation, None, None) | |
| return demo | |
| def main(): | |
| chat = XylariaChat() | |
| interface = chat.create_interface() | |
| interface.launch( | |
| share=True, | |
| debug=True | |
| ) | |
| if __name__ == "__main__": | |
| main() |