Files
medical_training/apps/cms/training.py
T

249 lines
11 KiB
Python
Raw Normal View History

2026-06-13 01:44:31 +08:00
"""CMS 训练记录查询 + 带教能力画像 / 排行榜(只读,fastapi 属主表 training_record)。
- CMS-TRN-1 超管训练记录列表:`GET /api/cms/training-records/`(全平台,`IsSuperAdmin`)。
- CMS-TEA-3 带教教学工具训练记录:`GET /api/cms/students/training-records/`
(名下学生,由 `CmsStudentViewSet` 的 action 复用 `build_training_records`)。
- CMS-TEA-4 带教能力画像:`GET /api/cms/students/{id}/competency/`(复用移动端智能分析 5 维口径)。
- CMS-TEA-5 带教排行榜:`GET /api/cms/students/ranking/`(名下学生多维度排名)。
得分按 `score_type` 归一(five_point×20);雷达/得分率走 `ai_feedback_structured.dimension_scores`
(带 max_score)。全程只读,不写训练表。
"""
from collections import defaultdict
from datetime import datetime, timedelta
from django.db.models import Q, Avg, Sum
from django.utils import timezone
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.pagination import PageNumberPagination
from rest_framework.response import Response
from apps.training.models import TrainingRecord
from apps.user.models import User
from apps.case.models import CaseBase
from apps.user.stats import (
STANDARD_DIMS, DIMENSION_MAP, _dimension_scores, _diagnosis_accuracy,
_norm_total, _NORM_TOTAL_EXPR,
)
from apps.cms.permissions import IsSuperAdmin
# 排行榜支持的维度:分数 / 训练次数 / 完成数 + 固定 5 个能力维度(得分率)
RANK_BASE_DIMS = ('score', 'train_count', 'completed')
RANK_DIMS = RANK_BASE_DIMS + tuple(STANDARD_DIMS)
def _is_number(s):
try:
float(s)
return True
except (TypeError, ValueError):
return False
def _parse_date(s):
"""'YYYY-MM-DD' / 'YYYY/MM/DD' → aware datetime(当天 00:00)或 None。"""
s = (s or '').strip()
for fmt in ('%Y-%m-%d', '%Y/%m/%d'):
try:
return timezone.make_aware(datetime.strptime(s, fmt))
except ValueError:
continue
return None
# ── 训练记录列表(CMS-TRN-1 / CMS-TEA-3 共用)──────────────────────────────────
def _filter_records(base, request):
p = request.query_params
search = (p.get('search') or '').strip()
if search:
base = base.filter(
Q(case__title__icontains=search)
| Q(user__real_name__icontains=search)
| Q(user__phone__icontains=search)
)
for key, field in (('user_id', 'user_id'), ('case_id', 'case_id')):
v = (p.get(key) or '').strip()
if v.isdigit():
base = base.filter(**{field: int(v)})
status = (p.get('status') or '').strip()
if status:
base = base.filter(status=status)
for f in ('training_mode', 'case_type'):
v = (p.get(f) or '').strip()
if v:
base = base.filter(**{f: v})
inst = (p.get('institution') or '').strip()
if inst.isdigit():
base = base.filter(user__institution_id=int(inst))
d0, d1 = _parse_date(p.get('start_date')), _parse_date(p.get('end_date'))
if d0:
base = base.filter(created_at__gte=d0)
if d1:
base = base.filter(created_at__lt=d1 + timedelta(days=1))
# 归一分数范围过滤
min_s, max_s = (p.get('min_score') or '').strip(), (p.get('max_score') or '').strip()
if _is_number(min_s) or _is_number(max_s):
base = base.annotate(_norm=_NORM_TOTAL_EXPR)
if _is_number(min_s):
base = base.filter(_norm__gte=float(min_s))
if _is_number(max_s):
base = base.filter(_norm__lte=float(max_s))
return base
def _serialize_record(r, users, cases):
"""users/cases 为按 id 预取的字典(避免对 fastapi 表的非空 FK 做 INNER JOIN
丢掉宿主系统外部用户/孤立病例的记录)。"""
user = users.get(r.user_id)
case = cases.get(r.case_id)
dept = case.department if (case and case.department_id) else None
return {
'record_id': r.id,
'user_id': r.user_id,
'user_name': user.real_name if user else '',
'institution': user.institution_id if user else None,
'institution_name': (user.institution.name if (user and user.institution_id) else ''),
'case_id': r.case_id,
'case_title': case.title if case else '',
'department': dept.name if dept else '',
'training_mode': r.training_mode,
'case_type': r.case_type,
'status': r.status,
'score': float(r.total_score) if r.total_score is not None else None,
'score_type': r.score_type,
'score_normalized': (round(_norm_total(r.total_score, r.score_type), 1)
if r.total_score is not None else None),
'evaluation_level': r.evaluation_level,
'trained_at': r.end_time or r.created_at,
'duration_seconds': r.duration_seconds,
}
def build_training_records(base, request):
"""对给定 base 查询集应用过滤 + 分页 + 汇总,返回分页 Response。
超管传 `TrainingRecord.objects.all()`;带教传名下学生范围的查询集。
⚠️ 不用 select_relateduser/case 在 fastapi 表里是非空 FKINNER JOIN 会丢掉
宿主系统外部用户/本地缺失病例的记录,导致 count 与 results 不一致);改为按 id 批量预取。
"""
base = _filter_records(base, request)
avg = base.filter(status='completed').aggregate(a=Avg(_NORM_TOTAL_EXPR))['a']
summary = {
'total': base.count(),
'completed': base.filter(status='completed').count(),
'avg_score': round(float(avg), 1) if avg is not None else None,
}
paginator = PageNumberPagination()
page = paginator.paginate_queryset(base.order_by('-end_time', '-id'), request)
users = User.all_objects.filter(id__in={r.user_id for r in page}).select_related('institution')
cases = CaseBase.all_objects.filter(id__in={r.case_id for r in page}).select_related('department')
users = {u.id: u for u in users}
cases = {c.id: c for c in cases}
resp = paginator.get_paginated_response([_serialize_record(r, users, cases) for r in page])
resp.data['summary'] = summary
return resp
@api_view(['GET'])
@permission_classes([IsAuthenticated, IsSuperAdmin])
def training_records(request):
"""CMS-TRN-1 超管训练记录列表(全平台只读)。"""
return build_training_records(TrainingRecord.objects.all(), request)
# ── 能力画像(CMS-TEA-4)──────────────────────────────────────────────────────
def _radar_buckets(records):
"""各标准维度得分率列表。返回 {标准维度: [得分率,...]}。"""
buckets = defaultdict(list)
for r in records:
for dim, score, mx in _dimension_scores(r):
std = DIMENSION_MAP.get(dim)
if std and score is not None and mx and float(mx) > 0:
buckets[std].append(float(score) / float(mx) * 100)
return buckets
def student_competency(student):
"""单个学生的能力画像(复用移动端「智能分析」固定 5 维口径)。"""
qs = TrainingRecord.objects.filter(user_id=student.id, status='completed')
recs = list(qs.only('total_score', 'score_type', 'duration_seconds', 'ai_feedback_structured'))
agg = qs.aggregate(total=Sum('duration_seconds'), avg=Avg(_NORM_TOTAL_EXPR))
avg = agg['avg']
buckets = _radar_buckets(recs)
radar = [{'dimension': d, 'score': (round(sum(buckets[d]) / len(buckets[d])) if buckets[d] else 0)}
for d in STANDARD_DIMS]
ordered = sorted((x for x in radar if buckets[x['dimension']]), key=lambda x: x['score'])
weak = [ordered[0]['dimension']] if ordered else []
comment = ''
if len(ordered) >= 2 and ordered[0]['score'] != ordered[-1]['score']:
comment = (f"该学生{ordered[-1]['dimension']}表现突出,但{ordered[0]['dimension']}"
f"仍有提升空间,建议加强相关训练。")
elif ordered:
comment = f"各维度表现较为均衡,可针对{weak[0]}做进一步强化。"
return {
'student_id': student.id,
'real_name': student.real_name,
'completed_cases': qs.count(),
'total_hours': round((agg['total'] or 0) / 3600, 1),
'avg_score': round(float(avg), 1) if avg is not None else 0,
'diagnosis_accuracy': _diagnosis_accuracy(recs),
'radar': radar,
'weak_dimensions': weak,
'comment': comment,
}
# ── 排行榜(CMS-TEA-5)────────────────────────────────────────────────────────
def build_ranking(students, dimension):
"""名下学生按指定维度排名。
dimension ∈ {score(默认), train_count, completed} STANDARD_DIMS(各维度得分率)。
无数据的学生 value=None,排在末尾。
"""
dimension = (dimension or 'score').strip()
if dimension not in RANK_DIMS:
dimension = 'score'
info = {u.id: (u.real_name, (u.department.name if u.department_id else '')) for u in students}
ids = list(info)
metrics = {i: {'train_count': 0, 'completed': 0, 'score_sum': 0.0, 'score_n': 0,
'dim': defaultdict(list)} for i in ids}
for r in (TrainingRecord.objects.filter(user_id__in=ids)
.only('user_id', 'status', 'total_score', 'score_type', 'ai_feedback_structured')):
m = metrics[r.user_id]
m['train_count'] += 1
if r.status == 'completed':
m['completed'] += 1
n = _norm_total(r.total_score, r.score_type)
if n is not None:
m['score_sum'] += n
m['score_n'] += 1
for dim, score, mx in _dimension_scores(r):
std = DIMENSION_MAP.get(dim)
if std and score is not None and mx and float(mx) > 0:
m['dim'][std].append(float(score) / float(mx) * 100)
def value_of(i):
m = metrics[i]
if dimension == 'train_count':
return m['train_count']
if dimension == 'completed':
return m['completed']
if dimension in STANDARD_DIMS:
vals = m['dim'].get(dimension)
return round(sum(vals) / len(vals), 1) if vals else None
return round(m['score_sum'] / m['score_n'], 1) if m['score_n'] else None
rows = [{'student_id': i, 'real_name': info[i][0], 'department': info[i][1],
'value': value_of(i), 'train_count': metrics[i]['train_count']} for i in ids]
rows.sort(key=lambda x: (x['value'] is None, -(x['value'] or 0)))
for idx, row in enumerate(rows, 1):
row['rank'] = idx
return {'dimension': dimension, 'count': len(rows), 'ranking': rows}