| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556 |
- from datetime import datetime, timezone, timedelta
- from typing import Dict, Any, Optional
- import jwt
- from fastapi import Request
- from core.settings import jwt_settings
- class JwtUtil:
- """
- JWT工具类
- """
- @staticmethod
- def generate_token(user_id: int, username: str) -> str:
- payload = {
- "user_id": user_id,
- "username": username,
- "exp": datetime.now(timezone.utc)
- + timedelta(minutes=jwt_settings.expire_minutes),
- }
- return jwt.encode(
- payload, jwt_settings.secret_key, algorithm=jwt_settings.algorithm
- )
- @staticmethod
- def verify_token(token: str) -> Dict[str, Any]:
- return jwt.decode(
- token, jwt_settings.secret_key, algorithm=[jwt_settings.algorithm]
- )
- @staticmethod
- def get_token_from_request(request: Request) -> Optional[str]:
- # 从Authorization头中获取
- auth_header = request.headers.get("Authorization")
- if auth_header and auth_header.startswith("Bearer "):
- return auth_header.split(" ")[1]
- # 从查询参数中获取
- token = request.query_params.get("token")
- if token:
- return token
- return None
- @classmethod
- def get_current_user_from_token(cls, request: Request) -> Optional[Dict[str, Any]]:
- token = cls.get_token_from_request(request)
- if not token:
- return None
- try:
- return cls.verify_token(token)
- except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
- return None
|