Files
fastapi/app/services/document_ingestion_service.py
T

178 lines
8.4 KiB
Python

import hashlib
from pathlib import Path
from fastapi import UploadFile
from sqlalchemy.orm import Session
from app.core.config import settings
from app.core.context import UserContext
from app.core.exceptions import AppError
from app.integrations.milvus_adapter import MilvusVectorStore
from app.integrations.pdf_parser import PdfParser
from app.repositories.knowledge_base_repository import KnowledgeBaseRepository
from app.schemas.knowledge_admin import KnowledgeDocumentUploadResponse
from app.services.document_chunk_service import DocumentChunkService
from app.services.embedding_service import EmbeddingService
from app.services.knowledge_space_service import KnowledgeSpaceService
class DocumentIngestionService:
"""知识入库服务:处理 PDF 上传、解析、分片、向量化和 Milvus 写入。"""
def __init__(
self,
db: Session,
*,
parser: PdfParser | None = None,
chunker: DocumentChunkService | None = None,
embedding_service: EmbeddingService | None = None,
vector_store: MilvusVectorStore | None = None,
) -> None:
self.db = db
self.repo = KnowledgeBaseRepository(db)
self.space_service = KnowledgeSpaceService(self.repo)
self.parser = parser or PdfParser()
self.chunker = chunker or DocumentChunkService()
self.embedding_service = embedding_service or EmbeddingService()
self.vector_store = vector_store or MilvusVectorStore()
async def upload_pdf(
self,
ctx: UserContext,
file: UploadFile,
*,
document_title: str | None,
document_category: str,
version: str,
) -> KnowledgeDocumentUploadResponse:
"""文档上传:内容管理员上传 PDF 后创建知识文档并触发构建任务。"""
self.space_service.ensure_content_admin(ctx)
space = self.space_service.get_or_create_space(ctx)
content = await file.read()
self._validate_pdf(file, content)
file_sha256 = hashlib.sha256(content).hexdigest()
existing = self.repo.get_document_by_hash(space.institution_id, file_sha256)
if existing:
task = self.repo.create_ingestion_task(existing) if existing.status == "failed" else None
return KnowledgeDocumentUploadResponse(
document_id=existing.id,
task_id=task.id if task else None,
duplicate=True,
status=existing.status,
parse_status=existing.parse_status,
embedding_status=existing.embedding_status,
chunk_count=existing.chunk_count,
collection_name=space.collection_name,
)
storage_path = self._save_file(space.institution_id, file.filename or "knowledge.pdf", content)
document = self.repo.create_document(
institution_id=space.institution_id,
uploaded_by=ctx.user_id,
file_name=file.filename or storage_path.name,
file_sha256=file_sha256,
file_size=len(content),
file_path=str(storage_path),
document_title=document_title or Path(file.filename or "").stem or None,
document_category=document_category,
version=version,
)
task = self.repo.create_ingestion_task(document)
if settings.knowledge_ingestion_sync:
await self.ingest_document(document.id, task.id)
else:
self._enqueue_async_task(document.id, task.id)
return KnowledgeDocumentUploadResponse(
document_id=document.id,
task_id=task.id,
duplicate=False,
status=document.status,
parse_status=document.parse_status,
embedding_status=document.embedding_status,
chunk_count=document.chunk_count,
collection_name=space.collection_name,
)
async def ingest_document(self, document_id: int, task_id: int | None = None) -> None:
"""知识构建:把已上传 PDF 转换为 MySQL 分片和 Milvus 向量。"""
document = self.repo.get_document(document_id)
if not document:
raise AppError("KNOWLEDGE_DOCUMENT_NOT_FOUND", "knowledge document not found", 404)
task = self.repo.get_ingestion_task(task_id) if task_id else None
try:
if task:
self.repo.update_task(task, status="running", progress=5, current_step="parse_pdf")
document.status = "processing"
document.parse_status = "running"
self.db.flush()
pages = self.parser.parse(document.file_path)
space = self.repo.get_or_create_space(document.institution_id, None)
drafts = self.chunker.build_chunks(pages)
chunks = self.chunker.to_models(
institution_id=document.institution_id,
document_id=document.id,
collection_name=space.collection_name,
embedding_model=settings.embedding_model,
drafts=drafts,
)
if task:
self.repo.update_task(task, progress=35, current_step="embed_chunks")
document.parse_status = "success"
document.embedding_status = "running"
self.db.flush()
vectors, _embedding_latency_ms = await self.embedding_service.embed_texts([chunk.chunk_text for chunk in chunks])
if task:
self.repo.update_task(task, progress=75, current_step="write_vectors")
self.vector_store.upsert_vectors(
space.collection_name,
[(chunk.chunk_uid, vector) for chunk, vector in zip(chunks, vectors)],
)
self.repo.replace_chunks(document, chunks)
document.status = "ready"
document.embedding_status = "success"
if task:
self.repo.update_task(task, status="success", progress=100, current_step="completed")
except Exception as exc:
document.status = "failed"
document.error_message = str(exc)[:2000]
document.parse_status = document.parse_status if document.parse_status == "success" else "failed"
document.embedding_status = "failed"
if task:
self.repo.update_task(task, status="failed", progress=100, current_step="failed", error_message=str(exc)[:2000])
if isinstance(exc, AppError):
raise
raise AppError("KNOWLEDGE_INGESTION_FAILED", "knowledge document ingestion failed", 500) from exc
def _validate_pdf(self, file: UploadFile, content: bytes) -> None:
"""上传校验:限制文件类型和大小,只允许 PDF 文档进入知识库。"""
if not content:
raise AppError("UPLOAD_FILE_EMPTY", "uploaded file is empty", 422)
max_bytes = settings.knowledge_max_upload_mb * 1024 * 1024
if len(content) > max_bytes:
raise AppError("UPLOAD_FILE_TOO_LARGE", f"uploaded file exceeds {settings.knowledge_max_upload_mb}MB", 413)
filename = (file.filename or "").lower()
if not filename.endswith(".pdf") and file.content_type not in {"application/pdf", "application/octet-stream"}:
raise AppError("UPLOAD_FILE_TYPE_INVALID", "only pdf file is supported", 422)
if not content.startswith(b"%PDF"):
raise AppError("UPLOAD_FILE_NOT_PDF", "uploaded file is not a valid pdf", 422)
def _save_file(self, institution_id: int, filename: str, content: bytes) -> Path:
"""文件保存:按机构隔离保存原始 PDF,供后续重建知识库。"""
safe_name = "".join(ch if ch.isalnum() or ch in {".", "_", "-"} else "_" for ch in filename)
storage_dir = Path(settings.knowledge_storage_dir) / "raw" / str(institution_id)
storage_dir.mkdir(parents=True, exist_ok=True)
target = storage_dir / f"{hashlib.sha256(content).hexdigest()[:16]}_{safe_name}"
target.write_bytes(content)
return target
def _enqueue_async_task(self, document_id: int, task_id: int) -> None:
"""异步投递:生产环境通过 Celery worker 执行 PDF 知识库构建。"""
try:
from app.tasks.knowledge_ingestion_tasks import ingest_knowledge_document
ingest_knowledge_document.delay(document_id, task_id)
except Exception as exc: # pragma: no cover - Celery 未运行时保留任务 queued 状态
raise AppError("KNOWLEDGE_TASK_ENQUEUE_FAILED", "knowledge ingestion task enqueue failed", 500) from exc