import html import json import uuid from datetime import datetime from pathlib import Path from types import SimpleNamespace from typing import Any from sqlalchemy import select from sqlalchemy.orm import Session, selectinload from app.core.config import settings from app.core.exceptions import AppError from app.models.training import TrainingSession from app.models.training_record import TrainingRecord from app.repositories.evaluation_repository import EvaluationRepository class PdfExportService: """PDF 导出服务:把 training_record 渲染为包含评分细则、检查证据和改进计划的报告。""" def __init__(self, db: Session) -> None: self.db = db self.repo = EvaluationRepository(db) def export(self, evaluation_id: int, user_id: str) -> SimpleNamespace: """报告导出:校验 training_record 归属后生成 PDF,并回写 PDF 路径。""" record = self.repo.get_owned_record(evaluation_id, user_id) if not record: raise AppError("EVALUATION_NOT_FOUND", "evaluation not found or not owned by current user", 404) session = self._get_session(record.session_id) if record.session_id else None output_dir = Path(settings.report_storage_dir) output_dir.mkdir(parents=True, exist_ok=True) timestamp = datetime.utcnow().strftime("%Y%m%d%H%M%S") file_name = f"training_record_{record.id}_{record.score_type}_{timestamp}_{uuid.uuid4().hex[:6]}.pdf" file_path = output_dir / file_name score_details = self.repo.list_score_details(record.id) self._write_pdf(file_path, record, session, score_details) record.pdf_file_path = str(file_path) recommendation = dict(record.recommendation_result or {}) recommendation["pdf_file_path"] = str(file_path) record.recommendation_result = recommendation self.repo.flush() return SimpleNamespace(id=record.id, file_path=str(file_path)) def _get_session(self, session_id: int) -> TrainingSession | None: """会话读取:加载报告展示需要的病例、检查申请和诊断治疗提交。""" stmt = ( select(TrainingSession) .options( selectinload(TrainingSession.case), selectinload(TrainingSession.orders), selectinload(TrainingSession.submission), ) .where(TrainingSession.id == session_id) ) return self.db.scalar(stmt) def _write_pdf(self, file_path: Path, record: TrainingRecord, session: TrainingSession | None, score_details: list | None = None) -> None: """PDF 写入:使用 reportlab 生成 Acrobat 可正常打开的标准 PDF。""" try: from reportlab.lib import colors from reportlab.lib.enums import TA_CENTER, TA_LEFT from reportlab.lib.pagesizes import A4 from reportlab.lib.styles import ParagraphStyle, getSampleStyleSheet from reportlab.lib.units import mm from reportlab.pdfbase import pdfmetrics from reportlab.pdfbase.cidfonts import UnicodeCIDFont from reportlab.pdfbase.ttfonts import TTFont from reportlab.platypus import Paragraph, SimpleDocTemplate, Spacer, Table, TableStyle font_name = self._register_report_font(pdfmetrics, TTFont, UnicodeCIDFont) base_styles = getSampleStyleSheet() styles = { "title": ParagraphStyle( "McaTitle", parent=base_styles["Title"], fontName=font_name, fontSize=20, leading=28, alignment=TA_CENTER, textColor=colors.HexColor("#17365D"), spaceAfter=12, ), "h2": ParagraphStyle( "McaHeading", parent=base_styles["Heading2"], fontName=font_name, fontSize=13, leading=18, alignment=TA_LEFT, textColor=colors.HexColor("#1F4E79"), spaceBefore=8, spaceAfter=6, ), "body": ParagraphStyle( "McaBody", parent=base_styles["BodyText"], fontName=font_name, fontSize=10, leading=15, textColor=colors.HexColor("#1F2937"), wordWrap="CJK", ), "small": ParagraphStyle( "McaSmall", parent=base_styles["BodyText"], fontName=font_name, fontSize=8.8, leading=12.5, textColor=colors.HexColor("#4B5563"), wordWrap="CJK", ), "thead": ParagraphStyle("McaThead", parent=base_styles["BodyText"], fontName=font_name, fontSize=9, leading=12, textColor=colors.white, alignment=TA_CENTER), "td": ParagraphStyle("McaTd", parent=base_styles["BodyText"], fontName=font_name, fontSize=8.5, leading=12, textColor=colors.HexColor("#111827"), wordWrap="CJK"), } context = { "Paragraph": Paragraph, "Spacer": Spacer, "Table": Table, "TableStyle": TableStyle, "colors": colors, "styles": styles, "mm": mm, } doc = SimpleDocTemplate( str(file_path), pagesize=A4, leftMargin=16 * mm, rightMargin=16 * mm, topMargin=14 * mm, bottomMargin=14 * mm, title="医疗问诊 Agent 训练评价报告", ) doc.build(self._build_story(record, session, context, score_details or [])) except ModuleNotFoundError: self._write_minimal_pdf(file_path, record) except Exception as exc: if file_path.exists(): file_path.unlink(missing_ok=True) raise AppError("PDF_EXPORT_FAILED", "PDF report export failed", 500) from exc def _write_minimal_pdf(self, file_path: Path, record: TrainingRecord) -> None: """兜底 PDF:缺少 reportlab 时生成标准可打开的极简 PDF,生产环境仍使用 reportlab 模板。""" lines = [ "Medical Consultation Agent Evaluation Report", f"Record ID: {record.id}", f"User ID: {record.external_user_id}", f"Score: {float(record.total_score or 0):g} ({record.score_type})", f"Level: {record.evaluation_level}", "This fallback PDF is generated because reportlab is not installed.", ] text_ops = [] y = 780 for line in lines: safe = line.encode("latin-1", "replace").decode("latin-1").replace("\\", "\\\\").replace("(", "\\(").replace(")", "\\)") text_ops.append(f"BT /F1 12 Tf 50 {y} Td ({safe}) Tj ET") y -= 22 stream = "\n".join(text_ops).encode("latin-1") if len(stream) < 900: stream += b"\n% " + (b"fallback-pdf-padding " * 40) objects = [ b"<< /Type /Catalog /Pages 2 0 R >>", b"<< /Type /Pages /Kids [3 0 R] /Count 1 >>", b"<< /Type /Page /Parent 2 0 R /MediaBox [0 0 595 842] /Resources << /Font << /F1 4 0 R >> >> /Contents 5 0 R >>", b"<< /Type /Font /Subtype /Type1 /BaseFont /Helvetica >>", b"<< /Length " + str(len(stream)).encode("ascii") + b" >>\nstream\n" + stream + b"\nendstream", ] content = bytearray(b"%PDF-1.4\n%\xe2\xe3\xcf\xd3\n") offsets = [0] for index, obj in enumerate(objects, start=1): offsets.append(len(content)) content.extend(f"{index} 0 obj\n".encode("ascii")) content.extend(obj) content.extend(b"\nendobj\n") xref_offset = len(content) content.extend(f"xref\n0 {len(objects) + 1}\n".encode("ascii")) content.extend(b"0000000000 65535 f \n") for offset in offsets[1:]: content.extend(f"{offset:010d} 00000 n \n".encode("ascii")) content.extend( f"trailer\n<< /Size {len(objects) + 1} /Root 1 0 R >>\nstartxref\n{xref_offset}\n%%EOF\n".encode("ascii") ) file_path.write_bytes(bytes(content)) def _build_story(self, record: TrainingRecord, session: TrainingSession | None, context: dict[str, Any], score_details: list | None = None) -> list: """报告模板:按基本信息、病例、提交、检查、评分细则和改进计划组织内容。""" Paragraph = context["Paragraph"] Spacer = context["Spacer"] styles = context["styles"] mm = context["mm"] case = session.case if session else None submission = session.submission if session else None orders = sorted(session.orders, key=lambda item: item.ordered_at) if session else [] structured = record.ai_feedback_structured or {} story = [Paragraph("医疗问诊 Agent 训练评价报告", styles["title"])] story.append(Paragraph("本报告仅用于医学教学训练评价,不替代真实临床诊疗。", styles["small"])) story.append(Spacer(1, 4 * mm)) self._append_kv_section( story, "一、报告基本信息", [ ("训练记录", record.id), ("用户 ID", record.external_user_id), ("病例", case.title if case else f"case_id={record.case_id}"), ("训练模式", self._mode_label(record.training_mode)), ("训练类别", record.case_type), ("评分类型", self._score_type_label(record.score_type)), ("总分", self._score_text(record)), ("评价等级", record.evaluation_level), ("生成时间", self._format_datetime(record.created_at)), ("模型", structured.get("llm_model") or "未记录"), ("模型耗时", self._latency_text(structured.get("latency_metrics"))), ], context, ) self._append_case_section(story, case, context) self._append_submission_section(story, submission, context) self._append_order_section(story, orders, context) self._append_dimension_section(story, score_details or structured.get("dimension_scores") or [], context) self._append_error_section(story, structured.get("errors") or record.wrong_points or [], context) self._append_list_section(story, "七、改进计划", structured.get("improvement_plan") or (record.recommendation_result or {}).get("improvement_plan") or [], context) self._append_list_section(story, "八、证据摘要", structured.get("evidence_summary") or [], context) self._append_guideline_section(story, structured.get("guideline_refs") or [], context) story.append(Spacer(1, 4 * mm)) story.append(Paragraph(f"综合评价:{self._safe_text(structured.get('overall_comment') or record.feedback)}", styles["body"])) return story def _append_case_section(self, story: list, case: Any, context: dict[str, Any]) -> None: """病例信息分节:展示病例主诉、关键症状、关键检查和标准诊断。""" if not case: return patient_gender = {"male": "男", "female": "女"}.get(case.patient_gender or "", case.patient_gender or "未记录") self._append_kv_section( story, "二、病例与考核要点", [ ("患者", f"{case.patient_age or '未记录'} 岁,{patient_gender}"), ("主诉", case.chief_complaint), ("关键症状", self._join(case.key_symptoms)), ("关键检查", self._join(case.key_exams)), ("考核要点", self._join(case.key_points)), ("标准诊断", case.diagnosis_primary), ("诊断依据", case.diagnosis_basis), ], context, ) def _append_submission_section(self, story: list, submission: Any, context: dict[str, Any]) -> None: """学生提交分节:展示学生最终诊断、治疗、沟通和随访内容。""" self._append_kv_section( story, "三、学生诊断与治疗提交", [ ("主要诊断", getattr(submission, "primary_diagnosis", None)), ("鉴别诊断", self._join(getattr(submission, "differential_diagnoses", None))), ("诊断依据", getattr(submission, "diagnosis_basis", None)), ("治疗原则", getattr(submission, "treatment_principle", None)), ("治疗措施", getattr(submission, "treatment_measures", None)), ("风险预案", getattr(submission, "risk_plan", None)), ("医患沟通", getattr(submission, "communication", None)), ("随访安排", getattr(submission, "follow_up", None)), ], context, ) def _append_order_section(self, story: list, orders: list, context: dict[str, Any]) -> None: """检查结果分节:展示用户申请过的检查/检验项目和数据库固定结果。""" self._append_title(story, "四、检查/检验申请与结果", context) if not orders: story.append(context["Paragraph"]("本次训练未申请检查/检验。", context["styles"]["body"])) return rows = [["项目", "类型", "结果", "关键", "异常"]] for order in orders: rows.append([order.item_name, order.item_type, order.result_text, "是" if order.is_key else "否", "是" if order.is_abnormal else "否"]) self._append_table(story, rows, [72, 52, 285, 38, 38], context) def _append_dimension_section(self, story: list, dimensions: list, context: dict[str, Any]) -> None: """评分细则分节:逐项展示得分、评价、证据、扣分原因和改进动作。""" self._append_title(story, "五、维度评分细则", context) if not dimensions: story.append(context["Paragraph"]("暂无维度评分。", context["styles"]["body"])) return rows = [["维度", "得分", "扣分原因", "置信度", "评语"]] for item in dimensions: payload = self._score_detail_payload(item) rows.append([ payload["dimension"], payload["score_text"], payload["deducted_reason"], payload["ai_confidence"], payload["comment"], ]) self._append_table(story, rows, [70, 45, 160, 45, 165], context) for index, item in enumerate(dimensions, start=1): payload = self._score_detail_payload(item) title = f"{index}. {payload['dimension']}:{payload['score_text']}" story.append(context["Paragraph"](self._safe_text(title), context["styles"]["body"])) self._append_list_section(story, "证据", payload["evidence"], context, compact=True) self._append_list_section(story, "扣分原因", [payload["deducted_reason"]] if payload["deducted_reason"] else [], context, compact=True) if payload["comment"]: story.append(context["Paragraph"](f"评语:{self._safe_text(payload['comment'])}", context["styles"]["small"])) def _score_detail_payload(self, item: Any) -> dict[str, Any]: """评分明细展示:兼容 training_score_detail ORM 和旧 dimension_scores 字典。""" if isinstance(item, dict): score = item.get("score", 0) max_score = item.get("max_score") return { "dimension": item.get("dimension", "未命名维度"), "score_text": f"{score} / {max_score}" if max_score is not None else str(score), "deducted_reason": item.get("deducted_reason") or ";".join(str(value) for value in item.get("deductions", []) if value), "ai_confidence": item.get("ai_confidence") or "未记录", "comment": item.get("comment") or item.get("improvement") or "", "evidence": item.get("evidence_message_ids") or item.get("evidence") or [], } score = getattr(item, "score", None) confidence = getattr(item, "ai_confidence", None) return { "dimension": getattr(item, "dimension", "未命名维度"), "score_text": f"{float(score):g}" if score is not None else "未记录", "deducted_reason": getattr(item, "deducted_reason", "") or "", "ai_confidence": f"{float(confidence):g}" if confidence is not None else "未记录", "comment": getattr(item, "comment", "") or "", "evidence": getattr(item, "evidence_message_ids", None) or [], } def _append_error_section(self, story: list, errors: list, context: dict[str, Any]) -> None: """扣分问题分节:展示模型识别出的关键问题和严重程度。""" self._append_title(story, "六、主要问题与扣分原因", context) if not errors: story.append(context["Paragraph"]("暂无明显扣分问题。", context["styles"]["body"])) return rows = [["问题", "维度", "严重度", "说明"]] for index, item in enumerate(errors, start=1): if isinstance(item, dict): rows.append( [ item.get("title") or f"问题 {index}", item.get("related_dimension") or "综合表现", item.get("severity") or "medium", item.get("description") or item.get("comment") or "", ] ) else: rows.append([f"问题 {index}", "综合表现", "medium", str(item)]) self._append_table(story, rows, [78, 70, 50, 287], context) def _append_guideline_section(self, story: list, refs: list, context: dict[str, Any]) -> None: """参考依据分节:展示评分时使用的指南或知识库片段。""" self._append_title(story, "九、参考指南与资料", context) if not refs: story.append(context["Paragraph"]("本次未命中外部评分指南,使用病例内置标准和评分规则。", context["styles"]["body"])) return rows = [["来源", "标题/内容", "相关性"]] for item in refs: if isinstance(item, dict): rows.append([item.get("source") or "知识库", item.get("title") or item.get("content") or "", item.get("score") or "已引用"]) else: rows.append(["知识库", str(item), "已引用"]) self._append_table(story, rows, [82, 320, 83], context) def _append_kv_section(self, story: list, title: str, rows: list[tuple[str, Any]], context: dict[str, Any]) -> None: """键值分节:以两列表格展示基本信息、病例信息和学生提交内容。""" self._append_title(story, title, context) table_rows = [["字段", "内容"]] table_rows.extend([[key, self._safe_text(value)] for key, value in rows]) self._append_table(story, table_rows, [95, 390], context) def _append_title(self, story: list, title: str, context: dict[str, Any]) -> None: """章节标题:统一 PDF 分节标题样式。""" story.append(context["Spacer"](1, 3 * context["mm"])) story.append(context["Paragraph"](self._safe_text(title), context["styles"]["h2"])) def _append_table(self, story: list, rows: list[list[Any]], col_widths: list[int], context: dict[str, Any]) -> None: """表格渲染:将文本转为可自动换行的 Paragraph 并应用统一表格样式。""" Paragraph = context["Paragraph"] Table = context["Table"] TableStyle = context["TableStyle"] colors = context["colors"] styles = context["styles"] rendered = [] for row_index, row in enumerate(rows): style = styles["thead"] if row_index == 0 else styles["td"] rendered.append([Paragraph(self._safe_text(cell), style) for cell in row]) table = Table(rendered, colWidths=col_widths, repeatRows=1, hAlign="LEFT") table.setStyle( TableStyle( [ ("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#2F75B5")), ("GRID", (0, 0), (-1, -1), 0.4, colors.HexColor("#D9E2F3")), ("VALIGN", (0, 0), (-1, -1), "TOP"), ("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, colors.HexColor("#F5F9FF")]), ("LEFTPADDING", (0, 0), (-1, -1), 5), ("RIGHTPADDING", (0, 0), (-1, -1), 5), ("TOPPADDING", (0, 0), (-1, -1), 5), ("BOTTOMPADDING", (0, 0), (-1, -1), 5), ] ) ) story.append(table) def _append_list_section(self, story: list, title: str, items: Any, context: dict[str, Any], compact: bool = False) -> None: """列表分节:展示证据摘要、扣分原因和改进计划。""" if not compact: self._append_title(story, title, context) elif title: story.append(context["Paragraph"](self._safe_text(title), context["styles"]["small"])) normalized = self._ensure_list(items) if not normalized: story.append(context["Paragraph"]("无", context["styles"]["small" if compact else "body"])) return for item in normalized: story.append(context["Paragraph"](f"- {self._safe_text(item)}", context["styles"]["small" if compact else "body"])) def _register_report_font(self, pdfmetrics: Any, TTFont: Any, UnicodeCIDFont: Any) -> str: """字体注册:优先嵌入 Windows 中文字体,提升 Adobe Acrobat DC 兼容性。""" font_name = "MCA-SimHei" try: pdfmetrics.getFont(font_name) return font_name except KeyError: pass for font_path in (Path("C:/Windows/Fonts/simhei.ttf"), Path("C:/Windows/Fonts/msyh.ttc"), Path("C:/Windows/Fonts/simsun.ttc")): if not font_path.exists(): continue try: pdfmetrics.registerFont(TTFont(font_name, str(font_path))) return font_name except Exception: continue pdfmetrics.registerFont(UnicodeCIDFont("STSong-Light")) return "STSong-Light" def _score_text(self, record: TrainingRecord) -> str: """分数展示:根据百分制或五分制输出总分文本。""" max_score = "100" if record.score_type == "percentage" else "5" return f"{float(record.total_score or 0):g} / {max_score}" def _latency_text(self, latency_metrics: dict | None) -> str: """模型耗时展示:读取评价生成阶段记录的 LLM 延迟。""" if not latency_metrics: return "未记录" value = latency_metrics.get("scoring_latency_ms") return f"{value} ms" if value is not None else self._safe_text(latency_metrics) def _score_type_label(self, score_type: str) -> str: """评分类型标签:转换内部枚举为中文显示。""" return {"percentage": "百分制", "five_point": "五分制"}.get(score_type, score_type) def _mode_label(self, mode: str) -> str: """训练模式标签:转换内部枚举为中文显示。""" return {"practice": "练习模式", "teaching": "教学互动模式", "novice": "练习模式"}.get(mode, mode) def _format_datetime(self, value: datetime | None) -> str: """时间格式化:统一报告中的时间展示。""" return value.strftime("%Y-%m-%d %H:%M:%S") if value else "未记录" def _join(self, value: Any) -> str: """数组文本化:把列表、字典或空值转换为报告中的短文本。""" if value is None: return "未记录" if isinstance(value, list): return ";".join(self._safe_text(item) for item in value) if value else "未记录" if isinstance(value, dict): return json.dumps(value, ensure_ascii=False) return str(value) def _ensure_list(self, value: Any) -> list[str]: """列表规整:把字符串、字典或数组统一为字符串数组。""" if value is None: return [] raw_items = value if isinstance(value, list) else [value] items = [] for raw in raw_items: if raw is None: continue if isinstance(raw, dict): text = raw.get("description") or raw.get("content") or raw.get("text") or json.dumps(raw, ensure_ascii=False) else: text = str(raw) text = text.strip() if text: items.append(text) return items def _safe_text(self, value: Any) -> str: """安全文本:将任意对象转换为可放入 Paragraph 的转义文本。""" if value is None: return "未记录" if isinstance(value, (dict, list)): text = json.dumps(value, ensure_ascii=False) else: text = str(value) return html.escape(text).replace("\n", "
")