sys_oper_log_service.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. from typing import Optional, List, Dict, Any
  2. from sqlalchemy import select, func, or_
  3. from core.exceptions import ServiceWarning
  4. from domain.dtos import SysOperLogDto, PageDto, PageResultDto
  5. from domain.models import SysOperLogModel
  6. from domain.services.base_services import ServiceBase
  7. class SysOperLogService(ServiceBase):
  8. def __init__(self, db_name: Optional[str] = None):
  9. super().__init__(db_name)
  10. async def get_page_list(
  11. self,
  12. page_dto: PageDto,
  13. ) -> PageResultDto[SysOperLogDto]:
  14. """
  15. 获取操作日志列表
  16. :param page_dto: 分页查询DTO
  17. :return: 操作日志列表
  18. """
  19. try:
  20. async with await self._get_async_db() as db:
  21. query = select(SysOperLogModel)
  22. query = self._apply_search(query, page_dto.search)
  23. query = self._apply_filter(
  24. query, page_dto.filters, page_dto.filter_conditions
  25. )
  26. total = await db.scalar(select(func.count()).select_from(query))
  27. query = self._apply_order(query, page_dto.order)
  28. query = query.offset(page_dto.offset).limit(page_dto.limit)
  29. result = await db.execute(query)
  30. models = result.scalars().all()
  31. return PageResultDto(
  32. total=total,
  33. rows=self._apply_map_get_dto_list(models),
  34. page_size=page_dto.page_size,
  35. )
  36. except Exception as e:
  37. raise ServiceWarning(f"获取操作日志列表失败: {str(e)}")
  38. @staticmethod
  39. def _apply_search(query, search: str):
  40. """
  41. 根据关键字构建查询对象
  42. :param query: 查询对象
  43. :param search: 关键字
  44. :return: 查询对象
  45. """
  46. if search:
  47. query = query.where(
  48. or_(
  49. SysOperLogModel.oper_name.like(f"%{search}%"),
  50. SysOperLogModel.oper_location.like(f"%{search}%"),
  51. SysOperLogModel.oper_ip.like(f"%{search}%"),
  52. SysOperLogModel.oper_url.like(f"%{search}%"),
  53. )
  54. )
  55. return query
  56. def _apply_filter(
  57. self,
  58. query,
  59. filters: Optional[Dict[str, Any]] = None,
  60. filter_conditions: Optional[List[Dict[str, Any]]] = None,
  61. ):
  62. """
  63. 根据过滤条件构建查询对象
  64. :param query: 查询对象
  65. :param filters: 过滤条件
  66. :return: 查询对象
  67. """
  68. return self._apply_filter_base(
  69. SysOperLogModel, query, filters, filter_conditions
  70. )
  71. def _apply_order(self, query, order: str):
  72. """
  73. 根据排序字段构建查询对象
  74. :param query: 查询对象
  75. :param order: 排序字段
  76. :return: 查询对象
  77. """
  78. if not order:
  79. order = "oper_time desc"
  80. return self._apply_order_base(SysOperLogModel, query, order)
  81. def _apply_map_get_dto_list(self, models) -> List[SysOperLogDto]:
  82. """
  83. 将模型对象转换为DTO对象
  84. :param models: 模型对象列表
  85. :return: DTO对象
  86. """
  87. return [self._apply_map_get_dto(model) for model in models]
  88. @staticmethod
  89. def _apply_map_get_dto(model) -> SysOperLogDto:
  90. """
  91. 将模型对象转换为DTO对象
  92. :param model: 模型对象
  93. :return: DTO对象
  94. """
  95. return SysOperLogDto.from_model(model)