import csv, json, tools.utils as utils, os from tools.stores.mysql_store import MysqlStore from tools.models.standard_model import StandardModel class ImageExtractor: def __init__(self): self._logger = utils.get_logger() self._db_store = MysqlStore() self._base_path = "./temp_files/images/output" self._complete_path="" self._ai = utils.AiHelper() self._err_files=[] self._file_name = "" self._sys_prompt = "请提取图片中的表格,用json格式输出。" self._user_prompt = """提取表格信息,要求: 1. 提取结构化信息:```typescript type item { a: string; //书号 b: string; //定额编号 c:string; //定额名称 d: string; //工作内容 e: string; //单位 f: string; //基本定额 g: float; //基价(元) h: float; //单重(t) i: float; //工费 j: float; //料费 k: float; //机费 l: string; //主材 } ``` 2. 提取的文字中间的空格需要保留,数据没有就留空 3. 确保符号提取准确,例如 kg,m²,m³,直径符号∅等 4. 返回压缩成一行的item数组的json字符串 """ def extract(self,file_name: str): self._file_name = file_name self._err_files =[] path = f"{self._base_path}/img/{self._file_name}/" self._complete_path = f"{self._base_path}/img_complete/{self._file_name}/" os.makedirs(self._complete_path , exist_ok=True) try: self._logger.info(f"开始处理目录: {path}") # 确保目录存在 if not os.path.exists(path): self._logger.error(f"目录不存在: {path}") return # 遍历目录下的所有文件 for root, dirs, files in os.walk(path): for file in files: # 检查是否为图片文件 if file.lower().endswith(('.png', '.jpg', '.jpeg')): image_path = os.path.join(root, file) self.extract_image(image_path) self._logger.info(f"目录处理完成: {path}") if len(self._err_files)>0: self._logger.error(f"----【处理图片失败】-----: {self._err_files}") except Exception as e: self._logger.error(f"处理目录失败 {path}: {e}") def extract_image(self, image_path: str) -> None: try: self._logger.info(f"开始处理图片: {image_path}") # content = self._ai.call_openai_with_image(image_path,self._sys_prompt,self._user_prompt,api_model="qwen2.5-vl-72b-instruct") api_key= utils.get_config_value("fastgpt.api_key") content = self._ai.call_fastgpt_ai_with_image(image_path,self._user_prompt,api_key) self.save_to_db(content) # 保存成功后移动文件到已处理目录 os.rename(image_path, os.path.join(self._complete_path,os.path.basename(image_path))) self._logger.info(f"图片处理完成: {image_path}") except Exception as e: self._err_files.append(image_path) self._logger.error(f"处理图片失败 {image_path}: {e}") def save_to_db(self, data_list: str|list) -> None: try: self._logger.info(f"开始保存图片内到数据库:{data_list}") if isinstance(data_list,str): data_list = json.loads(data_list) for item in data_list: try : standard = StandardModel( book_number=item['a'], quota_number=item['b'], quota_name=item['c'], work_content=item['d'], unit=item['e'], basic_quota=item['f'], base_price=item['g'], unit_weight=item['h'], labor_cost=item['i'], material_cost=item['j'], machine_cost=item['k'], main_material=item['l'] ) if not self._db_store.insert_standard(standard): self._logger.error(f"保存数据到数据库失败: {item}") except Exception as e: self._logger.error(f"保存图片内容失败: {e}") continue except Exception as e: self._logger.error(f"保存图片内容失败: {e}") def export(self): try: self._logger.info(f"开始导出数据库数据") data = self._db_store.query_standard_group_by_book() for k, v in data.items(): # 数据保存为 csv csv_file = f"{self._base_path}/csv/{k}.csv" # 确保目录存在 os.makedirs(os.path.dirname(csv_file), exist_ok=True) with open(csv_file, 'w', newline='', encoding='utf-8-sig') as f: writer = csv.writer(f) writer.writerow(['书号', '定额编号', '定额名称', '工作内容', '单位', '基本定额', '基价(元)', '单重(t)', '工费', '料费', '机费', '主材']) for item in v: # 将 StandardModel 对象的属性提取出来,构造成一个列表 row = [ item.book_number, item.quota_number, item.quota_name, item.work_content, item.unit, item.basic_quota, item.base_price, item.unit_weight, item.labor_cost, item.material_cost, item.machine_cost, item.main_material ] writer.writerow(row) self._logger.info(f"成功导出数据库数据") return data except Exception as e: self._logger.error(f"导出数据库数据失败: {e}")