github-actions[bot]
GitHub deploy: 4d024c91d61f15a8b39171610ab1406915ef598d
d6703a1
import logging
import time
import uuid
from typing import Optional
from sqlalchemy.orm import Session
from open_webui.internal.db import Base, JSONField, get_db, get_db_context
from open_webui.models.users import User
from pydantic import BaseModel, ConfigDict
from sqlalchemy import BigInteger, Column, Text, JSON, Boolean
log = logging.getLogger(__name__)
####################
# Feedback DB Schema
####################
class Feedback(Base):
__tablename__ = "feedback"
id = Column(Text, primary_key=True, unique=True)
user_id = Column(Text)
version = Column(BigInteger, default=0)
type = Column(Text)
data = Column(JSON, nullable=True)
meta = Column(JSON, nullable=True)
snapshot = Column(JSON, nullable=True)
created_at = Column(BigInteger)
updated_at = Column(BigInteger)
class FeedbackModel(BaseModel):
id: str
user_id: str
version: int
type: str
data: Optional[dict] = None
meta: Optional[dict] = None
snapshot: Optional[dict] = None
created_at: int
updated_at: int
model_config = ConfigDict(from_attributes=True)
####################
# Forms
####################
class FeedbackResponse(BaseModel):
id: str
user_id: str
version: int
type: str
data: Optional[dict] = None
meta: Optional[dict] = None
created_at: int
updated_at: int
class FeedbackIdResponse(BaseModel):
id: str
user_id: str
created_at: int
updated_at: int
class LeaderboardFeedbackData(BaseModel):
"""Minimal feedback data for leaderboard computation (excludes snapshot/meta)."""
id: str
data: Optional[dict] = None
class RatingData(BaseModel):
rating: Optional[str | int] = None
model_id: Optional[str] = None
sibling_model_ids: Optional[list[str]] = None
reason: Optional[str] = None
comment: Optional[str] = None
model_config = ConfigDict(extra="allow", protected_namespaces=())
class MetaData(BaseModel):
arena: Optional[bool] = None
chat_id: Optional[str] = None
message_id: Optional[str] = None
tags: Optional[list[str]] = None
model_config = ConfigDict(extra="allow")
class SnapshotData(BaseModel):
chat: Optional[dict] = None
model_config = ConfigDict(extra="allow")
class FeedbackForm(BaseModel):
type: str
data: Optional[RatingData] = None
meta: Optional[dict] = None
snapshot: Optional[SnapshotData] = None
model_config = ConfigDict(extra="allow")
class UserResponse(BaseModel):
id: str
name: str
email: str
role: str = "pending"
last_active_at: int # timestamp in epoch
updated_at: int # timestamp in epoch
created_at: int # timestamp in epoch
model_config = ConfigDict(from_attributes=True)
class FeedbackUserResponse(FeedbackResponse):
user: Optional[UserResponse] = None
class FeedbackListResponse(BaseModel):
items: list[FeedbackUserResponse]
total: int
class ModelHistoryEntry(BaseModel):
date: str
won: int
lost: int
class ModelHistoryResponse(BaseModel):
model_id: str
history: list[ModelHistoryEntry]
class FeedbackTable:
def insert_new_feedback(
self, user_id: str, form_data: FeedbackForm, db: Optional[Session] = None
) -> Optional[FeedbackModel]:
with get_db_context(db) as db:
id = str(uuid.uuid4())
feedback = FeedbackModel(
**{
"id": id,
"user_id": user_id,
"version": 0,
**form_data.model_dump(),
"created_at": int(time.time()),
"updated_at": int(time.time()),
}
)
try:
result = Feedback(**feedback.model_dump())
db.add(result)
db.commit()
db.refresh(result)
if result:
return FeedbackModel.model_validate(result)
else:
return None
except Exception as e:
log.exception(f"Error creating a new feedback: {e}")
return None
def get_feedback_by_id(
self, id: str, db: Optional[Session] = None
) -> Optional[FeedbackModel]:
try:
with get_db_context(db) as db:
feedback = db.query(Feedback).filter_by(id=id).first()
if not feedback:
return None
return FeedbackModel.model_validate(feedback)
except Exception:
return None
def get_feedback_by_id_and_user_id(
self, id: str, user_id: str, db: Optional[Session] = None
) -> Optional[FeedbackModel]:
try:
with get_db_context(db) as db:
feedback = db.query(Feedback).filter_by(id=id, user_id=user_id).first()
if not feedback:
return None
return FeedbackModel.model_validate(feedback)
except Exception:
return None
def get_feedbacks_by_chat_id(
self, chat_id: str, db: Optional[Session] = None
) -> list[FeedbackModel]:
"""Get all feedbacks for a specific chat."""
try:
with get_db_context(db) as db:
# meta.chat_id stores the chat reference
feedbacks = (
db.query(Feedback)
.filter(Feedback.meta["chat_id"].as_string() == chat_id)
.order_by(Feedback.created_at.desc())
.all()
)
return [FeedbackModel.model_validate(fb) for fb in feedbacks]
except Exception:
return []
def get_feedback_items(
self,
filter: dict = {},
skip: int = 0,
limit: int = 30,
db: Optional[Session] = None,
) -> FeedbackListResponse:
with get_db_context(db) as db:
query = db.query(Feedback, User).join(User, Feedback.user_id == User.id)
if filter:
order_by = filter.get("order_by")
direction = filter.get("direction")
if order_by == "username":
if direction == "asc":
query = query.order_by(User.name.asc())
else:
query = query.order_by(User.name.desc())
elif order_by == "model_id":
# it's stored in feedback.data['model_id']
if direction == "asc":
query = query.order_by(
Feedback.data["model_id"].as_string().asc()
)
else:
query = query.order_by(
Feedback.data["model_id"].as_string().desc()
)
elif order_by == "rating":
# it's stored in feedback.data['rating']
if direction == "asc":
query = query.order_by(
Feedback.data["rating"].as_string().asc()
)
else:
query = query.order_by(
Feedback.data["rating"].as_string().desc()
)
elif order_by == "updated_at":
if direction == "asc":
query = query.order_by(Feedback.updated_at.asc())
else:
query = query.order_by(Feedback.updated_at.desc())
else:
query = query.order_by(Feedback.created_at.desc())
# Count BEFORE pagination
total = query.count()
if skip:
query = query.offset(skip)
if limit:
query = query.limit(limit)
items = query.all()
feedbacks = []
for feedback, user in items:
feedback_model = FeedbackModel.model_validate(feedback)
user_model = UserResponse.model_validate(user)
feedbacks.append(
FeedbackUserResponse(**feedback_model.model_dump(), user=user_model)
)
return FeedbackListResponse(items=feedbacks, total=total)
def get_all_feedbacks(self, db: Optional[Session] = None) -> list[FeedbackModel]:
with get_db_context(db) as db:
return [
FeedbackModel.model_validate(feedback)
for feedback in db.query(Feedback)
.order_by(Feedback.updated_at.desc())
.all()
]
def get_all_feedback_ids(
self, db: Optional[Session] = None
) -> list[FeedbackIdResponse]:
with get_db_context(db) as db:
return [
FeedbackIdResponse(
id=row.id,
user_id=row.user_id,
created_at=row.created_at,
updated_at=row.updated_at,
)
for row in db.query(
Feedback.id,
Feedback.user_id,
Feedback.created_at,
Feedback.updated_at,
)
.order_by(Feedback.updated_at.desc())
.all()
]
def get_feedbacks_for_leaderboard(
self, db: Optional[Session] = None
) -> list[LeaderboardFeedbackData]:
"""Fetch only id and data for leaderboard computation (excludes snapshot/meta)."""
with get_db_context(db) as db:
return [
LeaderboardFeedbackData(id=row.id, data=row.data)
for row in db.query(Feedback.id, Feedback.data).all()
]
def get_model_evaluation_history(
self, model_id: str, days: int = 30, db: Optional[Session] = None
) -> list[ModelHistoryEntry]:
"""
Get daily wins/losses for a specific model over the past N days.
If days=0, returns all time data starting from first feedback.
Returns: [{"date": "2026-01-08", "won": 5, "lost": 2}, ...]
"""
from datetime import datetime, timedelta
from collections import defaultdict
with get_db_context(db) as db:
if days == 0:
# All time - no cutoff
rows = db.query(Feedback.created_at, Feedback.data).all()
else:
cutoff = int(time.time()) - (days * 86400)
rows = (
db.query(Feedback.created_at, Feedback.data)
.filter(Feedback.created_at >= cutoff)
.all()
)
daily_counts = defaultdict(lambda: {"won": 0, "lost": 0})
first_date = None
for created_at, data in rows:
if not data:
continue
if data.get("model_id") != model_id:
continue
rating_str = str(data.get("rating", ""))
if rating_str not in ("1", "-1"):
continue
date_str = datetime.fromtimestamp(created_at).strftime("%Y-%m-%d")
if rating_str == "1":
daily_counts[date_str]["won"] += 1
else:
daily_counts[date_str]["lost"] += 1
# Track first date for this model
if first_date is None or date_str < first_date:
first_date = date_str
# Generate date range
result = []
today = datetime.now().date()
if days == 0 and first_date:
# All time: start from first feedback date
start_date = datetime.strptime(first_date, "%Y-%m-%d").date()
num_days = (today - start_date).days + 1
else:
# Fixed range
num_days = days
start_date = today - timedelta(days=days - 1)
for i in range(num_days):
d = start_date + timedelta(days=i)
date_str = d.strftime("%Y-%m-%d")
counts = daily_counts.get(date_str, {"won": 0, "lost": 0})
result.append(
ModelHistoryEntry(date=date_str, won=counts["won"], lost=counts["lost"])
)
return result
def get_feedbacks_by_type(
self, type: str, db: Optional[Session] = None
) -> list[FeedbackModel]:
with get_db_context(db) as db:
return [
FeedbackModel.model_validate(feedback)
for feedback in db.query(Feedback)
.filter_by(type=type)
.order_by(Feedback.updated_at.desc())
.all()
]
def get_feedbacks_by_user_id(
self, user_id: str, db: Optional[Session] = None
) -> list[FeedbackModel]:
with get_db_context(db) as db:
return [
FeedbackModel.model_validate(feedback)
for feedback in db.query(Feedback)
.filter_by(user_id=user_id)
.order_by(Feedback.updated_at.desc())
.all()
]
def update_feedback_by_id(
self, id: str, form_data: FeedbackForm, db: Optional[Session] = None
) -> Optional[FeedbackModel]:
with get_db_context(db) as db:
feedback = db.query(Feedback).filter_by(id=id).first()
if not feedback:
return None
if form_data.data:
feedback.data = form_data.data.model_dump()
if form_data.meta:
feedback.meta = form_data.meta
if form_data.snapshot:
feedback.snapshot = form_data.snapshot.model_dump()
feedback.updated_at = int(time.time())
db.commit()
return FeedbackModel.model_validate(feedback)
def update_feedback_by_id_and_user_id(
self,
id: str,
user_id: str,
form_data: FeedbackForm,
db: Optional[Session] = None,
) -> Optional[FeedbackModel]:
with get_db_context(db) as db:
feedback = db.query(Feedback).filter_by(id=id, user_id=user_id).first()
if not feedback:
return None
if form_data.data:
feedback.data = form_data.data.model_dump()
if form_data.meta:
feedback.meta = form_data.meta
if form_data.snapshot:
feedback.snapshot = form_data.snapshot.model_dump()
feedback.updated_at = int(time.time())
db.commit()
return FeedbackModel.model_validate(feedback)
def delete_feedback_by_id(self, id: str, db: Optional[Session] = None) -> bool:
with get_db_context(db) as db:
feedback = db.query(Feedback).filter_by(id=id).first()
if not feedback:
return False
db.delete(feedback)
db.commit()
return True
def delete_feedback_by_id_and_user_id(
self, id: str, user_id: str, db: Optional[Session] = None
) -> bool:
with get_db_context(db) as db:
feedback = db.query(Feedback).filter_by(id=id, user_id=user_id).first()
if not feedback:
return False
db.delete(feedback)
db.commit()
return True
def delete_feedbacks_by_user_id(
self, user_id: str, db: Optional[Session] = None
) -> bool:
with get_db_context(db) as db:
result = db.query(Feedback).filter_by(user_id=user_id).delete()
db.commit()
return result > 0
def delete_all_feedbacks(self, db: Optional[Session] = None) -> bool:
with get_db_context(db) as db:
result = db.query(Feedback).delete()
db.commit()
return result > 0
Feedbacks = FeedbackTable()