Files
medical_training/apps/user/stats.py
T

225 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""移动端个人中心 - 训练统计 / 智能分析(读 fastapi 只读训练表)。
- 4.3 临床核心能力指标 GET /api/user/competency-metrics/
- 4.4 训练记录(统计信息) GET /api/user/training-records/
- 4.5 智能分析(关联评价表)GET /api/user/analysis/
数据源:training_recordmanaged=False 只读,user_id = 当前用户.id)。
各能力维度得分 + 满分来自 training_record.ai_feedback_structured.dimension_scores
(与 training_score_detail 同源,但 json 里带 max_score,便于算「得分率」)。
"""
import json
from datetime import timedelta
from django.db.models import Sum, Avg, Case, When, F, DecimalField
from django.utils import timezone
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.pagination import PageNumberPagination
from rest_framework.response import Response
from drf_spectacular.utils import extend_schema
from apps.training.models import TrainingRecord
# 临床核心能力标准 6 维(A 组)
STANDARD_DIMS = ['病史采集', '查体能力', '检查决策', '诊断能力', '治疗决策', '医患沟通']
# 评分维度(fastapi 实际打分维度,名称随病例评分规则而变)→ 标准 6 维 的归并映射。
# ⚠️ 数据中维度名不统一(已观察到两套来源、且随病例不同),此映射为按实测维度的最佳归并,
# 最终口径需与评分/内容团队对齐。未在映射内的维度会被忽略。
DIMENSION_MAP = {
# 病史采集
'信息获取': '病史采集', '病史采集': '病史采集', '问诊': '病史采集',
# 查体能力(注:当前 ai_feedback 数据未单列查体维度,多由内容侧 rubric 决定)
'查体能力': '查体能力', '体格检查': '查体能力', '查体': '查体能力',
# 检查决策
'检查决策': '检查决策', '检查利用': '检查决策', '检查理解': '检查决策',
'辅助检查': '检查决策', '检验决策': '检查决策',
# 诊断能力
'诊断推理': '诊断能力', '诊断能力': '诊断能力', '鉴别诊断': '诊断能力',
'分析推理': '诊断能力', '临床推理': '诊断能力', '临床整合': '诊断能力',
'知识掌握': '诊断能力', '知识运用': '诊断能力',
# 治疗决策
'治疗决策': '治疗决策', '处置决策': '治疗决策', '治疗': '治疗决策',
# 医患沟通
'沟通技巧': '医患沟通', '医患沟通': '医患沟通',
'沟通人文': '医患沟通', '人文沟通': '医患沟通', '沟通': '医患沟通',
}
# 诊断相关维度(诊断准确率口径:这些维度的平均得分率)
DIAGNOSIS_DIMS = {'诊断推理', '诊断能力', '鉴别诊断', '分析推理', '临床推理', '临床整合'}
def _completed_qs(user):
return TrainingRecord.objects.filter(user_id=user.id, status='completed')
# total_score 归一到百分制:五分制(0-5) ×20,其余按原值(percentage)。
# 用于 avg_score / current_score / 趋势 / 较上周,避免百分制与五分制混平均失真。
# (诊断准确率/雷达用各维度 score/max_score 比值自归一,不走这里。)
def _norm_total(score, score_type):
if score is None:
return None
return float(score) * 20 if score_type == 'five_point' else float(score)
# DB 侧等价表达式,供 aggregate(Avg(...)) 使用
_NORM_TOTAL_EXPR = Case(
When(score_type='five_point', then=F('total_score') * 20),
default=F('total_score'),
output_field=DecimalField(max_digits=7, decimal_places=2),
)
def _week_start(offset_weeks=0):
now = timezone.localtime()
monday = now - timedelta(days=now.weekday(), weeks=-offset_weeks)
return monday.replace(hour=0, minute=0, second=0, microsecond=0)
def _dimension_scores(rec):
"""从 ai_feedback_structured.dimension_scores 取 [(dimension, score, max_score)]。"""
fb = rec.ai_feedback_structured
if isinstance(fb, str):
try:
fb = json.loads(fb)
except Exception:
fb = {}
if not isinstance(fb, dict):
return []
out = []
for d in (fb.get('dimension_scores') or []):
if isinstance(d, dict) and d.get('dimension'):
out.append((d['dimension'], d.get('score'), d.get('max_score')))
return out
def _diagnosis_accuracy(records):
"""诊断相关维度的平均得分率(%)。无数据返回 None。"""
rates = []
for rec in records:
for dim, score, mx in _dimension_scores(rec):
if dim in DIAGNOSIS_DIMS and score is not None and mx and float(mx) > 0:
rates.append(float(score) / float(mx))
return round(sum(rates) / len(rates) * 100) if rates else None
def _hours(seconds):
return round((seconds or 0) / 3600, 1)
# ── 4.3 临床核心能力指标 ─────────────────────────────────────────────────────
@extend_schema(summary='临床核心能力指标', tags=['个人中心'])
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def competency_metrics(request):
qs = _completed_qs(request.user)
agg = qs.aggregate(total=Sum('duration_seconds'), avg=Avg(_NORM_TOTAL_EXPR))
avg = agg['avg']
return Response({
'completed_cases': qs.count(),
'completed_cases_week': qs.filter(end_time__gte=_week_start()).count(),
'total_hours': _hours(agg['total']),
'avg_score': round(float(avg), 1) if avg is not None else 0,
'diagnosis_accuracy': _diagnosis_accuracy(qs.only('ai_feedback_structured')),
})
# ── 4.4 训练记录(统计信息)─────────────────────────────────────────────────
@extend_schema(summary='训练记录(统计信息)', tags=['个人中心'])
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def training_records(request):
base = _completed_qs(request.user)
search = (request.query_params.get('search') or '').strip()
if search:
from django.db.models import Q
base = base.filter(Q(case__title__icontains=search) | Q(case__department__name__icontains=search))
agg = base.aggregate(total=Sum('duration_seconds'))
summary = {
'total_cases': base.count(),
'total_hours': _hours(agg['total']),
'avg_accuracy': _diagnosis_accuracy(base.only('ai_feedback_structured')),
}
page_qs = base.select_related('case', 'case__department').order_by('-end_time', '-id')
paginator = PageNumberPagination()
page = paginator.paginate_queryset(page_qs, request)
results = []
for r in page:
case = r.case if r.case_id else None # 孤立外键(本地无对应病例)时为 None
dept = case.department if case else None # 病例无科室或科室缺失时为 None
results.append({
'record_id': r.id,
'case_id': r.case_id,
'case_title': case.title if case else '',
'department': dept.name if dept else '',
'trained_at': r.end_time or r.created_at,
'score': float(r.total_score) if r.total_score is not None else None,
'score_type': r.score_type,
'evaluation_level': r.evaluation_level,
'training_mode': r.training_mode,
'case_type': r.case_type,
})
resp = paginator.get_paginated_response(results)
resp.data['summary'] = summary
return resp
# ── 4.5 智能分析(关联评价表)─────────────────────────────────────────────────
@extend_schema(summary='智能分析(关联评价表)', tags=['个人中心'])
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def analysis(request):
recs = list(_completed_qs(request.user).order_by('end_time', 'id'))
# 趋势 / 当前评分(按 total_score,按 score_type 归一到百分制)
scored = [(r.end_time or r.created_at, _norm_total(r.total_score, r.score_type))
for r in recs if r.total_score is not None]
recent = scored[-7:]
recent_trend = [{'label': (dt.strftime('%m-%d') if dt else ''), 'score': round(s)} for dt, s in recent]
current_score = round(sum(s for _, s in recent) / len(recent), 1) if recent else 0
# 较上周提升%
ws, ws_prev = _week_start(), _week_start(offset_weeks=-1)
this_week = [s for dt, s in scored if dt and dt >= ws]
last_week = [s for dt, s in scored if dt and ws_prev <= dt < ws]
score_delta_pct = None
if this_week and last_week:
a, b = sum(this_week) / len(this_week), sum(last_week) / len(last_week)
if b:
score_delta_pct = round((a - b) / b * 100)
# 雷达:维度得分率归一到 0-100,按 A 组归并求均
from collections import defaultdict
buckets = defaultdict(list)
for r in recs:
for dim, score, mx in _dimension_scores(r):
std = DIMENSION_MAP.get(dim)
if std and score is not None and mx and float(mx) > 0:
buckets[std].append(float(score) / float(mx) * 100)
radar = [{'dimension': d, 'score': (round(sum(buckets[d]) / len(buckets[d])) if buckets[d] else 0)}
for d in STANDARD_DIMS]
# 仅在「有数据」的维度里排序取强/弱,避免无数据维度(0 分)被误判为薄弱项
ordered = sorted((x for x in radar if buckets[x['dimension']]), key=lambda x: x['score'])
weak_dimensions = [ordered[0]['dimension']] if ordered else []
comment = ''
if len(ordered) >= 2 and ordered[0]['score'] != ordered[-1]['score']:
# 强弱维度不同才做对比,避免「最强==最弱」时出现自相矛盾文案
comment = f"您的{ordered[-1]['dimension']}表现突出,但{ordered[0]['dimension']}仍有提升空间,建议增加相关模拟训练。"
elif ordered:
comment = f"各维度表现较为均衡,可针对{weak_dimensions[0]}做进一步强化。"
return Response({
'current_score': current_score,
'score_delta_pct': score_delta_pct,
'recent_trend': recent_trend,
'radar': radar,
'weak_dimensions': weak_dimensions,
'comment': comment,
})