log.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. from typing import List, Optional, Dict, Any
  2. from datetime import datetime
  3. from sqlalchemy import and_, desc
  4. import tools.db_helper as db_helper
  5. from core.models import LogModel
  6. class LogStore:
  7. def __init__(self):
  8. # self._database= None
  9. self._database= 'Iwb_RailwayCosting'
  10. def query_logs_paginated(
  11. self,
  12. page: int = 1,
  13. page_size: int = 10,
  14. username: Optional[str] = None,
  15. operation_type: Optional[str] = None,
  16. operation_module: Optional[str] = None,
  17. operation_result: Optional[int] = None,
  18. start_time: Optional[datetime] = None,
  19. end_time: Optional[datetime] = None
  20. ) -> Dict[str, Any]:
  21. """
  22. 分页查询日志记录
  23. :param page: 页码
  24. :param page_size: 每页记录数
  25. :param username: 用户名(支持模糊查询)
  26. :param operation_type: 操作类型
  27. :param operation_module: 操作模块
  28. :param operation_result: 操作结果
  29. :param start_time: 开始时间
  30. :param end_time: 结束时间
  31. :return: 包含总记录数和日志列表的字典
  32. """
  33. with db_helper.sqlserver_query_session(self._database) as db_session:
  34. query = db_session.query(LogModel)
  35. # 构建查询条件
  36. conditions = []
  37. if username:
  38. conditions.append(LogModel.username.like(f'%{username}%'))
  39. if operation_type:
  40. conditions.append(LogModel.operation_type == operation_type)
  41. if operation_module:
  42. conditions.append(LogModel.operation_module == operation_module)
  43. if operation_result is not None:
  44. conditions.append(LogModel.operation_result == operation_result)
  45. if start_time:
  46. conditions.append(LogModel.created_at >= start_time)
  47. if end_time:
  48. conditions.append(LogModel.created_at < end_time)
  49. if conditions:
  50. query = query.filter(and_(*conditions))
  51. # 计算总记录数
  52. total = query.count()
  53. # 分页并按创建时间倒序排序
  54. logs = query.order_by(LogModel.created_at.desc())\
  55. .offset((page - 1) * page_size)\
  56. .limit(page_size)\
  57. .all()
  58. return {
  59. 'total': total,
  60. 'data': logs
  61. }
  62. def insert_log(
  63. self,
  64. username: str,
  65. operation_type: str,
  66. operation_desc: Optional[str] = None,
  67. operation_result: Optional[int] = None,
  68. operation_module: Optional[str] = None,
  69. operation_data: Optional[str] = None,
  70. data_changes: Optional[str] = None,
  71. operation_ip: Optional[str] = None
  72. ) -> LogModel:
  73. """
  74. 插入单条日志记录
  75. :param username: 用户名
  76. :param operation_type: 操作类型
  77. :param operation_desc: 操作描述
  78. :param operation_result: 操作结果
  79. :param operation_module: 操作模块
  80. :param operation_data: 操作数据
  81. :param data_changes: 数据变更记录
  82. :param operation_ip: 操作IP
  83. :return: 创建的日志记录
  84. """
  85. log = LogModel(
  86. username=username,
  87. operation_type=operation_type,
  88. operation_desc=operation_desc,
  89. operation_result=operation_result,
  90. operation_module=operation_module,
  91. operation_data=operation_data,
  92. data_changes=data_changes,
  93. operation_ip=operation_ip
  94. )
  95. with db_helper.sqlserver_session(self._database) as db_session:
  96. db_session.add(log)
  97. return log
  98. def batch_insert_logs(self, logs: List[Dict[str, Any]]) -> List[LogModel]:
  99. """
  100. 批量插入日志记录
  101. :param logs: 日志记录列表
  102. :return: 创建的日志记录列表
  103. """
  104. log_models = [LogModel(**log) for log in logs]
  105. with db_helper.sqlserver_session(self._database) as db_session:
  106. db_session.add_all(log_models)
  107. return log_models