123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- from models.process_result_data import ProcessResultData
- from utils.logger_helper import LoggerHelper
- from utils.config_helper import ConfigHelper
- from utils.ai_helper import AiHelper
- from stores.data_store_interface import IDataStore
- from models.collect_data import CollectData
- from models.process_data import ProcessData
- class DataProcess:
- logger = LoggerHelper.get_logger()
- config = ConfigHelper()
- _store = None
- DEFAULT_AI_SYSTEM_PROMPT = "请帮我分析以下文字,提取出关键信息,并以json格式字符串返回,如果部分信息为空,则该字段返回为空。"
- DEFAULT_AI_PROMPT_TEMPLATE_1 = """在以上内容中提取信息:
- 编号(no) 、标题(title)、在哪个城市招标(area)、开标的时间(date)、开标的地点(address)、发布时间(release_date)、150字左右的招标条件要求及联系方式等内容摘要(summary), 设备(devices)。
- 提取出相关设备的名称信息, 多个设备以逗号分割。
- 返回包含no, title, area, date, address, release_date, summary, devices字段的json格式字符串,没有找到或未提供的信息json字段为空。
- """
- DEFAULT_AI_PROMPT_TEMPLATE_2 = """在以上内容中提取信息:
- 编号(no) 、标题(title)、公告时间(date)、标中的总价格(price)、标中的公司,多个以逗号分割(bidder)、150-300字的标的物说明,标的物价格,公司的明细等内容摘要(summary),设备(devices)。
- 提取出相关设备的名称信息,多个设备以逗号分割。返回包含no,title,date,price,bidder,summary字段的json格式字符串,没有找到或未提供的信息json字段为空 """
- def __init__(self, store: IDataStore):
- self._store = store
- self._ai_system_prompt = self.config.get("ai.system_prompt",
- self.DEFAULT_AI_SYSTEM_PROMPT)
- self._ai_prompt_template_1 = self.config.get(
- "ai.prompt_template_1", self.DEFAULT_AI_PROMPT_TEMPLATE_1)
- self._ai_prompt_template_2 = self.config.get(
- "ai.prompt_template_2", self.DEFAULT_AI_PROMPT_TEMPLATE_2)
- @property
- def store(self) -> IDataStore:
- return self._store
- def process(self):
- try:
- urls = self.store.query_urls_to_process()
- for item in urls:
- self._process_item(item)
- self.store.save_process_data(True)
- self.store.save_process_result_data(True)
- except Exception as e:
- self.logger.error(f"数据处理发生异常: {e}")
- raise Exception(f"数据处理发生异常: {e}")
- def _process_item(self, url: str) -> None:
- try:
- self.logger.info(f"START ==>URL:{url}")
- item = self.store.query_one_collect_by_url(url)
- if not item:
- self.logger.info(f"END==> NOT FOUND URL:{url}")
- return
- if item.status == 1:
- self.logger.info(f"ALREADY1 URL:{url}")
- return
- data = self.store.query_one_process_by_url(
- url
- ) if item.data_type == 0 else self.store.query_one_process_result_by_url(
- url)
- if data:
- self.logger.info(f"ALREADY2 [{item.data_type}] URL==> {url}")
- return
- data = self._ai_process_1(
- item) if item.data_type == 0 else self._ai_process_2(item)
- if data:
- old = None
- if data.no:
- old = self.store.query_one_process_result_by_no(
- data.no
- ) if item.data_type == 0 else self.store.query_one_process_by_no(
- data.no)
- if not old:
- data.url = url
- data.keyword = item.keyword
- data.attach_path = item.attach_path
- if item.data_type == 0:
- self.store.insert_process_data(data)
- else:
- self.store.insert_process_result_data(data)
- else:
- if old.url != url:
- if old.other_urls:
- old.other_urls += f",{url}"
- else:
- old.other_urls = url
- if item.data_type == 0:
- self.store.set_process_other_urls(
- data.url, old.other_urls)
- else:
- self.store.set_process_result_other_urls(
- data.url, old.other_urls)
- self.logger.info(
- f"ALREADY 编号: {data.no} URL:{old.other_urls}")
- self.logger.info("END ==>" + url)
- except Exception as e:
- self.logger.error(f"数据处理发生异常: {url} {e}")
- def _ai_process_1(self, item: CollectData) -> ProcessData | None:
- try:
- data = AiHelper().call_openai(
- self._ai_system_prompt,
- f"{item.content} {self._ai_prompt_template_1}")
- area_str = data.get("area")
- if "省" in area_str:
- area_str_arr = area_str.split("省")
- area_str = area_str_arr[1] if len(
- area_str_arr) > 1 else area_str_arr[0]
- if "市" in area_str:
- area_str_arr = area_str.split("市")
- area_str = area_str_arr[1] if len(
- area_str_arr) > 1 else area_str_arr[0]
- return ProcessData(
- no=data.get("no"),
- title=data.get("title"),
- date=data.get("date"),
- area=area_str,
- address=data.get("address"),
- devices=data.get("devices"),
- summary=data.get("summary"),
- release_date=data.get("release_date"),
- prompt_tokens=data.get("prompt_tokens"),
- completion_tokens=data.get("completion_tokens"),
- total_tokens=data.get("total_tokens"),
- )
- except Exception as e:
- self.logger.error(f"AI 提取数据失败1: {item.url} {e}")
- return None
- def _ai_process_2(self, item: CollectData) -> ProcessResultData | None:
- try:
- data = AiHelper().call_openai(
- self._ai_system_prompt,
- f"{item.content} {self._ai_prompt_template_2}")
- return ProcessResultData(
- no=data.get("no"),
- title=data.get("title"),
- date=data.get("date"),
- price=data.get("price"),
- bidder=data.get("bidder"),
- summary=data.get("summary"),
- prompt_tokens=data.get("prompt_tokens"),
- completion_tokens=data.get("completion_tokens"),
- total_tokens=data.get("total_tokens"),
- )
- except Exception as e:
- self.logger.error(f"AI 提取数据失败2: {item.url} {e}")
- return None
|