from __future__ import annotations import hashlib import time from dataclasses import dataclass from typing import Any import httpx from fastapi import Request from app.core.config import settings from app.core.exceptions import AppError @dataclass(frozen=True) class AuthenticatedUser: """外部认证用户:承载从 Django 用户中心解析出的标准用户信息。""" user_id: str source: str = "django_user_center" username: str | None = None display_name: str | None = None tenant_id: str | None = None role: str | None = None class ExternalAuthService: """外部认证服务:调用 Django `/api/user/users/me/` 并标准化当前登录用户。""" _cache: dict[str, tuple[float, AuthenticatedUser]] = {} async def authenticate(self, request: Request) -> AuthenticatedUser: """用户鉴权:转发前端鉴权凭证到 Django 用户中心,成功后返回标准用户信息。""" if not settings.auth_user_me_url: raise AppError("AUTH_CONFIG_MISSING", "auth user me url is not configured", 500) outbound_headers = self._build_forward_headers(request) if not outbound_headers: raise AppError("AUTH_CREDENTIAL_REQUIRED", "Authorization or Cookie is required", 401) cache_key = self._cache_key(outbound_headers) cached = self._read_cache(cache_key) if cached: return cached try: async with httpx.AsyncClient(timeout=settings.auth_timeout_seconds, follow_redirects=False) as client: response = await client.get(settings.auth_user_me_url, headers=outbound_headers) except httpx.TimeoutException as exc: raise AppError("AUTH_USER_CENTER_UNAVAILABLE", "auth user center request timeout", 503) from exc except httpx.HTTPError as exc: raise AppError("AUTH_USER_CENTER_UNAVAILABLE", "auth user center unavailable", 503) from exc if response.status_code != 200: raise AppError("AUTH_USER_INVALID", "current login user is invalid", 401) try: payload = response.json() except ValueError as exc: raise AppError("AUTH_USER_PARSE_FAILED", "auth user center returned invalid json", 502) from exc user = self._parse_user(payload) self._write_cache(cache_key, user) return user def _build_forward_headers(self, request: Request) -> dict[str, str]: """认证转发:只透传鉴权必要 Header,避免把无关内部头转发给用户中心。""" headers: dict[str, str] = {} authorization = request.headers.get("Authorization") cookie = request.headers.get("Cookie") if authorization: headers["Authorization"] = authorization if cookie: headers["Cookie"] = cookie if request.headers.get("X-CSRFToken"): headers["X-CSRFToken"] = request.headers["X-CSRFToken"] return headers def _parse_user(self, payload: dict[str, Any]) -> AuthenticatedUser: """用户解析:兼容常见 Django 用户接口结构,并提取稳定 user_id。""" data = payload.get("data") if isinstance(payload.get("data"), dict) else payload user_id = self._first_present(data, ["id", "user_id", "uid", "pk", "uuid"]) if user_id is None: raise AppError("AUTH_USER_PARSE_FAILED", "auth user id is missing", 502) username = self._first_present(data, ["username", "account", "mobile", "phone"]) display_name = self._first_present(data, ["display_name", "name", "nickname", "real_name"]) role = self._first_present(data, ["role", "user_role"]) tenant_id = self._first_present(data, ["tenant_id", "org_id", "organization_id"]) return AuthenticatedUser( user_id=str(user_id), username=str(username) if username is not None else None, display_name=str(display_name) if display_name is not None else None, role=str(role) if role is not None else None, tenant_id=str(tenant_id) if tenant_id is not None else None, ) @staticmethod def _first_present(data: dict[str, Any], keys: list[str]) -> Any: """用户解析:按优先级读取第一个非空字段。""" for key in keys: value = data.get(key) if value is not None and value != "": return value return None @staticmethod def _cache_key(headers: dict[str, str]) -> str: """认证缓存:基于鉴权凭证生成不可逆缓存键,避免保存明文 token。""" raw = "|".join(f"{key}:{value}" for key, value in sorted(headers.items())) return hashlib.sha256(raw.encode("utf-8")).hexdigest() def _read_cache(self, cache_key: str) -> AuthenticatedUser | None: """认证缓存:读取进程内短期用户缓存,减少频繁请求 Django 用户中心。""" item = self._cache.get(cache_key) if not item: return None expires_at, user = item if expires_at <= time.time(): self._cache.pop(cache_key, None) return None return user def _write_cache(self, cache_key: str, user: AuthenticatedUser) -> None: """认证缓存:写入短期用户缓存,缓存只保存标准用户信息,不保存 token 明文。""" ttl = max(settings.auth_cache_ttl_seconds, 0) if ttl <= 0: return self._cache[cache_key] = (time.time() + ttl, user)