prepare fastapi root layout for server deployment
This commit is contained in:
@@ -0,0 +1,499 @@
|
||||
import html
|
||||
import json
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session, selectinload
|
||||
|
||||
from app.core.config import settings
|
||||
from app.core.exceptions import AppError
|
||||
from app.models.training import TrainingSession
|
||||
from app.models.training_record import TrainingRecord
|
||||
from app.repositories.evaluation_repository import EvaluationRepository
|
||||
|
||||
|
||||
class PdfExportService:
|
||||
"""PDF 导出服务:把 training_record 渲染为包含评分细则、检查证据和改进计划的报告。"""
|
||||
|
||||
def __init__(self, db: Session) -> None:
|
||||
self.db = db
|
||||
self.repo = EvaluationRepository(db)
|
||||
|
||||
def export(self, evaluation_id: int, user_id: str) -> SimpleNamespace:
|
||||
"""报告导出:校验 training_record 归属后生成 PDF,并回写 PDF 路径。"""
|
||||
record = self.repo.get_owned_record(evaluation_id, user_id)
|
||||
if not record:
|
||||
raise AppError("EVALUATION_NOT_FOUND", "evaluation not found or not owned by current user", 404)
|
||||
|
||||
session = self._get_session(record.session_id) if record.session_id else None
|
||||
output_dir = Path(settings.report_storage_dir)
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
timestamp = datetime.utcnow().strftime("%Y%m%d%H%M%S")
|
||||
file_name = f"training_record_{record.id}_{record.score_type}_{timestamp}_{uuid.uuid4().hex[:6]}.pdf"
|
||||
file_path = output_dir / file_name
|
||||
score_details = self.repo.list_score_details(record.id)
|
||||
self._write_pdf(file_path, record, session, score_details)
|
||||
|
||||
record.pdf_file_path = str(file_path)
|
||||
recommendation = dict(record.recommendation_result or {})
|
||||
recommendation["pdf_file_path"] = str(file_path)
|
||||
record.recommendation_result = recommendation
|
||||
self.repo.flush()
|
||||
return SimpleNamespace(id=record.id, file_path=str(file_path))
|
||||
|
||||
def _get_session(self, session_id: int) -> TrainingSession | None:
|
||||
"""会话读取:加载报告展示需要的病例、检查申请和诊断治疗提交。"""
|
||||
stmt = (
|
||||
select(TrainingSession)
|
||||
.options(
|
||||
selectinload(TrainingSession.case),
|
||||
selectinload(TrainingSession.orders),
|
||||
selectinload(TrainingSession.submission),
|
||||
)
|
||||
.where(TrainingSession.id == session_id)
|
||||
)
|
||||
return self.db.scalar(stmt)
|
||||
|
||||
def _write_pdf(self, file_path: Path, record: TrainingRecord, session: TrainingSession | None, score_details: list | None = None) -> None:
|
||||
"""PDF 写入:使用 reportlab 生成 Acrobat 可正常打开的标准 PDF。"""
|
||||
try:
|
||||
from reportlab.lib import colors
|
||||
from reportlab.lib.enums import TA_CENTER, TA_LEFT
|
||||
from reportlab.lib.pagesizes import A4
|
||||
from reportlab.lib.styles import ParagraphStyle, getSampleStyleSheet
|
||||
from reportlab.lib.units import mm
|
||||
from reportlab.pdfbase import pdfmetrics
|
||||
from reportlab.pdfbase.cidfonts import UnicodeCIDFont
|
||||
from reportlab.pdfbase.ttfonts import TTFont
|
||||
from reportlab.platypus import Paragraph, SimpleDocTemplate, Spacer, Table, TableStyle
|
||||
|
||||
font_name = self._register_report_font(pdfmetrics, TTFont, UnicodeCIDFont)
|
||||
base_styles = getSampleStyleSheet()
|
||||
styles = {
|
||||
"title": ParagraphStyle(
|
||||
"McaTitle",
|
||||
parent=base_styles["Title"],
|
||||
fontName=font_name,
|
||||
fontSize=20,
|
||||
leading=28,
|
||||
alignment=TA_CENTER,
|
||||
textColor=colors.HexColor("#17365D"),
|
||||
spaceAfter=12,
|
||||
),
|
||||
"h2": ParagraphStyle(
|
||||
"McaHeading",
|
||||
parent=base_styles["Heading2"],
|
||||
fontName=font_name,
|
||||
fontSize=13,
|
||||
leading=18,
|
||||
alignment=TA_LEFT,
|
||||
textColor=colors.HexColor("#1F4E79"),
|
||||
spaceBefore=8,
|
||||
spaceAfter=6,
|
||||
),
|
||||
"body": ParagraphStyle(
|
||||
"McaBody",
|
||||
parent=base_styles["BodyText"],
|
||||
fontName=font_name,
|
||||
fontSize=10,
|
||||
leading=15,
|
||||
textColor=colors.HexColor("#1F2937"),
|
||||
wordWrap="CJK",
|
||||
),
|
||||
"small": ParagraphStyle(
|
||||
"McaSmall",
|
||||
parent=base_styles["BodyText"],
|
||||
fontName=font_name,
|
||||
fontSize=8.8,
|
||||
leading=12.5,
|
||||
textColor=colors.HexColor("#4B5563"),
|
||||
wordWrap="CJK",
|
||||
),
|
||||
"thead": ParagraphStyle("McaThead", parent=base_styles["BodyText"], fontName=font_name, fontSize=9, leading=12, textColor=colors.white, alignment=TA_CENTER),
|
||||
"td": ParagraphStyle("McaTd", parent=base_styles["BodyText"], fontName=font_name, fontSize=8.5, leading=12, textColor=colors.HexColor("#111827"), wordWrap="CJK"),
|
||||
}
|
||||
context = {
|
||||
"Paragraph": Paragraph,
|
||||
"Spacer": Spacer,
|
||||
"Table": Table,
|
||||
"TableStyle": TableStyle,
|
||||
"colors": colors,
|
||||
"styles": styles,
|
||||
"mm": mm,
|
||||
}
|
||||
doc = SimpleDocTemplate(
|
||||
str(file_path),
|
||||
pagesize=A4,
|
||||
leftMargin=16 * mm,
|
||||
rightMargin=16 * mm,
|
||||
topMargin=14 * mm,
|
||||
bottomMargin=14 * mm,
|
||||
title="医疗问诊 Agent 训练评价报告",
|
||||
)
|
||||
doc.build(self._build_story(record, session, context, score_details or []))
|
||||
except ModuleNotFoundError:
|
||||
self._write_minimal_pdf(file_path, record)
|
||||
except Exception as exc:
|
||||
if file_path.exists():
|
||||
file_path.unlink(missing_ok=True)
|
||||
raise AppError("PDF_EXPORT_FAILED", "PDF report export failed", 500) from exc
|
||||
|
||||
def _write_minimal_pdf(self, file_path: Path, record: TrainingRecord) -> None:
|
||||
"""兜底 PDF:缺少 reportlab 时生成标准可打开的极简 PDF,生产环境仍使用 reportlab 模板。"""
|
||||
lines = [
|
||||
"Medical Consultation Agent Evaluation Report",
|
||||
f"Record ID: {record.id}",
|
||||
f"User ID: {record.external_user_id}",
|
||||
f"Score: {float(record.total_score or 0):g} ({record.score_type})",
|
||||
f"Level: {record.evaluation_level}",
|
||||
"This fallback PDF is generated because reportlab is not installed.",
|
||||
]
|
||||
text_ops = []
|
||||
y = 780
|
||||
for line in lines:
|
||||
safe = line.encode("latin-1", "replace").decode("latin-1").replace("\\", "\\\\").replace("(", "\\(").replace(")", "\\)")
|
||||
text_ops.append(f"BT /F1 12 Tf 50 {y} Td ({safe}) Tj ET")
|
||||
y -= 22
|
||||
stream = "\n".join(text_ops).encode("latin-1")
|
||||
if len(stream) < 900:
|
||||
stream += b"\n% " + (b"fallback-pdf-padding " * 40)
|
||||
objects = [
|
||||
b"<< /Type /Catalog /Pages 2 0 R >>",
|
||||
b"<< /Type /Pages /Kids [3 0 R] /Count 1 >>",
|
||||
b"<< /Type /Page /Parent 2 0 R /MediaBox [0 0 595 842] /Resources << /Font << /F1 4 0 R >> >> /Contents 5 0 R >>",
|
||||
b"<< /Type /Font /Subtype /Type1 /BaseFont /Helvetica >>",
|
||||
b"<< /Length " + str(len(stream)).encode("ascii") + b" >>\nstream\n" + stream + b"\nendstream",
|
||||
]
|
||||
content = bytearray(b"%PDF-1.4\n%\xe2\xe3\xcf\xd3\n")
|
||||
offsets = [0]
|
||||
for index, obj in enumerate(objects, start=1):
|
||||
offsets.append(len(content))
|
||||
content.extend(f"{index} 0 obj\n".encode("ascii"))
|
||||
content.extend(obj)
|
||||
content.extend(b"\nendobj\n")
|
||||
xref_offset = len(content)
|
||||
content.extend(f"xref\n0 {len(objects) + 1}\n".encode("ascii"))
|
||||
content.extend(b"0000000000 65535 f \n")
|
||||
for offset in offsets[1:]:
|
||||
content.extend(f"{offset:010d} 00000 n \n".encode("ascii"))
|
||||
content.extend(
|
||||
f"trailer\n<< /Size {len(objects) + 1} /Root 1 0 R >>\nstartxref\n{xref_offset}\n%%EOF\n".encode("ascii")
|
||||
)
|
||||
file_path.write_bytes(bytes(content))
|
||||
|
||||
def _build_story(self, record: TrainingRecord, session: TrainingSession | None, context: dict[str, Any], score_details: list | None = None) -> list:
|
||||
"""报告模板:按基本信息、病例、提交、检查、评分细则和改进计划组织内容。"""
|
||||
Paragraph = context["Paragraph"]
|
||||
Spacer = context["Spacer"]
|
||||
styles = context["styles"]
|
||||
mm = context["mm"]
|
||||
case = session.case if session else None
|
||||
submission = session.submission if session else None
|
||||
orders = sorted(session.orders, key=lambda item: item.ordered_at) if session else []
|
||||
structured = record.ai_feedback_structured or {}
|
||||
|
||||
story = [Paragraph("医疗问诊 Agent 训练评价报告", styles["title"])]
|
||||
story.append(Paragraph("本报告仅用于医学教学训练评价,不替代真实临床诊疗。", styles["small"]))
|
||||
story.append(Spacer(1, 4 * mm))
|
||||
|
||||
self._append_kv_section(
|
||||
story,
|
||||
"一、报告基本信息",
|
||||
[
|
||||
("训练记录", record.id),
|
||||
("用户 ID", record.external_user_id),
|
||||
("病例", case.title if case else f"case_id={record.case_id}"),
|
||||
("训练模式", self._mode_label(record.training_mode)),
|
||||
("训练类别", record.case_type),
|
||||
("评分类型", self._score_type_label(record.score_type)),
|
||||
("总分", self._score_text(record)),
|
||||
("评价等级", record.evaluation_level),
|
||||
("生成时间", self._format_datetime(record.created_at)),
|
||||
("模型", structured.get("llm_model") or "未记录"),
|
||||
("模型耗时", self._latency_text(structured.get("latency_metrics"))),
|
||||
],
|
||||
context,
|
||||
)
|
||||
self._append_case_section(story, case, context)
|
||||
self._append_submission_section(story, submission, context)
|
||||
self._append_order_section(story, orders, context)
|
||||
self._append_dimension_section(story, score_details or structured.get("dimension_scores") or [], context)
|
||||
self._append_error_section(story, structured.get("errors") or record.wrong_points or [], context)
|
||||
self._append_list_section(story, "七、改进计划", structured.get("improvement_plan") or (record.recommendation_result or {}).get("improvement_plan") or [], context)
|
||||
self._append_list_section(story, "八、证据摘要", structured.get("evidence_summary") or [], context)
|
||||
self._append_guideline_section(story, structured.get("guideline_refs") or [], context)
|
||||
story.append(Spacer(1, 4 * mm))
|
||||
story.append(Paragraph(f"综合评价:{self._safe_text(structured.get('overall_comment') or record.feedback)}", styles["body"]))
|
||||
return story
|
||||
|
||||
def _append_case_section(self, story: list, case: Any, context: dict[str, Any]) -> None:
|
||||
"""病例信息分节:展示病例主诉、关键症状、关键检查和标准诊断。"""
|
||||
if not case:
|
||||
return
|
||||
patient_gender = {"male": "男", "female": "女"}.get(case.patient_gender or "", case.patient_gender or "未记录")
|
||||
self._append_kv_section(
|
||||
story,
|
||||
"二、病例与考核要点",
|
||||
[
|
||||
("患者", f"{case.patient_age or '未记录'} 岁,{patient_gender}"),
|
||||
("主诉", case.chief_complaint),
|
||||
("关键症状", self._join(case.key_symptoms)),
|
||||
("关键检查", self._join(case.key_exams)),
|
||||
("考核要点", self._join(case.key_points)),
|
||||
("标准诊断", case.diagnosis_primary),
|
||||
("诊断依据", case.diagnosis_basis),
|
||||
],
|
||||
context,
|
||||
)
|
||||
|
||||
def _append_submission_section(self, story: list, submission: Any, context: dict[str, Any]) -> None:
|
||||
"""学生提交分节:展示学生最终诊断、治疗、沟通和随访内容。"""
|
||||
self._append_kv_section(
|
||||
story,
|
||||
"三、学生诊断与治疗提交",
|
||||
[
|
||||
("主要诊断", getattr(submission, "primary_diagnosis", None)),
|
||||
("鉴别诊断", self._join(getattr(submission, "differential_diagnoses", None))),
|
||||
("诊断依据", getattr(submission, "diagnosis_basis", None)),
|
||||
("治疗原则", getattr(submission, "treatment_principle", None)),
|
||||
("治疗措施", getattr(submission, "treatment_measures", None)),
|
||||
("风险预案", getattr(submission, "risk_plan", None)),
|
||||
("医患沟通", getattr(submission, "communication", None)),
|
||||
("随访安排", getattr(submission, "follow_up", None)),
|
||||
],
|
||||
context,
|
||||
)
|
||||
|
||||
def _append_order_section(self, story: list, orders: list, context: dict[str, Any]) -> None:
|
||||
"""检查结果分节:展示用户申请过的检查/检验项目和数据库固定结果。"""
|
||||
self._append_title(story, "四、检查/检验申请与结果", context)
|
||||
if not orders:
|
||||
story.append(context["Paragraph"]("本次训练未申请检查/检验。", context["styles"]["body"]))
|
||||
return
|
||||
rows = [["项目", "类型", "结果", "关键", "异常"]]
|
||||
for order in orders:
|
||||
rows.append([order.item_name, order.item_type, order.result_text, "是" if order.is_key else "否", "是" if order.is_abnormal else "否"])
|
||||
self._append_table(story, rows, [72, 52, 285, 38, 38], context)
|
||||
|
||||
def _append_dimension_section(self, story: list, dimensions: list, context: dict[str, Any]) -> None:
|
||||
"""评分细则分节:逐项展示得分、评价、证据、扣分原因和改进动作。"""
|
||||
self._append_title(story, "五、维度评分细则", context)
|
||||
if not dimensions:
|
||||
story.append(context["Paragraph"]("暂无维度评分。", context["styles"]["body"]))
|
||||
return
|
||||
rows = [["维度", "得分", "扣分原因", "置信度", "评语"]]
|
||||
for item in dimensions:
|
||||
payload = self._score_detail_payload(item)
|
||||
rows.append([
|
||||
payload["dimension"],
|
||||
payload["score_text"],
|
||||
payload["deducted_reason"],
|
||||
payload["ai_confidence"],
|
||||
payload["comment"],
|
||||
])
|
||||
self._append_table(story, rows, [70, 45, 160, 45, 165], context)
|
||||
for index, item in enumerate(dimensions, start=1):
|
||||
payload = self._score_detail_payload(item)
|
||||
title = f"{index}. {payload['dimension']}:{payload['score_text']}"
|
||||
story.append(context["Paragraph"](self._safe_text(title), context["styles"]["body"]))
|
||||
self._append_list_section(story, "证据", payload["evidence"], context, compact=True)
|
||||
self._append_list_section(story, "扣分原因", [payload["deducted_reason"]] if payload["deducted_reason"] else [], context, compact=True)
|
||||
if payload["comment"]:
|
||||
story.append(context["Paragraph"](f"评语:{self._safe_text(payload['comment'])}", context["styles"]["small"]))
|
||||
|
||||
def _score_detail_payload(self, item: Any) -> dict[str, Any]:
|
||||
"""评分明细展示:兼容 training_score_detail ORM 和旧 dimension_scores 字典。"""
|
||||
if isinstance(item, dict):
|
||||
score = item.get("score", 0)
|
||||
max_score = item.get("max_score")
|
||||
return {
|
||||
"dimension": item.get("dimension", "未命名维度"),
|
||||
"score_text": f"{score} / {max_score}" if max_score is not None else str(score),
|
||||
"deducted_reason": item.get("deducted_reason") or ";".join(str(value) for value in item.get("deductions", []) if value),
|
||||
"ai_confidence": item.get("ai_confidence") or "未记录",
|
||||
"comment": item.get("comment") or item.get("improvement") or "",
|
||||
"evidence": item.get("evidence_message_ids") or item.get("evidence") or [],
|
||||
}
|
||||
score = getattr(item, "score", None)
|
||||
confidence = getattr(item, "ai_confidence", None)
|
||||
return {
|
||||
"dimension": getattr(item, "dimension", "未命名维度"),
|
||||
"score_text": f"{float(score):g}" if score is not None else "未记录",
|
||||
"deducted_reason": getattr(item, "deducted_reason", "") or "",
|
||||
"ai_confidence": f"{float(confidence):g}" if confidence is not None else "未记录",
|
||||
"comment": getattr(item, "comment", "") or "",
|
||||
"evidence": getattr(item, "evidence_message_ids", None) or [],
|
||||
}
|
||||
|
||||
def _append_error_section(self, story: list, errors: list, context: dict[str, Any]) -> None:
|
||||
"""扣分问题分节:展示模型识别出的关键问题和严重程度。"""
|
||||
self._append_title(story, "六、主要问题与扣分原因", context)
|
||||
if not errors:
|
||||
story.append(context["Paragraph"]("暂无明显扣分问题。", context["styles"]["body"]))
|
||||
return
|
||||
rows = [["问题", "维度", "严重度", "说明"]]
|
||||
for index, item in enumerate(errors, start=1):
|
||||
if isinstance(item, dict):
|
||||
rows.append(
|
||||
[
|
||||
item.get("title") or f"问题 {index}",
|
||||
item.get("related_dimension") or "综合表现",
|
||||
item.get("severity") or "medium",
|
||||
item.get("description") or item.get("comment") or "",
|
||||
]
|
||||
)
|
||||
else:
|
||||
rows.append([f"问题 {index}", "综合表现", "medium", str(item)])
|
||||
self._append_table(story, rows, [78, 70, 50, 287], context)
|
||||
|
||||
def _append_guideline_section(self, story: list, refs: list, context: dict[str, Any]) -> None:
|
||||
"""参考依据分节:展示评分时使用的指南或知识库片段。"""
|
||||
self._append_title(story, "九、参考指南与资料", context)
|
||||
if not refs:
|
||||
story.append(context["Paragraph"]("本次未命中外部评分指南,使用病例内置标准和评分规则。", context["styles"]["body"]))
|
||||
return
|
||||
rows = [["来源", "标题/内容", "相关性"]]
|
||||
for item in refs:
|
||||
if isinstance(item, dict):
|
||||
rows.append([item.get("source") or "知识库", item.get("title") or item.get("content") or "", item.get("score") or "已引用"])
|
||||
else:
|
||||
rows.append(["知识库", str(item), "已引用"])
|
||||
self._append_table(story, rows, [82, 320, 83], context)
|
||||
|
||||
def _append_kv_section(self, story: list, title: str, rows: list[tuple[str, Any]], context: dict[str, Any]) -> None:
|
||||
"""键值分节:以两列表格展示基本信息、病例信息和学生提交内容。"""
|
||||
self._append_title(story, title, context)
|
||||
table_rows = [["字段", "内容"]]
|
||||
table_rows.extend([[key, self._safe_text(value)] for key, value in rows])
|
||||
self._append_table(story, table_rows, [95, 390], context)
|
||||
|
||||
def _append_title(self, story: list, title: str, context: dict[str, Any]) -> None:
|
||||
"""章节标题:统一 PDF 分节标题样式。"""
|
||||
story.append(context["Spacer"](1, 3 * context["mm"]))
|
||||
story.append(context["Paragraph"](self._safe_text(title), context["styles"]["h2"]))
|
||||
|
||||
def _append_table(self, story: list, rows: list[list[Any]], col_widths: list[int], context: dict[str, Any]) -> None:
|
||||
"""表格渲染:将文本转为可自动换行的 Paragraph 并应用统一表格样式。"""
|
||||
Paragraph = context["Paragraph"]
|
||||
Table = context["Table"]
|
||||
TableStyle = context["TableStyle"]
|
||||
colors = context["colors"]
|
||||
styles = context["styles"]
|
||||
rendered = []
|
||||
for row_index, row in enumerate(rows):
|
||||
style = styles["thead"] if row_index == 0 else styles["td"]
|
||||
rendered.append([Paragraph(self._safe_text(cell), style) for cell in row])
|
||||
table = Table(rendered, colWidths=col_widths, repeatRows=1, hAlign="LEFT")
|
||||
table.setStyle(
|
||||
TableStyle(
|
||||
[
|
||||
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#2F75B5")),
|
||||
("GRID", (0, 0), (-1, -1), 0.4, colors.HexColor("#D9E2F3")),
|
||||
("VALIGN", (0, 0), (-1, -1), "TOP"),
|
||||
("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.white, colors.HexColor("#F5F9FF")]),
|
||||
("LEFTPADDING", (0, 0), (-1, -1), 5),
|
||||
("RIGHTPADDING", (0, 0), (-1, -1), 5),
|
||||
("TOPPADDING", (0, 0), (-1, -1), 5),
|
||||
("BOTTOMPADDING", (0, 0), (-1, -1), 5),
|
||||
]
|
||||
)
|
||||
)
|
||||
story.append(table)
|
||||
|
||||
def _append_list_section(self, story: list, title: str, items: Any, context: dict[str, Any], compact: bool = False) -> None:
|
||||
"""列表分节:展示证据摘要、扣分原因和改进计划。"""
|
||||
if not compact:
|
||||
self._append_title(story, title, context)
|
||||
elif title:
|
||||
story.append(context["Paragraph"](self._safe_text(title), context["styles"]["small"]))
|
||||
normalized = self._ensure_list(items)
|
||||
if not normalized:
|
||||
story.append(context["Paragraph"]("无", context["styles"]["small" if compact else "body"]))
|
||||
return
|
||||
for item in normalized:
|
||||
story.append(context["Paragraph"](f"- {self._safe_text(item)}", context["styles"]["small" if compact else "body"]))
|
||||
|
||||
def _register_report_font(self, pdfmetrics: Any, TTFont: Any, UnicodeCIDFont: Any) -> str:
|
||||
"""字体注册:优先嵌入 Windows 中文字体,提升 Adobe Acrobat DC 兼容性。"""
|
||||
font_name = "MCA-SimHei"
|
||||
try:
|
||||
pdfmetrics.getFont(font_name)
|
||||
return font_name
|
||||
except KeyError:
|
||||
pass
|
||||
for font_path in (Path("C:/Windows/Fonts/simhei.ttf"), Path("C:/Windows/Fonts/msyh.ttc"), Path("C:/Windows/Fonts/simsun.ttc")):
|
||||
if not font_path.exists():
|
||||
continue
|
||||
try:
|
||||
pdfmetrics.registerFont(TTFont(font_name, str(font_path)))
|
||||
return font_name
|
||||
except Exception:
|
||||
continue
|
||||
pdfmetrics.registerFont(UnicodeCIDFont("STSong-Light"))
|
||||
return "STSong-Light"
|
||||
|
||||
def _score_text(self, record: TrainingRecord) -> str:
|
||||
"""分数展示:根据百分制或五分制输出总分文本。"""
|
||||
max_score = "100" if record.score_type == "percentage" else "5"
|
||||
return f"{float(record.total_score or 0):g} / {max_score}"
|
||||
|
||||
def _latency_text(self, latency_metrics: dict | None) -> str:
|
||||
"""模型耗时展示:读取评价生成阶段记录的 LLM 延迟。"""
|
||||
if not latency_metrics:
|
||||
return "未记录"
|
||||
value = latency_metrics.get("scoring_latency_ms")
|
||||
return f"{value} ms" if value is not None else self._safe_text(latency_metrics)
|
||||
|
||||
def _score_type_label(self, score_type: str) -> str:
|
||||
"""评分类型标签:转换内部枚举为中文显示。"""
|
||||
return {"percentage": "百分制", "five_point": "五分制"}.get(score_type, score_type)
|
||||
|
||||
def _mode_label(self, mode: str) -> str:
|
||||
"""训练模式标签:转换内部枚举为中文显示。"""
|
||||
return {"practice": "练习模式", "teaching": "教学互动模式", "novice": "练习模式"}.get(mode, mode)
|
||||
|
||||
def _format_datetime(self, value: datetime | None) -> str:
|
||||
"""时间格式化:统一报告中的时间展示。"""
|
||||
return value.strftime("%Y-%m-%d %H:%M:%S") if value else "未记录"
|
||||
|
||||
def _join(self, value: Any) -> str:
|
||||
"""数组文本化:把列表、字典或空值转换为报告中的短文本。"""
|
||||
if value is None:
|
||||
return "未记录"
|
||||
if isinstance(value, list):
|
||||
return ";".join(self._safe_text(item) for item in value) if value else "未记录"
|
||||
if isinstance(value, dict):
|
||||
return json.dumps(value, ensure_ascii=False)
|
||||
return str(value)
|
||||
|
||||
def _ensure_list(self, value: Any) -> list[str]:
|
||||
"""列表规整:把字符串、字典或数组统一为字符串数组。"""
|
||||
if value is None:
|
||||
return []
|
||||
raw_items = value if isinstance(value, list) else [value]
|
||||
items = []
|
||||
for raw in raw_items:
|
||||
if raw is None:
|
||||
continue
|
||||
if isinstance(raw, dict):
|
||||
text = raw.get("description") or raw.get("content") or raw.get("text") or json.dumps(raw, ensure_ascii=False)
|
||||
else:
|
||||
text = str(raw)
|
||||
text = text.strip()
|
||||
if text:
|
||||
items.append(text)
|
||||
return items
|
||||
|
||||
def _safe_text(self, value: Any) -> str:
|
||||
"""安全文本:将任意对象转换为可放入 Paragraph 的转义文本。"""
|
||||
if value is None:
|
||||
return "未记录"
|
||||
if isinstance(value, (dict, list)):
|
||||
text = json.dumps(value, ensure_ascii=False)
|
||||
else:
|
||||
text = str(value)
|
||||
return html.escape(text).replace("\n", "<br/>")
|
||||
Reference in New Issue
Block a user