jwt_util.py 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. from datetime import datetime, timezone, timedelta
  2. from typing import Dict, Any, Optional
  3. import jwt
  4. from fastapi import Request
  5. from core.settings import jwt_settings
  6. class JwtUtil:
  7. """
  8. JWT工具类
  9. """
  10. @staticmethod
  11. def generate_token(user_id: int, username: str) -> str:
  12. payload = {
  13. "user_id": user_id,
  14. "username": username,
  15. "exp": datetime.now(timezone.utc)
  16. + timedelta(minutes=jwt_settings.expire_minutes),
  17. }
  18. return jwt.encode(
  19. payload, jwt_settings.secret_key, algorithm=jwt_settings.algorithm
  20. )
  21. @staticmethod
  22. def verify_token(token: str) -> Dict[str, Any]:
  23. return jwt.decode(
  24. token, jwt_settings.secret_key, algorithm=[jwt_settings.algorithm]
  25. )
  26. @staticmethod
  27. def get_token_from_request(request: Request) -> Optional[str]:
  28. # 从Authorization头中获取
  29. auth_header = request.headers.get("Authorization")
  30. if auth_header and auth_header.startswith("Bearer "):
  31. return auth_header.split(" ")[1]
  32. # 从查询参数中获取
  33. token = request.query_params.get("token")
  34. if token:
  35. return token
  36. return None
  37. @classmethod
  38. def get_current_user_from_token(cls, request: Request) -> Optional[Dict[str, Any]]:
  39. token = cls.get_token_from_request(request)
  40. if not token:
  41. return None
  42. try:
  43. return cls.verify_token(token)
  44. except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
  45. return None