sys_dept_service.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. from typing import Optional
  2. from sqlalchemy import select
  3. from core.enums import DeleteTypeEnum
  4. from domain.dtos import SysDeptDto, SysDeptCreateDto, SysDeptUpdateDto
  5. from domain.models import SysDeptModel
  6. from domain.services.base_services import CurdServiceBase
  7. class SysDeptService(
  8. CurdServiceBase[SysDeptModel, SysDeptDto, SysDeptCreateDto, SysDeptUpdateDto]
  9. ):
  10. def __init__(self, db_name: Optional[str] = None):
  11. super().__init__(
  12. SysDeptModel,
  13. SysDeptDto,
  14. SysDeptCreateDto,
  15. SysDeptUpdateDto,
  16. db_name=db_name,
  17. )
  18. async def get_dept_children(self, dept_id: int):
  19. async with self._get_async_db() as db:
  20. return (
  21. (
  22. await db.execute(
  23. select(SysDeptModel)
  24. .where(
  25. SysDeptModel.parent_id == dept_id,
  26. SysDeptModel.is_del == DeleteTypeEnum.NORMAL.value,
  27. )
  28. .distinct()
  29. )
  30. )
  31. .scalars()
  32. .all()
  33. )
  34. async def get_dept_all_children(self, dept_id: int):
  35. async with self._get_async_db() as db:
  36. # 创建递归CTE查询
  37. cte = (
  38. select(SysDeptModel)
  39. .where(
  40. SysDeptModel.id == dept_id,
  41. SysDeptModel.is_del == DeleteTypeEnum.NORMAL.value,
  42. )
  43. .cte(recursive=True)
  44. )
  45. # 递归部分:查找所有子部门
  46. cte = cte.union_all(
  47. select(SysDeptModel)
  48. .join(cte, SysDeptModel.parent_id == cte.c.id)
  49. .where(SysDeptModel.is_del == DeleteTypeEnum.NORMAL.value)
  50. )
  51. dept_list = (await db.execute(select(cte).distinct())).scalars().all()
  52. return dept_list