from __future__ import annotations import hashlib import time from dataclasses import dataclass from typing import Any import httpx from fastapi import Request from app.core.config import settings from app.core.exceptions import AppError @dataclass(frozen=True) class AuthenticatedUser: """外部认证用户:承载从 Django 用户中心解析出的标准用户信息。""" user_id: str source: str = "django_user_center" username: str | None = None display_name: str | None = None tenant_id: str | None = None role: str | None = None phone: str | None = None avatar: str | None = None gender: int | None = None institution_id: int | None = None institution_name: str | None = None department_id: int | None = None department_name: str | None = None title_name: str | None = None major: str | None = None training_stage: str | None = None learning_target: str | None = None current_level: str | None = None status: int | None = None profile: dict[str, Any] | None = None class ExternalAuthService: """外部认证服务:调用 Django `/api/user/users/me/` 并标准化当前登录用户。""" _cache: dict[str, tuple[float, AuthenticatedUser]] = {} async def authenticate(self, request: Request) -> AuthenticatedUser: """用户鉴权:转发前端鉴权凭证到 Django 用户中心,成功后返回标准用户信息。""" if not settings.auth_user_me_url: raise AppError("AUTH_CONFIG_MISSING", "auth user me url is not configured", 500) outbound_headers = self._build_forward_headers(request) if not outbound_headers: raise AppError("AUTH_CREDENTIAL_REQUIRED", "Authorization header is required", 401) cache_key = self._cache_key(outbound_headers) cached = self._read_cache(cache_key) if cached: return cached try: async with httpx.AsyncClient(timeout=settings.auth_timeout_seconds, follow_redirects=False) as client: response = await client.get(settings.auth_user_me_url, headers=outbound_headers) except httpx.TimeoutException as exc: raise AppError("AUTH_USER_CENTER_UNAVAILABLE", "auth user center request timeout", 503) from exc except httpx.HTTPError as exc: raise AppError("AUTH_USER_CENTER_UNAVAILABLE", "auth user center unavailable", 503) from exc if response.status_code != 200: raise AppError("AUTH_USER_INVALID", "current login user is invalid", 401) try: payload = response.json() except ValueError as exc: raise AppError("AUTH_USER_PARSE_FAILED", "auth user center returned invalid json", 502) from exc user = self._parse_user(payload) self._write_cache(cache_key, user) return user def _build_forward_headers(self, request: Request) -> dict[str, str]: """认证转发:只透传 Bearer token,避免把无关内部头转发给用户中心。""" headers: dict[str, str] = {} authorization = request.headers.get("Authorization") if authorization: authorization = authorization.strip() if authorization and " " not in authorization: authorization = f"Bearer {authorization}" headers["Authorization"] = authorization return headers def _parse_user(self, payload: dict[str, Any]) -> AuthenticatedUser: """用户解析:兼容常见 Django 用户接口结构,并提取稳定 user_id。""" data = payload.get("data") if isinstance(payload.get("data"), dict) else payload user_id = self._first_present(data, ["id", "user_id", "uid", "pk", "uuid"]) if user_id is None: raise AppError("AUTH_USER_PARSE_FAILED", "auth user id is missing", 502) status = self._to_int(data.get("status")) if status == 0: raise AppError("AUTH_USER_DISABLED", "current user is disabled", 403) username = self._first_present(data, ["username", "account", "mobile", "phone"]) display_name = self._first_present(data, ["display_name", "name", "nickname", "real_name"]) role = self._first_present(data, ["role_type", "role", "user_role"]) institution_id = self._to_int(data.get("institution_id") or data.get("institution")) tenant_id = str(institution_id) if institution_id is not None else None profile = self._build_profile(data) return AuthenticatedUser( user_id=str(user_id), username=str(username) if username is not None else None, display_name=str(display_name) if display_name is not None else None, role=str(role) if role is not None else None, tenant_id=str(tenant_id) if tenant_id is not None else None, phone=self._to_str(data.get("phone")), avatar=self._to_str(data.get("avatar")), gender=self._to_int(data.get("gender")), institution_id=institution_id, institution_name=self._to_str(data.get("institution_name")), department_id=self._to_int(data.get("department_id") or data.get("department")), department_name=self._to_str(data.get("department_name")), title_name=self._to_str(data.get("title_name")), major=self._to_str(data.get("major")), training_stage=self._to_str(data.get("training_stage")), learning_target=self._to_str(data.get("learning_target")), current_level=self._to_str(data.get("current_level")), status=status, profile=profile, ) def _build_profile(self, data: dict[str, Any]) -> dict[str, Any]: """用户画像:按 Django `/me` 字段白名单保留学习画像,供前端和后续 Agent 个性化使用。""" keys = [ "id", "username", "real_name", "phone", "avatar", "gender", "role_type", "institution", "institution_id", "institution_name", "department", "department_id", "department_name", "title_name", "major", "training_stage", "learning_target", "competency_profile", "weak_dimensions", "strong_dimensions", "ai_preference", "total_training_count", "total_case_count", "current_level", "status", "last_login", "last_login_time", "is_superuser", "is_staff", "is_active", "date_joined", "created_at", "updated_at", ] return {key: data.get(key) for key in keys if key in data} @staticmethod def _first_present(data: dict[str, Any], keys: list[str]) -> Any: """用户解析:按优先级读取第一个非空字段。""" for key in keys: value = data.get(key) if value is not None and value != "": return value return None @staticmethod def _to_int(value: Any) -> int | None: """用户解析:把 Django 返回的数字字段稳定转成 int,空值保持 None。""" if value is None or value == "": return None try: return int(value) except (TypeError, ValueError): return None @staticmethod def _to_str(value: Any) -> str | None: """用户解析:把可展示字段转成字符串,空值保持 None。""" if value is None or value == "": return None return str(value) @staticmethod def _cache_key(headers: dict[str, str]) -> str: """认证缓存:基于鉴权凭证生成不可逆缓存键,避免保存明文 token。""" raw = "|".join(f"{key}:{value}" for key, value in sorted(headers.items())) return hashlib.sha256(raw.encode("utf-8")).hexdigest() def _read_cache(self, cache_key: str) -> AuthenticatedUser | None: """认证缓存:读取进程内短期用户缓存,减少频繁请求 Django 用户中心。""" item = self._cache.get(cache_key) if not item: return None expires_at, user = item if expires_at <= time.time(): self._cache.pop(cache_key, None) return None return user def _write_cache(self, cache_key: str, user: AuthenticatedUser) -> None: """认证缓存:写入短期用户缓存,缓存只保存标准用户信息,不保存 token 明文。""" ttl = max(settings.auth_cache_ttl_seconds, 0) if ttl <= 0: return self._cache[cache_key] = (time.time() + ttl, user)