|
- import requests, tools.utils as utils, core.configs as configs
- from core.dtos import (
- ProjectDto,
- ProjectTaskDto,
- ProjectQuotaDto,
- ChapterDto,
- TotalBudgetInfoDto,
- ExcelParseResultDataDto,
- )
- from core.enum import TaskStatusEnum
- from stores import (
- ProjectStore,
- ProjectTaskStore,
- ProjectQuotaStore,
- ChapterStore,
- BudgetStore,
- )
- from core.dtos import (
- ExcelParseDto,
- ExcelParseZgsDto,
- ExcelParseItemDto,
- ExcelParseFileDto,
- ExcelParseResultDto,
- )
- class TaskProcessor:
- def __init__(self):
- self._logger = utils.get_logger()
- self._project_store = ProjectStore()
- self._budget_store = BudgetStore()
- self._chapter_store = ChapterStore()
- self._task_store = ProjectTaskStore()
- self._quota_store = ProjectQuotaStore()
- self._task_submit_url = "/task_submit"
- self._task_status_url = "/task_status"
- self._task_cancel_url = "/cancel_task"
- def submit_task(self, task: ProjectTaskDto):
- try:
- self._logger.info(f"开始运行任务:{task.task_name}")
- self._task_store.update_task_status(
- task.id, TaskStatusEnum.PROCESSING.value
- )
- if not task.file_path:
- raise Exception("任务文件不存在")
- files, msg = self._read_files(task.file_path)
- if not files or len(files) == 0:
- raise Exception(msg)
- project = self._project_store.get(task.project_id)
- if not project:
- raise Exception("项目不存在")
- budget_models = self._budget_store.get_budget_info(task.project_id)
- budgets = [
- TotalBudgetInfoDto.from_model(budget) for budget in budget_models
- ]
- chapter = self._chapter_store.get_chapter_item_by_item_code(
- task.project_id, task.item_code
- )
- # parents = self._chapter_store.get_all_parents_chapter_items(
- # task.project_id, task.item_code
- # )
- # children = self._chapter_store.get_all_children_chapter_items(
- # task.project_id, task.item_code
- # )
- # data, msg = self._build_api_body(
- # task, project, budgets, parents, children, files
- # )
- data, msg = self._build_api_body(task, project, budgets, chapter, files)
- if not data:
- raise Exception(msg)
- res = self._call_api(self._task_submit_url, data)
- if res.result == -1:
- self._task_store.update_task_status(
- task.id, TaskStatusEnum.FAILURE.value, res.reason
- )
- return res.reason
- if res.data and len(res.data) > 0:
- self._insert_data(task, project, res.data)
- if res.result == 1:
- self._logger.debug(f"运行任务:{task.task_name}完成")
- self._task_store.update_task_status(
- task.id, TaskStatusEnum.SUCCESS.value
- )
- else:
- self._logger.debug(f"运行任务:{task.task_name}请求中,等待结果")
- self.query_task(task, project)
- return None
- except Exception as e:
- msg = f"任务运行失败,原因:{e}"
- self._logger.error(f"运行任务:{task.task_name}, {msg}")
- self._task_store.update_task_status(
- task.id, TaskStatusEnum.FAILURE.value, msg
- )
- return msg
- def cancel_task(self, task: ProjectTaskDto):
- try:
- self._logger.info(f"开始取消运行任务:{task.id}")
- if task.process_status == TaskStatusEnum.PROCESSING.value:
- res = self._call_api(self._task_cancel_url, {"task_id": task.id})
- if res.result == -1:
- self._task_store.update_task_status(
- task.id, TaskStatusEnum.FAILURE.value, res.reason
- )
- return res.reason
- project = self._project_store.get(task.project_id)
- if not project:
- self._logger.error(f"取消运行任务:{task.id}失败,原因:项目不存在")
- return "项目不存在"
- if res.data and len(res.data) > 0:
- self._insert_data(task, project, res.data)
- if res.result == 0:
- self._logger.info(f"取消运行任务:{task.id}成功")
- # self._task_store.update_task_status(task.id, TaskStatusEnum.CANCELED.value)
- return None
- elif res.result == 1:
- self._logger.error(f"取消运行任务失败:{task.id}已运行完成")
- self._task_store.update_task_status(
- task.id, TaskStatusEnum.SUCCESS.value
- )
- return "取消失败,任务已运行完成"
- else:
- self._logger.error(f"取消运行任务:{task.id}失败,原因:任务状态错误")
- return "任务状态错误"
- except Exception as e:
- msg = f"取消运行任务失败,原因:{e}"
- self._logger.error(msg)
- return msg
- def query_task(self, task: ProjectTaskDto, project: ProjectDto = None):
- try:
- import time
- while True:
- time.sleep(30)
- res = self._call_api(self._task_status_url, {"task_id": task.id})
- if res.result == -1:
- self._task_store.update_task_status(
- task.id, TaskStatusEnum.FAILURE.value, res.reason
- )
- return res.reason
- if res.data and len(res.data) > 0:
- if not project:
- project = self._project_store.get(task.project_id)
- self._insert_data(task, project, res.data)
- if res.result == 1:
- self._logger.debug(f"运行任务:{task.task_name}完成")
- self._task_store.update_task_status(
- task.id, TaskStatusEnum.SUCCESS.value
- )
- break
- else:
- self._logger.debug(f"运行任务:{task.task_name}请求中,等待结果")
- self.query_task(task, project)
- except Exception as e:
- msg = f"任务状态查询失败,原因:{e}"
- self._logger.error(msg)
- raise Exception(msg)
- def _read_files(self, paths: str) -> (list[ExcelParseFileDto], str):
- try:
- files = []
- self._logger.debug(f"开始读取文件:{paths}")
- path_list = paths.split(",")
- for path in path_list:
- file = utils.encode_file(path)
- files.append(ExcelParseFileDto(file_id=path, content=file))
- self._logger.debug(f"读取文件完成:{paths}")
- return files, ""
- except Exception as e:
- msg = f"读取文件失败,原因:{e}"
- self._logger.error(msg)
- return None, msg
- def _build_api_body(
- self,
- task: ProjectTaskDto,
- project: ProjectDto,
- budgets: list[TotalBudgetInfoDto],
- chapter: ChapterDto,
- # parents: list[ChapterDto],
- # children: list[ChapterDto],
- files: list[ExcelParseFileDto],
- ):
- try:
- budgets_data = [ExcelParseZgsDto.from_dto(budget) for budget in budgets]
- # parents_data = [ExcelParseItemDto.from_dto(parent) for parent in parents]
- # children_data = [ExcelParseItemDto.from_dto(child) for child in children]
- chapter_data = ExcelParseItemDto.from_dto(chapter)
- data = ExcelParseDto(
- task_id=task.id or 0,
- version=configs.app.version or "2020",
- project_id=task.project_id,
- project_name=project.project_name,
- project_stage=project.design_stage,
- selected_zgs_id=task.budget_id or 0,
- zgs_list=budgets_data,
- selected_chapter=chapter_data,
- # hierarchy=parents_data,
- # components=children_data,
- files=files,
- )
- return data, ""
- except Exception as e:
- msg = f"构建API BODY,原因:{e}"
- self._logger.error(msg)
- return None, msg
- def _call_api(self, api_url, data) -> ExcelParseResultDto:
- try:
- url = f"{configs.app.task_api_url}{api_url}"
- if isinstance(data, ExcelParseDto):
- data = data.to_dict()
- self._logger.debug(f"调用接口:{url},data:{data}")
- response = requests.post(
- url, headers={"Content-Type": "application/json"}, json=data
- )
- self._logger.debug(f"调用接口返回[{response.status_code}]:{response.text}")
- if response.status_code == 200:
- result = response.json()
- result_dto = ExcelParseResultDto.from_dict(result)
- self._logger.debug(f"调用接口成功")
- return result_dto
- else:
- self._logger.error("调用接口失败")
- raise Exception(response.text)
- except Exception as e:
- msg = f"调用接口失败,原因:{e}"
- self._logger.error(msg)
- raise Exception(msg)
- def _insert_data(
- self,
- task: ProjectTaskDto,
- project: ProjectDto,
- data: list[ExcelParseResultDataDto],
- ):
- try:
- self._logger.debug(f"开始插入数据:{task.task_name}")
- for item in data:
- self._logger.debug(f"数据:{item.to_dict()}")
- quota_dto = (
- self._quota_store.get_quota_by_quota_input(
- project.project_id, task.budget_id, item.target_id
- )
- if item.target_id > 0
- else None
- )
- if quota_dto:
- self._logger.debug(
- f"更新数据[{item.target_id}]:{item.item_id}/{item.item_code} {item.dinge_code}"
- )
- quota_dto.item_code = item.item_code
- quota_dto.item_id = item.item_id
- quota_dto.quota_id = item.target_id if item.target_id > 0 else 0
- quota_dto.quota_code = item.dinge_code
- quota_dto.quota_adjustment = item.dinge_adjust
- quota_dto.entry_name = item.entry_name
- quota_dto.units = item.units
- quota_dto.amount = item.amount
- quota_dto.ex_file = item.ex_file_id
- quota_dto.ex_cell = item.ex_cell
- quota_dto.ex_row = item.ex_row
- quota_dto.ex_unit = item.ex_unit
- quota_dto.ex_amount = item.ex_amount
- quota_dto.send_error = None
- self._quota_store.update_quota(quota_dto)
- else:
- self._logger.debug(
- f"新增数据[{item.target_id}]:{item.item_id}/{item.item_code} {item.dinge_code}"
- )
- quota_dto = ProjectQuotaDto(
- task_id=task.id,
- budget_id=item.zgs_id,
- budget_code=item.zgs_code,
- project_id=project.project_id,
- item_code=item.item_code,
- item_id=item.item_id,
- quota_id=item.target_id if item.target_id > 0 else 0,
- quota_code=item.dinge_code,
- quota_adjustment=item.dinge_adjust,
- entry_name=item.entry_name,
- units=item.units,
- amount=item.amount,
- ex_file=item.ex_file_id,
- ex_cell=item.ex_cell,
- ex_row=item.ex_row,
- ex_unit=item.ex_unit,
- ex_amount=item.ex_amount,
- created_by=task.created_by,
- )
- self._quota_store.create_quota(quota_dto)
- self._logger.debug(f"插入数据完成:{task.task_name}")
- return True
- except Exception as e:
- msg = f"插入数据失败,原因:{e}"
- self._logger.error(msg)
- return False, msg
|