task_processor.py 12 KB


  1. import requests, tools.utils as utils, core.configs as configs
  2. from core.dtos import (
  3. ProjectDto,
  4. ProjectTaskDto,
  5. ProjectQuotaDto,
  6. ChapterDto,
  7. TotalBudgetInfoDto,
  8. ExcelParseResultDataDto,
  9. )
  10. from core.enum import TaskStatusEnum
  11. from stores import (
  12. ProjectStore,
  13. ProjectTaskStore,
  14. ProjectQuotaStore,
  15. ChapterStore,
  16. BudgetStore,
  17. )
  18. from core.dtos import (
  19. ExcelParseDto,
  20. ExcelParseZgsDto,
  21. ExcelParseItemDto,
  22. ExcelParseFileDto,
  23. ExcelParseResultDto,
  24. )
  25. class TaskProcessor:
  26. def __init__(self):
  27. self._logger = utils.get_logger()
  28. self._project_store = ProjectStore()
  29. self._budget_store = BudgetStore()
  30. self._chapter_store = ChapterStore()
  31. self._task_store = ProjectTaskStore()
  32. self._quota_store = ProjectQuotaStore()
  33. self._task_submit_url = "/task_submit"
  34. self._task_status_url = "/task_status"
  35. self._task_cancel_url = "/cancel_task"
  36. def submit_task(self, task: ProjectTaskDto):
  37. try:
  38. self._logger.info(f"开始运行任务:{task.task_name}")
  39. self._task_store.update_task_status(
  40. task.id, TaskStatusEnum.PROCESSING.value
  41. )
  42. if not task.file_path:
  43. raise Exception("任务文件不存在")
  44. files, msg = self._read_files(task.file_path)
  45. if not files or len(files) == 0:
  46. raise Exception(msg)
  47. project = self._project_store.get(task.project_id)
  48. if not project:
  49. raise Exception("项目不存在")
  50. budget_models = self._budget_store.get_budget_info(task.project_id)
  51. budgets = [
  52. TotalBudgetInfoDto.from_model(budget) for budget in budget_models
  53. ]
  54. chapter = self._chapter_store.get_chapter_item_by_item_code(
  55. task.project_id, task.item_code
  56. )
  57. # parents = self._chapter_store.get_all_parents_chapter_items(
  58. # task.project_id, task.item_code
  59. # )
  60. # children = self._chapter_store.get_all_children_chapter_items(
  61. # task.project_id, task.item_code
  62. # )
  63. # data, msg = self._build_api_body(
  64. # task, project, budgets, parents, children, files
  65. # )
  66. data, msg = self._build_api_body(task, project, budgets, chapter, files)
  67. if not data:
  68. raise Exception(msg)
  69. res = self._call_api(self._task_submit_url, data)
  70. if res.result == -1:
  71. self._task_store.update_task_status(
  72. task.id, TaskStatusEnum.FAILURE.value, res.reason
  73. )
  74. return res.reason
  75. if res.data and len(res.data) > 0:
  76. self._insert_data(task, project, res.data)
  77. if res.result == 1:
  78. self._logger.debug(f"运行任务:{task.task_name}完成")
  79. self._task_store.update_task_status(
  80. task.id, TaskStatusEnum.SUCCESS.value
  81. )
  82. else:
  83. self._logger.debug(f"运行任务:{task.task_name}请求中,等待结果")
  84. self.query_task(task, project)
  85. return None
  86. except Exception as e:
  87. msg = f"任务运行失败,原因:{e}"
  88. self._logger.error(f"运行任务:{task.task_name}, {msg}")
  89. self._task_store.update_task_status(
  90. task.id, TaskStatusEnum.FAILURE.value, msg
  91. )
  92. return msg
  93. def cancel_task(self, task: ProjectTaskDto):
  94. try:
  95. self._logger.info(f"开始取消运行任务:{task.id}")
  96. if task.process_status == TaskStatusEnum.PROCESSING.value:
  97. res = self._call_api(self._task_cancel_url, {"task_id": task.id})
  98. if res.result == -1:
  99. self._task_store.update_task_status(
  100. task.id, TaskStatusEnum.FAILURE.value, res.reason
  101. )
  102. return res.reason
  103. project = self._project_store.get(task.project_id)
  104. if not project:
  105. self._logger.error(f"取消运行任务:{task.id}失败,原因:项目不存在")
  106. return "项目不存在"
  107. if res.data and len(res.data) > 0:
  108. self._insert_data(task, project, res.data)
  109. if res.result == 0:
  110. self._logger.info(f"取消运行任务:{task.id}成功")
  111. # self._task_store.update_task_status(task.id, TaskStatusEnum.CANCELED.value)
  112. return None
  113. elif res.result == 1:
  114. self._logger.error(f"取消运行任务失败:{task.id}已运行完成")
  115. self._task_store.update_task_status(
  116. task.id, TaskStatusEnum.SUCCESS.value
  117. )
  118. return "取消失败,任务已运行完成"
  119. else:
  120. self._logger.error(f"取消运行任务:{task.id}失败,原因:任务状态错误")
  121. return "任务状态错误"
  122. except Exception as e:
  123. msg = f"取消运行任务失败,原因:{e}"
  124. self._logger.error(msg)
  125. return msg
  126. def query_task(self, task: ProjectTaskDto, project: ProjectDto = None):
  127. try:
  128. import time
  129. while True:
  130. time.sleep(30)
  131. res = self._call_api(self._task_status_url, {"task_id": task.id})
  132. if res.result == -1:
  133. self._task_store.update_task_status(
  134. task.id, TaskStatusEnum.FAILURE.value, res.reason
  135. )
  136. return res.reason
  137. if res.data and len(res.data) > 0:
  138. if not project:
  139. project = self._project_store.get(task.project_id)
  140. self._insert_data(task, project, res.data)
  141. if res.result == 1:
  142. self._logger.debug(f"运行任务:{task.task_name}完成")
  143. self._task_store.update_task_status(
  144. task.id, TaskStatusEnum.SUCCESS.value
  145. )
  146. break
  147. else:
  148. self._logger.debug(f"运行任务:{task.task_name}请求中,等待结果")
  149. self.query_task(task, project)
  150. except Exception as e:
  151. msg = f"任务状态查询失败,原因:{e}"
  152. self._logger.error(msg)
  153. raise Exception(msg)
  154. def _read_files(self, paths: str) -> (list[ExcelParseFileDto], str):
  155. try:
  156. files = []
  157. self._logger.debug(f"开始读取文件:{paths}")
  158. path_list = paths.split(",")
  159. for path in path_list:
  160. file = utils.encode_file(path)
  161. files.append(ExcelParseFileDto(file_id=path, content=file))
  162. self._logger.debug(f"读取文件完成:{paths}")
  163. return files, ""
  164. except Exception as e:
  165. msg = f"读取文件失败,原因:{e}"
  166. self._logger.error(msg)
  167. return None, msg
  168. def _build_api_body(
  169. self,
  170. task: ProjectTaskDto,
  171. project: ProjectDto,
  172. budgets: list[TotalBudgetInfoDto],
  173. chapter: ChapterDto,
  174. # parents: list[ChapterDto],
  175. # children: list[ChapterDto],
  176. files: list[ExcelParseFileDto],
  177. ):
  178. try:
  179. budgets_data = [ExcelParseZgsDto.from_dto(budget) for budget in budgets]
  180. # parents_data = [ExcelParseItemDto.from_dto(parent) for parent in parents]
  181. # children_data = [ExcelParseItemDto.from_dto(child) for child in children]
  182. chapter_data = ExcelParseItemDto.from_dto(chapter)
  183. data = ExcelParseDto(
  184. task_id=task.id or 0,
  185. version=configs.app.version or "2020",
  186. project_id=task.project_id,
  187. project_name=project.project_name,
  188. project_stage=project.design_stage,
  189. selected_zgs_id=task.budget_id or 0,
  190. zgs_list=budgets_data,
  191. selected_chapter=chapter_data,
  192. # hierarchy=parents_data,
  193. # components=children_data,
  194. files=files,
  195. )
  196. return data, ""
  197. except Exception as e:
  198. msg = f"构建API BODY,原因:{e}"
  199. self._logger.error(msg)
  200. return None, msg
  201. def _call_api(self, api_url, data) -> ExcelParseResultDto:
  202. try:
  203. url = f"{configs.app.task_api_url}{api_url}"
  204. if isinstance(data, ExcelParseDto):
  205. data = data.to_dict()
  206. self._logger.debug(f"调用接口:{url},data:{data}")
  207. response = requests.post(
  208. url, headers={"Content-Type": "application/json"}, json=data
  209. )
  210. self._logger.debug(f"调用接口返回[{response.status_code}]:{response.text}")
  211. if response.status_code == 200:
  212. result = response.json()
  213. result_dto = ExcelParseResultDto.from_dict(result)
  214. self._logger.debug(f"调用接口成功")
  215. return result_dto
  216. else:
  217. self._logger.error("调用接口失败")
  218. raise Exception(response.text)
  219. except Exception as e:
  220. msg = f"调用接口失败,原因:{e}"
  221. self._logger.error(msg)
  222. raise Exception(msg)
  223. def _insert_data(
  224. self,
  225. task: ProjectTaskDto,
  226. project: ProjectDto,
  227. data: list[ExcelParseResultDataDto],
  228. ):
  229. try:
  230. self._logger.debug(f"开始插入数据:{task.task_name}")
  231. for item in data:
  232. self._logger.debug(f"数据:{item.to_dict()}")
  233. quota_dto = (
  234. self._quota_store.get_quota_by_quota_input(
  235. project.project_id, task.budget_id, item.target_id
  236. )
  237. if item.target_id > 0
  238. else None
  239. )
  240. if quota_dto:
  241. self._logger.debug(
  242. f"更新数据[{item.target_id}]:{item.item_id}/{item.item_code} {item.dinge_code}"
  243. )
  244. quota_dto.item_code = item.item_code
  245. quota_dto.item_id = item.item_id
  246. quota_dto.quota_id = item.target_id if item.target_id > 0 else 0
  247. quota_dto.quota_code = item.dinge_code
  248. quota_dto.quota_adjustment = item.dinge_adjust
  249. quota_dto.entry_name = item.entry_name
  250. quota_dto.units = item.units
  251. quota_dto.amount = item.amount
  252. quota_dto.ex_file = item.ex_file_id
  253. quota_dto.ex_cell = item.ex_cell
  254. quota_dto.ex_row = item.ex_row
  255. quota_dto.ex_unit = item.ex_unit
  256. quota_dto.ex_amount = item.ex_amount
  257. quota_dto.send_error = None
  258. self._quota_store.update_quota(quota_dto)
  259. else:
  260. self._logger.debug(
  261. f"新增数据[{item.target_id}]:{item.item_id}/{item.item_code} {item.dinge_code}"
  262. )
  263. quota_dto = ProjectQuotaDto(
  264. task_id=task.id,
  265. budget_id=item.zgs_id,
  266. budget_code=item.zgs_code,
  267. project_id=project.project_id,
  268. item_code=item.item_code,
  269. item_id=item.item_id,
  270. quota_id=item.target_id if item.target_id > 0 else 0,
  271. quota_code=item.dinge_code,
  272. quota_adjustment=item.dinge_adjust,
  273. entry_name=item.entry_name,
  274. units=item.units,
  275. amount=item.amount,
  276. ex_file=item.ex_file_id,
  277. ex_cell=item.ex_cell,
  278. ex_row=item.ex_row,
  279. ex_unit=item.ex_unit,
  280. ex_amount=item.ex_amount,
  281. created_by=task.created_by,
  282. )
  283. self._quota_store.create_quota(quota_dto)
  284. self._logger.debug(f"插入数据完成:{task.task_name}")
  285. return True
  286. except Exception as e:
  287. msg = f"插入数据失败,原因:{e}"
  288. self._logger.error(msg)
  289. return False, msg