123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- import utils, pandas as pd,os
- from pathlib import Path
- from models.project_data import ProjectModel, ProjectItemModel
- from stores.mysql_store import MysqlStore
- from ai.fast_gpt import FastGPTAi
- class PreProcess:
- def __init__(self):
- self._logger= utils.get_logger()
- self._store= MysqlStore()
- # self._ai = utils.AiHelper()
- self._ai = FastGPTAi()
- self._ai_sys_prompt = "从给定信息中提取结构化信息,并返回json压缩成一行的字符串,如果部分信息为空或nan,则该字段返回为空。"
- self.separate_ai_calls = False
- self._data={}
- def run(self, project: ProjectModel, separate_ai_calls=True) ->bool:
- try:
- self._logger.info(f"开始预处理项目:{project.project_name}[{project.project_no}_{project.standard_version}] ")
- self._store.update_project_status(project.id,21)
- self.separate_ai_calls = separate_ai_calls
- raw_data = self.read_excels(f"{utils.get_config_value("file.source_path","./temp_files")}/project/{project.project_no}/")
- if self.separate_ai_calls:
- for filename, sheets in raw_data.items():
- excel_data = self.normalize_data({filename: sheets})
- self.call_ai(project.id, excel_data)
- else:
- excel_data = self.normalize_data(raw_data)
- self.call_ai(project.id, excel_data)
- project.items = self._data[project.id].items
- self._store.re_insert_project(project)
- del self._data[project.id]
- self._logger.info(f"结束预处理项目:{project.project_name}[{project.project_no}_{project.standard_version}] [设备条数:{len(project.items)}]")
- self._store.update_project_status(project.id,31)
- return True
- except Exception as e:
- self._logger.error(f"预处理项目失败:[{project.project_no}] 错误: {e}")
- self._store.update_project_status(project.id,11)
- return False
- def read_excels(self, folder_path):
- path = Path(folder_path)
- # 验证路径是否存在
- if not path.exists():
- raise FileNotFoundError(f"目录不存在: {path.resolve()}")
- # 验证是否是目录
- if not path.is_dir():
- raise NotADirectoryError(f"路径不是目录: {path.resolve()}")
- excel_files = list(path.glob('*'))
- if not excel_files:
- self._logger.warning(f"警告:未找到任何文件")
- raw_data = {}
- for file in excel_files:
- try:
- # 使用确定的表头行读取整个 Excel 文件
- # 读取所有sheet
- sheets = pd.read_excel(file, sheet_name=None)
- sheet_data = {}
- for sheet_name, df in sheets.items():
- # 读取前几行以确定表头
- header_row = self.determine_header_row(df,file.name)
- if header_row >= 0:
- # 使用确定的表头行重新读取Sheet
- df = df.iloc[header_row:] # 直接使用确定的表头行
- df.columns = df.iloc[0] # 设置表头
- df = df[1:] # 去掉表头行
- df.reset_index(drop=True, inplace=True) # 重置索引
- sheet_data[sheet_name] = df
- raw_data[file.name] = sheet_data
- self._logger.debug(f"读取 {file.name} 的 {len(sheet_data)} 个sheet")
- except Exception as e:
- self._logger.error(f"读取 {file.name} 失败: {e}")
- return raw_data
- def determine_header_row(self, df,file_name):
- if any('序号' in str(cell) or 'No.' in str(cell) for cell in df.head()):
- self._logger.debug(f"{file_name}表头包含“序号”")
- return -1
- for i, row in df.iterrows():
- if any('序号' in str(cell) or 'No.' in str(cell) for cell in row):
- self._logger.debug(f"{file_name}找到包含“序号”的行:{i}")
- return i
- # 如果没有找到包含“序号”的行,默认使用第0行作为表头
- self._logger.warning(f"{file_name}未找到包含“序号”的行,使用默认的第1行作为表头")
- return -1
- @staticmethod
- def normalize_data(raw_data):
- excel_data = []
- for filename, sheets in raw_data.items():
- for sheet_name, df in sheets.items():
- # 统一编码/去除空行等
- df = df.map(lambda x: x.strip() if isinstance(x, str) else x)
- # df.columns = ['名称', '型号规格','单位']
- # 去除空行
- df = df.dropna(how='all')
- # 去除空行 及 型号规格为空
- # df = df.dropna(subset=['型号规格'], how='any')
- excel_data.append({
- "filename": filename,
- "sheet": sheet_name,
- # "data": df.to_dict(orient='records')
- "data": df.to_markdown()
- })
- return excel_data
- @staticmethod
- def prompt_template(excel_data):
- prompt = """
- 请从以下表格数据中提取结构化信息,要求:
- 1. 识别字段类型:数值/文本/日期
- 2. 提取结构化信息:```typescript
- export interface device {
- n: string; //物料名称
- m: string; //型号规格
- u:string; //单位
- c: float; //数量
- }
- ```
- 3. 返回结构体device的数组的json数组格式,压缩成一行
- """
- prompt += "数据记录:\n"
- for data in excel_data:
- # for row in data['data']:
- # prompt += f"{row}\n"
- prompt += f"{data['data']}\n"
- print("AI_PROMPT: " + prompt)
- return prompt
- def call_ai(self, project_id:int, excel_data):
- project = ProjectModel(project_id=project_id)
- api_key = utils.get_config_value("fastgpt.api_key_pre_process")
- if self.separate_ai_calls:
- # 初始化self._data[project_no],避免在循环中重复初始化
- for data in excel_data:
- prompt = self.prompt_template([data])
- response = self._ai.call_ai(prompt,api_key)
- project.items.extend(self.format_data(project_id, response))
- # response = {'data': [{'n': '阻燃型导线穿管敷设', 'm': 'WDZB1-BYJ-750V-2.5mm2', 'u': '米', 'c': 900.0}, {'n': '阻燃型导线穿管敷设', 'm': 'WDZB1-BYJ-750V-4mm2', 'u': '米', 'c': 800.0}, {'n': '耐火型导线穿管敷设', 'm': 'WDZB1N-BYJ-750V-2.5mm2', 'u': '米', 'c': 200.0}, {'n': '防火堵料', 'm': '', 'u': '公斤', 'c': 10.0}, {'n': '防火漆', 'm': '10kg/', '': '桶', 'c': 2.0}, {'n': '镀锌钢管', 'm': 'SC20', 'u': '米', 'c': 580.0}, {'n': '接地线', 'm': '热浸镀锌扁钢25x4', 'u': '米', 'c': 50.0}, {'n': '局部等电位端子箱', 'm': '', 'u': '个', 'c': 2.0}, {'n': '双联单控照明开关', 'm': '~250V 10A', 'u': '个', 'c': 4.0}, {'n': '密闭双联单控照明开关', 'm': '~250V 10A', 'u': '个', 'c': 4.0}, {'n': '配合空调室外机移位', 'm': '', 'u': '项', 'c': 1.0}, {'n': '应急照明灯', 'm': '220V,10W', 'u': '套', 'c': 4.0}, {'n': '门禁', 'm': '', 'u': '套', 'c': 4.0}, {'n': '配电线路改移', 'm': '开槽、移点位等', 'u': '项', 'c': 1.0}, {'n': '烘手器插座', 'm': '220V,10A,密闭型', 'u': '个', 'c': 2.0}], 'completion_tokens': 439, 'prompt_tokens': 766, 'total_tokens': 1205}
- # 更新数据部分
- # project.items.extend(self.format_data(project_id, response["data"]))
- # project.completion_tokens += response["completion_tokens"]
- # project.prompt_tokens += response["prompt_tokens"]
- # project.total_tokens += response["total_tokens"]
- else:
- prompt = self.prompt_template(excel_data)
- response = self._ai.call_ai(prompt,api_key)
- project.items.extend(self.format_data(project_id, response))
- # project.completion_tokens = response["completion_tokens"]
- # project.prompt_tokens = response["prompt_tokens"]
- # project.total_tokens = response["total_tokens"]
- # project.items = self.format_data(project_id, response["data"])
- self._data[project_id] = project
- def format_data(self,project_id,new_data) ->list[ProjectItemModel]:
- formatted_data = []
- for data in new_data:
- try:
- item = ProjectItemModel(
- project_id=project_id,
- device_name="",
- device_model="",
- device_unit="",
- )
- if data.get('n'):
- item.device_name = data['n']
- if data.get('m'):
- item.device_model = data['m']
- if data.get('u'):
- item.device_unit = data['u']
- if data.get('c'):
- item.device_count = data['c']
- formatted_data.append(item)
- except Exception as e:
- self._logger.error(f"格式化数据时出错: {data} {e}")
- return formatted_data
- def run_1(self, project: ProjectModel) ->bool:
- self._logger.info(f"开始预处理项目:{project.project_name}[{project.project_no}_{project.standard_version}] ")
- self._store.update_project_status(project.id, 21)
- file_path = f"{utils.get_config_value("file.source_path", "./temp_files")}/project/{project.project_no}/"
- file = os.listdir(file_path)[0]
- if not file:
- self._logger.error(f"项目:{project.project_no} 没有找到文件")
- return False
- if not self.check_file_type(file):
- self._logger.error(f"项目:{project.project_no} 文件格式不正确")
- return False
- try:
- prompt="""从上传的表格数据中提取信息,要求:
- 1. 识别字段类型:数值/文本/日期
- 2. 提取信息结构体:```typescript
- type item {
- n: string; //物料名称
- m: string; //型号规格
- u:string; //单位
- c: float; //数量,数量多列的话要求和
- }
- ```
- 3. 返回压缩成一行的item数组的json字符串
- """
- api_key= utils.get_config_value("fastgpt.api_key_pre_process")
- data = self._ai.call_ai_with_file(file_path+file, prompt,api_key)
- # data = utils.AiHelper().call_openai_with_file(file_path+file,"", user_prompt=prompt,api_model="qwen-max")
- if isinstance(data, str):
- import json
- data= json.loads(data)
- res_data = self.format_data(project.id, data)
- if len(res_data)<=0:
- self._logger.error(f"项目:{project.project_no} 文件处理失败: {data}")
- self._store.update_project_status(project.id, 11)
- return False
- project.items = res_data
- self._store.re_insert_project(project)
- self._logger.info(
- f"结束预处理项目:{project.project_name}[{project.project_no}_{project.standard_version}] [设备条数:{len(project.items)}]")
- self._store.update_project_status(project.id, 31)
- return True
- except Exception as e:
- self._logger.error(f"项目:{project.project_no} 文件处理失败: {e}")
- self._store.update_project_status(project.id,11)
- return False
- @staticmethod
- def check_file_type(file_name:str) ->bool:
- file_type = file_name.split('.')[-1]
- if file_type not in ['xlsx','xls']:
- return False
- return True
|