import threading,utils,os from models.project_data import ProjectModel,SubProjectModel, SubProjectItemModel from stores.project_store import ProjectStore import data_process,data_send class ProjectService: def __init__(self): self._project_store = ProjectStore() self._logger = utils.get_logger() def get_all_project_paginated(self, page: int, per_page: int, keyword: str = None) ->(list[ProjectModel], int): sub_project_list, total_count = self._project_store.query_project_all_paginated(page, per_page, keyword) return sub_project_list, total_count def get_project_by_id(self ,project_id:int): project = self._project_store.query_project_by_id(project_id) return project def add_project(self, project:ProjectModel): try: project.create_by = 'admin' if project.create_by is None else project.create_by self._project_store.insert_project(project) return True, '' except Exception as e: self._logger.error(f"添加项目失败:{e}") return False, '添加项目失败' def update_project(self, project:ProjectModel): project = self._project_store.query_project_by_id(project.id) if project: self._project_store.update_project(project) return True, '' else: return False, '项目不存在' def delete_project(self, project_id:int): project = self._project_store.query_project_by_id(project_id) if project: self._project_store.delete_project(project_id) return True,'' else: return False, '项目不存在' def get_all_sub_project_paginated(self, project_id: int, page: int, per_page: int, keyword: str = None, status: int = None) ->(list[SubProjectModel], int): sub_project_list, total_count = self._project_store.query_sub_project_all_paginated(project_id, page, per_page, keyword, status) return sub_project_list, total_count def get_all_sub_project(self) ->list[SubProjectModel]: sub_project_list = self._project_store.query_sub_project_all() return sub_project_list def get_sub_project_by_id(self, sub_project_id:int) ->SubProjectModel: project = self._project_store.query_sub_project(sub_project_id) return project def save_sub_project(self, sub_project:SubProjectModel, project_files: list, delete_old: bool): if sub_project.id: sub_data = self._project_store.query_sub_project(sub_project.id) if not sub_data: return False, '子项目不存在' sub_data.sub_project_name = sub_project.sub_project_name sub_data.standard_version = sub_project.standard_version sub_data.work_catalog = sub_project.work_catalog sub_data.work_content = sub_project.work_content sub_data.status = 0 self.update_sub_project(sub_data) else: sub_data = SubProjectModel( project_id=sub_project.project_id, sub_project_name=sub_project.sub_project_name, standard_version=sub_project.standard_version, work_catalog=sub_project.work_catalog, work_content=sub_project.work_content, file_paths="", status=0 ) new_id = self.add_sub_project(sub_data) sub_data.id = new_id try: # 处理文件上传逻辑 if project_files: file_paths = self._process_file_upload(sub_data, project_files, delete_old) self.update_sub_project_file_paths(sub_data.id, file_paths) elif delete_old: return False,"请上传项目数据文件" return True, '' except ValueError as ve: return False,str(ve) except Exception as e: return False,f'服务器错误: {str(e)}' def _process_file_upload(self,sub_project: SubProjectModel, files: list, delete_old: bool) -> str: """处理文件上传流程""" base_path = utils.get_config_value('file.source_path', './temp_files') sub_project_dir = os.path.join(base_path, f'{sub_project.get_path()}') os.makedirs(sub_project_dir, exist_ok=True) self._logger.info(f"保存处理文件,项目ID:{sub_project.project_id},子项目ID:{sub_project.id}") if delete_old: if os.path.exists(sub_project_dir): for filename in os.listdir(sub_project_dir): file_path = os.path.join(sub_project_dir, filename) if os.path.isfile(file_path): os.remove(file_path) file_paths =[] if delete_old or not sub_project.file_paths else sub_project.file_paths.split(',') for file in files: if not file.filename: continue allowed_ext = {'xlsx', 'xls', 'csv'} ext = file.filename.rsplit('.', 1)[-1].lower() if ext not in allowed_ext: continue file_path = os.path.join(sub_project_dir, file.filename) file_path = file_path.replace('\\', '/') file.save(file_path) file_paths.append(file_path) return ','.join(file_paths) def add_sub_project(self, sub_project:SubProjectModel): return self._project_store.insert_sub_project(sub_project) def update_sub_project(self, sub_project:SubProjectModel): self._project_store.update_sub_project(sub_project) def update_sub_project_file_paths(self, sub_project_id: int, paths: str): self._project_store.update_sub_project_file_path(sub_project_id, paths) def start_sub_project_task(self, sub_project_id: int) -> (bool, str): data = self._project_store.query_sub_project(sub_project_id) if data: if data.status == 21 or data.status == 31: return False, '正在采集数据中' if data.status == 22 or data.status == 32: return False, '正在分析处理中' if data.status == 32 or data.status == 33: return False, '正在上传数据中' thread = threading.Thread(target = self._process_and_send_sub_project, args=(data,)) thread.start() return True, '' else: return False, '项目不存在' def _process_and_send_sub_project(self, sub_project: SubProjectModel) : # 启动分析处理 if data_process.process_project(sub_project): # 更新远程数据 data_send.send_project_data(sub_project) self._project_store.update_sub_project_status(sub_project.id, 5) def process_sub_project(self, sub_project_id:int): data = self._project_store.query_sub_project(sub_project_id) if data: if data.status == 21 or data.status == 31: return False, '正在采集数据中' if data.status == 22 or data.status == 32: return False, '正在分析处理中' if data.status == 32 or data.status == 33: return False, '正在上传数据中' thread = threading.Thread(target=self._process_sub_project, args=(data,)) thread.start() return True, '' else: return False, '项目不存在' def _process_sub_project(self, sub_project: SubProjectModel): if data_process.process_data(sub_project): return True, '' else: self._logger.error(f"分析处理失败:{sub_project.sub_project_name}") return False, '分析处理失败' def start_send_sub_project(self, sub_project_id: int) -> (bool, str): data = self._project_store.query_sub_project(sub_project_id) if data: if data.status == 21 or data.status == 31: return False, '正在采集数据中' if data.status == 22 or data.status == 32: return False, '正在分析处理中' if data.status == 32 or data.status == 33: return False, '正在上传数据中' thread = threading.Thread(target=self._send_sub_project_data, args=(data,)) thread.start() def _send_sub_project_data(self, project: SubProjectModel): if data_send.send_project_data(project): self._project_store.update_sub_project_status(project.id, 5) return True, '' else: return False, '上传数据失败' def delete_sub_project(self, sub_project_id:int): project = self._project_store.query_sub_project(sub_project_id) if project: self._project_store.delete_sub_project(sub_project_id) return True,'' else: return False, '项目不存在' def get_sub_project_item_list_by_sub_project_paginated(self, project_id: int, page: int, per_page: int, keyword: str = None, process_status:int = None, send_status:int=None) -> (list[SubProjectItemModel], int): return self._project_store.query_sub_project_items_by_project_paginated(project_id, page, per_page, keyword, process_status, send_status) def get_sub_project_item_list_by_sub_project_id(self, project_id:int) ->list[SubProjectItemModel]: return self._project_store.query_sub_project_items_by_project(project_id) def get_sub_project_item_by_id(self, item_id: int): return self._project_store.query_sub_project_item_by_id(item_id) def add_sub_project_item(self, item:SubProjectItemModel): project = self._project_store.query_sub_project(item.sub_project_id) project_item = SubProjectItemModel( project_id=project.project_id, sub_project_id=item.sub_project_id, device_name=item.device_name, device_model=item.device_model, device_unit=item.device_unit, standard_version=project.standard_version, standard_no=item.standard_no, ) return self._project_store.insert_sub_project_item(project_item) def update_sub_project_item(self, item:SubProjectItemModel): project_item = self._project_store.query_sub_project_item_by_id(item.id) if project_item: project_item.device_name = item.device_name project_item.device_model = item.device_model project_item.device_unit = item.device_unit project_item.device_count = float(item.device_count) project_item.standard_no = item.standard_no self._project_store.update_sub_project_item(project_item) return True else: return False def delete_sub_project_item(self, item_id:int): project_item = self._project_store.query_sub_project_item_by_id(item_id) if project_item: self._project_store.delete_sub_project_item_by_id(project_item.id) return True else: return False def start_process_item(self, item_id:int): project_item = self._project_store.query_sub_project_item_by_id(item_id) if not project_item: return False, '项目不存在' thread = threading.Thread(target=self._process_sub_project_item, args=(project_item,)) thread.start() return True,"" def _process_sub_project_item(self, sub_project_item:SubProjectItemModel): if data_process.process_item(sub_project_item): return data_send.send_item_data(sub_project_item),"" else: self._logger.error(f"分析处理失败:{sub_project_item.device_name}") return False, '分析处理失败' def start_send_item(self, item_id:int): project_item = self._project_store.query_sub_project_item_by_id(item_id) if not project_item: return False, '项目不存在' thread = threading.Thread(target=self._send_sub_project_item, args=(project_item,)) thread.start() return True,"" def _send_sub_project_item(self, sub_project_item:SubProjectItemModel): if data_send.send_item_data(sub_project_item): return True, '' else: self._logger.error(f"上传失败:{sub_project_item.device_name}") return False, '上传数据失败'