github-actions[bot]
GitHub deploy: 4d024c91d61f15a8b39171610ab1406915ef598d
d6703a1
import json
import secrets
import time
import uuid
from typing import Optional
from sqlalchemy.orm import Session
from open_webui.internal.db import Base, JSONField, get_db, get_db_context
from open_webui.models.groups import Groups
from open_webui.models.access_grants import (
AccessGrantModel,
AccessGrants,
)
from pydantic import BaseModel, ConfigDict, Field
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy import (
BigInteger,
Boolean,
Column,
ForeignKey,
String,
Text,
JSON,
UniqueConstraint,
case,
cast,
)
from sqlalchemy import or_, func, select, and_, text
from sqlalchemy.sql import exists
####################
# Channel DB Schema
####################
class Channel(Base):
__tablename__ = "channel"
id = Column(Text, primary_key=True, unique=True)
user_id = Column(Text)
type = Column(Text, nullable=True)
name = Column(Text)
description = Column(Text, nullable=True)
# Used to indicate if the channel is private (for 'group' type channels)
is_private = Column(Boolean, nullable=True)
data = Column(JSON, nullable=True)
meta = Column(JSON, nullable=True)
created_at = Column(BigInteger)
updated_at = Column(BigInteger)
updated_by = Column(Text, nullable=True)
archived_at = Column(BigInteger, nullable=True)
archived_by = Column(Text, nullable=True)
deleted_at = Column(BigInteger, nullable=True)
deleted_by = Column(Text, nullable=True)
class ChannelModel(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: str
user_id: str
type: Optional[str] = None
name: str
description: Optional[str] = None
is_private: Optional[bool] = None
data: Optional[dict] = None
meta: Optional[dict] = None
access_grants: list[AccessGrantModel] = Field(default_factory=list)
created_at: int # timestamp in epoch (time_ns)
updated_at: int # timestamp in epoch (time_ns)
updated_by: Optional[str] = None
archived_at: Optional[int] = None # timestamp in epoch (time_ns)
archived_by: Optional[str] = None
deleted_at: Optional[int] = None # timestamp in epoch (time_ns)
deleted_by: Optional[str] = None
class ChannelMember(Base):
__tablename__ = "channel_member"
id = Column(Text, primary_key=True, unique=True)
channel_id = Column(Text, nullable=False)
user_id = Column(Text, nullable=False)
role = Column(Text, nullable=True)
status = Column(Text, nullable=True)
is_active = Column(Boolean, nullable=False, default=True)
is_channel_muted = Column(Boolean, nullable=False, default=False)
is_channel_pinned = Column(Boolean, nullable=False, default=False)
data = Column(JSON, nullable=True)
meta = Column(JSON, nullable=True)
invited_at = Column(BigInteger, nullable=True)
invited_by = Column(Text, nullable=True)
joined_at = Column(BigInteger)
left_at = Column(BigInteger, nullable=True)
last_read_at = Column(BigInteger, nullable=True)
created_at = Column(BigInteger)
updated_at = Column(BigInteger)
class ChannelMemberModel(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: str
channel_id: str
user_id: str
role: Optional[str] = None
status: Optional[str] = None
is_active: bool = True
is_channel_muted: bool = False
is_channel_pinned: bool = False
data: Optional[dict] = None
meta: Optional[dict] = None
invited_at: Optional[int] = None # timestamp in epoch (time_ns)
invited_by: Optional[str] = None
joined_at: Optional[int] = None # timestamp in epoch (time_ns)
left_at: Optional[int] = None # timestamp in epoch (time_ns)
last_read_at: Optional[int] = None # timestamp in epoch (time_ns)
created_at: Optional[int] = None # timestamp in epoch (time_ns)
updated_at: Optional[int] = None # timestamp in epoch (time_ns)
class ChannelFile(Base):
__tablename__ = "channel_file"
id = Column(Text, unique=True, primary_key=True)
user_id = Column(Text, nullable=False)
channel_id = Column(
Text, ForeignKey("channel.id", ondelete="CASCADE"), nullable=False
)
message_id = Column(
Text, ForeignKey("message.id", ondelete="CASCADE"), nullable=True
)
file_id = Column(Text, ForeignKey("file.id", ondelete="CASCADE"), nullable=False)
created_at = Column(BigInteger, nullable=False)
updated_at = Column(BigInteger, nullable=False)
__table_args__ = (
UniqueConstraint("channel_id", "file_id", name="uq_channel_file_channel_file"),
)
class ChannelFileModel(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: str
channel_id: str
file_id: str
user_id: str
created_at: int # timestamp in epoch (time_ns)
updated_at: int # timestamp in epoch (time_ns)
class ChannelWebhook(Base):
__tablename__ = "channel_webhook"
id = Column(Text, primary_key=True, unique=True)
channel_id = Column(Text, nullable=False)
user_id = Column(Text, nullable=False)
name = Column(Text, nullable=False)
profile_image_url = Column(Text, nullable=True)
token = Column(Text, nullable=False)
last_used_at = Column(BigInteger, nullable=True)
created_at = Column(BigInteger, nullable=False)
updated_at = Column(BigInteger, nullable=False)
class ChannelWebhookModel(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: str
channel_id: str
user_id: str
name: str
profile_image_url: Optional[str] = None
token: str
last_used_at: Optional[int] = None # timestamp in epoch (time_ns)
created_at: int # timestamp in epoch (time_ns)
updated_at: int # timestamp in epoch (time_ns)
####################
# Forms
####################
class ChannelResponse(ChannelModel):
is_manager: bool = False
write_access: bool = False
user_count: Optional[int] = None
class ChannelForm(BaseModel):
name: str = ""
description: Optional[str] = None
is_private: Optional[bool] = None
data: Optional[dict] = None
meta: Optional[dict] = None
access_grants: Optional[list[dict]] = None
group_ids: Optional[list[str]] = None
user_ids: Optional[list[str]] = None
class CreateChannelForm(ChannelForm):
type: Optional[str] = None
class ChannelWebhookForm(BaseModel):
name: str
profile_image_url: Optional[str] = None
class ChannelTable:
def _get_access_grants(
self, channel_id: str, db: Optional[Session] = None
) -> list[AccessGrantModel]:
return AccessGrants.get_grants_by_resource("channel", channel_id, db=db)
def _to_channel_model(
self, channel: Channel, db: Optional[Session] = None
) -> ChannelModel:
channel_data = ChannelModel.model_validate(channel).model_dump(
exclude={"access_grants"}
)
access_grants = self._get_access_grants(channel_data["id"], db=db)
channel_data["access_grants"] = access_grants
return ChannelModel.model_validate(channel_data)
def _collect_unique_user_ids(
self,
invited_by: str,
user_ids: Optional[list[str]] = None,
group_ids: Optional[list[str]] = None,
) -> set[str]:
"""
Collect unique user ids from:
- invited_by
- user_ids
- each group in group_ids
Returns a set for efficient SQL diffing.
"""
users = set(user_ids or [])
users.add(invited_by)
for group_id in group_ids or []:
users.update(Groups.get_group_user_ids_by_id(group_id))
return users
def _create_membership_models(
self,
channel_id: str,
invited_by: str,
user_ids: set[str],
) -> list[ChannelMember]:
"""
Takes a set of NEW user IDs (already filtered to exclude existing members).
Returns ORM ChannelMember objects to be added.
"""
now = int(time.time_ns())
memberships = []
for uid in user_ids:
model = ChannelMemberModel(
**{
"id": str(uuid.uuid4()),
"channel_id": channel_id,
"user_id": uid,
"status": "joined",
"is_active": True,
"is_channel_muted": False,
"is_channel_pinned": False,
"invited_at": now,
"invited_by": invited_by,
"joined_at": now,
"left_at": None,
"last_read_at": now,
"created_at": now,
"updated_at": now,
}
)
memberships.append(ChannelMember(**model.model_dump()))
return memberships
def insert_new_channel(
self, form_data: CreateChannelForm, user_id: str, db: Optional[Session] = None
) -> Optional[ChannelModel]:
with get_db_context(db) as db:
channel = ChannelModel(
**{
**form_data.model_dump(exclude={"access_grants"}),
"type": form_data.type if form_data.type else None,
"name": form_data.name.lower(),
"id": str(uuid.uuid4()),
"user_id": user_id,
"created_at": int(time.time_ns()),
"updated_at": int(time.time_ns()),
"access_grants": [],
}
)
new_channel = Channel(**channel.model_dump(exclude={"access_grants"}))
if form_data.type in ["group", "dm"]:
users = self._collect_unique_user_ids(
invited_by=user_id,
user_ids=form_data.user_ids,
group_ids=form_data.group_ids,
)
memberships = self._create_membership_models(
channel_id=new_channel.id,
invited_by=user_id,
user_ids=users,
)
db.add_all(memberships)
db.add(new_channel)
db.commit()
AccessGrants.set_access_grants(
"channel", new_channel.id, form_data.access_grants, db=db
)
return self._to_channel_model(new_channel, db=db)
def get_channels(self, db: Optional[Session] = None) -> list[ChannelModel]:
with get_db_context(db) as db:
channels = db.query(Channel).all()
return [self._to_channel_model(channel, db=db) for channel in channels]
def _has_permission(self, db, query, filter: dict, permission: str = "read"):
return AccessGrants.has_permission_filter(
db=db,
query=query,
DocumentModel=Channel,
filter=filter,
resource_type="channel",
permission=permission,
)
def get_channels_by_user_id(
self, user_id: str, db: Optional[Session] = None
) -> list[ChannelModel]:
with get_db_context(db) as db:
user_group_ids = [
group.id for group in Groups.get_groups_by_member_id(user_id, db=db)
]
membership_channels = (
db.query(Channel)
.join(ChannelMember, Channel.id == ChannelMember.channel_id)
.filter(
Channel.deleted_at.is_(None),
Channel.archived_at.is_(None),
Channel.type.in_(["group", "dm"]),
ChannelMember.user_id == user_id,
ChannelMember.is_active.is_(True),
)
.all()
)
query = db.query(Channel).filter(
Channel.deleted_at.is_(None),
Channel.archived_at.is_(None),
or_(
Channel.type.is_(None), # True NULL/None
Channel.type == "", # Empty string
and_(Channel.type != "group", Channel.type != "dm"),
),
)
query = self._has_permission(
db, query, {"user_id": user_id, "group_ids": user_group_ids}
)
standard_channels = query.all()
all_channels = membership_channels + standard_channels
return [self._to_channel_model(c, db=db) for c in all_channels]
def get_dm_channel_by_user_ids(
self, user_ids: list[str], db: Optional[Session] = None
) -> Optional[ChannelModel]:
with get_db_context(db) as db:
# Ensure uniqueness in case a list with duplicates is passed
unique_user_ids = list(set(user_ids))
match_count = func.sum(
case(
(ChannelMember.user_id.in_(unique_user_ids), 1),
else_=0,
)
)
subquery = (
db.query(ChannelMember.channel_id)
.group_by(ChannelMember.channel_id)
# 1. Channel must have exactly len(user_ids) members
.having(func.count(ChannelMember.user_id) == len(unique_user_ids))
# 2. All those members must be in unique_user_ids
.having(match_count == len(unique_user_ids))
.subquery()
)
channel = (
db.query(Channel)
.filter(
Channel.id.in_(subquery),
Channel.type == "dm",
)
.first()
)
return self._to_channel_model(channel, db=db) if channel else None
def add_members_to_channel(
self,
channel_id: str,
invited_by: str,
user_ids: Optional[list[str]] = None,
group_ids: Optional[list[str]] = None,
db: Optional[Session] = None,
) -> list[ChannelMemberModel]:
with get_db_context(db) as db:
# 1. Collect all user_ids including groups + inviter
requested_users = self._collect_unique_user_ids(
invited_by, user_ids, group_ids
)
existing_users = {
row.user_id
for row in db.query(ChannelMember.user_id)
.filter(ChannelMember.channel_id == channel_id)
.all()
}
new_user_ids = requested_users - existing_users
if not new_user_ids:
return [] # Nothing to add
new_memberships = self._create_membership_models(
channel_id, invited_by, new_user_ids
)
db.add_all(new_memberships)
db.commit()
return [
ChannelMemberModel.model_validate(membership)
for membership in new_memberships
]
def remove_members_from_channel(
self,
channel_id: str,
user_ids: list[str],
db: Optional[Session] = None,
) -> int:
with get_db_context(db) as db:
result = (
db.query(ChannelMember)
.filter(
ChannelMember.channel_id == channel_id,
ChannelMember.user_id.in_(user_ids),
)
.delete(synchronize_session=False)
)
db.commit()
return result # number of rows deleted
def is_user_channel_manager(
self, channel_id: str, user_id: str, db: Optional[Session] = None
) -> bool:
with get_db_context(db) as db:
# Check if the user is the creator of the channel
# or has a 'manager' role in ChannelMember
channel = db.query(Channel).filter(Channel.id == channel_id).first()
if channel and channel.user_id == user_id:
return True
membership = (
db.query(ChannelMember)
.filter(
ChannelMember.channel_id == channel_id,
ChannelMember.user_id == user_id,
ChannelMember.role == "manager",
)
.first()
)
return membership is not None
def join_channel(
self, channel_id: str, user_id: str, db: Optional[Session] = None
) -> Optional[ChannelMemberModel]:
with get_db_context(db) as db:
# Check if the membership already exists
existing_membership = (
db.query(ChannelMember)
.filter(
ChannelMember.channel_id == channel_id,
ChannelMember.user_id == user_id,
)
.first()
)
if existing_membership:
return ChannelMemberModel.model_validate(existing_membership)
# Create new membership
channel_member = ChannelMemberModel(
**{
"id": str(uuid.uuid4()),
"channel_id": channel_id,
"user_id": user_id,
"status": "joined",
"is_active": True,
"is_channel_muted": False,
"is_channel_pinned": False,
"joined_at": int(time.time_ns()),
"left_at": None,
"last_read_at": int(time.time_ns()),
"created_at": int(time.time_ns()),
"updated_at": int(time.time_ns()),
}
)
new_membership = ChannelMember(**channel_member.model_dump())
db.add(new_membership)
db.commit()
return channel_member
def leave_channel(
self, channel_id: str, user_id: str, db: Optional[Session] = None
) -> bool:
with get_db_context(db) as db:
membership = (
db.query(ChannelMember)
.filter(
ChannelMember.channel_id == channel_id,
ChannelMember.user_id == user_id,
)
.first()
)
if not membership:
return False
membership.status = "left"
membership.is_active = False
membership.left_at = int(time.time_ns())
membership.updated_at = int(time.time_ns())
db.commit()
return True
def get_member_by_channel_and_user_id(
self, channel_id: str, user_id: str, db: Optional[Session] = None
) -> Optional[ChannelMemberModel]:
with get_db_context(db) as db:
membership = (
db.query(ChannelMember)
.filter(
ChannelMember.channel_id == channel_id,
ChannelMember.user_id == user_id,
)
.first()
)
return ChannelMemberModel.model_validate(membership) if membership else None
def get_members_by_channel_id(
self, channel_id: str, db: Optional[Session] = None
) -> list[ChannelMemberModel]:
with get_db_context(db) as db:
memberships = (
db.query(ChannelMember)
.filter(ChannelMember.channel_id == channel_id)
.all()
)
return [
ChannelMemberModel.model_validate(membership)
for membership in memberships
]
def pin_channel(
self,
channel_id: str,
user_id: str,
is_pinned: bool,
db: Optional[Session] = None,
) -> bool:
with get_db_context(db) as db:
membership = (
db.query(ChannelMember)
.filter(
ChannelMember.channel_id == channel_id,
ChannelMember.user_id == user_id,
)
.first()
)
if not membership:
return False
membership.is_channel_pinned = is_pinned
membership.updated_at = int(time.time_ns())
db.commit()
return True
def update_member_last_read_at(
self, channel_id: str, user_id: str, db: Optional[Session] = None
) -> bool:
with get_db_context(db) as db:
membership = (
db.query(ChannelMember)
.filter(
ChannelMember.channel_id == channel_id,
ChannelMember.user_id == user_id,
)
.first()
)
if not membership:
return False
membership.last_read_at = int(time.time_ns())
membership.updated_at = int(time.time_ns())
db.commit()
return True
def update_member_active_status(
self,
channel_id: str,
user_id: str,
is_active: bool,
db: Optional[Session] = None,
) -> bool:
with get_db_context(db) as db:
membership = (
db.query(ChannelMember)
.filter(
ChannelMember.channel_id == channel_id,
ChannelMember.user_id == user_id,
)
.first()
)
if not membership:
return False
membership.is_active = is_active
membership.updated_at = int(time.time_ns())
db.commit()
return True
def is_user_channel_member(
self, channel_id: str, user_id: str, db: Optional[Session] = None
) -> bool:
with get_db_context(db) as db:
membership = (
db.query(ChannelMember)
.filter(
ChannelMember.channel_id == channel_id,
ChannelMember.user_id == user_id,
)
.first()
)
return membership is not None
def get_channel_by_id(
self, id: str, db: Optional[Session] = None
) -> Optional[ChannelModel]:
try:
with get_db_context(db) as db:
channel = db.query(Channel).filter(Channel.id == id).first()
return self._to_channel_model(channel, db=db) if channel else None
except Exception:
return None
def get_channels_by_file_id(
self, file_id: str, db: Optional[Session] = None
) -> list[ChannelModel]:
with get_db_context(db) as db:
channel_files = (
db.query(ChannelFile).filter(ChannelFile.file_id == file_id).all()
)
channel_ids = [cf.channel_id for cf in channel_files]
channels = db.query(Channel).filter(Channel.id.in_(channel_ids)).all()
return [self._to_channel_model(channel, db=db) for channel in channels]
def get_channels_by_file_id_and_user_id(
self, file_id: str, user_id: str, db: Optional[Session] = None
) -> list[ChannelModel]:
with get_db_context(db) as db:
# 1. Determine which channels have this file
channel_file_rows = (
db.query(ChannelFile).filter(ChannelFile.file_id == file_id).all()
)
channel_ids = [row.channel_id for row in channel_file_rows]
if not channel_ids:
return []
# 2. Load all channel rows that still exist
channels = (
db.query(Channel)
.filter(
Channel.id.in_(channel_ids),
Channel.deleted_at.is_(None),
Channel.archived_at.is_(None),
)
.all()
)
if not channels:
return []
# Preload user's group membership
user_group_ids = [
g.id for g in Groups.get_groups_by_member_id(user_id, db=db)
]
allowed_channels = []
for channel in channels:
# --- Case A: group or dm => user must be an active member ---
if channel.type in ["group", "dm"]:
membership = (
db.query(ChannelMember)
.filter(
ChannelMember.channel_id == channel.id,
ChannelMember.user_id == user_id,
ChannelMember.is_active.is_(True),
)
.first()
)
if membership:
allowed_channels.append(self._to_channel_model(channel, db=db))
continue
# --- Case B: standard channel => rely on ACL permissions ---
query = db.query(Channel).filter(Channel.id == channel.id)
query = self._has_permission(
db,
query,
{"user_id": user_id, "group_ids": user_group_ids},
permission="read",
)
allowed = query.first()
if allowed:
allowed_channels.append(self._to_channel_model(allowed, db=db))
return allowed_channels
def get_channel_by_id_and_user_id(
self, id: str, user_id: str, db: Optional[Session] = None
) -> Optional[ChannelModel]:
with get_db_context(db) as db:
# Fetch the channel
channel: Channel = (
db.query(Channel)
.filter(
Channel.id == id,
Channel.deleted_at.is_(None),
Channel.archived_at.is_(None),
)
.first()
)
if not channel:
return None
# If the channel is a group or dm, read access requires membership (active)
if channel.type in ["group", "dm"]:
membership = (
db.query(ChannelMember)
.filter(
ChannelMember.channel_id == id,
ChannelMember.user_id == user_id,
ChannelMember.is_active.is_(True),
)
.first()
)
if membership:
return self._to_channel_model(channel, db=db)
else:
return None
# For channels that are NOT group/dm, fall back to ACL-based read access
query = db.query(Channel).filter(Channel.id == id)
# Determine user groups
user_group_ids = [
group.id for group in Groups.get_groups_by_member_id(user_id, db=db)
]
# Apply ACL rules
query = self._has_permission(
db,
query,
{"user_id": user_id, "group_ids": user_group_ids},
permission="read",
)
channel_allowed = query.first()
return (
self._to_channel_model(channel_allowed, db=db)
if channel_allowed
else None
)
def update_channel_by_id(
self, id: str, form_data: ChannelForm, db: Optional[Session] = None
) -> Optional[ChannelModel]:
with get_db_context(db) as db:
channel = db.query(Channel).filter(Channel.id == id).first()
if not channel:
return None
channel.name = form_data.name
channel.description = form_data.description
channel.is_private = form_data.is_private
channel.data = form_data.data
channel.meta = form_data.meta
if form_data.access_grants is not None:
AccessGrants.set_access_grants(
"channel", id, form_data.access_grants, db=db
)
channel.updated_at = int(time.time_ns())
db.commit()
return self._to_channel_model(channel, db=db) if channel else None
def add_file_to_channel_by_id(
self, channel_id: str, file_id: str, user_id: str, db: Optional[Session] = None
) -> Optional[ChannelFileModel]:
with get_db_context(db) as db:
channel_file = ChannelFileModel(
**{
"id": str(uuid.uuid4()),
"channel_id": channel_id,
"file_id": file_id,
"user_id": user_id,
"created_at": int(time.time()),
"updated_at": int(time.time()),
}
)
try:
result = ChannelFile(**channel_file.model_dump())
db.add(result)
db.commit()
db.refresh(result)
if result:
return ChannelFileModel.model_validate(result)
else:
return None
except Exception:
return None
def set_file_message_id_in_channel_by_id(
self,
channel_id: str,
file_id: str,
message_id: str,
db: Optional[Session] = None,
) -> bool:
try:
with get_db_context(db) as db:
channel_file = (
db.query(ChannelFile)
.filter_by(channel_id=channel_id, file_id=file_id)
.first()
)
if not channel_file:
return False
channel_file.message_id = message_id
channel_file.updated_at = int(time.time())
db.commit()
return True
except Exception:
return False
def remove_file_from_channel_by_id(
self, channel_id: str, file_id: str, db: Optional[Session] = None
) -> bool:
try:
with get_db_context(db) as db:
db.query(ChannelFile).filter_by(
channel_id=channel_id, file_id=file_id
).delete()
db.commit()
return True
except Exception:
return False
def delete_channel_by_id(self, id: str, db: Optional[Session] = None) -> bool:
with get_db_context(db) as db:
AccessGrants.revoke_all_access("channel", id, db=db)
db.query(Channel).filter(Channel.id == id).delete()
db.commit()
return True
####################
# Webhook Methods
####################
def insert_webhook(
self,
channel_id: str,
user_id: str,
form_data: ChannelWebhookForm,
db: Optional[Session] = None,
) -> Optional[ChannelWebhookModel]:
with get_db_context(db) as db:
webhook = ChannelWebhookModel(
id=str(uuid.uuid4()),
channel_id=channel_id,
user_id=user_id,
name=form_data.name,
profile_image_url=form_data.profile_image_url,
token=secrets.token_urlsafe(32),
last_used_at=None,
created_at=int(time.time_ns()),
updated_at=int(time.time_ns()),
)
db.add(ChannelWebhook(**webhook.model_dump()))
db.commit()
return webhook
def get_webhooks_by_channel_id(
self, channel_id: str, db: Optional[Session] = None
) -> list[ChannelWebhookModel]:
with get_db_context(db) as db:
webhooks = (
db.query(ChannelWebhook)
.filter(ChannelWebhook.channel_id == channel_id)
.all()
)
return [ChannelWebhookModel.model_validate(w) for w in webhooks]
def get_webhook_by_id(
self, webhook_id: str, db: Optional[Session] = None
) -> Optional[ChannelWebhookModel]:
with get_db_context(db) as db:
webhook = (
db.query(ChannelWebhook).filter(ChannelWebhook.id == webhook_id).first()
)
return ChannelWebhookModel.model_validate(webhook) if webhook else None
def get_webhook_by_id_and_token(
self, webhook_id: str, token: str, db: Optional[Session] = None
) -> Optional[ChannelWebhookModel]:
with get_db_context(db) as db:
webhook = (
db.query(ChannelWebhook)
.filter(
ChannelWebhook.id == webhook_id,
ChannelWebhook.token == token,
)
.first()
)
return ChannelWebhookModel.model_validate(webhook) if webhook else None
def update_webhook_by_id(
self,
webhook_id: str,
form_data: ChannelWebhookForm,
db: Optional[Session] = None,
) -> Optional[ChannelWebhookModel]:
with get_db_context(db) as db:
webhook = (
db.query(ChannelWebhook).filter(ChannelWebhook.id == webhook_id).first()
)
if not webhook:
return None
webhook.name = form_data.name
webhook.profile_image_url = form_data.profile_image_url
webhook.updated_at = int(time.time_ns())
db.commit()
return ChannelWebhookModel.model_validate(webhook)
def update_webhook_last_used_at(
self, webhook_id: str, db: Optional[Session] = None
) -> bool:
with get_db_context(db) as db:
webhook = (
db.query(ChannelWebhook).filter(ChannelWebhook.id == webhook_id).first()
)
if not webhook:
return False
webhook.last_used_at = int(time.time_ns())
db.commit()
return True
def delete_webhook_by_id(
self, webhook_id: str, db: Optional[Session] = None
) -> bool:
with get_db_context(db) as db:
result = (
db.query(ChannelWebhook)
.filter(ChannelWebhook.id == webhook_id)
.delete()
)
db.commit()
return result > 0
Channels = ChannelTable()