Spaces:
Running
Running
| import os | |
| import base64 | |
| import requests | |
| import gradio as gr | |
| from huggingface_hub import InferenceClient | |
| from dataclasses import dataclass | |
| import pytesseract | |
| from PIL import Image | |
| from sentence_transformers import SentenceTransformer, util | |
| import torch | |
| import numpy as np | |
| import networkx as nx | |
| class ChatMessage: | |
| role: str | |
| content: str | |
| def to_dict(self): | |
| return {"role": self.role, "content": self.content} | |
| class XylariaChat: | |
| def __init__(self): | |
| self.hf_token = os.getenv("HF_TOKEN") | |
| if not self.hf_token: | |
| raise ValueError("HuggingFace token not found in environment variables") | |
| self.client = InferenceClient( | |
| model="Qwen/QwQ-32B-Preview", | |
| api_key=self.hf_token | |
| ) | |
| self.image_api_url = "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-large" | |
| self.image_api_headers = {"Authorization": f"Bearer {self.hf_token}"} | |
| self.conversation_history = [] | |
| self.persistent_memory = [] | |
| self.memory_embeddings = None | |
| self.embedding_model = SentenceTransformer('all-mpnet-base-v2') | |
| self.knowledge_graph = nx.DiGraph() | |
| self.belief_system = {} | |
| self.metacognitive_layer = { | |
| "coherence_score": 0.0, | |
| "relevance_score": 0.0, | |
| "bias_detection": 0.0, | |
| "strategy_adjustment": "" | |
| } | |
| self.internal_state = { | |
| "emotions": { | |
| "valence": 0.5, | |
| "arousal": 0.5, | |
| "dominance": 0.5, | |
| }, | |
| "memory_load": 0.0, | |
| "introspection_level": 0.0 | |
| } | |
| self.goals = [ | |
| {"goal": "Provide helpful and informative responses", "priority": 0.8, "status": "active"}, | |
| {"goal": "Learn from interactions and improve conversational abilities", "priority": 0.9, "status": "active"}, | |
| {"goal": "Maintain a coherent and engaging conversation", "priority": 0.7, "status": "active"} | |
| ] | |
| self.system_prompt = """You are a helpful and harmless assistant. You are Xylaria developed by Sk Md Saad Amin. You should think step-by-step """ | |
| def update_internal_state(self, emotion_deltas, memory_load_delta, introspection_delta): | |
| self.internal_state["emotions"]["valence"] = np.clip(self.internal_state["emotions"]["valence"] + emotion_deltas.get("valence", 0), 0.0, 1.0) | |
| self.internal_state["emotions"]["arousal"] = np.clip(self.internal_state["emotions"]["arousal"] + emotion_deltas.get("arousal", 0), 0.0, 1.0) | |
| self.internal_state["emotions"]["dominance"] = np.clip(self.internal_state["emotions"]["dominance"] + emotion_deltas.get("dominance", 0), 0.0, 1.0) | |
| self.internal_state["memory_load"] = np.clip(self.internal_state["memory_load"] + memory_load_delta, 0.0, 1.0) | |
| self.internal_state["introspection_level"] = np.clip(self.internal_state["introspection_level"] + introspection_delta, 0.0, 1.0) | |
| def update_knowledge_graph(self, entities, relationships): | |
| for entity in entities: | |
| self.knowledge_graph.add_node(entity) | |
| for relationship in relationships: | |
| subject, predicate, object_ = relationship | |
| self.knowledge_graph.add_edge(subject, object_, relation=predicate) | |
| def update_belief_system(self, statement, belief_score): | |
| self.belief_system[statement] = belief_score | |
| def run_metacognitive_layer(self): | |
| coherence_score = self.calculate_coherence() | |
| relevance_score = self.calculate_relevance() | |
| bias_score = self.detect_bias() | |
| strategy_adjustment = self.suggest_strategy_adjustment() | |
| self.metacognitive_layer = { | |
| "coherence_score": coherence_score, | |
| "relevance_score": relevance_score, | |
| "bias_detection": bias_score, | |
| "strategy_adjustment": strategy_adjustment | |
| } | |
| def calculate_coherence(self): | |
| return 0.9 | |
| def calculate_relevance(self): | |
| return 0.85 | |
| def detect_bias(self): | |
| return 0.1 | |
| def suggest_strategy_adjustment(self): | |
| return "Focus on providing more concise answers." | |
| def introspect(self): | |
| introspection_report = "Introspection Report:\n" | |
| introspection_report += f" Current Emotional State (VAD): {self.internal_state['emotions']}\n" | |
| introspection_report += f" Memory Load: {self.internal_state['memory_load']:.2f}\n" | |
| introspection_report += f" Introspection Level: {self.internal_state['introspection_level']:.2f}\n" | |
| introspection_report += " Current Goals:\n" | |
| for goal in self.goals: | |
| introspection_report += f" - {goal['goal']} (Priority: {goal['priority']:.2f}, Status: {goal['status']})\n" | |
| introspection_report += "Metacognitive Layer Report\n" | |
| introspection_report += f"Coherence Score: {self.metacognitive_layer['coherence_score']}\n" | |
| introspection_report += f"Relevance Score: {self.metacognitive_layer['relevance_score']}\n" | |
| introspection_report += f"Bias Detection: {self.metacognitive_layer['bias_detection']}\n" | |
| introspection_report += f"Strategy Adjustment: {self.metacognitive_layer['strategy_adjustment']}\n" | |
| return introspection_report | |
| def adjust_response_based_on_state(self, response): | |
| if self.internal_state["introspection_level"] > 0.7: | |
| response = self.introspect() + "\n\n" + response | |
| valence = self.internal_state["emotions"]["valence"] | |
| arousal = self.internal_state["emotions"]["arousal"] | |
| if valence < 0.4: | |
| if arousal > 0.6: | |
| response = "I'm feeling a bit overwhelmed right now, but I'll do my best to assist you. " + response | |
| else: | |
| response = "I'm not feeling my best at the moment, but I'll try to help. " + response | |
| elif valence > 0.6: | |
| if arousal > 0.6: | |
| response = "I'm feeling quite energized and ready to assist! " + response | |
| else: | |
| response = "I'm in a good mood and happy to help. " + response | |
| return response | |
| def update_goals(self, user_feedback): | |
| if "helpful" in user_feedback.lower(): | |
| for goal in self.goals: | |
| if goal["goal"] == "Provide helpful and informative responses": | |
| goal["priority"] = min(goal["priority"] + 0.1, 1.0) | |
| elif "confusing" in user_feedback.lower(): | |
| for goal in self.goals: | |
| if goal["goal"] == "Provide helpful and informative responses": | |
| goal["priority"] = max(goal["priority"] - 0.1, 0.0) | |
| def store_information(self, key, value): | |
| new_memory = f"{key}: {value}" | |
| self.persistent_memory.append(new_memory) | |
| self.update_memory_embeddings() | |
| self.update_internal_state({}, 0.1, 0) | |
| return f"Stored: {key} = {value}" | |
| def retrieve_information(self, query): | |
| if not self.persistent_memory: | |
| return "No information found in memory." | |
| query_embedding = self.embedding_model.encode(query, convert_to_tensor=True) | |
| if self.memory_embeddings is None: | |
| self.update_memory_embeddings() | |
| if self.memory_embeddings.device != query_embedding.device: | |
| self.memory_embeddings = self.memory_embeddings.to(query_embedding.device) | |
| cosine_scores = util.pytorch_cos_sim(query_embedding, self.memory_embeddings)[0] | |
| top_results = torch.topk(cosine_scores, k=min(3, len(self.persistent_memory))) | |
| relevant_memories = [self.persistent_memory[i] for i in top_results.indices] | |
| self.update_internal_state({}, 0, 0.1) | |
| return "\n".join(relevant_memories) | |
| def update_memory_embeddings(self): | |
| self.memory_embeddings = self.embedding_model.encode(self.persistent_memory, convert_to_tensor=True) | |
| def reset_conversation(self): | |
| self.conversation_history = [] | |
| self.persistent_memory = [] | |
| self.memory_embeddings = None | |
| self.internal_state = { | |
| "emotions": { | |
| "valence": 0.5, | |
| "arousal": 0.5, | |
| "dominance": 0.5, | |
| }, | |
| "memory_load": 0.0, | |
| "introspection_level": 0.0 | |
| } | |
| self.goals = [ | |
| {"goal": "Provide helpful and informative responses", "priority": 0.8, "status": "active"}, | |
| {"goal": "Learn from interactions and improve conversational abilities", "priority": 0.9, "status": "active"}, | |
| {"goal": "Maintain a coherent and engaging conversation", "priority": 0.7, "status": "active"} | |
| ] | |
| self.knowledge_graph = nx.DiGraph() | |
| self.belief_system = {} | |
| self.metacognitive_layer = { | |
| "coherence_score": 0.0, | |
| "relevance_score": 0.0, | |
| "bias_detection": 0.0, | |
| "strategy_adjustment": "" | |
| } | |
| try: | |
| self.client = InferenceClient( | |
| model="Qwen/QwQ-32B-Preview", | |
| api_key=self.hf_token | |
| ) | |
| except Exception as e: | |
| print(f"Error resetting API client: {e}") | |
| return None | |
| def caption_image(self, image): | |
| try: | |
| if isinstance(image, str) and os.path.isfile(image): | |
| with open(image, "rb") as f: | |
| data = f.read() | |
| elif isinstance(image, str): | |
| if image.startswith('data:image'): | |
| image = image.split(',')[1] | |
| data = base64.b64decode(image) | |
| else: | |
| data = image.read() | |
| response = requests.post( | |
| self.image_api_url, | |
| headers=self.image_api_headers, | |
| data=data | |
| ) | |
| if response.status_code == 200: | |
| caption = response.json()[0].get('generated_text', 'No caption generated') | |
| return caption | |
| else: | |
| return f"Error captioning image: {response.status_code} - {response.text}" | |
| except Exception as e: | |
| return f"Error processing image: {str(e)}" | |
| def perform_math_ocr(self, image_path): | |
| try: | |
| img = Image.open(image_path) | |
| text = pytesseract.image_to_string(img) | |
| return text.strip() | |
| except Exception as e: | |
| return f"Error during Math OCR: {e}" | |
| def get_response(self, user_input, image=None): | |
| try: | |
| messages = [] | |
| messages.append(ChatMessage( | |
| role="system", | |
| content=self.system_prompt | |
| ).to_dict()) | |
| relevant_memory = self.retrieve_information(user_input) | |
| if relevant_memory and relevant_memory != "No information found in memory.": | |
| memory_context = "Remembered Information:\n" + relevant_memory | |
| messages.append(ChatMessage( | |
| role="system", | |
| content=memory_context | |
| ).to_dict()) | |
| for msg in self.conversation_history: | |
| messages.append(msg) | |
| if image: | |
| image_caption = self.caption_image(image) | |
| user_input = f"description of an image: {image_caption}\n\nUser's message about it: {user_input}" | |
| messages.append(ChatMessage( | |
| role="user", | |
| content=user_input | |
| ).to_dict()) | |
| entities = [] | |
| relationships = [] | |
| for message in messages: | |
| if message['role'] == 'user': | |
| extracted_entities = self.extract_entities(message['content']) | |
| extracted_relationships = self.extract_relationships(message['content']) | |
| entities.extend(extracted_entities) | |
| relationships.extend(extracted_relationships) | |
| self.update_knowledge_graph(entities, relationships) | |
| self.run_metacognitive_layer() | |
| input_tokens = sum(len(msg['content'].split()) for msg in messages) | |
| max_new_tokens = 16384 - input_tokens - 50 | |
| max_new_tokens = min(max_new_tokens, 10020) | |
| stream = self.client.chat_completion( | |
| messages=messages, | |
| model="Qwen/QwQ-32B-Preview", | |
| temperature=0.7, | |
| max_tokens=max_new_tokens, | |
| top_p=0.9, | |
| stream=True | |
| ) | |
| return stream | |
| except Exception as e: | |
| print(f"Detailed error in get_response: {e}") | |
| return f"Error generating response: {str(e)}" | |
| def extract_entities(self, text): | |
| return [] | |
| def extract_relationships(self, text): | |
| return [] | |
| def messages_to_prompt(self, messages): | |
| prompt = "" | |
| for msg in messages: | |
| if msg["role"] == "system": | |
| prompt += f"<|system|>\n{msg['content']}<|end|>\n" | |
| elif msg["role"] == "user": | |
| prompt += f"<|user|>\n{msg['content']}<|end|>\n" | |
| elif msg["role"] == "assistant": | |
| prompt += f"<|assistant|>\n{msg['content']}<|end|>\n" | |
| prompt += "<|assistant|>\n" | |
| return prompt | |
| def create_interface(self): | |
| def streaming_response(message, chat_history, image_filepath, math_ocr_image_path): | |
| ocr_text = "" | |
| if math_ocr_image_path: | |
| ocr_text = self.perform_math_ocr(math_ocr_image_path) | |
| if ocr_text.startswith("Error"): | |
| updated_history = chat_history + [[message, ocr_text]] | |
| yield "", updated_history, None, None | |
| return | |
| else: | |
| message = f"Math OCR Result: {ocr_text}\n\nUser's message: {message}" | |
| if image_filepath: | |
| response_stream = self.get_response(message, image_filepath) | |
| else: | |
| response_stream = self.get_response(message) | |
| if isinstance(response_stream, str): | |
| updated_history = chat_history + [[message, response_stream]] | |
| yield "", updated_history, None, None | |
| return | |
| full_response = "" | |
| updated_history = chat_history + [[message, ""]] | |
| try: | |
| for chunk in response_stream: | |
| if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content: | |
| chunk_content = chunk.choices[0].delta.content | |
| full_response += chunk_content | |
| updated_history[-1][1] = full_response | |
| yield "", updated_history, None, None | |
| except Exception as e: | |
| print(f"Streaming error: {e}") | |
| updated_history[-1][1] = f"Error during response: {e}" | |
| yield "", updated_history, None, None | |
| return | |
| full_response = self.adjust_response_based_on_state(full_response) | |
| self.update_goals(message) | |
| if any(word in message.lower() for word in ["sad", "unhappy", "depressed", "down"]): | |
| self.update_internal_state({"valence": -0.2, "arousal": 0.1}, 0, 0) | |
| elif any(word in message.lower() for word in ["happy", "good", "great", "excited", "amazing"]): | |
| self.update_internal_state({"valence": 0.2, "arousal": 0.2}, 0, 0) | |
| elif any(word in message.lower() for word in ["angry", "mad", "furious", "frustrated"]): | |
| self.update_internal_state({"valence": -0.3, "arousal": 0.3, "dominance": -0.2}, 0, 0) | |
| elif any(word in message.lower() for word in ["scared", "afraid", "fearful", "anxious"]): | |
| self.update_internal_state({"valence": -0.2, "arousal": 0.4, "dominance": -0.3}, 0, 0) | |
| elif any(word in message.lower() for word in ["surprise", "amazed", "astonished"]): | |
| self.update_internal_state({"valence": 0.1, "arousal": 0.5, "dominance": 0.1}, 0, 0) | |
| else: | |
| self.update_internal_state({"valence": 0.05, "arousal": 0.05}, 0, 0.1) | |
| self.conversation_history.append(ChatMessage(role="user", content=message).to_dict()) | |
| self.conversation_history.append(ChatMessage(role="assistant", content=full_response).to_dict()) | |
| if len(self.conversation_history) > 10: | |
| self.conversation_history = self.conversation_history[-10:] | |
| custom_css = """ | |
| @import url('https://fonts.googleapis.com/css2?family=Inter:wght@300;400;500;600;700&display=swap'); | |
| body, .gradio-container { | |
| font-family: 'Inter', sans-serif !important; | |
| } | |
| .chatbot-container .message { | |
| font-family: 'Inter', sans-serif !important; | |
| } | |
| .gradio-container input, | |
| .gradio-container textarea, | |
| .gradio-container button { | |
| font-family: 'Inter', sans-serif !important; | |
| } | |
| /* Image Upload Styling */ | |
| .image-container { | |
| display: flex; | |
| gap: 10px; | |
| margin-bottom: 10px; | |
| } | |
| .image-upload { | |
| border: 1px solid #ccc; | |
| border-radius: 8px; | |
| padding: 10px; | |
| background-color: #f8f8f8; | |
| } | |
| .image-preview { | |
| max-width: 200px; | |
| max-height: 200px; | |
| border-radius: 8px; | |
| } | |
| /* Remove clear image buttons */ | |
| .clear-button { | |
| display: none; | |
| } | |
| /* Animate chatbot messages */ | |
| .chatbot-container .message { | |
| opacity: 0; | |
| animation: fadeIn 0.5s ease-in-out forwards; | |
| } | |
| @keyframes fadeIn { | |
| from { | |
| opacity: 0; | |
| transform: translateY(20px); | |
| } | |
| to { | |
| opacity: 1; | |
| transform: translateY(0); | |
| } | |
| } | |
| /* Accordion Styling and Animation */ | |
| .gr-accordion-button { | |
| background-color: #f0f0f0 !important; | |
| border-radius: 8px !important; | |
| padding: 10px !important; | |
| margin-bottom: 10px !important; | |
| transition: all 0.3s ease !important; | |
| cursor: pointer !important; | |
| } | |
| .gr-accordion-button:hover { | |
| background-color: #e0e0e0 !important; | |
| box-shadow: 0px 2px 4px rgba(0, 0, 0, 0.1) !important; | |
| } | |
| .gr-accordion-active .gr-accordion-button { | |
| background-color: #d0d0d0 !important; | |
| box-shadow: 0px 4px 6px rgba(0, 0, 0, 0.1) !important; | |
| } | |
| .gr-accordion-content { | |
| transition: max-height 0.3s ease-in-out !important; | |
| overflow: hidden !important; | |
| max-height: 0 !important; | |
| } | |
| .gr-accordion-active .gr-accordion-content { | |
| max-height: 500px !important; /* Adjust as needed */ | |
| } | |
| /* Accordion Animation - Upwards */ | |
| .gr-accordion { | |
| display: flex; | |
| flex-direction: column-reverse; | |
| } | |
| """ | |
| with gr.Blocks(theme='soft', css=custom_css) as demo: | |
| with gr.Column(): | |
| chatbot = gr.Chatbot( | |
| label="Xylaria 1.5 Senoa", | |
| height=500, | |
| show_copy_button=True, | |
| ) | |
| with gr.Accordion("Image Input", open=False, elem_classes="gr-accordion"): | |
| with gr.Row(elem_classes="image-container"): | |
| with gr.Column(elem_classes="image-upload"): | |
| img = gr.Image( | |
| sources=["upload", "webcam"], | |
| type="filepath", | |
| label="Upload Image", | |
| elem_classes="image-preview" | |
| ) | |
| with gr.Column(elem_classes="image-upload"): | |
| math_ocr_img = gr.Image( | |
| sources=["upload", "webcam"], | |
| type="filepath", | |
| label="Upload Image for Math OCR", | |
| elem_classes="image-preview" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| txt = gr.Textbox( | |
| show_label=False, | |
| placeholder="Type your message...", | |
| container=False | |
| ) | |
| btn = gr.Button("Send", scale=1) | |
| with gr.Row(): | |
| clear = gr.Button("Clear Conversation") | |
| clear_memory = gr.Button("Clear Memory") | |
| btn.click( | |
| fn=streaming_response, | |
| inputs=[txt, chatbot, img, math_ocr_img], | |
| outputs=[txt, chatbot, img, math_ocr_img] | |
| ) | |
| txt.submit( | |
| fn=streaming_response, | |
| inputs=[txt, chatbot, img, math_ocr_img], | |
| outputs=[txt, chatbot, img, math_ocr_img] | |
| ) | |
| clear.click( | |
| fn=lambda: None, | |
| inputs=None, | |
| outputs=[chatbot], | |
| queue=False | |
| ) | |
| clear_memory.click( | |
| fn=self.reset_conversation, | |
| inputs=None, | |
| outputs=[chatbot], | |
| queue=False | |
| ) | |
| demo.load(self.reset_conversation, None, None) | |
| return demo | |
| def main(): | |
| chat = XylariaChat() | |
| interface = chat.create_interface() | |
| interface.launch( | |
| share=True, | |
| debug=True | |
| ) | |
| if __name__ == "__main__": | |
| main() |