from functools import lru_cache from typing import Optional, List from sqlalchemy import select, or_ from sqlalchemy.ext.asyncio import AsyncSession from core.enums import StatusTypeEnum, DeleteTypeEnum, MasterTypeEnum from core.exceptions import ServiceWarning from domain.dtos import SysLoginLogDto, SysOperLogDto, SysDictDataDto from domain.models import ( SysUserModel, SysDeptModel, SysRoleModel, SysUserRoleModel, SysPostModel, SysUserPostModel, SysMenuModel, SysLoginLogModel, SysOperLogModel, SysRoleDeptModel, SysPermissionModel, SysDictDataModel, ) from utils import DBUtil, logger class CommonService: def __init__(self, db_name: Optional[str] = None): self._db_name = db_name pass async def _get_async_db(self) -> AsyncSession: """根据db_name获取异步数据库连接""" try: # 使用DBUtil获取异步连接 async with DBUtil()(db_name=self._db_name) as conn: return conn except Exception as e: logger.error(f"数据库连接失败: {str(e)}") raise async def get_user_detail_by_id(self, id: int): """ 查询用户详细信息 :param id: :return: """ try: async with await self._get_async_db() as db: user = ( ( await db.execute( select(SysUserModel) .where( SysUserModel.status == StatusTypeEnum.NORMAL.key, SysUserModel.is_del == DeleteTypeEnum.NORMAL.key, SysUserModel.id == id, ) .distinct() ) ) .scalars() .first() ) if not user: return None user_id_str = str(user.id) dept = ( ( await db.execute( select(SysDeptModel) .where( SysDeptModel.status == StatusTypeEnum.NORMAL.key, SysDeptModel.is_del == DeleteTypeEnum.NORMAL.key, SysDeptModel.id == user.dept_id, ) .distinct() ) ) .scalars() .first() ) roles = ( ( await db.execute( select(SysRoleModel) .join( SysUserRoleModel, SysUserRoleModel.role_id == SysRoleModel.id, ) .where( SysUserRoleModel.user_id == user.id, SysRoleModel.status == StatusTypeEnum.NORMAL.key, SysRoleModel.is_del == DeleteTypeEnum.NORMAL.key, ) .distinct() ) ) .scalars() .all() ) posts = ( ( await db.execute( select(SysPostModel) .join( SysUserPostModel, SysUserPostModel.post_id == SysPostModel.id, ) .where( SysUserPostModel.user_id == user.id, SysPostModel.status == StatusTypeEnum.NORMAL.key, ) .distinct() ) ) .scalars() .all() ) role_id_list = [str(item.id) for item in roles] if "1" in role_id_list: menus = ( ( await db.execute( select(SysMenuModel) .where(SysMenuModel.status == StatusTypeEnum.NORMAL.key) .distinct() ) ) .scalars() .all() ) else: menus = ( ( await db.execute( select(SysMenuModel) .join( SysPermissionModel, SysPermissionModel.name == SysMenuModel.perms, ) .where( SysRoleModel.id.in_(role_id_list), or_( ( SysPermissionModel.master == MasterTypeEnum.ROLE.key, SysPermissionModel.master_value.in_( role_id_list ), ), ( SysPermissionModel.master == MasterTypeEnum.USER.key, SysPermissionModel.master_value == user_id_str, ), ), SysMenuModel.status == StatusTypeEnum.NORMAL.key, ) .distinct() ) ) .scalars() .all() ) return { "user": user, "dept": dept, "roles": roles, "posts": posts, "menus": menus, } except Exception as e: logger.error(f"查询用户详细信息失败: {str(e)}") raise ServiceWarning(message=f"查询用户详细信息失败: {str(e)}") async def get_user_role_dept_ids(self, user_id: int) -> list[int]: """ 获取用户角色部门id :param user_id: :return: """ try: async with self._get_async_db() as db: dept_ids = ( ( await db.execute( select(SysRoleDeptModel.dept_id) .join( SysRoleModel, SysRoleDeptModel.role_id == SysRoleModel.id, ) .join( SysUserRoleModel, SysRoleDeptModel.role_id == SysUserRoleModel.role_id, ) .where( SysUserRoleModel.user_id == user_id, SysRoleModel.status == StatusTypeEnum.NORMAL.key, ) .distinct() ) ) .scalars() .all() ) return dept_ids except Exception as e: raise ServiceWarning(f"查询用户角色部门id失败:{str(e)}") async def get_dept_all_children_ids(self, dept_id: int): """ 获取部门所有子部门id :param dept_id: :return: """ try: async with self._get_async_db() as db: # 创建递归CTE查询 cte = ( select(SysDeptModel.id) .where( SysDeptModel.id == dept_id, SysDeptModel.is_del == DeleteTypeEnum.NORMAL.value, ) .cte(recursive=True) ) # 递归部分:查找所有子部门 cte = cte.union_all( select(SysDeptModel.id) .join(cte, SysDeptModel.parent_id == cte.c.id) .where(SysDeptModel.is_del == DeleteTypeEnum.NORMAL.value) ) ids = (await db.execute(select(cte).distinct())).scalars().all() return ids except Exception as e: raise ServiceWarning(f"查询部门所有子部门id失败:{str(e)}") @lru_cache() async def get_all_dict_data(self) -> List[SysDictDataDto]: """根据字典类型查询字典数据""" try: async with await self._get_async_db() as db: query = select(SysDictDataModel).where( SysDictDataModel.status == StatusTypeEnum.NORMAL.key, ) result = await db.execute(query) models = result.scalars().all() return [SysDictDataDto.from_model(model) for model in models] except Exception as e: raise ServiceWarning(f"查询字典数据失败: {str(e)}") async def create_login_log(self, dto: SysLoginLogDto): """ 创建登录日志 :param dto: :return: """ try: async with await self._get_async_db() as db: db_obj = SysLoginLogModel.from_dict(dto.to_dict()) db.add(db_obj) await db.commit() await db.refresh(db_obj) return db_obj except Exception as e: raise ServiceWarning(f"创建登录日志失败:{str(e)}") async def create_oper_log(self, dto: SysOperLogDto): """ 创建操作日志 :param dto: :return: """ try: async with await self._get_async_db() as db: db_obj = SysOperLogModel.from_dict(dto.to_dict()) db.add(db_obj) await db.commit() await db.refresh(db_obj) return db_obj except Exception as e: raise ServiceWarning(f"创建操作日志失败:{str(e)}")