Files
fastapi/app/services/knowledge_space_service.py
T

40 lines
2.0 KiB
Python
Raw Normal View History

from app.core.context import UserContext
from app.core.exceptions import AppError
from app.models.knowledge_base import KbKnowledgeSpace
from app.repositories.knowledge_base_repository import KnowledgeBaseRepository
class KnowledgeSpaceService:
"""知识空间服务:按用户所属机构定位知识库 collection。"""
def __init__(self, repo: KnowledgeBaseRepository) -> None:
self.repo = repo
def require_institution_id(self, ctx: UserContext) -> int:
"""机构校验:知识库能力必须绑定 Django 用户中心返回的 institution_id。"""
if ctx.institution_id is None:
raise AppError("INSTITUTION_REQUIRED", "institution_id is required for knowledge base", 403)
return int(ctx.institution_id)
def get_or_create_space(self, ctx: UserContext) -> KbKnowledgeSpace:
"""知识空间获取:内容管理员上传文档时自动创建机构知识空间。"""
institution_id = self.require_institution_id(ctx)
profile = ctx.profile or {}
institution_name = profile.get("institution_name") or f"institution_{institution_id}"
return self.repo.get_or_create_space(institution_id, institution_name)
def get_active_space(self, ctx: UserContext) -> KbKnowledgeSpace:
"""知识空间读取:AI 学习助手问答时读取机构当前可用知识空间。"""
institution_id = self.require_institution_id(ctx)
space = self.repo.get_space(institution_id)
if not space:
raise AppError("KNOWLEDGE_SPACE_NOT_FOUND", "knowledge space not initialized for institution", 404)
return space
def ensure_content_admin(self, ctx: UserContext) -> None:
"""权限校验:仅内容管理员或系统管理员可以上传并构建机构知识库。"""
role = (ctx.role or "").lower()
allowed_roles = {"content_admin", "institution_admin", "admin", "super_admin"}
if role not in allowed_roles:
raise AppError("KNOWLEDGE_ADMIN_FORBIDDEN", "only content admin can upload knowledge documents", 403)