data_process.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. from utils.logger_helper import LoggerHelper
  2. from utils.ai_helper import AiHelper
  3. from stores.data_store_interface import IDataStore
  4. from models.collect_data import CollectData
  5. from models.process_data import ProcessData
  6. class DataProcess:
  7. logger = LoggerHelper.get_logger()
  8. _store = None
  9. def __init__(self, store: IDataStore):
  10. self._store = store
  11. @property
  12. def store(self) -> IDataStore:
  13. return self._store
  14. def process(self):
  15. try:
  16. urls = self.store.query_urls_to_process()
  17. for item in urls:
  18. self._process_item(item)
  19. self.store.save_process_data(True)
  20. except Exception as e:
  21. self.logger.error(f"数据处理过程中发生异常: {e}")
  22. def _process_item(self, url: str) -> None:
  23. self.logger.info("START ==>" + url)
  24. item = self.store.query_one_collect_by_url(url)
  25. if not item:
  26. self.logger.info("END: NOT FOUND URL==>" + url)
  27. return
  28. if item.status == 1:
  29. self.logger.info("ALREADY URL==>" + url)
  30. return
  31. data = self._ai_process(item)
  32. if data:
  33. old = None
  34. if data.no:
  35. old = self.store.query_one_process_by_no(data.no)
  36. if not old:
  37. data.url = url
  38. data.keyword = item.keyword
  39. self.store.insert_process_data(data)
  40. else:
  41. if old.url != url:
  42. if old.other_urls:
  43. old.other_urls += f",{url}"
  44. else:
  45. old.other_urls = url
  46. self.store.set_process_other_urls(data.url, old.other_urls)
  47. self.logger.info(f"ALREADY 编号: {data.no} URL:{old.other_urls}")
  48. self.logger.info("END ==>" + url)
  49. def _ai_process(self, item: CollectData) -> ProcessData | None:
  50. try:
  51. data = AiHelper().call_ai(item.content)
  52. return data
  53. except Exception as e:
  54. self.logger.error(f"AI 提取数据失败: {item.url} {e}")
  55. return None
  56. # def _generate_unique_id(self) -> str:
  57. # from datetime import datetime
  58. # current_time = datetime.now().strftime("%Y%m%d%H%M%S%f")
  59. # thread_id = threading.current_thread().ident
  60. # unique_id = f"{current_time}-{thread_id}"
  61. # return unique_id