megoflow / app.py
ahmedmegahed's picture
Update app.py
ed99b16 verified
# ==============================================================================
# MegoFlow / Actora - Social Post Engagement Analyzer
# ==============================================================================
# تطبيق Gradio عربي للتنبؤ بتفاعل منشورات السوشيال وإظهار تحليل Gemini.
#
# التعليمات المختصرة للتشغيل (على Hugging Face Spaces أو محليًا):
# 1) ضع المتغيرات في .env أو في Settings → Variables:
# - GEMINI_API_KEY="YOUR_KEY"
# - USE_MOCK_MODEL="True" # غيّرها إلى "False" لتفعيل الموديل الحقيقي لاحقًا
# - MODEL_REPO="amrtweg/Actora" # افتراضيًا
# - (اختياري) ACTORA_SCALE_FACTOR="10.0" لتكبير مخرجات الموديل الحقيقي إن كانت صغيرة
#
# 2) ارفع ملف assets/logo.svg (لو غير موجود سيظهر لوجو بديل تلقائي).
#
# 3) تأكد من وجود الحزم في requirements.txt (لا تضف/تحذف حزم هنا؛
# فقط إن وجدت غير مستخدم ضع تعليق TODO داخل requirements وليس هنا).
#
# 4) شغّل التطبيق.
# ==============================================================================
import os
import re
import time
import base64 # (1) إضافة الاستيراد المفقود
import random
import logging
from typing import Dict, Optional
import gradio as gr
import bleach
from dotenv import load_dotenv
import google.generativeai as genai
# مكونات الموديل الحقيقي (تبقى موجودة حتى لو شاغلين Mock افتراضيًا)
import torch
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModelForSequenceClassification
# ----------------------------
# الإعدادات والتهيئة العامة
# ----------------------------
load_dotenv()
GEMINI_API_KEY: Optional[str] = os.getenv("AIzaSyC5k5j7zeDWxJATTzx6zNABXwXQWvLIqw")
USE_MOCK_MODEL: bool = os.getenv("USE_MOCK_MODEL", "True").lower() in ("true", "1", "t", "yes", "y")
MODEL_REPO: str = os.getenv("MODEL_REPO", "amrtweg/Actora")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
# ----------------------------
# تحميل اللوجو كـ Data URL
# ----------------------------
def load_logo_data_url(path: str = "assets/logo.svg") -> str:
"""
يقرأ ملف SVG ويحوله Base64 Data URL.
لو غير موجود، يولّد SVG بسيط Placeholder.
"""
try:
with open(path, "rb") as f:
encoded = base64.b64encode(f.read()).decode("utf-8")
return f"data:image/svg+xml;base64,{encoded}"
except FileNotFoundError:
logging.warning("لم يتم العثور على الملف assets/logo.svg — سيتم استخدام لوجو بديل.")
placeholder_svg = (
'<svg xmlns="http://www.w3.org/2000/svg" width="200" height="60">'
'<defs><linearGradient id="g" x1="0" y1="0" x2="1" y2="1">'
'<stop offset="0%" stop-color="#24b5a5"/><stop offset="100%" stop-color="#042A5C"/>'
'</linearGradient></defs>'
'<rect width="200" height="60" fill="#0f172a"/>'
'<text x="16" y="40" font-family="Cairo,Arial" font-size="28" font-weight="700" fill="url(#g)">Mego</text>'
'<text x="102" y="40" font-family="Cairo,Arial" font-size="28" font-weight="400" fill="#e2e8f0">Flow</text>'
'</svg>'
)
encoded = base64.b64encode(placeholder_svg.encode("utf-8")).decode("utf-8")
return f"data:image/svg+xml;base64,{encoded}"
# (2) توحيد اسم متغير اللوجو
LOGO_DATA_URL = load_logo_data_url()
# ----------------------------
# المنطق الخاص بالموديلات
# ----------------------------
class MockPredictor:
"""
الموديل الوهمي الافتراضي لسهولة التجربة سريعًا (لا يعتمد على إنترنت/موديلات كبيرة).
لتفعيل الموديل الحقيقي: غيّر USE_MOCK_MODEL="False" في .env أو Variables.
"""
def __init__(self, model_path: str = "mock", **kwargs):
self.is_ready = True
logging.info("MockPredictor قيد الاستخدام. (لتفعيل الحقيقي: USE_MOCK_MODEL=False)")
def predict(self, text: str) -> Dict[str, int]:
if not text or not text.strip():
return {}
# أرقام شبه عشوائية لعرض الواجهة فقط
time.sleep(0.2)
results = {
"liked": random.randint(20, 250),
"loved": random.randint(5, 90),
"haha": random.randint(0, 40),
"wow": random.randint(0, 35),
"sad": random.randint(0, 15),
"angry": random.randint(0, 10),
"comments": random.randint(5, 80),
"shares": random.randint(3, 50),
}
# إجمالي التفاعل المتوقع (بدون مضاعفة comments/shares)
results["interactions"] = sum(results[k] for k in ["liked","loved","haha","wow","sad","angry"])
return results
# ===== الموديل الحقيقي (جاهز للاستبدال بالضبط وقت ما تحب) =====
# لإلغاء الـ Mock وتفعيل هذا: USE_MOCK_MODEL="False"
def _normalize_label(s: str) -> str:
"""تبسيط اسم الليبل للمطابقة: lowercase وبدون مسافات/رموز."""
return re.sub(r"[^a-z]+", "", s.lower())
class ActoraPredictor:
"""
Predictor حقيقي يعتمد على موديل HuggingFace SequenceClassification.
- يقرأ id2label/label2id من config
- يحوّل logits لقيم موجبة باستخدام softplus
- يطابق الليبلات المرنة إلى المفاتيح القياسية
- يحسب interactions كمجموع القيم المتاحة
"""
CANONICAL_KEYS = ["liked", "loved", "haha", "wow", "sad", "angry", "comments", "shares"]
def __init__(self, model_path: str, device: Optional[str] = None, max_length: int = 256):
self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")
self.max_length = max_length
self.scale = float(os.getenv("ACTORA_SCALE_FACTOR", "1.0"))
logging.info(f"[ActoraPredictor] تحميل الموديل: {model_path} على {self.device}")
self.tokenizer = AutoTokenizer.from_pretrained(model_path)
self.model = AutoModelForSequenceClassification.from_pretrained(model_path).to(self.device)
self.model.eval()
id2label = getattr(self.model.config, "id2label", {}) or {}
if not id2label and hasattr(self.model.config, "label2id"):
label2id = {k.lower(): int(v) for k, v in self.model.config.label2id.items()}
else:
# تحويل id2label إلى label2id
label2id = {str(v).lower(): int(k) for k, v in id2label.items()}
self.idx_map: Dict[str, int] = {}
flattened = { _normalize_label(lbl): idx for lbl, idx in label2id.items() }
for key in self.CANONICAL_KEYS:
candidates = [key, f"{key}s", f"{key}_count", f"{key}count"]
found = None
for c in candidates:
norm = _normalize_label(c)
for flbl, idx in flattened.items():
if flbl == norm or flbl.endswith(norm) or norm in flbl:
found = idx
break
if found is not None:
break
if found is not None:
self.idx_map[key] = found
logging.info(f"[ActoraPredictor] Label map: {self.idx_map}")
self.is_ready = True
@torch.inference_mode()
def predict(self, text: str) -> Dict[str, int]:
if not self.is_ready:
raise RuntimeError("Model not ready.")
if not text or not text.strip():
return {}
enc = self.tokenizer(
text, return_tensors="pt", truncation=True, max_length=self.max_length, padding=False
)
enc = {k: v.to(self.device) for k, v in enc.items()}
out = self.model(**enc)
logits = out.logits.squeeze(0) # [num_labels]
positive = F.softplus(logits).cpu().tolist()
results: Dict[str, int] = {}
for key in self.CANONICAL_KEYS:
if key in self.idx_map:
val = positive[self.idx_map[key]] * self.scale
results[key] = int(round(float(val)))
else:
results[key] = 0
results["interactions"] = int(
sum(results[k] for k in ["liked", "loved", "haha", "wow", "sad", "angry", "comments", "shares"])
)
return results
# ===== نهاية الموديل الحقيقي =====
# ----------------------------
# عميل Gemini (مع Sanitization)
# ----------------------------
class GeminiClient:
def __init__(self, api_key: Optional[str]):
self.model = None
if api_key:
try:
genai.configure(api_key=api_key)
# نستخدم موديل سريع وحديث
self.model = genai.GenerativeModel("gemini-1.5-flash-latest")
logging.info("تم تهيئة Gemini بنجاح.")
except Exception as e:
logging.error(f"فشل تهيئة Gemini: {e}")
else:
logging.warning("GEMINI_API_KEY غير موجود. سيتم تعطيل تحليل Gemini.")
def analyze(self, text: str, metrics: Dict[str, int]) -> str:
# لو مفيش موديل (أو في وضع Mock)، نرجّع رسالة بسيطة (مع تعقيم)
if not self.model:
return "ميزة تحليل Gemini غير مفعلة. أضف المفتاح في الإعدادات لتفعيلها."
prompt = f"""
# المهمة
أنت "MegoFlow AI"، مستشار تسويق رقمي وخبير في تحليل النصوص الإعلانية. المطلوب تحليل موجز وقابل للتنفيذ.
# المعطيات
- إجمالي التفاعلات المتوقعة: {metrics.get('interactions', 0)}
- إعجابات: {metrics.get('liked', 0)}
- قلوب: {metrics.get('loved', 0)}
- تعليقات: {metrics.get('comments', 0)}
- مشاركات: {metrics.get('shares', 0)}
# نص المنشور
---
{text}
---
# المطلوب (Markdown بالعربية):
1) **تشخيص الأداء**: جملة واحدة توضح الاستقبال المتوقع.
2) **مقترحات للنمو**: 3 نقاط محددة جدًا (CTA/إعادة صياغة/زاوية مختلفة) مرتبطة بالنص.
"""
try:
response = self.model.generate_content(prompt)
# (4) إبقاء Sanitization بواسطة bleach
safe = bleach.clean(response.text or "", tags=[], strip=True)
return safe
except Exception as e:
logging.error(f"خطأ أثناء استدعاء Gemini: {e}")
return "حدث خطأ أثناء التواصل مع Gemini. حاول لاحقًا."
gemini = GeminiClient(GEMINI_API_KEY)
# ----------------------------
# تهيئة الـ Predictor المختار
# ----------------------------
if USE_MOCK_MODEL:
predictor = MockPredictor()
else:
try:
predictor = ActoraPredictor(MODEL_REPO)
except Exception as e:
logging.error(f"تعذر تحميل الموديل الحقيقي: {e}")
logging.warning("الرجوع مؤقتًا إلى MockPredictor للاستمرار في العمل.")
predictor = MockPredictor()
# ----------------------------
# واجهة المستخدم (Gradio)
# ----------------------------
APP_CSS = """
@import url('https://fonts.googleapis.com/css2?family=Cairo:wght@400;700;900&display=swap');
body { font-family: 'Cairo', sans-serif; direction: rtl; background: #0f172a; color: #e2e8f0; }
.center { max-width: 1200px; margin: 0 auto; }
.logo { text-align: center; padding: 16px 0 8px; }
.card { background: #1e293b; border: 1px solid #334155; border-radius: 14px; padding: 18px; margin-top: 12px; box-shadow: 0 6px 18px rgba(0,0,0,0.18); }
.card-header { font-size: 18px; font-weight: 800; color: #fff; border-bottom: 1px solid #334155; padding-bottom: 10px; margin-bottom: 12px; } /* (3) توحيد .card-header */
.metrics-grid { display: grid; grid-template-columns: repeat(auto-fill,minmax(120px,1fr)); gap: 10px; }
.metric-item { background: #0f172a; border: 1px solid #334155; border-radius: 10px; padding: 12px; text-align: center; }
.metric-value { font-size: 22px; font-weight: 900; color: #24b5a5; }
.metric-label { font-size: 12px; color: #94a3b8; margin-top: 6px; }
.footer { text-align: center; color: #94a3b8; margin: 24px 0; font-size: 13px; }
.tabpad { padding-top: 6px; }
"""
def render_metrics(metrics: Dict[str, int]) -> str:
if not metrics:
return "<div class='card'><div class='card-header'>المقاييس المتوقعة</div><p>لا توجد بيانات بعد. اكتب نصًا ثم اضغط تحليل.</p></div>"
# (3) استخدام .card-header بالاسم الموحد
html = "<div class='card'><div class='card-header'>المقاييس المتوقعة</div><div class='metrics-grid'>"
items = [
("إجمالي التفاعلات", metrics.get("interactions", 0)),
("أعجبني", metrics.get("liked", 0)),
("أحببته", metrics.get("loved", 0)),
("أضحكني", metrics.get("haha", 0)),
("أدهشني", metrics.get("wow", 0)),
("أحزنني", metrics.get("sad", 0)),
("أغضبني", metrics.get("angry", 0)),
("تعليقات", metrics.get("comments", 0)),
("مشاركات", metrics.get("shares", 0)),
]
for label, value in items:
html += f"<div class='metric-item'><div class='metric-value'>{value:,}</div><div class='metric-label'>{label}</div></div>"
html += "</div></div>"
return html
def handle_single(text: str):
if not text or not text.strip():
return (
"<div class='card'><div class='card-header'>المقاييس المتوقعة</div><p>يرجى إدخال نص المنشور أولًا.</p></div>",
"لا يمكن عرض التحليل بدون نص.",
[]
)
metrics = predictor.predict(text)
analysis = gemini.analyze(text, metrics)
metrics_html = render_metrics(metrics)
# بنسجل في السجل: (نص مختصر, إجمالي, وقت)
entry = [
(text[:100] + "…") if len(text) > 100 else text,
metrics.get("interactions", 0),
time.strftime("%Y-%m-%d %H:%M:%S"),
]
return metrics_html, analysis, entry
def handle_ab(a: str, b: str):
if not a.strip() or not b.strip():
msg = "<div class='card'><div class='card-header'>المقارنة</div><p>أدخل نصّين كاملين للمقارنة.</p></div>"
return msg, msg
ma = predictor.predict(a)
mb = predictor.predict(b)
return render_metrics(ma), render_metrics(mb)
def append_history(history, new_entry):
if new_entry and new_entry[0]:
return [new_entry] + (history or [])
return history or []
with gr.Blocks(css=APP_CSS, theme=gr.themes.Soft(primary_hue="teal")) as app:
with gr.Column(elem_classes="center"):
# الهيدر (يستخدم LOGO_DATA_URL الموحد) (2)
gr.HTML(f"""
<div class="logo">
<img src="{LOGO_DATA_URL}" alt="MegoFlow Logo" style="height:56px; filter: drop-shadow(0 0 10px rgba(36,181,165,0.3));" />
<div style="color:#94a3b8; font-size:14px; margin-top:6px;">حوّل نص منشورك لأرقام ورؤى قابلة للتنفيذ</div>
</div>
""")
history_state = gr.State([]) # [[text, interactions, ts], ...]
with gr.Tabs():
# (لا نغير اللغة/الترتيب)
with gr.TabItem("تحليل منشور"):
with gr.Row(elem_classes="tabpad"):
with gr.Column(scale=5):
post_box = gr.Textbox(lines=12, label="نص المنشور", placeholder="اكتب أو الصق نص المنشور هنا...")
analyze_btn = gr.Button("حلّل الآن", variant="primary")
with gr.Column(scale=5):
metrics_html = gr.HTML()
analysis_md = gr.Markdown()
with gr.TabItem("مقارنة A/B"):
with gr.Row(elem_classes="tabpad"):
with gr.Column():
a_box = gr.Textbox(lines=8, label="المنشور (أ)")
a_out = gr.HTML()
with gr.Column():
b_box = gr.Textbox(lines=8, label="المنشور (ب)")
b_out = gr.HTML()
compare_btn = gr.Button("اختبر وقارن", variant="primary")
with gr.TabItem("سجل"):
history_table = gr.DataFrame(
headers=["النص", "إجمالي التفاعلات", "وقت التحليل"],
datatype=["str", "number", "str"],
interactive=False,
row_count=8
)
gr.HTML("<div class='footer'>MegoFlow © 2025</div>")
# الأحداث
analyze_btn.click(
fn=handle_single,
inputs=[post_box],
outputs=[metrics_html, analysis_md, history_state]
).then(
fn=append_history,
inputs=[history_state, history_state],
outputs=[history_table]
)
compare_btn.click(
fn=handle_ab,
inputs=[a_box, b_box],
outputs=[a_out, b_out]
)
if __name__ == "__main__":
app.launch()