1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071 |
- from utils.logger_helper import LoggerHelper
- from utils.ai_helper import AiHelper
- from stores.data_store_interface import IDataStore
- from models.collect_data import CollectData
- from models.process_data import ProcessData
- class DataProcess:
- logger = LoggerHelper.get_logger()
- _store = None
- def __init__(self, store: IDataStore):
- self._store = store
- @property
- def store(self) -> IDataStore:
- return self._store
- def process(self):
- try:
- urls = self.store.query_urls_to_process()
- for item in urls:
- self._process_item(item)
- self.store.save_process_data(True)
- except Exception as e:
- self.logger.error(f"数据处理过程中发生异常: {e}")
- def _process_item(self, url: str) -> None:
- self.logger.info("START ==>" + url)
- item = self.store.query_one_collect_by_url(url)
- if not item:
- self.logger.info("END: NOT FOUND URL==>" + url)
- return
- if item.status == 1:
- self.logger.info("ALREADY URL==>" + url)
- return
- data = self._ai_process(item)
- if data:
- old = None
- if data.no:
- old = self.store.query_one_process_by_no(data.no)
- if not old:
- data.url = url
- data.keyword = item.keyword
- self.store.insert_process_data(data)
- else:
- if old.url != url:
- if old.other_urls:
- old.other_urls += f",{url}"
- else:
- old.other_urls = url
- self.store.set_process_other_urls(data.url, old.other_urls)
- self.logger.info(f"ALREADY 编号: {data.no} URL:{old.other_urls}")
- self.logger.info("END ==>" + url)
- def _ai_process(self, item: CollectData) -> ProcessData | None:
- try:
- data = AiHelper().call_ai(item.content)
- return data
- except Exception as e:
- self.logger.error(f"AI 提取数据失败: {item.url} {e}")
- return None
- # def _generate_unique_id(self) -> str:
- # from datetime import datetime
- # current_time = datetime.now().strftime("%Y%m%d%H%M%S%f")
- # thread_id = threading.current_thread().ident
- # unique_id = f"{current_time}-{thread_id}"
- # return unique_id
|