pre_process.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. import utils, pandas as pd,os
  2. from pathlib import Path
  3. from models.project_data import ProjectModel, ProjectItemModel
  4. from stores.mysql_store import MysqlStore
  5. from ai.fast_gpt import FastGPTAi
  6. class PreProcess:
  7. def __init__(self):
  8. self._logger= utils.get_logger()
  9. self._store= MysqlStore()
  10. # self._ai = utils.AiHelper()
  11. self._ai = FastGPTAi()
  12. self._ai_sys_prompt = "从给定信息中提取结构化信息,并返回json压缩成一行的字符串,如果部分信息为空或nan,则该字段返回为空。"
  13. self.separate_ai_calls = False
  14. self._data={}
  15. def run(self, project: ProjectModel, separate_ai_calls=True) ->bool:
  16. try:
  17. self._logger.info(f"开始预处理项目:{project.project_name}[{project.project_no}_{project.standard_version}] ")
  18. self._store.update_project_status(project.id,21)
  19. self.separate_ai_calls = separate_ai_calls
  20. raw_data = self.read_excels(f"{utils.get_config_value("file.source_path","./temp_files")}/project/{project.project_no}/")
  21. if self.separate_ai_calls:
  22. for filename, sheets in raw_data.items():
  23. excel_data = self.normalize_data({filename: sheets})
  24. self.call_ai(project.id, excel_data)
  25. else:
  26. excel_data = self.normalize_data(raw_data)
  27. self.call_ai(project.id, excel_data)
  28. project.items = self._data[project.id].items
  29. self._store.re_insert_project(project)
  30. del self._data[project.id]
  31. self._logger.info(f"结束预处理项目:{project.project_name}[{project.project_no}_{project.standard_version}] [设备条数:{len(project.items)}]")
  32. self._store.update_project_status(project.id,31)
  33. return True
  34. except Exception as e:
  35. self._logger.error(f"预处理项目失败:[{project.project_no}] 错误: {e}")
  36. self._store.update_project_status(project.id,11)
  37. return False
  38. def read_excels(self, folder_path):
  39. path = Path(folder_path)
  40. # 验证路径是否存在
  41. if not path.exists():
  42. raise FileNotFoundError(f"目录不存在: {path.resolve()}")
  43. # 验证是否是目录
  44. if not path.is_dir():
  45. raise NotADirectoryError(f"路径不是目录: {path.resolve()}")
  46. excel_files = list(path.glob('*'))
  47. if not excel_files:
  48. self._logger.warning(f"警告:未找到任何文件")
  49. raw_data = {}
  50. for file in excel_files:
  51. try:
  52. # 使用确定的表头行读取整个 Excel 文件
  53. # 读取所有sheet
  54. sheets = pd.read_excel(file, sheet_name=None)
  55. sheet_data = {}
  56. for sheet_name, df in sheets.items():
  57. # 读取前几行以确定表头
  58. header_row = self.determine_header_row(df,file.name)
  59. if header_row >= 0:
  60. # 使用确定的表头行重新读取Sheet
  61. df = df.iloc[header_row:] # 直接使用确定的表头行
  62. df.columns = df.iloc[0] # 设置表头
  63. df = df[1:] # 去掉表头行
  64. df.reset_index(drop=True, inplace=True) # 重置索引
  65. sheet_data[sheet_name] = df
  66. raw_data[file.name] = sheet_data
  67. self._logger.debug(f"读取 {file.name} 的 {len(sheet_data)} 个sheet")
  68. except Exception as e:
  69. self._logger.error(f"读取 {file.name} 失败: {e}")
  70. return raw_data
  71. def determine_header_row(self, df,file_name):
  72. if any('序号' in str(cell) or 'No.' in str(cell) for cell in df.head()):
  73. self._logger.debug(f"{file_name}表头包含“序号”")
  74. return -1
  75. for i, row in df.iterrows():
  76. if any('序号' in str(cell) or 'No.' in str(cell) for cell in row):
  77. self._logger.debug(f"{file_name}找到包含“序号”的行:{i}")
  78. return i
  79. # 如果没有找到包含“序号”的行,默认使用第0行作为表头
  80. self._logger.warning(f"{file_name}未找到包含“序号”的行,使用默认的第1行作为表头")
  81. return -1
  82. @staticmethod
  83. def normalize_data(raw_data):
  84. excel_data = []
  85. for filename, sheets in raw_data.items():
  86. for sheet_name, df in sheets.items():
  87. # 统一编码/去除空行等
  88. df = df.map(lambda x: x.strip() if isinstance(x, str) else x)
  89. # df.columns = ['名称', '型号规格','单位']
  90. # 去除空行
  91. df = df.dropna(how='all')
  92. # 去除空行 及 型号规格为空
  93. # df = df.dropna(subset=['型号规格'], how='any')
  94. excel_data.append({
  95. "filename": filename,
  96. "sheet": sheet_name,
  97. # "data": df.to_dict(orient='records')
  98. "data": df.to_markdown()
  99. })
  100. return excel_data
  101. @staticmethod
  102. def prompt_template(excel_data):
  103. prompt = """
  104. 请从以下表格数据中提取结构化信息,要求:
  105. 1. 识别字段类型:数值/文本/日期
  106. 2. 提取结构化信息:```typescript
  107. export interface device {
  108. n: string; //物料名称
  109. m: string; //型号规格
  110. u:string; //单位
  111. c: float; //数量
  112. }
  113. ```
  114. 3. 返回结构体device的数组的json数组格式,压缩成一行
  115. """
  116. prompt += "数据记录:\n"
  117. for data in excel_data:
  118. # for row in data['data']:
  119. # prompt += f"{row}\n"
  120. prompt += f"{data['data']}\n"
  121. print("AI_PROMPT: " + prompt)
  122. return prompt
  123. def call_ai(self, project_id:int, excel_data):
  124. project = ProjectModel(project_id=project_id)
  125. api_key = utils.get_config_value("fastgpt.api_key_pre_process")
  126. if self.separate_ai_calls:
  127. # 初始化self._data[project_no],避免在循环中重复初始化
  128. for data in excel_data:
  129. prompt = self.prompt_template([data])
  130. response = self._ai.call_ai(prompt,api_key)
  131. project.items.extend(self.format_data(project_id, response))
  132. # response = {'data': [{'n': '阻燃型导线穿管敷设', 'm': 'WDZB1-BYJ-750V-2.5mm2', 'u': '米', 'c': 900.0}, {'n': '阻燃型导线穿管敷设', 'm': 'WDZB1-BYJ-750V-4mm2', 'u': '米', 'c': 800.0}, {'n': '耐火型导线穿管敷设', 'm': 'WDZB1N-BYJ-750V-2.5mm2', 'u': '米', 'c': 200.0}, {'n': '防火堵料', 'm': '', 'u': '公斤', 'c': 10.0}, {'n': '防火漆', 'm': '10kg/', '': '桶', 'c': 2.0}, {'n': '镀锌钢管', 'm': 'SC20', 'u': '米', 'c': 580.0}, {'n': '接地线', 'm': '热浸镀锌扁钢25x4', 'u': '米', 'c': 50.0}, {'n': '局部等电位端子箱', 'm': '', 'u': '个', 'c': 2.0}, {'n': '双联单控照明开关', 'm': '~250V 10A', 'u': '个', 'c': 4.0}, {'n': '密闭双联单控照明开关', 'm': '~250V 10A', 'u': '个', 'c': 4.0}, {'n': '配合空调室外机移位', 'm': '', 'u': '项', 'c': 1.0}, {'n': '应急照明灯', 'm': '220V,10W', 'u': '套', 'c': 4.0}, {'n': '门禁', 'm': '', 'u': '套', 'c': 4.0}, {'n': '配电线路改移', 'm': '开槽、移点位等', 'u': '项', 'c': 1.0}, {'n': '烘手器插座', 'm': '220V,10A,密闭型', 'u': '个', 'c': 2.0}], 'completion_tokens': 439, 'prompt_tokens': 766, 'total_tokens': 1205}
  133. # 更新数据部分
  134. # project.items.extend(self.format_data(project_id, response["data"]))
  135. # project.completion_tokens += response["completion_tokens"]
  136. # project.prompt_tokens += response["prompt_tokens"]
  137. # project.total_tokens += response["total_tokens"]
  138. else:
  139. prompt = self.prompt_template(excel_data)
  140. response = self._ai.call_ai(prompt,api_key)
  141. project.items.extend(self.format_data(project_id, response))
  142. # project.completion_tokens = response["completion_tokens"]
  143. # project.prompt_tokens = response["prompt_tokens"]
  144. # project.total_tokens = response["total_tokens"]
  145. # project.items = self.format_data(project_id, response["data"])
  146. self._data[project_id] = project
  147. def format_data(self,project_id,new_data) ->list[ProjectItemModel]:
  148. formatted_data = []
  149. for data in new_data:
  150. try:
  151. item = ProjectItemModel(
  152. project_id=project_id,
  153. device_name="",
  154. device_model="",
  155. device_unit="",
  156. )
  157. if data.get('n'):
  158. item.device_name = data['n']
  159. if data.get('m'):
  160. item.device_model = data['m']
  161. if data.get('u'):
  162. item.device_unit = data['u']
  163. if data.get('c'):
  164. item.device_count = data['c']
  165. formatted_data.append(item)
  166. except Exception as e:
  167. self._logger.error(f"格式化数据时出错: {data} {e}")
  168. return formatted_data
  169. def run_1(self, project: ProjectModel) ->bool:
  170. self._logger.info(f"开始预处理项目:{project.project_name}[{project.project_no}_{project.standard_version}] ")
  171. self._store.update_project_status(project.id, 21)
  172. file_path = f"{utils.get_config_value("file.source_path", "./temp_files")}/project/{project.project_no}/"
  173. file = os.listdir(file_path)[0]
  174. if not file:
  175. self._logger.error(f"项目:{project.project_no} 没有找到文件")
  176. return False
  177. if not self.check_file_type(file):
  178. self._logger.error(f"项目:{project.project_no} 文件格式不正确")
  179. return False
  180. try:
  181. prompt="""从上传的表格数据中提取信息,要求:
  182. 1. 识别字段类型:数值/文本/日期
  183. 2. 提取信息结构体:```typescript
  184. type item {
  185. n: string; //物料名称
  186. m: string; //型号规格
  187. u:string; //单位
  188. c: float; //数量,数量多列的话要求和
  189. }
  190. ```
  191. 3. 返回压缩成一行的item数组的json字符串
  192. """
  193. api_key= utils.get_config_value("fastgpt.api_key_pre_process")
  194. data = self._ai.call_ai_with_file(file_path+file, prompt,api_key)
  195. # data = utils.AiHelper().call_openai_with_file(file_path+file,"", user_prompt=prompt,api_model="qwen-max")
  196. if isinstance(data, str):
  197. import json
  198. data= json.loads(data)
  199. res_data = self.format_data(project.id, data)
  200. if len(res_data)<=0:
  201. self._logger.error(f"项目:{project.project_no} 文件处理失败: {data}")
  202. self._store.update_project_status(project.id, 11)
  203. return False
  204. project.items = res_data
  205. self._store.re_insert_project(project)
  206. self._logger.info(
  207. f"结束预处理项目:{project.project_name}[{project.project_no}_{project.standard_version}] [设备条数:{len(project.items)}]")
  208. self._store.update_project_status(project.id, 31)
  209. return True
  210. except Exception as e:
  211. self._logger.error(f"项目:{project.project_no} 文件处理失败: {e}")
  212. self._store.update_project_status(project.id,11)
  213. return False
  214. @staticmethod
  215. def check_file_type(file_name:str) ->bool:
  216. file_type = file_name.split('.')[-1]
  217. if file_type not in ['xlsx','xls']:
  218. return False
  219. return True