data_process.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. from models.process_result_data import ProcessResultData
  2. from utils.logger_helper import LoggerHelper
  3. from utils.config_helper import ConfigHelper
  4. from utils.ai_helper import AiHelper
  5. from stores.data_store_interface import IDataStore
  6. from models.collect_data import CollectData
  7. from models.process_data import ProcessData
  8. class DataProcess:
  9. logger = LoggerHelper.get_logger()
  10. config = ConfigHelper()
  11. _store = None
  12. DEFAULT_AI_SYSTEM_PROMPT = "请帮我分析以下文字,提取出关键信息,并以json格式字符串返回,如果部分信息为空,则该字段返回为空。"
  13. DEFAULT_AI_PROMPT_TEMPLATE_1 = """在以上内容中提取信息:
  14. 编号(no) 、标题(title)、在哪个城市招标(area)、开标的时间(date)、开标的地点(address)、发布时间(release_date)、150字左右的招标条件要求及联系方式等内容摘要(summary), 设备(devices)。
  15. 提取出相关设备的名称信息, 多个设备以逗号分割。
  16. 返回包含no, title, area, date, address, release_date, summary, devices字段的json格式字符串,没有找到或未提供的信息json字段为空。
  17. """
  18. DEFAULT_AI_PROMPT_TEMPLATE_2 = """在以上内容中提取信息:
  19. 编号(no) 、标题(title)、公告时间(date)、标中的总价格(price)、标中的公司,多个以逗号分割(bidder)、150-300字的标的物说明,标的物价格,公司的明细等内容摘要(summary),设备(devices)。
  20. 提取出相关设备的名称信息,多个设备以逗号分割。返回包含no,title,date,price,bidder,summary字段的json格式字符串,没有找到或未提供的信息json字段为空 """
  21. def __init__(self, store: IDataStore):
  22. self._store = store
  23. self._ai_system_prompt = self.config.get("ai.system_prompt",
  24. self.DEFAULT_AI_SYSTEM_PROMPT)
  25. self._ai_prompt_template_1 = self.config.get(
  26. "ai.prompt_template_1", self.DEFAULT_AI_PROMPT_TEMPLATE_1)
  27. self._ai_prompt_template_2 = self.config.get(
  28. "ai.prompt_template_2", self.DEFAULT_AI_PROMPT_TEMPLATE_2)
  29. @property
  30. def store(self) -> IDataStore:
  31. return self._store
  32. def process(self):
  33. try:
  34. urls = self.store.query_urls_to_process()
  35. for item in urls:
  36. self._process_item(item)
  37. self.store.save_process_data(True)
  38. self.store.save_process_result_data(True)
  39. except Exception as e:
  40. self.logger.error(f"数据处理发生异常: {e}")
  41. raise Exception(f"数据处理发生异常: {e}")
  42. def _process_item(self, url: str) -> None:
  43. try:
  44. self.logger.info(f"START ==>URL:{url}")
  45. item = self.store.query_one_collect_by_url(url)
  46. if not item:
  47. self.logger.info(f"END==> NOT FOUND URL:{url}")
  48. return
  49. if item.status == 1:
  50. self.logger.info(f"ALREADY1 URL:{url}")
  51. return
  52. data = self.store.query_one_process_by_url(
  53. url
  54. ) if item.data_type == 0 else self.store.query_one_process_result_by_url(
  55. url)
  56. if data:
  57. self.logger.info(f"ALREADY2 [{item.data_type}] URL==> {url}")
  58. return
  59. data = self._ai_process_1(
  60. item) if item.data_type == 0 else self._ai_process_2(item)
  61. if data:
  62. old = None
  63. if data.no:
  64. old = self.store.query_one_process_result_by_no(
  65. data.no
  66. ) if item.data_type == 0 else self.store.query_one_process_by_no(
  67. data.no)
  68. if not old:
  69. data.url = url
  70. data.keyword = item.keyword
  71. data.attach_path = item.attach_path
  72. if item.data_type == 0:
  73. self.store.insert_process_data(data)
  74. else:
  75. self.store.insert_process_result_data(data)
  76. else:
  77. if old.url != url:
  78. if old.other_urls:
  79. old.other_urls += f",{url}"
  80. else:
  81. old.other_urls = url
  82. if item.data_type == 0:
  83. self.store.set_process_other_urls(
  84. data.url, old.other_urls)
  85. else:
  86. self.store.set_process_result_other_urls(
  87. data.url, old.other_urls)
  88. self.logger.info(
  89. f"ALREADY 编号: {data.no} URL:{old.other_urls}")
  90. self.logger.info("END ==>" + url)
  91. except Exception as e:
  92. self.logger.error(f"数据处理发生异常: {url} {e}")
  93. def _ai_process_1(self, item: CollectData) -> ProcessData | None:
  94. try:
  95. data = AiHelper().call_openai(
  96. self._ai_system_prompt,
  97. f"{item.content} {self._ai_prompt_template_1}")
  98. area_str = data.get("area")
  99. if "省" in area_str:
  100. area_str_arr = area_str.split("省")
  101. area_str = area_str_arr[1] if len(
  102. area_str_arr) > 1 else area_str_arr[0]
  103. if "市" in area_str:
  104. area_str_arr = area_str.split("市")
  105. area_str = area_str_arr[1] if len(
  106. area_str_arr) > 1 else area_str_arr[0]
  107. return ProcessData(
  108. no=data.get("no"),
  109. title=data.get("title"),
  110. date=data.get("date"),
  111. area=area_str,
  112. address=data.get("address"),
  113. devices=data.get("devices"),
  114. summary=data.get("summary"),
  115. release_date=data.get("release_date"),
  116. prompt_tokens=data.get("prompt_tokens"),
  117. completion_tokens=data.get("completion_tokens"),
  118. total_tokens=data.get("total_tokens"),
  119. )
  120. except Exception as e:
  121. self.logger.error(f"AI 提取数据失败1: {item.url} {e}")
  122. return None
  123. def _ai_process_2(self, item: CollectData) -> ProcessResultData | None:
  124. try:
  125. data = AiHelper().call_openai(
  126. self._ai_system_prompt,
  127. f"{item.content} {self._ai_prompt_template_2}")
  128. return ProcessResultData(
  129. no=data.get("no"),
  130. title=data.get("title"),
  131. date=data.get("date"),
  132. price=data.get("price"),
  133. bidder=data.get("bidder"),
  134. summary=data.get("summary"),
  135. prompt_tokens=data.get("prompt_tokens"),
  136. completion_tokens=data.get("completion_tokens"),
  137. total_tokens=data.get("total_tokens"),
  138. )
  139. except Exception as e:
  140. self.logger.error(f"AI 提取数据失败2: {item.url} {e}")
  141. return None