| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960 |
- from typing import Optional
- from sqlalchemy import select
- from core.enums import DeleteTypeEnum
- from domain.dtos import SysDeptDto, SysDeptCreateDto, SysDeptUpdateDto
- from domain.models import SysDeptModel
- from domain.services.base_services import CurdServiceBase
- class SysDeptService(
- CurdServiceBase[SysDeptModel, SysDeptDto, SysDeptCreateDto, SysDeptUpdateDto]
- ):
- def __init__(self, db_name: Optional[str] = None):
- super().__init__(
- SysDeptModel,
- SysDeptDto,
- SysDeptCreateDto,
- SysDeptUpdateDto,
- db_name=db_name,
- )
- async def get_dept_children(self, dept_id: int):
- async with self._get_async_db() as db:
- return (
- (
- await db.execute(
- select(SysDeptModel)
- .where(
- SysDeptModel.parent_id == dept_id,
- SysDeptModel.is_del == DeleteTypeEnum.NORMAL.value,
- )
- .distinct()
- )
- )
- .scalars()
- .all()
- )
- async def get_dept_all_children(self, dept_id: int):
- async with self._get_async_db() as db:
- # 创建递归CTE查询
- cte = (
- select(SysDeptModel)
- .where(
- SysDeptModel.id == dept_id,
- SysDeptModel.is_del == DeleteTypeEnum.NORMAL.value,
- )
- .cte(recursive=True)
- )
- # 递归部分:查找所有子部门
- cte = cte.union_all(
- select(SysDeptModel)
- .join(cte, SysDeptModel.parent_id == cte.c.id)
- .where(SysDeptModel.is_del == DeleteTypeEnum.NORMAL.value)
- )
- dept_list = (await db.execute(select(cte).distinct())).scalars().all()
- return dept_list
|