feat: add streaming learning assistant and knowledge base scaffolding

This commit is contained in:
刘金宝
2026-06-10 09:32:36 +08:00
parent f0cdc454b3
commit 89258ab448
31 changed files with 2021 additions and 330 deletions
+177
View File
@@ -0,0 +1,177 @@
import hashlib
from pathlib import Path
from fastapi import UploadFile
from sqlalchemy.orm import Session
from app.core.config import settings
from app.core.context import UserContext
from app.core.exceptions import AppError
from app.integrations.milvus_adapter import MilvusVectorStore
from app.integrations.pdf_parser import PdfParser
from app.repositories.knowledge_base_repository import KnowledgeBaseRepository
from app.schemas.knowledge_admin import KnowledgeDocumentUploadResponse
from app.services.document_chunk_service import DocumentChunkService
from app.services.embedding_service import EmbeddingService
from app.services.knowledge_space_service import KnowledgeSpaceService
class DocumentIngestionService:
"""知识入库服务:处理 PDF 上传、解析、分片、向量化和 Milvus 写入。"""
def __init__(
self,
db: Session,
*,
parser: PdfParser | None = None,
chunker: DocumentChunkService | None = None,
embedding_service: EmbeddingService | None = None,
vector_store: MilvusVectorStore | None = None,
) -> None:
self.db = db
self.repo = KnowledgeBaseRepository(db)
self.space_service = KnowledgeSpaceService(self.repo)
self.parser = parser or PdfParser()
self.chunker = chunker or DocumentChunkService()
self.embedding_service = embedding_service or EmbeddingService()
self.vector_store = vector_store or MilvusVectorStore()
async def upload_pdf(
self,
ctx: UserContext,
file: UploadFile,
*,
document_title: str | None,
document_category: str,
version: str,
) -> KnowledgeDocumentUploadResponse:
"""文档上传:内容管理员上传 PDF 后创建知识文档并触发构建任务。"""
self.space_service.ensure_content_admin(ctx)
space = self.space_service.get_or_create_space(ctx)
content = await file.read()
self._validate_pdf(file, content)
file_sha256 = hashlib.sha256(content).hexdigest()
existing = self.repo.get_document_by_hash(space.institution_id, file_sha256)
if existing:
task = self.repo.create_ingestion_task(existing) if existing.status == "failed" else None
return KnowledgeDocumentUploadResponse(
document_id=existing.id,
task_id=task.id if task else None,
duplicate=True,
status=existing.status,
parse_status=existing.parse_status,
embedding_status=existing.embedding_status,
chunk_count=existing.chunk_count,
collection_name=space.collection_name,
)
storage_path = self._save_file(space.institution_id, file.filename or "knowledge.pdf", content)
document = self.repo.create_document(
institution_id=space.institution_id,
uploaded_by=ctx.user_id,
file_name=file.filename or storage_path.name,
file_sha256=file_sha256,
file_size=len(content),
file_path=str(storage_path),
document_title=document_title or Path(file.filename or "").stem or None,
document_category=document_category,
version=version,
)
task = self.repo.create_ingestion_task(document)
if settings.knowledge_ingestion_sync:
await self.ingest_document(document.id, task.id)
else:
self._enqueue_async_task(document.id, task.id)
return KnowledgeDocumentUploadResponse(
document_id=document.id,
task_id=task.id,
duplicate=False,
status=document.status,
parse_status=document.parse_status,
embedding_status=document.embedding_status,
chunk_count=document.chunk_count,
collection_name=space.collection_name,
)
async def ingest_document(self, document_id: int, task_id: int | None = None) -> None:
"""知识构建:把已上传 PDF 转换为 MySQL 分片和 Milvus 向量。"""
document = self.repo.get_document(document_id)
if not document:
raise AppError("KNOWLEDGE_DOCUMENT_NOT_FOUND", "knowledge document not found", 404)
task = self.repo.get_ingestion_task(task_id) if task_id else None
try:
if task:
self.repo.update_task(task, status="running", progress=5, current_step="parse_pdf")
document.status = "processing"
document.parse_status = "running"
self.db.flush()
pages = self.parser.parse(document.file_path)
space = self.repo.get_or_create_space(document.institution_id, None)
drafts = self.chunker.build_chunks(pages)
chunks = self.chunker.to_models(
institution_id=document.institution_id,
document_id=document.id,
collection_name=space.collection_name,
embedding_model=settings.embedding_model,
drafts=drafts,
)
if task:
self.repo.update_task(task, progress=35, current_step="embed_chunks")
document.parse_status = "success"
document.embedding_status = "running"
self.db.flush()
vectors, _embedding_latency_ms = await self.embedding_service.embed_texts([chunk.chunk_text for chunk in chunks])
if task:
self.repo.update_task(task, progress=75, current_step="write_vectors")
self.vector_store.upsert_vectors(
space.collection_name,
[(chunk.chunk_uid, vector) for chunk, vector in zip(chunks, vectors)],
)
self.repo.replace_chunks(document, chunks)
document.status = "ready"
document.embedding_status = "success"
if task:
self.repo.update_task(task, status="success", progress=100, current_step="completed")
except Exception as exc:
document.status = "failed"
document.error_message = str(exc)[:2000]
document.parse_status = document.parse_status if document.parse_status == "success" else "failed"
document.embedding_status = "failed"
if task:
self.repo.update_task(task, status="failed", progress=100, current_step="failed", error_message=str(exc)[:2000])
if isinstance(exc, AppError):
raise
raise AppError("KNOWLEDGE_INGESTION_FAILED", "knowledge document ingestion failed", 500) from exc
def _validate_pdf(self, file: UploadFile, content: bytes) -> None:
"""上传校验:限制文件类型和大小,只允许 PDF 文档进入知识库。"""
if not content:
raise AppError("UPLOAD_FILE_EMPTY", "uploaded file is empty", 422)
max_bytes = settings.knowledge_max_upload_mb * 1024 * 1024
if len(content) > max_bytes:
raise AppError("UPLOAD_FILE_TOO_LARGE", f"uploaded file exceeds {settings.knowledge_max_upload_mb}MB", 413)
filename = (file.filename or "").lower()
if not filename.endswith(".pdf") and file.content_type not in {"application/pdf", "application/octet-stream"}:
raise AppError("UPLOAD_FILE_TYPE_INVALID", "only pdf file is supported", 422)
if not content.startswith(b"%PDF"):
raise AppError("UPLOAD_FILE_NOT_PDF", "uploaded file is not a valid pdf", 422)
def _save_file(self, institution_id: int, filename: str, content: bytes) -> Path:
"""文件保存:按机构隔离保存原始 PDF,供后续重建知识库。"""
safe_name = "".join(ch if ch.isalnum() or ch in {".", "_", "-"} else "_" for ch in filename)
storage_dir = Path(settings.knowledge_storage_dir) / "raw" / str(institution_id)
storage_dir.mkdir(parents=True, exist_ok=True)
target = storage_dir / f"{hashlib.sha256(content).hexdigest()[:16]}_{safe_name}"
target.write_bytes(content)
return target
def _enqueue_async_task(self, document_id: int, task_id: int) -> None:
"""异步投递:生产环境通过 Celery worker 执行 PDF 知识库构建。"""
try:
from app.tasks.knowledge_ingestion_tasks import ingest_knowledge_document
ingest_knowledge_document.delay(document_id, task_id)
except Exception as exc: # pragma: no cover - Celery 未运行时保留任务 queued 状态
raise AppError("KNOWLEDGE_TASK_ENQUEUE_FAILED", "knowledge ingestion task enqueue failed", 500) from exc