log.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. from typing import List, Optional, Dict, Any
  2. from datetime import datetime
  3. from sqlalchemy import and_, desc
  4. import tools.db_helper as db_helper
  5. from core.models import LogModel
  6. class LogStore:
  7. def __init__(self):
  8. # self._database= None
  9. self._database= 'Iwb_RailwayCosting'
  10. def query_logs_paginated(
  11. self,
  12. page: int = 1,
  13. page_size: int = 10,
  14. username: Optional[str] = None,
  15. operation_type: Optional[str] = None,
  16. operation_module: Optional[str] = None,
  17. operation_result: Optional[int] = None,
  18. start_time: Optional[datetime] = None,
  19. end_time: Optional[datetime] = None
  20. ) -> Dict[str, Any]:
  21. """
  22. 分页查询日志记录
  23. :param page: 页码
  24. :param page_size: 每页记录数
  25. :param username: 用户名(支持模糊查询)
  26. :param operation_type: 操作类型
  27. :param operation_module: 操作模块
  28. :param operation_result: 操作结果
  29. :param start_time: 开始时间
  30. :param end_time: 结束时间
  31. :return: 包含总记录数和日志列表的字典
  32. """
  33. with db_helper.sqlserver_query_session(self._database) as db_session:
  34. pass
  35. query = db_session.query(LogModel)
  36. # 构建查询条件
  37. conditions = []
  38. if username:
  39. conditions.append(LogModel.username.like(f'%{username}%'))
  40. if operation_type:
  41. conditions.append(LogModel.operation_type == operation_type)
  42. if operation_module:
  43. conditions.append(LogModel.operation_module == operation_module)
  44. if operation_result is not None:
  45. conditions.append(LogModel.operation_result == operation_result)
  46. if start_time:
  47. conditions.append(LogModel.created_at >= start_time)
  48. if end_time:
  49. conditions.append(LogModel.created_at < end_time)
  50. if conditions:
  51. query = query.filter(and_(*conditions))
  52. # 计算总记录数
  53. total = query.count()
  54. # 分页并按创建时间倒序排序
  55. logs = query.order_by(LogModel.created_at.desc())\
  56. .offset((page - 1) * page_size)\
  57. .limit(page_size)\
  58. .all()
  59. return {
  60. 'total': total,
  61. 'data': logs
  62. }
  63. def insert_log(
  64. self,
  65. username: str,
  66. operation_type: str,
  67. operation_desc: Optional[str] = None,
  68. operation_result: Optional[int] = None,
  69. operation_module: Optional[str] = None,
  70. operation_data: Optional[str] = None,
  71. data_changes: Optional[str] = None,
  72. operation_ip: Optional[str] = None
  73. ) -> LogModel:
  74. """
  75. 插入单条日志记录
  76. :param username: 用户名
  77. :param operation_type: 操作类型
  78. :param operation_desc: 操作描述
  79. :param operation_result: 操作结果
  80. :param operation_module: 操作模块
  81. :param operation_data: 操作数据
  82. :param data_changes: 数据变更记录
  83. :param operation_ip: 操作IP
  84. :return: 创建的日志记录
  85. """
  86. log = LogModel(
  87. username=username,
  88. operation_type=operation_type,
  89. operation_desc=operation_desc,
  90. operation_result=operation_result,
  91. operation_module=operation_module,
  92. operation_data=operation_data,
  93. data_changes=data_changes,
  94. operation_ip=operation_ip
  95. )
  96. with db_helper.sqlserver_session(self._database) as db_session:
  97. db_session.add(log)
  98. return log
  99. def batch_insert_logs(self, logs: List[Dict[str, Any]]) -> List[LogModel]:
  100. """
  101. 批量插入日志记录
  102. :param logs: 日志记录列表
  103. :return: 创建的日志记录列表
  104. """
  105. log_models = [LogModel(**log) for log in logs]
  106. with db_helper.sqlserver_session(self._database) as db_session:
  107. db_session.add_all(log_models)
  108. return log_models