123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- from typing import List, Optional, Dict, Any
- from datetime import datetime
- from sqlalchemy import and_, desc
- import tools.db_helper as db_helper
- from core.models import LogModel
- class LogStore:
- def __init__(self):
- # self._database= None
- self._database= 'Iwb_RailwayCosting'
- def query_logs_paginated(
- self,
- page: int = 1,
- page_size: int = 10,
- username: Optional[str] = None,
- operation_type: Optional[str] = None,
- operation_module: Optional[str] = None,
- operation_result: Optional[int] = None,
- start_time: Optional[datetime] = None,
- end_time: Optional[datetime] = None
- ) -> Dict[str, Any]:
- """
- 分页查询日志记录
- :param page: 页码
- :param page_size: 每页记录数
- :param username: 用户名(支持模糊查询)
- :param operation_type: 操作类型
- :param operation_module: 操作模块
- :param operation_result: 操作结果
- :param start_time: 开始时间
- :param end_time: 结束时间
- :return: 包含总记录数和日志列表的字典
- """
- with db_helper.sqlserver_query_session(self._database) as db_session:
- pass
- query = db_session.query(LogModel)
- # 构建查询条件
- conditions = []
- if username:
- conditions.append(LogModel.username.like(f'%{username}%'))
- if operation_type:
- conditions.append(LogModel.operation_type == operation_type)
- if operation_module:
- conditions.append(LogModel.operation_module == operation_module)
- if operation_result is not None:
- conditions.append(LogModel.operation_result == operation_result)
- if start_time:
- conditions.append(LogModel.created_at >= start_time)
- if end_time:
- conditions.append(LogModel.created_at < end_time)
- if conditions:
- query = query.filter(and_(*conditions))
- # 计算总记录数
- total = query.count()
- # 分页并按创建时间倒序排序
- logs = query.order_by(LogModel.created_at.desc())\
- .offset((page - 1) * page_size)\
- .limit(page_size)\
- .all()
- return {
- 'total': total,
- 'data': logs
- }
-
- def insert_log(
- self,
- username: str,
- operation_type: str,
- operation_desc: Optional[str] = None,
- operation_result: Optional[int] = None,
- operation_module: Optional[str] = None,
- operation_data: Optional[str] = None,
- data_changes: Optional[str] = None,
- operation_ip: Optional[str] = None
- ) -> LogModel:
- """
- 插入单条日志记录
- :param username: 用户名
- :param operation_type: 操作类型
- :param operation_desc: 操作描述
- :param operation_result: 操作结果
- :param operation_module: 操作模块
- :param operation_data: 操作数据
- :param data_changes: 数据变更记录
- :param operation_ip: 操作IP
- :return: 创建的日志记录
- """
- log = LogModel(
- username=username,
- operation_type=operation_type,
- operation_desc=operation_desc,
- operation_result=operation_result,
- operation_module=operation_module,
- operation_data=operation_data,
- data_changes=data_changes,
- operation_ip=operation_ip
- )
- with db_helper.sqlserver_session(self._database) as db_session:
- db_session.add(log)
- return log
-
- def batch_insert_logs(self, logs: List[Dict[str, Any]]) -> List[LogModel]:
- """
- 批量插入日志记录
- :param logs: 日志记录列表
- :return: 创建的日志记录列表
- """
- log_models = [LogModel(**log) for log in logs]
- with db_helper.sqlserver_session(self._database) as db_session:
- db_session.add_all(log_models)
- return log_models
|