project_quota.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. from typing import Optional
  2. import tools.utils as utils,threading
  3. from core.dtos import ProjectQuotaDto
  4. from core.enum import SendStatusEnum
  5. from core.models import ProjectQuotaModel
  6. from stores import ProjectQuotaStore
  7. import executor
  8. class ProjectQuotaService:
  9. def __init__(self):
  10. self.store = ProjectQuotaStore()
  11. self._logger = utils.get_logger()
  12. def get_quotas_paginated(self, budget_id: int, project_id: str, item_code: str, page: int = 1, page_size: int = 10,
  13. keyword: Optional[str] = None, send_status: Optional[int] = None):
  14. """获取项目定额列表
  15. Args:
  16. budget_id: 概算序号
  17. project_id: 项目编号
  18. item_code: 条目编号
  19. page: 页码
  20. page_size: 每页数量
  21. keyword: 关键字
  22. send_status: 发送状态
  23. Returns:
  24. dict: 包含总数和定额列表的字典
  25. """
  26. try:
  27. data = self.store.get_quotas_paginated(
  28. budget_id=budget_id,
  29. project_id=project_id,
  30. item_code=item_code,
  31. page=page,
  32. page_size=page_size,
  33. keyword=keyword,
  34. send_status=send_status
  35. )
  36. return [ProjectQuotaDto.from_model(quota).to_dict() for quota in data.get('data',[])],data.get('total',0)
  37. except Exception as e:
  38. # 记录错误日志
  39. print(f"获取项目定额列表失败: {str(e)}")
  40. raise
  41. def get_quotas_by_task_paginated(self, task_id: int, budget_id, project_id, item_code, page: int = 1, page_size: int = 10,keyword: Optional[str]=None,send_status: Optional[int] = None):
  42. try:
  43. data = self.store.get_quotas_by_task_paginated(
  44. task_id=task_id,
  45. budget_id=budget_id,
  46. project_id=project_id,
  47. item_code=item_code,
  48. page=page,
  49. page_size=page_size,
  50. send_status=send_status,
  51. keyword=keyword
  52. )
  53. return [ProjectQuotaDto.from_model(quota).to_dict() for quota in data.get('data',[])],data.get('total',0)
  54. except Exception as e:
  55. self._logger.error(f"获取任务项目定额列表失败: {str(e)}")
  56. raise
  57. def get_quota_dto(self, quota_id: int):
  58. try:
  59. return self.store.get_quota_dto(quota_id)
  60. except Exception as e:
  61. self._logger.error(f"获取定额条目DTO失败: {str(e)}")
  62. raise
  63. def get_quota(self, quota_id: int) -> Optional[ProjectQuotaModel]:
  64. """获取单个项目定额
  65. Args:
  66. quota_id: 定额ID
  67. Returns:
  68. Optional[ProjectQuotaDto]: 项目定额DTO对象
  69. """
  70. try:
  71. return self.store.get_quota(quota_id)
  72. except Exception as e:
  73. self._logger.error(f"获取定额条目失败: {str(e)}")
  74. raise
  75. def save_quota(self, quota_dto: ProjectQuotaDto) -> Optional[ProjectQuotaDto]:
  76. """保存定额"""
  77. try:
  78. # 业务验证
  79. if quota_dto.id == 0:
  80. quota_dto = self.create_quota(quota_dto)
  81. else:
  82. quota_dto = self.update_quota(quota_dto)
  83. # self.update_process_status(quota_dto.id,4)
  84. if quota_dto.send_status != SendStatusEnum.NEW.value:
  85. self.update_send_status(quota_dto.id, SendStatusEnum.CHANGE.value)
  86. return quota_dto
  87. except Exception as e:
  88. self._logger.error(f"保存定额条目失败: {str(e)}")
  89. raise
  90. def create_quota(self, quota_dto: ProjectQuotaDto) -> ProjectQuotaDto:
  91. """创建项目定额
  92. Args:
  93. quota_dto: 定额DTO对象
  94. Returns:
  95. ProjectQuotaDto: 创建后的定额DTO对象
  96. """
  97. try:
  98. # 业务验证
  99. if not quota_dto.project_id or not quota_dto.budget_id:
  100. raise ValueError("项目编号和概算序号不能为空")
  101. return self.store.create_quota(quota_dto)
  102. except Exception as e:
  103. self._logger.error(f"创建项目定额失败: {str(e)}")
  104. raise
  105. def update_quota(self, quota_dto: ProjectQuotaDto) -> Optional[ProjectQuotaDto]:
  106. """更新项目定额
  107. Args:
  108. quota_dto: 定额DTO对象
  109. Returns:
  110. Optional[ProjectQuotaDto]: 更新后的定额DTO对象
  111. """
  112. try:
  113. # 业务验证
  114. if not quota_dto.id:
  115. raise ValueError("定额ID不能为空")
  116. return self.store.update_quota(quota_dto)
  117. except Exception as e:
  118. self._logger.error(f"更新项目定额失败: {str(e)}")
  119. raise
  120. def delete_quota(self, quota_id: int) -> bool:
  121. """删除项目定额
  122. Args:
  123. quota_id: 定额ID
  124. Returns:
  125. bool: 删除是否成功
  126. """
  127. try:
  128. return self.store.delete_quota(quota_id)
  129. except Exception as e:
  130. self._logger.error(f"删除项目定额失败: {str(e)}")
  131. raise
  132. def update_send_status(self, quota_id: int, status: int, err: str = None) -> bool:
  133. """更新发送状态
  134. Args:
  135. quota_id: 定额ID
  136. status: 状态值
  137. err: 错误信息
  138. Returns:
  139. bool: 更新是否成功
  140. """
  141. try:
  142. return self.store.update_send_status(quota_id, status, err)
  143. except Exception as e:
  144. self._logger.error(f"更新项目定额发送状态失败: {str(e)}")
  145. raise
  146. # def start_process(self, quota_id: int) -> Optional[str]:
  147. # """启动处理"""
  148. # quota = self.get_quota_dto(quota_id)
  149. # if quota:
  150. # self.update_process_status(quota_id, 1)
  151. # thread = threading.Thread(target=self._process_quota, args=(quota,))
  152. # thread.start()
  153. # else:
  154. # return "定额条目没有查询到"
  155. #
  156. # def _process_quota(self, quota: ProjectQuotaDto):
  157. # try:
  158. # msg = executor.process_quota(quota)
  159. # if not msg:
  160. # self.start_send(quota.id)
  161. # except Exception as e:
  162. # self._logger.error(f"处理定额条目失败: {str(e)}")
  163. # self.update_process_status(quota.id, 3, str(e))
  164. # raise
  165. def start_send_by_ids(self, ids: str,is_cover: bool = False):
  166. try:
  167. id_list= ids.split(',')
  168. err =""
  169. for _id in id_list:
  170. msg = self.start_send(int(_id),is_cover)
  171. if msg:
  172. err += f"{msg}[{_id}],"
  173. return err
  174. except Exception as e:
  175. self._logger.error(f"批量启动定额条目发送失败: {str(e)}")
  176. raise
  177. def start_send(self, _id: int, is_cover: bool = False) -> Optional[str]:
  178. """启动发送"""
  179. quota = self.get_quota_dto(_id)
  180. if quota:
  181. self.update_send_status(_id, 1)
  182. if not is_cover:
  183. quota.quota_id = 0
  184. thread = threading.Thread(target=self._send_quota, args=(quota,))
  185. thread.start()
  186. return None
  187. else:
  188. return "定额条目没有查询到"
  189. def _send_quota(self, quota: ProjectQuotaDto):
  190. try:
  191. executor.send_quota(quota)
  192. except Exception as e:
  193. self._logger.error(f"发送定额条目失败: {str(e)}")
  194. self.update_send_status(quota.id, 3, str(e))
  195. raise