import hashlib from pathlib import Path from fastapi import UploadFile from sqlalchemy.orm import Session from app.core.config import settings from app.core.context import UserContext from app.core.exceptions import AppError from app.integrations.milvus_adapter import MilvusVectorStore from app.integrations.pdf_parser import PdfParser from app.repositories.knowledge_base_repository import KnowledgeBaseRepository from app.schemas.knowledge_admin import KnowledgeDocumentUploadResponse from app.services.document_chunk_service import DocumentChunkService from app.services.embedding_service import EmbeddingService from app.services.knowledge_space_service import KnowledgeSpaceService class DocumentIngestionService: """知识入库服务:处理 PDF 上传、解析、分片、向量化和 Milvus 写入。""" def __init__( self, db: Session, *, parser: PdfParser | None = None, chunker: DocumentChunkService | None = None, embedding_service: EmbeddingService | None = None, vector_store: MilvusVectorStore | None = None, ) -> None: self.db = db self.repo = KnowledgeBaseRepository(db) self.space_service = KnowledgeSpaceService(self.repo) self.parser = parser or PdfParser() self.chunker = chunker or DocumentChunkService() self.embedding_service = embedding_service or EmbeddingService() self.vector_store = vector_store or MilvusVectorStore() async def upload_pdf( self, ctx: UserContext, file: UploadFile, *, document_title: str | None, document_category: str, version: str, ) -> KnowledgeDocumentUploadResponse: """文档上传:内容管理员上传 PDF 后创建知识文档并触发构建任务。""" self.space_service.ensure_content_admin(ctx) space = self.space_service.get_or_create_space(ctx) content = await file.read() self._validate_pdf(file, content) file_sha256 = hashlib.sha256(content).hexdigest() existing = self.repo.get_document_by_hash(space.institution_id, file_sha256) if existing: task = self.repo.create_ingestion_task(existing) if existing.status == "failed" else None return KnowledgeDocumentUploadResponse( document_id=existing.id, task_id=task.id if task else None, duplicate=True, status=existing.status, parse_status=existing.parse_status, embedding_status=existing.embedding_status, chunk_count=existing.chunk_count, collection_name=space.collection_name, ) storage_path = self._save_file(space.institution_id, file.filename or "knowledge.pdf", content) document = self.repo.create_document( institution_id=space.institution_id, uploaded_by=ctx.user_id, file_name=file.filename or storage_path.name, file_sha256=file_sha256, file_size=len(content), file_path=str(storage_path), document_title=document_title or Path(file.filename or "").stem or None, document_category=document_category, version=version, ) task = self.repo.create_ingestion_task(document) if settings.knowledge_ingestion_sync: await self.ingest_document(document.id, task.id) else: self._enqueue_async_task(document.id, task.id) return KnowledgeDocumentUploadResponse( document_id=document.id, task_id=task.id, duplicate=False, status=document.status, parse_status=document.parse_status, embedding_status=document.embedding_status, chunk_count=document.chunk_count, collection_name=space.collection_name, ) async def ingest_document(self, document_id: int, task_id: int | None = None) -> None: """知识构建:把已上传 PDF 转换为 MySQL 分片和 Milvus 向量。""" document = self.repo.get_document(document_id) if not document: raise AppError("KNOWLEDGE_DOCUMENT_NOT_FOUND", "knowledge document not found", 404) task = self.repo.get_ingestion_task(task_id) if task_id else None try: if task: self.repo.update_task(task, status="running", progress=5, current_step="parse_pdf") document.status = "processing" document.parse_status = "running" self.db.flush() pages = self.parser.parse(document.file_path) space = self.repo.get_or_create_space(document.institution_id, None) drafts = self.chunker.build_chunks(pages) chunks = self.chunker.to_models( institution_id=document.institution_id, document_id=document.id, collection_name=space.collection_name, embedding_model=settings.embedding_model, drafts=drafts, ) if task: self.repo.update_task(task, progress=35, current_step="embed_chunks") document.parse_status = "success" document.embedding_status = "running" self.db.flush() vectors, _embedding_latency_ms = await self.embedding_service.embed_texts([chunk.chunk_text for chunk in chunks]) if task: self.repo.update_task(task, progress=75, current_step="write_vectors") self.vector_store.upsert_vectors( space.collection_name, [(chunk.chunk_uid, vector) for chunk, vector in zip(chunks, vectors)], ) self.repo.replace_chunks(document, chunks) document.status = "ready" document.embedding_status = "success" if task: self.repo.update_task(task, status="success", progress=100, current_step="completed") except Exception as exc: document.status = "failed" document.error_message = str(exc)[:2000] document.parse_status = document.parse_status if document.parse_status == "success" else "failed" document.embedding_status = "failed" if task: self.repo.update_task(task, status="failed", progress=100, current_step="failed", error_message=str(exc)[:2000]) if isinstance(exc, AppError): raise raise AppError("KNOWLEDGE_INGESTION_FAILED", "knowledge document ingestion failed", 500) from exc def _validate_pdf(self, file: UploadFile, content: bytes) -> None: """上传校验:限制文件类型和大小,只允许 PDF 文档进入知识库。""" if not content: raise AppError("UPLOAD_FILE_EMPTY", "uploaded file is empty", 422) max_bytes = settings.knowledge_max_upload_mb * 1024 * 1024 if len(content) > max_bytes: raise AppError("UPLOAD_FILE_TOO_LARGE", f"uploaded file exceeds {settings.knowledge_max_upload_mb}MB", 413) filename = (file.filename or "").lower() if not filename.endswith(".pdf") and file.content_type not in {"application/pdf", "application/octet-stream"}: raise AppError("UPLOAD_FILE_TYPE_INVALID", "only pdf file is supported", 422) if not content.startswith(b"%PDF"): raise AppError("UPLOAD_FILE_NOT_PDF", "uploaded file is not a valid pdf", 422) def _save_file(self, institution_id: int, filename: str, content: bytes) -> Path: """文件保存:按机构隔离保存原始 PDF,供后续重建知识库。""" safe_name = "".join(ch if ch.isalnum() or ch in {".", "_", "-"} else "_" for ch in filename) storage_dir = Path(settings.knowledge_storage_dir) / "raw" / str(institution_id) storage_dir.mkdir(parents=True, exist_ok=True) target = storage_dir / f"{hashlib.sha256(content).hexdigest()[:16]}_{safe_name}" target.write_bytes(content) return target def _enqueue_async_task(self, document_id: int, task_id: int) -> None: """异步投递:生产环境通过 Celery worker 执行 PDF 知识库构建。""" try: from app.tasks.knowledge_ingestion_tasks import ingest_knowledge_document ingest_knowledge_document.delay(document_id, task_id) except Exception as exc: # pragma: no cover - Celery 未运行时保留任务 queued 状态 raise AppError("KNOWLEDGE_TASK_ENQUEUE_FAILED", "knowledge ingestion task enqueue failed", 500) from exc