|
@@ -1,11 +1,12 @@
|
|
|
from time import sleep
|
|
|
+import threading
|
|
|
+from typing import Optional
|
|
|
|
|
|
-import requests,tools.utils as utils, core.configs as configs
|
|
|
-from core.dtos import ProjectDto,ProjectTaskDto, ProjectQuotaDto, ChapterDto, TotalBudgetInfoDto, ExcelParseResultDataDto
|
|
|
+import tools.utils as utils, core.configs as configs
|
|
|
+from core.dtos import ProjectTaskDto
|
|
|
from core.enum import TaskStatusEnum
|
|
|
-from stores import ProjectStore,ProjectTaskStore, ProjectQuotaStore, ChapterStore, BudgetStore
|
|
|
-from core.dtos import ExcelParseDto,ExcelParseZgsDto,ExcelParseItemDto,ExcelParseFileDto,ExcelParseResultDto
|
|
|
-import threading
|
|
|
+from executor.task_processor import TaskProcessor
|
|
|
+from stores import ProjectStore,ProjectTaskStore
|
|
|
|
|
|
class TaskRunner:
|
|
|
_is_running = {}
|
|
@@ -15,50 +16,98 @@ class TaskRunner:
|
|
|
_task_sleep_interval = configs.app.task_interval
|
|
|
_logger = utils.get_logger()
|
|
|
_project_store = ProjectStore()
|
|
|
- _budget_store = BudgetStore()
|
|
|
- _chapter_store = ChapterStore()
|
|
|
_task_store = ProjectTaskStore()
|
|
|
- _quota_store = ProjectQuotaStore()
|
|
|
- _task_submit_url = "/task_submit"
|
|
|
- _task_status_url = "/task_status"
|
|
|
- _task_cancel_url = "/cancel_task"
|
|
|
_lock = threading.Lock()
|
|
|
+ _task_processor = None
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _get_task_processor(cls):
|
|
|
+ if cls._task_processor is None:
|
|
|
+ cls._task_processor = TaskProcessor()
|
|
|
+ return cls._task_processor
|
|
|
|
|
|
@classmethod
|
|
|
- def run(cls, task:ProjectTaskDto=None):
|
|
|
+ def _ensure_wait_list(cls, project_id: str) -> None:
|
|
|
+ """确保项目的等待队列已初始化"""
|
|
|
+ if project_id not in cls._task_wait_list:
|
|
|
+ cls._task_wait_list[project_id] = []
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _add_task_to_wait_list(cls, task: ProjectTaskDto, log_message: str = None) -> None:
|
|
|
+ """将任务添加到等待队列并更新状态"""
|
|
|
+ project_id = task.project_id
|
|
|
+ cls._ensure_wait_list(project_id)
|
|
|
+ if log_message:
|
|
|
+ cls._logger.info(log_message)
|
|
|
+ cls._task_wait_list[project_id].append(task)
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def run(cls, task: ProjectTaskDto = None):
|
|
|
if task:
|
|
|
project_id = task.project_id
|
|
|
- if project_id not in cls._task_wait_list:
|
|
|
- cls._task_wait_list[project_id] = []
|
|
|
- cls._logger.info(f"添加到待运行队列:{task.task_name}")
|
|
|
- cls._task_wait_list[project_id].append(task)
|
|
|
- # cls._task_store.update_task_status(task.id,TaskStatusEnum.WAIT.value)
|
|
|
-
|
|
|
- if task and task.project_id in cls._is_running and cls._is_running[task.project_id]:
|
|
|
- return
|
|
|
-
|
|
|
- # 如果有新任务但并发数已满,直接返回等待
|
|
|
- if task and len(cls._running_projects) >= cls._max_concurrent_projects and task.project_id not in cls._running_projects:
|
|
|
+ # 检查项目是否已有任务在运行
|
|
|
+ if project_id in cls._is_running and cls._is_running[project_id]:
|
|
|
+ cls._add_task_to_wait_list(
|
|
|
+ task,
|
|
|
+ f"项目[{project_id}]已有任务在运行,任务[{task.task_name}]将等待"
|
|
|
+ )
|
|
|
+ return
|
|
|
+
|
|
|
+ # 如果项目没有任务在运行,但并发项目数已满且该项目不在运行集合中
|
|
|
+ if len(cls._running_projects) >= cls._max_concurrent_projects and project_id not in cls._running_projects:
|
|
|
+ cls._add_task_to_wait_list(
|
|
|
+ task,
|
|
|
+ f"项目运行队列[{cls._max_concurrent_projects}]已满,任务[{task.task_name}]将等待"
|
|
|
+ )
|
|
|
+ return
|
|
|
+
|
|
|
+ # 可以执行的新任务
|
|
|
+ cls._add_task_to_wait_list(task, f"添加到待运行队列:{task.task_name}")
|
|
|
+ cls._execute_project_tasks(project_id)
|
|
|
return
|
|
|
+
|
|
|
+ # 无新任务时,尝试执行等待队列中的任务
|
|
|
if not task:
|
|
|
- if cls._task_wait_list and len(cls._task_wait_list) > 0:
|
|
|
- for project_id in cls._task_wait_list.keys():
|
|
|
- cls._execute_project_tasks(project_id)
|
|
|
+ if cls._task_wait_list and len(cls._task_wait_list) > 0:
|
|
|
+ # 遍历等待队列中的项目
|
|
|
+ for project_id in list(cls._task_wait_list.keys()):
|
|
|
+ # 只有当项目没有任务在运行且并发项目数未满时才执行
|
|
|
+ if (project_id not in cls._is_running or not cls._is_running[project_id]) and len(cls._running_projects) < cls._max_concurrent_projects:
|
|
|
+ cls._execute_project_tasks(project_id)
|
|
|
else:
|
|
|
- cls._logger.info(f"项目运行队列[{cls._max_concurrent_projects}]已满,等待{cls._task_sleep_interval}秒后同步数据运行")
|
|
|
+ cls._logger.info(f"等待队列为空,等待{cls._task_sleep_interval}秒后同步数据运行")
|
|
|
cls._sync_wait_list()
|
|
|
- sleep(cls._task_sleep_interval)
|
|
|
- return
|
|
|
- else:
|
|
|
- cls._execute_project_tasks(task.project_id)
|
|
|
-
|
|
|
-
|
|
|
+ @classmethod
|
|
|
+ def _update_project_running_status(cls, project_id: str, is_running: bool) -> None:
|
|
|
+ """更新项目运行状态"""
|
|
|
+ with cls._lock:
|
|
|
+ cls._is_running[project_id] = is_running
|
|
|
+ if is_running:
|
|
|
+ cls._running_projects.add(project_id)
|
|
|
+ elif project_id in cls._running_projects:
|
|
|
+ cls._running_projects.remove(project_id)
|
|
|
+
|
|
|
+ # 如果项目没有等待任务,从等待列表中移除
|
|
|
+ if not is_running and project_id in cls._task_wait_list and len(cls._task_wait_list[project_id]) == 0:
|
|
|
+ del cls._task_wait_list[project_id]
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _can_execute_project(cls, project_id: str) -> bool:
|
|
|
+ """检查项目是否可以执行"""
|
|
|
+ # 如果项目已在运行中,直接返回False
|
|
|
+ if project_id in cls._is_running and cls._is_running[project_id]:
|
|
|
+ return False
|
|
|
+ # 如果项目不在运行集合中且并发数已满,直接返回False
|
|
|
+ if project_id not in cls._running_projects and len(cls._running_projects) >= cls._max_concurrent_projects:
|
|
|
+ return False
|
|
|
+ return True
|
|
|
+
|
|
|
@classmethod
|
|
|
def _execute_project_tasks(cls, project_id: str):
|
|
|
try:
|
|
|
- # 如果项目不在运行集合中且并发数已满,直接返回等待
|
|
|
+ # 检查项目是否可以执行
|
|
|
with cls._lock:
|
|
|
- if project_id not in cls._running_projects and len(cls._running_projects) >= cls._max_concurrent_projects:
|
|
|
+ if not cls._can_execute_project(project_id):
|
|
|
return
|
|
|
cls._is_running[project_id] = True
|
|
|
cls._running_projects.add(project_id)
|
|
@@ -68,17 +117,12 @@ class TaskRunner:
|
|
|
while project_id in cls._task_wait_list and len(cls._task_wait_list[project_id]) > 0:
|
|
|
current_task = cls._task_wait_list[project_id].pop(0)
|
|
|
try:
|
|
|
- cls._submit_task(current_task)
|
|
|
- except Exception as e:
|
|
|
- cls._logger.error(f"运行任务失败:{current_task.task_name}, {str(e)}")
|
|
|
+ task_processor = cls._get_task_processor()
|
|
|
+ task_processor.submit_task(current_task)
|
|
|
+ except Exception as ex:
|
|
|
+ cls._logger.error(f"运行任务失败:{current_task.task_name}, {str(ex)}")
|
|
|
finally:
|
|
|
- with cls._lock:
|
|
|
- if project_id in cls._is_running:
|
|
|
- cls._is_running[project_id] = False
|
|
|
- if project_id in cls._running_projects:
|
|
|
- cls._running_projects.remove(project_id)
|
|
|
- if project_id in cls._task_wait_list and len(cls._task_wait_list[project_id]) == 0:
|
|
|
- del cls._task_wait_list[project_id]
|
|
|
+ cls._update_project_running_status(project_id, False)
|
|
|
cls._sync_wait_list()
|
|
|
|
|
|
# 创建新线程执行任务
|
|
@@ -87,60 +131,42 @@ class TaskRunner:
|
|
|
thread.start()
|
|
|
except Exception as e:
|
|
|
cls._logger.error(f"执行项目任务失败:{project_id}, {str(e)}")
|
|
|
- with cls._lock:
|
|
|
- if project_id in cls._is_running:
|
|
|
- cls._is_running[project_id] = False
|
|
|
- if project_id in cls._running_projects:
|
|
|
- cls._running_projects.remove(project_id)
|
|
|
- if project_id in cls._task_wait_list and len(cls._task_wait_list[project_id]) == 0:
|
|
|
- del cls._task_wait_list[project_id]
|
|
|
+ cls._update_project_running_status(project_id, False)
|
|
|
cls._sync_wait_list()
|
|
|
|
|
|
@classmethod
|
|
|
- def _sync_wait_list(cls,project_id:str=None):
|
|
|
+ def _sync_wait_list(cls, project_id: str = None) -> Optional[str]:
|
|
|
try:
|
|
|
cls._logger.info(f"开始同步待运行队列")
|
|
|
tasks = cls._task_store.get_wait_tasks(project_id)
|
|
|
for task in tasks:
|
|
|
- if task.project_id not in cls._task_wait_list:
|
|
|
- cls._task_wait_list[task.project_id] = []
|
|
|
+ cls._ensure_wait_list(task.project_id)
|
|
|
if task not in cls._task_wait_list[task.project_id]:
|
|
|
cls._task_wait_list[task.project_id].append(task)
|
|
|
+
|
|
|
total_tasks = sum(len(tasks) for tasks in cls._task_wait_list.values())
|
|
|
- cls._logger.info(f"同步待运行队列完成,同步了{total_tasks}条数据")
|
|
|
if total_tasks > 0:
|
|
|
+ cls._logger.info(f"同步待运行队列,同步{total_tasks}条数据")
|
|
|
cls.run()
|
|
|
- return None
|
|
|
+ else:
|
|
|
+ cls._logger.info(f"同步待运行队列,无新增数据,等待{cls._task_sleep_interval}秒后同步数据运行")
|
|
|
+ sleep(cls._task_sleep_interval)
|
|
|
+ cls._sync_wait_list()
|
|
|
except Exception as e:
|
|
|
msg = f"同步待运行队列失败,原因:{e}"
|
|
|
cls._logger.error(msg)
|
|
|
- return msg
|
|
|
+
|
|
|
|
|
|
@classmethod
|
|
|
def cancel(cls, task:ProjectTaskDto):
|
|
|
try:
|
|
|
cls._logger.info(f"开始取消运行任务:{task.id}")
|
|
|
if task.process_status == TaskStatusEnum.PROCESSING.value:
|
|
|
- res = cls._call_api(cls._task_cancel_url,{"task_id":task.id})
|
|
|
- if res.result==-1:
|
|
|
- cls._task_store.update_task_status(task.id,TaskStatusEnum.FAILURE.value, res.reason)
|
|
|
- return res.reason
|
|
|
- project = cls._project_store.get(task.project_id)
|
|
|
- if not project:
|
|
|
- cls._logger.error(f"取消运行任务:{task.id}失败,原因:项目不存在")
|
|
|
- return "项目不存在"
|
|
|
- if res.data and len(res.data)>0:
|
|
|
- cls._insert_data(task,project,res.data)
|
|
|
- if res.result == 0 :
|
|
|
- cls._logger.info(f"取消运行任务:{task.id}成功")
|
|
|
- cls._task_store.update_task_status(task.id, TaskStatusEnum.CANCELED.value)
|
|
|
- elif res.result == 1:
|
|
|
- cls._logger.info(f"取消运行任务失败:{task.id}已运行完成")
|
|
|
- cls._task_store.update_task_status(task.id, TaskStatusEnum.SUCCESS.value)
|
|
|
- return f"取消失败,任务已运行完成"
|
|
|
+ task_processor = cls._get_task_processor()
|
|
|
+ result = task_processor.cancel_task(task)
|
|
|
+ return result
|
|
|
elif task.process_status == TaskStatusEnum.WAIT.value:
|
|
|
cls._task_wait_list[task.project_id].remove(task)
|
|
|
- cls._task_store.update_task_status(task.id, TaskStatusEnum.CANCELED.value)
|
|
|
cls._logger.info(f"取消运行任务:{task.id}成功")
|
|
|
else:
|
|
|
cls._logger.info(f"取消运行任务:{task.id}失败,原因:任务状态错误")
|
|
@@ -151,169 +177,11 @@ class TaskRunner:
|
|
|
cls._logger.error(msg)
|
|
|
return msg
|
|
|
|
|
|
- @classmethod
|
|
|
- def _submit_task(cls, task:ProjectTaskDto):
|
|
|
- try:
|
|
|
- cls._logger.info(f"开始运行任务:{task.task_name}")
|
|
|
- cls._task_store.update_task_status(task.id, TaskStatusEnum.PROCESSING.value)
|
|
|
- if not task.file_path:
|
|
|
- raise Exception("任务文件不存在")
|
|
|
- files, msg = cls._read_files(task.file_path)
|
|
|
- if not files or len(files)==0:
|
|
|
- raise Exception(msg)
|
|
|
- project = cls._project_store.get(task.project_id)
|
|
|
- if not project:
|
|
|
- raise Exception("项目不存在")
|
|
|
- budget_models = cls._budget_store.get_budget_info(task.project_id)
|
|
|
- budgets = [TotalBudgetInfoDto.from_model(budget) for budget in budget_models]
|
|
|
- parents = cls._chapter_store.get_all_parents_chapter_items(task.project_id, task.item_code)
|
|
|
- children = cls._chapter_store.get_all_children_chapter_items(task.project_id, task.item_code)
|
|
|
- data,msg = cls._build_api_body(task,project,budgets,parents,children,files)
|
|
|
- if not data:
|
|
|
- raise Exception(msg)
|
|
|
- res = cls._call_api(cls._task_submit_url,data)
|
|
|
-
|
|
|
- if res.result==-1:
|
|
|
- cls._task_store.update_task_status(task.id,TaskStatusEnum.FAILURE.value, res.reason)
|
|
|
- return res.reason
|
|
|
-
|
|
|
- if res.data and len(res.data)>0:
|
|
|
- cls._insert_data(task,project,res.data)
|
|
|
- if res.result == 1 :
|
|
|
- cls._logger.info(f"运行任务:{task.task_name}完成")
|
|
|
- cls._task_store.update_task_status(task.id,TaskStatusEnum.SUCCESS.value)
|
|
|
- else:
|
|
|
- cls._logger.info(f"运行任务:{task.task_name}请求中,等待结果")
|
|
|
- cls._query_task(task,project)
|
|
|
- return None
|
|
|
- except Exception as e:
|
|
|
- msg = f"任务运行失败,原因:{e}"
|
|
|
- cls._logger.error(f"运行任务:{task.task_name}, {msg}")
|
|
|
- cls._task_store.update_task_status(task.id,TaskStatusEnum.FAILURE.value, msg)
|
|
|
- return msg
|
|
|
-
|
|
|
- @classmethod
|
|
|
- def _query_task(cls, task:ProjectTaskDto,project:ProjectDto):
|
|
|
- try:
|
|
|
- import time
|
|
|
- while True:
|
|
|
- time.sleep(30)
|
|
|
- res = cls._call_api(cls._task_status_url,{"task_id":task.id})
|
|
|
- if res.result==-1:
|
|
|
- cls._task_store.update_task_status(task.id,TaskStatusEnum.FAILURE.value, res.reason)
|
|
|
- return res.reason
|
|
|
- if res.data and len(res.data) > 0:
|
|
|
- cls._insert_data(task, project, res.data)
|
|
|
- if res.result == 1:
|
|
|
- cls._logger.info(f"运行任务:{task.task_name}完成")
|
|
|
- cls._task_store.update_task_status(task.id, TaskStatusEnum.SUCCESS.value)
|
|
|
- break
|
|
|
- else:
|
|
|
- cls._logger.info(f"运行任务:{task.task_name}请求中,等待结果")
|
|
|
- cls._query_task(task, project)
|
|
|
- except Exception as e:
|
|
|
- msg = f"任务状态查询失败,原因:{e}"
|
|
|
- cls._logger.error(msg)
|
|
|
- raise Exception(msg)
|
|
|
-
|
|
|
- @classmethod
|
|
|
- def _read_files(cls,paths:str)->(list[ExcelParseFileDto],str):
|
|
|
- try:
|
|
|
- files=[]
|
|
|
- cls._logger.debug(f"开始读取文件:{paths}")
|
|
|
- path_list= paths.split(",")
|
|
|
- for path in path_list:
|
|
|
- file = utils.encode_file(path)
|
|
|
- files.append(ExcelParseFileDto(file_id=path, content=file))
|
|
|
- cls._logger.debug(f"读取文件完成:{paths}")
|
|
|
- return files, ''
|
|
|
- except Exception as e:
|
|
|
- msg = f"读取文件失败,原因:{e}"
|
|
|
- cls._logger.error(msg)
|
|
|
- return None,msg
|
|
|
-
|
|
|
- @classmethod
|
|
|
- def _build_api_body(cls,task:ProjectTaskDto,project:ProjectDto,budgets:list[TotalBudgetInfoDto],parents:list[ChapterDto],children:list[ChapterDto],files:list[ExcelParseFileDto]):
|
|
|
- try:
|
|
|
- budgets_data = [ExcelParseZgsDto.from_dto(budget) for budget in budgets]
|
|
|
- parents_data = [ExcelParseItemDto.from_dto(parent) for parent in parents]
|
|
|
- children_data = [ExcelParseItemDto.from_dto(child) for child in children]
|
|
|
- data = ExcelParseDto(
|
|
|
- task_id=task.id or 0,
|
|
|
- version=configs.app.version or "2020",
|
|
|
- project_id=task.project_id,
|
|
|
- project_name=project.project_name,
|
|
|
- project_stage=project.design_stage,
|
|
|
- selected_zgs_id=task.budget_id or 0,
|
|
|
- zgs_list=budgets_data,
|
|
|
- hierarchy=parents_data,
|
|
|
- components=children_data,
|
|
|
- file_excel=files
|
|
|
- )
|
|
|
- return data, ""
|
|
|
- except Exception as e:
|
|
|
- msg = f"构建API BODY,原因:{e}"
|
|
|
- cls._logger.error(msg)
|
|
|
- return None,msg
|
|
|
-
|
|
|
- @classmethod
|
|
|
- def _call_api(cls,api_url,data)->ExcelParseResultDto:
|
|
|
- try:
|
|
|
- url = f"{configs.app.task_api_url}{api_url}"
|
|
|
- if isinstance(data, ExcelParseDto):
|
|
|
- data = data.to_dict()
|
|
|
- cls._logger.debug(f"调用接口:{url},data:{data}")
|
|
|
- response = requests.post(url, headers={"Content-Type": "application/json"}, json=data)
|
|
|
- cls._logger.debug(f"调用接口返回[{response.status_code}]:{response.text}")
|
|
|
- if response.status_code == 200:
|
|
|
- result = response.json()
|
|
|
- result_dto = ExcelParseResultDto.from_dict(result)
|
|
|
- cls._logger.debug(f"调用接口成功")
|
|
|
- return result_dto
|
|
|
- else:
|
|
|
- cls._logger.error("调用接口失败")
|
|
|
- raise Exception(response.text)
|
|
|
- except Exception as e:
|
|
|
- msg = f"调用接口失败,原因:{e}"
|
|
|
- cls._logger.error(msg)
|
|
|
- raise Exception(msg)
|
|
|
-
|
|
|
- @classmethod
|
|
|
- def _insert_data(cls,task:ProjectTaskDto,project:ProjectDto,data:list[ExcelParseResultDataDto]):
|
|
|
- try:
|
|
|
- cls._logger.debug(f"开始插入数据:{task.task_name}")
|
|
|
|
|
|
- for item in data:
|
|
|
- cls._logger.debug(f"数据:{item.to_dict()}")
|
|
|
- quota = ProjectQuotaDto(
|
|
|
- task_id=task.id,
|
|
|
- budget_id=item.zgs_id,
|
|
|
- budget_code=item.zgs_code,
|
|
|
- project_id=project.project_id,
|
|
|
- item_code=item.item_code,
|
|
|
- item_id=item.item_id,
|
|
|
- quota_code=item.dinge_code,
|
|
|
- entry_name=item.entry_name,
|
|
|
- units=item.units,
|
|
|
- amount=item.amount,
|
|
|
- ex_file=item.ex_file_id,
|
|
|
- ex_cell=item.ex_cell,
|
|
|
- ex_row=item.ex_row,
|
|
|
- ex_unit=item.ex_unit,
|
|
|
- ex_amount=item.ex_amount,
|
|
|
- created_by=task.created_by,
|
|
|
- )
|
|
|
- cls._quota_store.create_quota(quota)
|
|
|
- cls._logger.debug(f"插入数据完成:{task.task_name}")
|
|
|
- return True
|
|
|
- except Exception as e:
|
|
|
- msg = f"插入数据失败,原因:{e}"
|
|
|
- cls._logger.error(msg)
|
|
|
- return False,msg
|
|
|
|
|
|
@classmethod
|
|
|
def get_project_running_state(cls, project_id: str) -> bool:
|
|
|
- return project_id in cls._is_running and cls._is_running[project_id] or len(cls._running_projects) >= cls._max_concurrent_projects
|
|
|
+ return project_id in cls._is_running and cls._is_running[project_id]
|
|
|
|
|
|
|
|
|
|