123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265 |
- import threading,utils,os
- from models.project_data import ProjectModel,SubProjectModel, SubProjectItemModel
- from stores.project_store import ProjectStore
- import data_process,data_send
- class ProjectService:
- def __init__(self):
- self._project_store = ProjectStore()
- self._logger = utils.get_logger()
- def get_all_project_paginated(self, page: int, per_page: int, keyword: str = None) ->(list[ProjectModel], int):
- sub_project_list, total_count = self._project_store.query_project_all_paginated(page, per_page, keyword)
- return sub_project_list, total_count
- def get_project_by_id(self ,project_id:int):
- project = self._project_store.query_project_by_id(project_id)
- return project
- def add_project(self, project:ProjectModel):
- try:
- project.create_by = 'admin' if project.create_by is None else project.create_by
- self._project_store.insert_project(project)
- return True, ''
- except Exception as e:
- self._logger.error(f"添加项目失败:{e}")
- return False, '添加项目失败'
- def update_project(self, project:ProjectModel):
- project = self._project_store.query_project_by_id(project.id)
- if project:
- self._project_store.update_project(project)
- return True, ''
- else:
- return False, '项目不存在'
- def delete_project(self, project_id:int):
- project = self._project_store.query_project_by_id(project_id)
- if project:
- self._project_store.delete_project(project_id)
- return True,''
- else:
- return False, '项目不存在'
- def get_all_sub_project_paginated(self, project_id: int, page: int, per_page: int, keyword: str = None, status: int = None) ->(list[SubProjectModel], int):
- sub_project_list, total_count = self._project_store.query_sub_project_all_paginated(project_id, page, per_page, keyword, status)
- return sub_project_list, total_count
- def get_all_sub_project(self) ->list[SubProjectModel]:
- sub_project_list = self._project_store.query_sub_project_all()
- return sub_project_list
- def get_sub_project_by_id(self, sub_project_id:int) ->SubProjectModel:
- project = self._project_store.query_sub_project(sub_project_id)
- return project
- def save_sub_project(self, sub_project:SubProjectModel, project_files: list, delete_old: bool):
- if sub_project.id:
- sub_data = self._project_store.query_sub_project(sub_project.id)
- if not sub_data:
- return False, '子项目不存在'
- sub_data.sub_project_name = sub_project.sub_project_name
- sub_data.standard_version = sub_project.standard_version
- sub_data.work_catalog = sub_project.work_catalog
- sub_data.work_content = sub_project.work_content
- sub_data.status = 0
- self.update_sub_project(sub_data)
- else:
- sub_data = SubProjectModel(
- project_id=sub_project.project_id,
- sub_project_name=sub_project.sub_project_name,
- standard_version=sub_project.standard_version,
- work_catalog=sub_project.work_catalog,
- work_content=sub_project.work_content,
- file_paths="",
- status=0
- )
- new_id = self.add_sub_project(sub_data)
- sub_data.id = new_id
- try:
- # 处理文件上传逻辑
- if project_files:
- file_paths = self._process_file_upload(sub_data, project_files, delete_old)
- self.update_sub_project_file_paths(sub_data.id, file_paths)
- elif delete_old:
- return False,"请上传项目数据文件"
- return True, ''
- except ValueError as ve:
- return False,str(ve)
- except Exception as e:
- return False,f'服务器错误: {str(e)}'
- def _process_file_upload(self,sub_project: SubProjectModel, files: list, delete_old: bool) -> str:
- """处理文件上传流程"""
- base_path = utils.get_config_value('file.source_path', './temp_files')
- sub_project_dir = os.path.join(base_path, f'{sub_project.get_path()}')
- os.makedirs(sub_project_dir, exist_ok=True)
- self._logger.info(f"保存处理文件,项目ID:{sub_project.project_id},子项目ID:{sub_project.id}")
- if delete_old:
- if os.path.exists(sub_project_dir):
- for filename in os.listdir(sub_project_dir):
- file_path = os.path.join(sub_project_dir, filename)
- if os.path.isfile(file_path):
- os.remove(file_path)
- file_paths =[] if delete_old or not sub_project.file_paths else sub_project.file_paths.split(',')
- for file in files:
- if not file.filename:
- continue
- allowed_ext = {'xlsx', 'xls', 'csv'}
- ext = file.filename.rsplit('.', 1)[-1].lower()
- if ext not in allowed_ext:
- continue
- file_path = os.path.join(sub_project_dir, file.filename)
- file_path = file_path.replace('\\', '/')
- file.save(file_path)
- file_paths.append(file_path)
- return ','.join(file_paths)
- def add_sub_project(self, sub_project:SubProjectModel):
- return self._project_store.insert_sub_project(sub_project)
- def update_sub_project(self, sub_project:SubProjectModel):
- self._project_store.update_sub_project(sub_project)
- def update_sub_project_file_paths(self, sub_project_id: int, paths: str):
- self._project_store.update_sub_project_file_path(sub_project_id, paths)
- def start_sub_project_task(self, sub_project_id: int) -> (bool, str):
- data = self._project_store.query_sub_project(sub_project_id)
- if data:
- if data.status == 21 or data.status == 31:
- return False, '正在采集数据中'
- if data.status == 22 or data.status == 32:
- return False, '正在分析处理中'
- if data.status == 32 or data.status == 33:
- return False, '正在上传数据中'
- thread = threading.Thread(target = self._process_and_send_sub_project, args=(data,))
- thread.start()
- return True, ''
- else:
- return False, '项目不存在'
- def _process_and_send_sub_project(self, sub_project: SubProjectModel) :
- # 启动分析处理
- if data_process.process_project(sub_project):
- # 更新远程数据
- data_send.send_project_data(sub_project)
- self._project_store.update_sub_project_status(sub_project.id, 5)
- def process_sub_project(self, sub_project_id:int):
- data = self._project_store.query_sub_project(sub_project_id)
- if data:
- if data.status == 21 or data.status == 31:
- return False, '正在采集数据中'
- if data.status == 22 or data.status == 32:
- return False, '正在分析处理中'
- if data.status == 32 or data.status == 33:
- return False, '正在上传数据中'
- thread = threading.Thread(target=self._process_sub_project, args=(data,))
- thread.start()
- return True, ''
- else:
- return False, '项目不存在'
- def _process_sub_project(self, sub_project: SubProjectModel):
- if data_process.process_data(sub_project):
- return True, ''
- else:
- self._logger.error(f"分析处理失败:{sub_project.sub_project_name}")
- return False, '分析处理失败'
- def start_send_sub_project(self, sub_project_id: int) -> (bool, str):
- data = self._project_store.query_sub_project(sub_project_id)
- if data:
- if data.status == 21 or data.status == 31:
- return False, '正在采集数据中'
- if data.status == 22 or data.status == 32:
- return False, '正在分析处理中'
- if data.status == 32 or data.status == 33:
- return False, '正在上传数据中'
- thread = threading.Thread(target=self._send_sub_project_data, args=(data,))
- thread.start()
- def _send_sub_project_data(self, project: SubProjectModel):
- if data_send.send_project_data(project):
- self._project_store.update_sub_project_status(project.id, 5)
- return True, ''
- else:
- return False, '上传数据失败'
- def delete_sub_project(self, sub_project_id:int):
- project = self._project_store.query_sub_project(sub_project_id)
- if project:
- self._project_store.delete_sub_project(sub_project_id)
- return True,''
- else:
- return False, '项目不存在'
- def get_sub_project_item_list_by_sub_project_paginated(self, project_id: int, page: int, per_page: int, keyword: str = None, process_status:int = None, send_status:int=None) -> (list[SubProjectItemModel], int):
- return self._project_store.query_sub_project_items_by_project_paginated(project_id, page, per_page, keyword, process_status, send_status)
- def get_sub_project_item_list_by_sub_project_id(self, project_id:int) ->list[SubProjectItemModel]:
- return self._project_store.query_sub_project_items_by_project(project_id)
- def get_sub_project_item_by_id(self, item_id: int):
- return self._project_store.query_sub_project_item_by_id(item_id)
- def add_sub_project_item(self, item:SubProjectItemModel):
- project = self._project_store.query_sub_project(item.sub_project_id)
- project_item = SubProjectItemModel(
- project_id=project.project_id,
- sub_project_id=item.sub_project_id,
- device_name=item.device_name,
- device_model=item.device_model,
- device_unit=item.device_unit,
- standard_version=project.standard_version,
- standard_no=item.standard_no,
- )
- return self._project_store.insert_sub_project_item(project_item)
- def update_sub_project_item(self, item:SubProjectItemModel):
- project_item = self._project_store.query_sub_project_item_by_id(item.id)
- if project_item:
- project_item.device_name = item.device_name
- project_item.device_model = item.device_model
- project_item.device_unit = item.device_unit
- project_item.device_count = float(item.device_count)
- project_item.standard_no = item.standard_no
- self._project_store.update_sub_project_item(project_item)
- return True
- else:
- return False
- def delete_sub_project_item(self, item_id:int):
- project_item = self._project_store.query_sub_project_item_by_id(item_id)
- if project_item:
- self._project_store.delete_sub_project_item_by_id(project_item.id)
- return True
- else:
- return False
- def start_process_item(self, item_id:int):
- project_item = self._project_store.query_sub_project_item_by_id(item_id)
- if not project_item:
- return False, '项目不存在'
- thread = threading.Thread(target=self._process_sub_project_item, args=(project_item,))
- thread.start()
- return True,""
- def _process_sub_project_item(self, sub_project_item:SubProjectItemModel):
- if data_process.process_item(sub_project_item):
- return data_send.send_item_data(sub_project_item),""
- else:
- self._logger.error(f"分析处理失败:{sub_project_item.device_name}")
- return False, '分析处理失败'
- def start_send_item(self, item_id:int):
- project_item = self._project_store.query_sub_project_item_by_id(item_id)
- if not project_item:
- return False, '项目不存在'
- thread = threading.Thread(target=self._send_sub_project_item, args=(project_item,))
- thread.start()
- return True,""
- def _send_sub_project_item(self, sub_project_item:SubProjectItemModel):
- if data_send.send_item_data(sub_project_item):
- return True, ''
- else:
- self._logger.error(f"上传失败:{sub_project_item.device_name}")
- return False, '上传数据失败'
|