from typing import Optional import tools.utils as utils from core.dtos import ProjectQuotaDto from core.models import ProjectQuotaModel from stores import ProjectQuotaStore, ChapterStore from core.log.log_record import LogRecordHelper from core.enum import OperationModule, OperationType, SendStatusEnum import executor class ProjectQuotaService: def __init__(self): self.store = ProjectQuotaStore() self.chapter_store = ChapterStore() self._logger = utils.get_logger() def get_quotas_paginated( self, budget_id: int, project_id: str, item_code: str, page: int = 1, page_size: int = 10, keyword: Optional[str] = None, send_status: Optional[int] = None, ): """获取项目定额列表 Args: budget_id: 概算序号 project_id: 项目编号 item_code: 条目编号 page: 页码 page_size: 每页数量 keyword: 关键字 send_status: 发送状态 Returns: dict: 包含总数和定额列表的字典 """ try: data = self.store.get_quotas_paginated( budget_id=budget_id, project_id=project_id, item_code=item_code, page=page, page_size=page_size, keyword=keyword, send_status=send_status, ) return [ ProjectQuotaDto.from_model(quota).to_dict() for quota in data.get("data", []) ], data.get("total", 0) except Exception as e: # 记录错误日志 print(f"获取项目定额列表失败: {str(e)}") raise def get_quotas_by_task_paginated( self, task_id: int, budget_id, project_id, item_code, page: int = 1, page_size: int = 10, keyword: Optional[str] = None, send_status: Optional[int] = None, ): try: data = self.store.get_quotas_by_task_paginated( task_id=task_id, budget_id=budget_id, project_id=project_id, item_code=item_code, page=page, page_size=page_size, send_status=send_status, keyword=keyword, ) return [ ProjectQuotaDto.from_model(quota).to_dict() for quota in data.get("data", []) ], data.get("total", 0) except Exception as e: self._logger.error(f"获取任务项目定额列表失败: {str(e)}") raise def get_quota_dto(self, quota_id: int): try: return self.store.get_quota_dto(quota_id) except Exception as e: self._logger.error(f"获取定额条目DTO失败: {str(e)}") raise def get_quota(self, quota_id: int) -> Optional[ProjectQuotaModel]: """获取单个项目定额 Args: quota_id: 定额ID Returns: Optional[ProjectQuotaDto]: 项目定额DTO对象 """ try: return self.store.get_quota(quota_id) except Exception as e: self._logger.error(f"获取定额条目失败: {str(e)}") raise def save_quota(self, quota_dto: ProjectQuotaDto) -> Optional[ProjectQuotaDto]: """保存定额""" try: # 业务验证 if quota_dto.id == 0: quota_dto = self.create_quota(quota_dto) else: quota_dto = self.update_quota(quota_dto) if quota_dto.send_status != SendStatusEnum.NEW.value: self.update_send_status(quota_dto.id, SendStatusEnum.CHANGE.value) return quota_dto except Exception as e: self._logger.error(f"保存定额条目失败: {str(e)}") raise def create_quota(self, quota_dto: ProjectQuotaDto) -> ProjectQuotaDto: """创建项目定额 Args: quota_dto: 定额DTO对象 Returns: ProjectQuotaDto: 创建后的定额DTO对象 """ try: # 业务验证 if not quota_dto.project_id or not quota_dto.budget_id: raise ValueError("项目编号和概算序号不能为空") LogRecordHelper.log_success( OperationType.CREATE, OperationModule.QUOTA, f"新增定额条目: {quota_dto.entry_name}", utils.to_str(quota_dto.to_dict()), ) return self.store.create_quota(quota_dto) except Exception as e: self._logger.error(f"创建项目定额失败: {str(e)}") LogRecordHelper.log_fail( OperationType.CREATE, OperationModule.QUOTA, f"新增定额条目失败: {str(e)}", utils.to_str(quota_dto.to_dict()), ) raise def update_quota(self, quota_dto: ProjectQuotaDto) -> Optional[ProjectQuotaDto]: """更新项目定额 Args: quota_dto: 定额DTO对象 Returns: Optional[ProjectQuotaDto]: 更新后的定额DTO对象 """ try: # 业务验证 if not quota_dto.id: raise ValueError("定额ID不能为空") quota = self.get_quota_dto(quota_dto.id) log_data = utils.to_str(quota.to_dict()) if quota: quota_dto.id = quota.id quota_dto.project_id = quota.project_id quota_dto.budget_id = quota.budget_id quota_dto.item_id = quota.item_id quota_dto.item_code = quota.item_code quota_dto.quota_id = quota.quota_id quota_dto.ex_file = quota.ex_file quota_dto.ex_row = quota.ex_row quota_dto.ex_cell = quota.ex_cell quota_dto.ex_unit = quota.ex_unit quota_dto.ex_amount = quota.ex_amount quota_dto.note = quota.note # self.update_process_status(quota_dto.id,4) LogRecordHelper.log_success( OperationType.UPDATE, OperationModule.QUOTA, f"修改定额条目: {quota_dto.entry_name}", log_data, utils.to_str(quota_dto.to_dict()), ) return self.store.update_quota(quota_dto) except Exception as e: self._logger.error(f"更新项目定额失败: {str(e)}") LogRecordHelper.log_fail( OperationType.UPDATE, OperationModule.QUOTA, f"修改定额条目失败: {str(e)}", utils.to_str(quota_dto.to_dict()), ) raise def edit_by_key(self, id: int, key: str, value: str): try: # 业务验证 # dict_data = {"id": id, key: value} # quota_dto = ProjectQuotaDto(**dict_data) quota_dto = self.get_quota_dto(id) if not quota_dto: raise ValueError("未查询到定额条目") log_data = utils.to_str(quota_dto.to_dict()) setattr(quota_dto, key, value) if quota_dto.is_change == 0 and key == "entry_name": quota_dto.is_change = 0 else: quota_dto.is_change = 1 # self.update_process_status(quota_dto.id,4) LogRecordHelper.log_success( OperationType.UPDATE, OperationModule.QUOTA, f"修改定额条目[{key}]: {value}", log_data, utils.to_str({"id": id, key: value}), ) self.store.update_quota(quota_dto) return None except Exception as e: self._logger.error(f"更新项目定额失败[{key}]: {value}, {str(e)}") LogRecordHelper.log_fail( OperationType.UPDATE, OperationModule.QUOTA, f"修改定额条目失败[{key}]: {value}, {str(e)}", utils.to_str({"id": id, key: value}), ) raise def edit_quota_code_batch(self, ids: str, quota_code: str): try: if not ids: return "请选择要变更的章节" ids = ids.split(",") msg = "" for id in ids: try: quota_dto = self.get_quota_dto(int(id)) if not quota_dto: msg += f"{id}不存在;" continue quota_dto.quota_code = quota_code quota_dto.is_change = 1 self.store.update_quota(quota_dto) except Exception as e: msg += f"{id}: {str(e)};" LogRecordHelper.log_fail( OperationType.UPDATE, OperationModule.QUOTA, f"批量修改定额: {ids}", utils.to_str({"ids": ids, "quota_code": quota_code}), ) return None if msg == "" else msg except Exception as e: msg = f"批量修改定额失败: {str(e)}" self._logger.error(msg) LogRecordHelper.log_fail( OperationType.UPDATE, OperationModule.QUOTA, f"批量修改定额失败: {str(e)}", utils.to_str({"ids": ids, "quota_code": quota_code}), ) return msg def delete_quota(self, quota_id: int) -> bool: """删除项目定额 Args: quota_id: 定额ID Returns: bool: 删除是否成功 """ try: flag = self.store.delete_quota(quota_id) LogRecordHelper.log_success( OperationType.DELETE, OperationModule.QUOTA, f"删除定额条目: {quota_id}", f"{quota_id}", ) return flag except Exception as e: self._logger.error(f"删除项目定额失败: {str(e)}") LogRecordHelper.log_fail( OperationType.DELETE, OperationModule.QUOTA, f"删除定额条目失败: {quota_id}", f"{quota_id}", ) raise def save_change_chapter(self, project_id, ids, new_id): try: if not ids: return "请选择要变更的章节" chapter = self.chapter_store.get_chapter_dto(project_id, new_id) if not chapter: return "章节不存在" ids_arr = ids.split(",") err = "" for id_str in ids_arr: try: self.store.update_quota_chapter( int(id_str), chapter.item_id, chapter.item_code ) except Exception as e: err += str(e) + ";" continue if err: LogRecordHelper.log_fail( OperationType.UPDATE, OperationModule.QUOTA, f"修改定额条目章节失败: {err}", ids, ) return err LogRecordHelper.log_success( OperationType.UPDATE, OperationModule.QUOTA, f"修改定额条目章节", ids, ) return None except Exception as e: msg = f"章节变更失败: {str(e)}" self._logger.error(msg) LogRecordHelper.log_fail( OperationType.UPDATE, OperationModule.QUOTA, f"修改定额条目章节失败: {str(e)}", ids, ) return msg def update_send_status(self, quota_id: int, status: int, err: str = None) -> bool: """更新发送状态 Args: quota_id: 定额ID status: 状态值 err: 错误信息 Returns: bool: 更新是否成功 """ try: return self.store.update_send_status(quota_id, status, err) except Exception as e: self._logger.error(f"更新项目定额发送状态失败: {str(e)}") raise # def start_process(self, quota_id: int) -> Optional[str]: # """启动处理""" # quota = self.get_quota_dto(quota_id) # if quota: # self.update_process_status(quota_id, 1) # thread = threading.Thread(target=self._process_quota, args=(quota,)) # thread.start() # else: # return "定额条目没有查询到" # # def _process_quota(self, quota: ProjectQuotaDto): # try: # msg = executor.process_quota(quota) # if not msg: # self.start_send(quota.id) # except Exception as e: # self._logger.error(f"处理定额条目失败: {str(e)}") # self.update_process_status(quota.id, 3, str(e)) # raise def start_send_by_ids(self, ids: str, is_cover: bool = False): try: id_list = ids.split(",") err = "" for _id in id_list: msg = self.start_send(int(_id), is_cover, False) if msg: err += f"{msg}[{_id}]," if err: LogRecordHelper.log_fail( OperationType.SEND, OperationModule.QUOTA, f"批量推送定额条目,部分失败: {err}", ids, ) LogRecordHelper.log_success( OperationType.SEND, OperationModule.QUOTA, f"批量推送定额条目", ids, ) return err except Exception as e: LogRecordHelper.log_fail( OperationType.SEND, OperationModule.QUOTA, f"批量推送定额条目失败: {str(e)}", ids, ) self._logger.error(f"批量启动定额条目发送失败: {str(e)}") raise def start_send( self, _id: int, is_cover: bool = False, is_log: bool = True ) -> Optional[str]: """启动发送""" quota = self.get_quota_dto(_id) if quota: self.update_send_status(_id, SendStatusEnum.SUCCESS.value) if not is_cover: quota.quota_id = 0 # thread = threading.Thread(target=self._send_quota, args=(quota,)) # thread.start() if is_log: LogRecordHelper.log_success( OperationType.SEND, OperationModule.QUOTA, f"推送定额条目", f"ID:{_id},是否覆盖:{is_cover}", ) if self._send_quota(quota): return None else: return f"{_id}发送失败;" else: return "定额条目没有查询到" def _send_quota(self, quota: ProjectQuotaDto): try: executor.send_quota(quota) return True except Exception as e: self._logger.error(f"发送定额条目失败: {str(e)}") self.update_send_status(quota.id, 3, str(e)) return False