from redis import asyncio as aioredis from core.settings import redis_settings class RedisUtil: """ Redis相关方法 """ redis = None @classmethod async def create_redis_pool(cls) -> aioredis.Redis: """ 应用启动时初始化redis连接 :return: Redis连接对象 """ if not redis_settings.enable: return from utils import logger logger.info('开始连接redis...') redis = await aioredis.from_url( url=f'redis://{redis_settings.host}', port=redis_settings.port, username=redis_settings.username, password=redis_settings.password, db=redis_settings.database, encoding='utf-8', decode_responses=True, ) try: connection = await redis.ping() if connection: logger.info('redis连接成功') else: logger.error('redis连接失败') cls.redis = redis except AuthenticationError as e: logger.error(f'redis用户名或密码错误,详细错误信息:{e}') except TimeoutError as e: logger.error(f'redis连接超时,详细错误信息:{e}') except RedisError as e: logger.error(f'redis连接错误,详细错误信息:{e}') @classmethod async def get_redis_pool(cls) -> aioredis.Redis: """ 获取redis连接对象 :return: Redis连接对象 """ if cls.redis is None: await cls.create_redis_pool() return cls.redis @classmethod async def close_redis_pool(cls): """ 应用关闭时关闭redis连接 :return: """ from utils import logger if cls.redis is None: return try: await cls.redis.close() logger.info('关闭redis连接成功') except RuntimeError as e: logger.debug('尝试关闭Redis连接时事件循环已关闭,可能正常退出:%s', e)