|
@@ -0,0 +1,431 @@
|
|
|
+from typing import Optional
|
|
|
+
|
|
|
+import tools.utils as utils, core.configs as configs, os, threading
|
|
|
+from core.log.log_record import LogRecordHelper
|
|
|
+from core.enum import OperationModule, OperationType, TaskStatusEnum, SendStatusEnum
|
|
|
+from core.dtos import ProjectTaskDto
|
|
|
+from core.models import ProjectTaskModel
|
|
|
+from stores import ProjectTaskStore
|
|
|
+import executor
|
|
|
+
|
|
|
+
|
|
|
+class ProjectTaskService:
|
|
|
+ def __init__(self):
|
|
|
+ self.store = ProjectTaskStore()
|
|
|
+ self._logger = utils.get_logger()
|
|
|
+ self._task_locks = {}
|
|
|
+ self._lock = threading.Lock()
|
|
|
+
|
|
|
+ def _get_task_lock(self, task_id: int) -> threading.Lock:
|
|
|
+ with self._lock:
|
|
|
+ if task_id not in self._task_locks:
|
|
|
+ self._task_locks[task_id] = threading.Lock()
|
|
|
+ return self._task_locks[task_id]
|
|
|
+
|
|
|
+ def start_run_task(self, task_id: int):
|
|
|
+ task_lock = self._get_task_lock(task_id)
|
|
|
+ with task_lock:
|
|
|
+ task = self.store.get_task_dto(task_id)
|
|
|
+ if not task:
|
|
|
+ return "没有查询到任务"
|
|
|
+ if not task.file_path or task.file_path.strip() == "":
|
|
|
+ return "没有上传文件"
|
|
|
+ if task.process_status == TaskStatusEnum.PROCESSING:
|
|
|
+ return "正在运行中"
|
|
|
+
|
|
|
+ try:
|
|
|
+ thread = threading.Thread(target=self._run_task, args=(task,))
|
|
|
+ thread.daemon = True
|
|
|
+ thread.start()
|
|
|
+ LogRecordHelper.log_success(
|
|
|
+ OperationType.PROCESS,
|
|
|
+ OperationModule.TASK,
|
|
|
+ f"开始运行:{task.task_name}",
|
|
|
+ task.id,
|
|
|
+ )
|
|
|
+ if executor.project_is_running(task.project_id):
|
|
|
+ self.update_process_status(task_id, TaskStatusEnum.WAIT.value, "")
|
|
|
+ return "0"
|
|
|
+ else:
|
|
|
+ self.update_process_status(
|
|
|
+ task_id, TaskStatusEnum.PROCESSING.value, ""
|
|
|
+ )
|
|
|
+ return None
|
|
|
+ except Exception as e:
|
|
|
+ self._logger.error(f"启动任务失败: {str(e)}")
|
|
|
+ self.update_process_status(
|
|
|
+ task_id, TaskStatusEnum.FAILURE.value, str(e)
|
|
|
+ )
|
|
|
+ LogRecordHelper.log_fail(
|
|
|
+ OperationType.PROCESS,
|
|
|
+ OperationModule.TASK,
|
|
|
+ f"开始运行失败:{task.task_name}",
|
|
|
+ task.id,
|
|
|
+ )
|
|
|
+ raise
|
|
|
+
|
|
|
+ def _run_task(self, task: ProjectTaskDto):
|
|
|
+ task_lock = self._get_task_lock(task.id)
|
|
|
+ with task_lock:
|
|
|
+ try:
|
|
|
+ executor.run_task(task)
|
|
|
+ # if not msg:
|
|
|
+ # self.start_send_task(task.id)
|
|
|
+ except Exception as e:
|
|
|
+ self._logger.error(f"运行项目任务失败: {str(e)}")
|
|
|
+ self.update_process_status(
|
|
|
+ task.id, TaskStatusEnum.FAILURE.value, str(e)
|
|
|
+ )
|
|
|
+ raise
|
|
|
+
|
|
|
+ def start_send_task(self, task_id: int):
|
|
|
+ task_lock = self._get_task_lock(task_id)
|
|
|
+ with task_lock:
|
|
|
+ task = self.store.get_task_dto(task_id)
|
|
|
+ if not task:
|
|
|
+ return "没有查询到任务"
|
|
|
+ if task.process_status != TaskStatusEnum.SUCCESS.value:
|
|
|
+ return "还未处理完成"
|
|
|
+ if task.send_status == SendStatusEnum.PROCESSING.value:
|
|
|
+ return "正在发送中"
|
|
|
+
|
|
|
+ try:
|
|
|
+ thread = threading.Thread(target=self._send_task, args=(task,))
|
|
|
+ thread.daemon = True
|
|
|
+ thread.start()
|
|
|
+ LogRecordHelper.log_success(
|
|
|
+ OperationType.SEND,
|
|
|
+ OperationModule.TASK,
|
|
|
+ f"开始发送:{task.task_name}",
|
|
|
+ task.id,
|
|
|
+ )
|
|
|
+ return None
|
|
|
+ except Exception as e:
|
|
|
+ self._logger.error(f"启动发送任务失败: {str(e)}")
|
|
|
+ self.update_send_status(task_id, TaskStatusEnum.FAILURE.value, str(e))
|
|
|
+ LogRecordHelper.log_fail(
|
|
|
+ OperationType.SEND,
|
|
|
+ OperationModule.TASK,
|
|
|
+ f"开始发送失败:{task.task_name}",
|
|
|
+ task.id,
|
|
|
+ )
|
|
|
+ raise
|
|
|
+
|
|
|
+ def _send_task(self, task: ProjectTaskDto):
|
|
|
+ task_lock = self._get_task_lock(task.id)
|
|
|
+ with task_lock:
|
|
|
+ try:
|
|
|
+ executor.send_task(task)
|
|
|
+ except Exception as e:
|
|
|
+ self._logger.error(f"发送项目任务失败: {str(e)}")
|
|
|
+ self.update_send_status(task.id, TaskStatusEnum.FAILURE.value, str(e))
|
|
|
+ raise
|
|
|
+
|
|
|
+ def get_tasks_paginated(
|
|
|
+ self,
|
|
|
+ project_id: str,
|
|
|
+ item_code: str,
|
|
|
+ page: int = 1,
|
|
|
+ page_size: int = 10,
|
|
|
+ keyword: Optional[str] = None,
|
|
|
+ process_status: Optional[int] = None,
|
|
|
+ send_status: Optional[int] = None,
|
|
|
+ ):
|
|
|
+ """获取项目任务列表
|
|
|
+
|
|
|
+ Args:
|
|
|
+ project_id: 项目编号
|
|
|
+ item_code: 条目编号
|
|
|
+ page: 页码
|
|
|
+ page_size: 每页数量
|
|
|
+ keyword: 关键字
|
|
|
+ process_status: 处理状态
|
|
|
+ send_status: 发送状态
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ dict: 包含总数和任务列表的字典
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ data = self.store.get_tasks_paginated(
|
|
|
+ project_id=project_id,
|
|
|
+ item_code=item_code,
|
|
|
+ page=page,
|
|
|
+ page_size=page_size,
|
|
|
+ keyword=keyword,
|
|
|
+ process_status=process_status,
|
|
|
+ send_status=send_status,
|
|
|
+ )
|
|
|
+ return [
|
|
|
+ ProjectTaskDto.from_model(task).to_dict()
|
|
|
+ for task in data.get("data", [])
|
|
|
+ ], data.get("total", 0)
|
|
|
+ except Exception as e:
|
|
|
+ self._logger.error(f"获取项目任务列表失败: {str(e)}")
|
|
|
+ raise
|
|
|
+
|
|
|
+ def get_task(self, task_id: int) -> Optional[ProjectTaskModel]:
|
|
|
+ """获取单个项目任务
|
|
|
+
|
|
|
+ Args:
|
|
|
+ task_id: 任务ID
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Optional[ProjectTaskModel]: 项目任务
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ task = self.store.get_task(task_id)
|
|
|
+ return task
|
|
|
+ except Exception as e:
|
|
|
+ self._logger.error(f"获取项目任务失败: {str(e)}")
|
|
|
+ raise
|
|
|
+
|
|
|
+ def get_task_dto(self, task_id: int):
|
|
|
+ try:
|
|
|
+ task = self.store.get_task_dto(task_id)
|
|
|
+ return task
|
|
|
+ except Exception as e:
|
|
|
+ self._logger.error(f"获取项目任务失败: {str(e)}")
|
|
|
+ raise
|
|
|
+
|
|
|
+ def save_task(self, task_id: int, task_dto: ProjectTaskDto, files: list):
|
|
|
+ log_data = ""
|
|
|
+ if task_id == 0:
|
|
|
+ task = self.store.create_task(task_dto)
|
|
|
+ else:
|
|
|
+ task = self.get_task_dto(task_id)
|
|
|
+ if task.process_status != TaskStatusEnum.NEW.value:
|
|
|
+ raise Exception("项目任务提交处理过,不能再修改。")
|
|
|
+ log_data = utils.to_str(task.to_dict())
|
|
|
+ if task is None:
|
|
|
+ LogRecordHelper.log_fail(
|
|
|
+ OperationType.UPDATE,
|
|
|
+ OperationModule.TASK,
|
|
|
+ f"修改任务失败 错误:任务[{task_id}]不存在",
|
|
|
+ )
|
|
|
+ raise Exception("项目任务不存在")
|
|
|
+ task_dto.id = task_id
|
|
|
+ task = self.store.update_task(task_dto)
|
|
|
+ try:
|
|
|
+ if task:
|
|
|
+ paths = self._process_file_upload(task, files)
|
|
|
+ if paths != task.file_path:
|
|
|
+ task = self.store.update_task_files(task.id, paths)
|
|
|
+ if task_id == 0:
|
|
|
+ LogRecordHelper.log_success(
|
|
|
+ OperationType.CREATE,
|
|
|
+ OperationModule.TASK,
|
|
|
+ f"创建任务成功 任务:{task.task_name}",
|
|
|
+ utils.to_str(task.to_dict()),
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ LogRecordHelper.log_success(
|
|
|
+ OperationType.UPDATE,
|
|
|
+ OperationModule.TASK,
|
|
|
+ f"修改任务成功",
|
|
|
+ log_data,
|
|
|
+ utils.to_str(task.to_dict()),
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ if task_id == 0:
|
|
|
+ LogRecordHelper.log_fail(
|
|
|
+ OperationType.CREATE,
|
|
|
+ OperationModule.TASK,
|
|
|
+ f"创建任务失败",
|
|
|
+ utils.to_str(task_dto.to_dict()),
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ LogRecordHelper.log_fail(
|
|
|
+ OperationType.UPDATE,
|
|
|
+ OperationModule.TASK,
|
|
|
+ f"修改任务失败",
|
|
|
+ log_data,
|
|
|
+ )
|
|
|
+ return task
|
|
|
+ except ValueError as ve:
|
|
|
+ LogRecordHelper.log_fail(
|
|
|
+ OperationType.UPDATE,
|
|
|
+ OperationModule.TASK,
|
|
|
+ f"{'修改' if task_id == 0 else '添加'}任务失败 错误:{task_dto.task_name} {str(ve)}",
|
|
|
+ utils.to_str(task_dto.to_dict()),
|
|
|
+ )
|
|
|
+ raise ve
|
|
|
+ except Exception as e:
|
|
|
+ LogRecordHelper.log_fail(
|
|
|
+ OperationType.UPDATE,
|
|
|
+ OperationModule.TASK,
|
|
|
+ f"{'修改' if task_id == 0 else '添加'}任务失败 错误:{task_dto.task_name} {str(e)}",
|
|
|
+ utils.to_str(task_dto.to_dict()),
|
|
|
+ )
|
|
|
+ raise e
|
|
|
+
|
|
|
+ def _process_file_upload(self, task: ProjectTaskDto, files: list) -> str:
|
|
|
+ """处理文件上传流程"""
|
|
|
+
|
|
|
+ self._logger.info(f"保存处理文件,项目ID:{task.project_id},任务ID:{task.id}")
|
|
|
+ delete_old = task.process_status == 0
|
|
|
+ if delete_old and task.file_path:
|
|
|
+ delete_paths = []
|
|
|
+ for file_path in task.file_path.split(","):
|
|
|
+ if os.path.isfile(file_path):
|
|
|
+ delete_dir = os.path.dirname(file_path).replace(
|
|
|
+ "upload_files/", "delete_files/"
|
|
|
+ )
|
|
|
+ os.makedirs(delete_dir, exist_ok=True)
|
|
|
+ # 处理文件名冲突
|
|
|
+ base_name = os.path.basename(file_path)
|
|
|
+ target_path = os.path.join(delete_dir, base_name)
|
|
|
+ counter = 1
|
|
|
+ while os.path.exists(target_path):
|
|
|
+ name, ext = os.path.splitext(base_name)
|
|
|
+ target_path = os.path.join(delete_dir, f"{name}_{counter}{ext}")
|
|
|
+ counter += 1
|
|
|
+ os.rename(file_path, target_path)
|
|
|
+ delete_paths.append(target_path)
|
|
|
+ # 删除空目录
|
|
|
+ original_dir = os.path.dirname(file_path)
|
|
|
+ if not os.listdir(original_dir):
|
|
|
+ os.rmdir(original_dir)
|
|
|
+ if len(delete_paths) > 0:
|
|
|
+ LogRecordHelper.log_success(
|
|
|
+ OperationType.DELETE,
|
|
|
+ OperationModule.TASK,
|
|
|
+ f"删除任务文件:{task.task_name}",
|
|
|
+ utils.to_str(delete_paths),
|
|
|
+ )
|
|
|
+
|
|
|
+ # file_paths = [] if delete_old or not task.file_path else task.file_path.split(',')
|
|
|
+
|
|
|
+ file_paths = []
|
|
|
+ if files and len(files) > 0:
|
|
|
+ task_dir = os.path.join(
|
|
|
+ configs.app.source_path, f"upload_files/{task.get_path()}"
|
|
|
+ )
|
|
|
+ os.makedirs(task_dir, exist_ok=True)
|
|
|
+ for file in files:
|
|
|
+ if not file.filename:
|
|
|
+ continue
|
|
|
+ allowed_ext = {"xlsx", "xls", "csv"}
|
|
|
+ ext = file.filename.rsplit(".", 1)[-1].lower()
|
|
|
+ if ext not in allowed_ext:
|
|
|
+ continue
|
|
|
+ file_path = os.path.join(task_dir, file.filename)
|
|
|
+ file_path = file_path.replace("\\", "/")
|
|
|
+ file.save(file_path)
|
|
|
+ file_paths.append(file_path)
|
|
|
+ return ",".join(file_paths)
|
|
|
+
|
|
|
+ def _create_task(self, task_dto: ProjectTaskDto) -> ProjectTaskDto:
|
|
|
+ """创建项目任务
|
|
|
+
|
|
|
+ Args:
|
|
|
+ task_dto: 任务DTO对象
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ ProjectTaskDto: 创建后的任务DTO对象
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 业务验证
|
|
|
+ if not task_dto.project_id or not task_dto.budget_id:
|
|
|
+ raise ValueError("项目编号和概算序号不能为空")
|
|
|
+ if not task_dto.task_name:
|
|
|
+ raise ValueError("任务名称不能为空")
|
|
|
+
|
|
|
+ return self.store.create_task(task_dto)
|
|
|
+ except Exception as e:
|
|
|
+ self._logger.error(f"创建项目任务失败: {str(e)}")
|
|
|
+ raise
|
|
|
+
|
|
|
+ def _update_task(self, task_dto: ProjectTaskDto) -> Optional[ProjectTaskDto]:
|
|
|
+ """更新项目任务
|
|
|
+
|
|
|
+ Args:
|
|
|
+ task_dto: 任务DTO对象
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Optional[ProjectTaskDto]: 更新后的任务DTO对象
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 业务验证
|
|
|
+ if not task_dto.id:
|
|
|
+ raise ValueError("任务ID不能为空")
|
|
|
+ if not task_dto.task_name:
|
|
|
+ raise ValueError("任务名称不能为空")
|
|
|
+
|
|
|
+ return self.store.update_task(task_dto)
|
|
|
+ except Exception as e:
|
|
|
+ self._logger.error(f"更新项目任务失败: {str(e)}")
|
|
|
+ raise
|
|
|
+
|
|
|
+ def delete_task(self, task_id: int) -> bool:
|
|
|
+ """删除项目任务
|
|
|
+
|
|
|
+ Args:
|
|
|
+ task_id: 任务ID
|
|
|
+ Returns:
|
|
|
+ bool: 删除是否成功
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ task = self.store.get_task(task_id)
|
|
|
+ if task.process_status == TaskStatusEnum.PROCESSING.value:
|
|
|
+ raise Exception("任务正在进行中,请先取消任务")
|
|
|
+ return self.store.delete_task(task_id)
|
|
|
+ except Exception as e:
|
|
|
+ self._logger.error(f"删除项目任务失败: {str(e)}")
|
|
|
+ raise
|
|
|
+
|
|
|
+ def update_process_status(self, task_id: int, status: int, err: str = None) -> bool:
|
|
|
+ """更新处理状态
|
|
|
+
|
|
|
+ Args:
|
|
|
+ task_id: 任务ID
|
|
|
+ status: 状态值
|
|
|
+ err: 错误信息
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ bool: 更新是否成功
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ return self.store.update_process_status(task_id, status, err)
|
|
|
+ except Exception as e:
|
|
|
+ self._logger.error(f"更新项目任务处理状态失败: {str(e)}")
|
|
|
+ raise
|
|
|
+
|
|
|
+ def update_send_status(self, task_id: int, status: int, err: str = None) -> bool:
|
|
|
+ """更新发送状态
|
|
|
+
|
|
|
+ Args:
|
|
|
+ task_id: 任务ID
|
|
|
+ status: 状态值
|
|
|
+ err: 错误信息
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ bool: 更新是否成功
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ return self.store.update_send_status(task_id, status, err)
|
|
|
+ except Exception as e:
|
|
|
+ self._logger.error(f"更新项目任务发送状态失败: {str(e)}")
|
|
|
+ raise
|
|
|
+
|
|
|
+ def cancel_run_task(self, task_id: int):
|
|
|
+ task = self.store.get_task_dto(task_id)
|
|
|
+ if task:
|
|
|
+ msg = executor.cancel_task(task)
|
|
|
+ if not msg:
|
|
|
+ self.store.update_task_status(task.id, TaskStatusEnum.CANCELED.value)
|
|
|
+ LogRecordHelper.log_success(
|
|
|
+ OperationType.PROCESS,
|
|
|
+ OperationModule.TASK,
|
|
|
+ f"取消运行:{task.task_name}",
|
|
|
+ task.id,
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ LogRecordHelper.log_fail(
|
|
|
+ OperationType.PROCESS,
|
|
|
+ OperationModule.TASK,
|
|
|
+ f"取消运行失败:{task.task_name}。 {msg}",
|
|
|
+ task.id,
|
|
|
+ )
|
|
|
+ return msg
|
|
|
+ else:
|
|
|
+ return "没有查询到任务"
|