216 lines
8.7 KiB
Python
216 lines
8.7 KiB
Python
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
|
|
import httpx
|
|
from fastapi import Request
|
|
|
|
from app.core.config import settings
|
|
from app.core.exceptions import AppError
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class AuthenticatedUser:
|
|
"""外部认证用户:承载从 Django 用户中心解析出的标准用户信息。"""
|
|
|
|
user_id: str
|
|
source: str = "django_user_center"
|
|
username: str | None = None
|
|
display_name: str | None = None
|
|
tenant_id: str | None = None
|
|
role: str | None = None
|
|
phone: str | None = None
|
|
avatar: str | None = None
|
|
gender: int | None = None
|
|
institution_id: int | None = None
|
|
institution_name: str | None = None
|
|
department_id: int | None = None
|
|
department_name: str | None = None
|
|
title_name: str | None = None
|
|
major: str | None = None
|
|
training_stage: str | None = None
|
|
learning_target: str | None = None
|
|
current_level: str | None = None
|
|
status: int | None = None
|
|
profile: dict[str, Any] | None = None
|
|
|
|
|
|
class ExternalAuthService:
|
|
"""外部认证服务:调用 Django `/api/user/users/me/` 并标准化当前登录用户。"""
|
|
|
|
_cache: dict[str, tuple[float, AuthenticatedUser]] = {}
|
|
|
|
async def authenticate(self, request: Request) -> AuthenticatedUser:
|
|
"""用户鉴权:转发前端鉴权凭证到 Django 用户中心,成功后返回标准用户信息。"""
|
|
if not settings.auth_user_me_url:
|
|
raise AppError("AUTH_CONFIG_MISSING", "auth user me url is not configured", 500)
|
|
|
|
outbound_headers = self._build_forward_headers(request)
|
|
if not outbound_headers:
|
|
raise AppError("AUTH_CREDENTIAL_REQUIRED", "Authorization header is required", 401)
|
|
|
|
cache_key = self._cache_key(outbound_headers)
|
|
cached = self._read_cache(cache_key)
|
|
if cached:
|
|
return cached
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=settings.auth_timeout_seconds, follow_redirects=False) as client:
|
|
response = await client.get(settings.auth_user_me_url, headers=outbound_headers)
|
|
except httpx.TimeoutException as exc:
|
|
raise AppError("AUTH_USER_CENTER_UNAVAILABLE", "auth user center request timeout", 503) from exc
|
|
except httpx.HTTPError as exc:
|
|
raise AppError("AUTH_USER_CENTER_UNAVAILABLE", "auth user center unavailable", 503) from exc
|
|
|
|
if response.status_code != 200:
|
|
raise AppError("AUTH_USER_INVALID", "current login user is invalid", 401)
|
|
|
|
try:
|
|
payload = response.json()
|
|
except ValueError as exc:
|
|
raise AppError("AUTH_USER_PARSE_FAILED", "auth user center returned invalid json", 502) from exc
|
|
|
|
user = self._parse_user(payload)
|
|
self._write_cache(cache_key, user)
|
|
return user
|
|
|
|
def _build_forward_headers(self, request: Request) -> dict[str, str]:
|
|
"""认证转发:只透传 Bearer token,避免把无关内部头转发给用户中心。"""
|
|
headers: dict[str, str] = {}
|
|
authorization = request.headers.get("Authorization")
|
|
if authorization:
|
|
authorization = authorization.strip()
|
|
if authorization and " " not in authorization:
|
|
authorization = f"Bearer {authorization}"
|
|
headers["Authorization"] = authorization
|
|
return headers
|
|
|
|
def _parse_user(self, payload: dict[str, Any]) -> AuthenticatedUser:
|
|
"""用户解析:兼容常见 Django 用户接口结构,并提取稳定 user_id。"""
|
|
data = payload.get("data") if isinstance(payload.get("data"), dict) else payload
|
|
user_id = self._first_present(data, ["id", "user_id", "uid", "pk", "uuid"])
|
|
if user_id is None:
|
|
raise AppError("AUTH_USER_PARSE_FAILED", "auth user id is missing", 502)
|
|
status = self._to_int(data.get("status"))
|
|
if status == 0:
|
|
raise AppError("AUTH_USER_DISABLED", "current user is disabled", 403)
|
|
username = self._first_present(data, ["username", "account", "mobile", "phone"])
|
|
display_name = self._first_present(data, ["display_name", "name", "nickname", "real_name"])
|
|
role = self._first_present(data, ["role_type", "role", "user_role"])
|
|
institution_id = self._to_int(data.get("institution_id") or data.get("institution"))
|
|
tenant_id = str(institution_id) if institution_id is not None else None
|
|
profile = self._build_profile(data)
|
|
return AuthenticatedUser(
|
|
user_id=str(user_id),
|
|
username=str(username) if username is not None else None,
|
|
display_name=str(display_name) if display_name is not None else None,
|
|
role=str(role) if role is not None else None,
|
|
tenant_id=str(tenant_id) if tenant_id is not None else None,
|
|
phone=self._to_str(data.get("phone")),
|
|
avatar=self._to_str(data.get("avatar")),
|
|
gender=self._to_int(data.get("gender")),
|
|
institution_id=institution_id,
|
|
institution_name=self._to_str(data.get("institution_name")),
|
|
department_id=self._to_int(data.get("department_id") or data.get("department")),
|
|
department_name=self._to_str(data.get("department_name")),
|
|
title_name=self._to_str(data.get("title_name")),
|
|
major=self._to_str(data.get("major")),
|
|
training_stage=self._to_str(data.get("training_stage")),
|
|
learning_target=self._to_str(data.get("learning_target")),
|
|
current_level=self._to_str(data.get("current_level")),
|
|
status=status,
|
|
profile=profile,
|
|
)
|
|
|
|
def _build_profile(self, data: dict[str, Any]) -> dict[str, Any]:
|
|
"""用户画像:按 Django `/me` 字段白名单保留学习画像,供前端和后续 Agent 个性化使用。"""
|
|
keys = [
|
|
"id",
|
|
"username",
|
|
"real_name",
|
|
"phone",
|
|
"avatar",
|
|
"gender",
|
|
"role_type",
|
|
"institution",
|
|
"institution_id",
|
|
"institution_name",
|
|
"department",
|
|
"department_id",
|
|
"department_name",
|
|
"title_name",
|
|
"major",
|
|
"training_stage",
|
|
"learning_target",
|
|
"competency_profile",
|
|
"weak_dimensions",
|
|
"strong_dimensions",
|
|
"ai_preference",
|
|
"total_training_count",
|
|
"total_case_count",
|
|
"current_level",
|
|
"status",
|
|
"last_login",
|
|
"last_login_time",
|
|
"is_superuser",
|
|
"is_staff",
|
|
"is_active",
|
|
"date_joined",
|
|
"created_at",
|
|
"updated_at",
|
|
]
|
|
return {key: data.get(key) for key in keys if key in data}
|
|
|
|
@staticmethod
|
|
def _first_present(data: dict[str, Any], keys: list[str]) -> Any:
|
|
"""用户解析:按优先级读取第一个非空字段。"""
|
|
for key in keys:
|
|
value = data.get(key)
|
|
if value is not None and value != "":
|
|
return value
|
|
return None
|
|
|
|
@staticmethod
|
|
def _to_int(value: Any) -> int | None:
|
|
"""用户解析:把 Django 返回的数字字段稳定转成 int,空值保持 None。"""
|
|
if value is None or value == "":
|
|
return None
|
|
try:
|
|
return int(value)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
@staticmethod
|
|
def _to_str(value: Any) -> str | None:
|
|
"""用户解析:把可展示字段转成字符串,空值保持 None。"""
|
|
if value is None or value == "":
|
|
return None
|
|
return str(value)
|
|
|
|
@staticmethod
|
|
def _cache_key(headers: dict[str, str]) -> str:
|
|
"""认证缓存:基于鉴权凭证生成不可逆缓存键,避免保存明文 token。"""
|
|
raw = "|".join(f"{key}:{value}" for key, value in sorted(headers.items()))
|
|
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
|
|
|
|
def _read_cache(self, cache_key: str) -> AuthenticatedUser | None:
|
|
"""认证缓存:读取进程内短期用户缓存,减少频繁请求 Django 用户中心。"""
|
|
item = self._cache.get(cache_key)
|
|
if not item:
|
|
return None
|
|
expires_at, user = item
|
|
if expires_at <= time.time():
|
|
self._cache.pop(cache_key, None)
|
|
return None
|
|
return user
|
|
|
|
def _write_cache(self, cache_key: str, user: AuthenticatedUser) -> None:
|
|
"""认证缓存:写入短期用户缓存,缓存只保存标准用户信息,不保存 token 明文。"""
|
|
ttl = max(settings.auth_cache_ttl_seconds, 0)
|
|
if ttl <= 0:
|
|
return
|
|
self._cache[cache_key] = (time.time() + ttl, user)
|