32 lines
1.2 KiB
Python
32 lines
1.2 KiB
Python
|
|
import time
|
||
|
|
|
||
|
|
from django.core.cache import cache
|
||
|
|
|
||
|
|
_BLACKLIST_PREFIX = 'jwt_blacklist:'
|
||
|
|
_INVALID_BEFORE_PREFIX = 'jwt_user_invalid_before:'
|
||
|
|
_USER_TOKEN_TTL = 7 * 24 * 3600 # 覆盖 refresh 最长生命周期
|
||
|
|
|
||
|
|
|
||
|
|
def revoke_token(jti: str, exp_ts: float) -> None:
|
||
|
|
"""将单个 token 加入黑名单。U7 退出 / U8 旋转旧 refresh 时调用。"""
|
||
|
|
ttl = max(int(exp_ts - time.time()), 1)
|
||
|
|
cache.set(f'{_BLACKLIST_PREFIX}{jti}', '1', timeout=ttl)
|
||
|
|
|
||
|
|
|
||
|
|
def invalidate_user_tokens(user_id: int) -> None:
|
||
|
|
"""用户级失效截止:写入当前时间戳+1,此前所有 token 立即失效。U5/U6 改密后调用。
|
||
|
|
|
||
|
|
+1 确保同一秒内签发的旧 token 被拒绝,而改密后新登录签发的 token(iat >= now+1)被放行。
|
||
|
|
"""
|
||
|
|
cache.set(f'{_INVALID_BEFORE_PREFIX}{user_id}', int(time.time()) + 1, timeout=_USER_TOKEN_TTL)
|
||
|
|
|
||
|
|
|
||
|
|
def is_token_revoked(jti: str) -> bool:
|
||
|
|
return bool(cache.get(f'{_BLACKLIST_PREFIX}{jti}'))
|
||
|
|
|
||
|
|
|
||
|
|
def get_user_invalid_before(user_id: int):
|
||
|
|
"""返回用户级失效截止时间戳(unix seconds),不存在则返回 None。"""
|
||
|
|
val = cache.get(f'{_INVALID_BEFORE_PREFIX}{user_id}')
|
||
|
|
return int(val) if val is not None else None
|