from datetime import datetime, timezone, timedelta from typing import Dict, Any, Optional import jwt from fastapi import Request from core.settings import jwt_settings class JwtUtil: """ JWT工具类 """ @staticmethod def generate_token(user_id: int, username: str) -> str: payload = { "user_id": user_id, "username": username, "exp": datetime.now(timezone.utc) + timedelta(minutes=jwt_settings.expire_minutes), } return jwt.encode( payload, jwt_settings.secret_key, algorithm=jwt_settings.algorithm ) @staticmethod def verify_token(token: str) -> Dict[str, Any]: return jwt.decode( token, jwt_settings.secret_key, algorithm=[jwt_settings.algorithm] ) @staticmethod def get_token_from_request(request: Request) -> Optional[str]: # 从Authorization头中获取 auth_header = request.headers.get("Authorization") if auth_header and auth_header.startswith("Bearer "): return auth_header.split(" ")[1] # 从查询参数中获取 token = request.query_params.get("token") if token: return token return None @classmethod def get_current_user_from_token(cls, request: Request) -> Optional[Dict[str, Any]]: token = cls.get_token_from_request(request) if not token: return None try: return cls.verify_token(token) except (jwt.ExpiredSignatureError, jwt.InvalidTokenError): return None