add training configuration APIs

This commit is contained in:
刘金宝
2026-06-05 12:57:02 +08:00
parent 7f1803f9fa
commit 41a2851120
14 changed files with 1541 additions and 10 deletions
+35
View File
@@ -11,6 +11,9 @@ from app.services.runtime_memory import runtime_memory
class OrderService:
"""检查检验服务:提供可申请项目和数据库固定结果返回。"""
PHYSICAL_TYPES = {"physical", "physical_exam", "inspection", "palpation", "percussion", "auscultation"}
PHYSICAL_KEYWORDS = ("体格", "体征", "查体", "听诊", "叩诊", "触诊")
def __init__(self, db: Session) -> None:
self.db = db
self.case_repo = CaseRepository(db)
@@ -20,6 +23,30 @@ class OrderService:
"""检查项目列表:按会话病例返回可申请项目,不返回结果。"""
session = self._get_session(session_id, user_id)
items = self.case_repo.get_exam_items(session.case_id)
return self._items_response(items)
def list_physical_exam_items(self, session_id: int, user_id: str) -> OrderItemsResponse:
"""体格检查列表:从当前病例检查项中筛选体格检查类项目。"""
session = self._get_session(session_id, user_id)
items = [item for item in self.case_repo.get_exam_items(session.case_id) if self._is_physical_item(item)]
return self._items_response(items)
def list_auxiliary_exam_items(self, session_id: int, user_id: str) -> OrderItemsResponse:
"""辅助检查列表:从当前病例检查项中筛选非体格检查类项目。"""
session = self._get_session(session_id, user_id)
items = [item for item in self.case_repo.get_exam_items(session.case_id) if not self._is_physical_item(item)]
return self._items_response(items)
def create_physical_exam_order(self, session_id: int, user_id: str, item_code: str) -> CreateOrderResponse:
"""体格检查结果:复用检查申请逻辑,结果仍只来自数据库。"""
return self.create_order(session_id, user_id, item_code)
def create_auxiliary_exam_order(self, session_id: int, user_id: str, item_code: str) -> CreateOrderResponse:
"""辅助检查结果:复用检查申请逻辑,结果仍只来自数据库。"""
return self.create_order(session_id, user_id, item_code)
def _items_response(self, items) -> OrderItemsResponse:
"""检查列表响应:把 ORM 检查项转换成前端列表结构。"""
return OrderItemsResponse(
items=[
OrderItemResponse(item_code=item.item_code, item_name=item.item_name, item_type=item.item_type)
@@ -27,6 +54,14 @@ class OrderService:
]
)
def _is_physical_item(self, item) -> bool:
"""检查分类:按 item_type 和 category 识别体格检查,其他归入辅助检查。"""
item_type = (item.item_type or "").lower()
category = item.category or ""
if item_type in self.PHYSICAL_TYPES:
return True
return any(keyword in category or keyword in item.item_name for keyword in self.PHYSICAL_KEYWORDS)
def create_order(self, session_id: int, user_id: str, item_code: str) -> CreateOrderResponse:
"""检查申请:从数据库读取检查结果并写入当前会话记录。"""
session = self._get_session(session_id, user_id)
+59 -1
View File
@@ -28,6 +28,7 @@ from app.schemas.session import (
)
from app.services.audit_service import AuditService
from app.services.runtime_memory import runtime_memory
from app.services.training_config_service import TrainingConfigService
logger = logging.getLogger(__name__)
@@ -48,6 +49,7 @@ class SessionService:
if not case:
raise AppError("CASE_NOT_FOUND", "case not found or inactive", 404)
patient_config = TrainingConfigService(self.db).normalize_patient_config(payload.patient_config)
session_code = f"sess_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}_{uuid.uuid4().hex[:8]}"
memory_key = f"mem:{session_code}"
session = self.session_repo.create_session(
@@ -64,7 +66,7 @@ class SessionService:
status="inquiry",
started_at=datetime.utcnow(),
memory_key=memory_key,
metadata_={"source": "demo"},
metadata_={"source": "demo", "patient_config": patient_config},
)
)
patient_opening = case.patient_opening or "家长:医生,孩子这几天不舒服,想请您看看。"
@@ -75,6 +77,7 @@ class SessionService:
session_code=session.session_code,
status=session.status,
patient_opening=patient_opening,
patient_config=patient_config,
)
async def chat(self, ctx: UserContext, session_id: int, message: str) -> ChatResponse:
@@ -225,6 +228,61 @@ class SessionService:
self.audit.log(ctx, "session.hints", "training_session", str(session.id), session.id)
return HintResponse(**result)
async def stream_hints(self, ctx: UserContext, session_id: int, payload: HintRequest) -> AsyncIterator[str]:
"""流式练习提示:把结构化提示压缩成一句话并用 SSE 返回给前端。"""
started_at = time.perf_counter()
try:
hint_result = await self.generate_hints(ctx, session_id, payload)
sentence = self._build_hint_sentence(hint_result)
except AppError as exc:
error_message = exc.message
error_code = exc.code
async def app_error_generator() -> AsyncIterator[str]:
yield self._sse_error(error_message, error_code)
return app_error_generator()
except Exception:
logger.exception("hint_stream.failed session_id=%s", session_id)
async def error_generator() -> AsyncIterator[str]:
yield self._sse_error("练习提示生成失败,请稍后重试", "HINT_STREAM_FAILED")
return error_generator()
async def event_generator() -> AsyncIterator[str]:
if not sentence:
yield self._sse_error("当前没有生成有效提示,请继续问诊后再试", "HINT_EMPTY")
return
for chunk in self._chunk_text(sentence, size=12):
payload_text = json.dumps({"delta": chunk}, ensure_ascii=False)
yield f"event: hint_delta\ndata: {payload_text}\n\n"
await asyncio.sleep(0)
done_payload = json.dumps({"latency_ms": int((time.perf_counter() - started_at) * 1000)}, ensure_ascii=False)
yield f"event: hint_done\ndata: {done_payload}\n\n"
return event_generator()
def _build_hint_sentence(self, hint_result: HintResponse) -> str:
"""提示压缩:从结构化提示中提炼适合前端流式展示的一句话。"""
parts: list[str] = []
if hint_result.missing_dimensions:
parts.append(f"当前可补充{ ''.join(hint_result.missing_dimensions[:3]) }")
if hint_result.next_questions:
parts.append(f"下一步可问:{hint_result.next_questions[0]}")
elif hint_result.hints:
parts.append(hint_result.hints[0])
if hint_result.recommended_orders:
order = hint_result.recommended_orders[0]
item_code = order.get("item_code") or order.get("item_name") or "关键检查"
reason = order.get("reason") or "用于完善病情判断"
parts.append(f"可考虑申请{item_code}{reason}")
return "".join(parts) + ("" if parts else "")
def _chunk_text(self, text: str, size: int) -> list[str]:
"""文本切片:把一句练习提示拆成短片段,便于前端按 SSE 渐进展示。"""
return [text[index : index + size] for index in range(0, len(text), size)]
def complete_inquiry(self, ctx: UserContext, session_id: int) -> SessionStatusResponse:
"""完成问诊:校验至少一轮医生问诊后进入诊断阶段。"""
session = self._get_session(session_id, ctx.user_id)
+104
View File
@@ -0,0 +1,104 @@
from sqlalchemy.orm import Session
from app.core.exceptions import AppError
from app.repositories.case_repository import CaseRepository
from app.schemas.training_config import (
ConfigOption,
PatientConfig,
TrainingConfigOptionsResponse,
TrainingConfigRecommendedResponse,
)
class TrainingConfigService:
"""训练配置服务:提供训练页病人初始化配置,不写数据库。"""
def __init__(self, db: Session) -> None:
self.db = db
self.case_repo = CaseRepository(db)
def get_recommended(self, case_id: int) -> TrainingConfigRecommendedResponse:
"""推荐配置:根据病例返回训练页默认病人初始化配置。"""
self._ensure_case(case_id)
recommended = self.default_patient_config()
return TrainingConfigRecommendedResponse(
case_id=case_id,
recommended=recommended,
recommended_labels=self.config_labels(recommended),
options=self.config_options(),
)
def get_options(self, case_id: int) -> TrainingConfigOptionsResponse:
"""配置选项:返回训练页自定义配置的全部可选项。"""
self._ensure_case(case_id)
recommended = self.default_patient_config()
return TrainingConfigOptionsResponse(
case_id=case_id,
recommended=recommended,
recommended_labels=self.config_labels(recommended),
options=self.config_options(),
)
def default_patient_config(self) -> PatientConfig:
"""默认配置:按当前产品原型初始化病人信息。"""
return PatientConfig(
visit_environment="outpatient",
age_group="youth",
education_level="higher",
personality="calm",
)
def normalize_patient_config(self, config: PatientConfig | None) -> dict:
"""配置归一:校验并补齐前端传入的病人初始化配置。"""
selected = config or self.default_patient_config()
values = selected.model_dump()
allowed = {key: {item.value for item in items} for key, items in self.config_options().items()}
for key, value in values.items():
if value not in allowed.get(key, set()):
raise AppError("TRAINING_CONFIG_INVALID", f"invalid patient config field: {key}", 400)
return {
"values": values,
"labels": self.config_labels(selected),
}
def config_labels(self, config: PatientConfig) -> dict[str, str]:
"""配置标签:把配置值转换为前端和提示词可读的中文标签。"""
option_map = {
key: {item.value: item.label for item in items}
for key, items in self.config_options().items()
}
values = config.model_dump()
return {key: option_map.get(key, {}).get(value, value) for key, value in values.items()}
def config_options(self) -> dict[str, list[ConfigOption]]:
"""配置选项:训练页可选病人初始化配置。"""
return {
"visit_environment": [
ConfigOption(value="outpatient", label="门诊", description="适合常规问诊训练"),
ConfigOption(value="emergency", label="急诊", description="病情紧急、沟通节奏更快"),
ConfigOption(value="ward", label="病房", description="适合住院病情追踪和处置沟通"),
],
"age_group": [
ConfigOption(value="child", label="儿童", description="由家属代述为主"),
ConfigOption(value="youth", label="青年", description="表达清楚,能配合问诊"),
ConfigOption(value="middle_aged", label="中年", description="关注工作、家庭和慢病背景"),
ConfigOption(value="elderly", label="老年", description="表达较慢,需关注基础病和用药史"),
],
"education_level": [
ConfigOption(value="primary_or_below", label="小学及以下", description="医学术语理解弱"),
ConfigOption(value="secondary", label="中等教育", description="能理解常见健康解释"),
ConfigOption(value="higher", label="高等教育", description="理解能力强,能描述细节"),
],
"personality": [
ConfigOption(value="calm", label="平和", description="情绪稳定,按问题回答"),
ConfigOption(value="anxious", label="焦虑", description="更担心病情和治疗风险"),
ConfigOption(value="impatient", label="急躁", description="希望快速获得结论"),
ConfigOption(value="cooperative", label="配合", description="愿意补充细节"),
ConfigOption(value="suspicious", label="多疑", description="会追问检查和用药依据"),
],
}
def _ensure_case(self, case_id: int) -> None:
"""病例校验:确认配置请求对应已发布病例。"""
if not self.case_repo.get_active_case(case_id):
raise AppError("CASE_NOT_FOUND", "case not found or inactive", 404)