1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- from utils.logger_helper import LoggerHelper
- from utils.config_helper import ConfigHelper
- from stores.data_store_interface import IDataStore
- from models.collect_data import CollectData
- from models.process_data import ProcessData
- from models.area_email import AreaEmail
- class MysqlDataStore(IDataStore):
- logger = LoggerHelper.get_logger()
- config = ConfigHelper()
- _collectData = CollectData()
- _processData = ProcessData()
- _areaEmail = AreaEmail()
- def __init__(self):
- size = self.config.get('save.collect_batch_size')
- if not size:
- size = 1
- self._collect_size = int(size)
- self._collect_list = []
- size = self.config.get('save.process_batch_size')
- if not size:
- size = 1
- self._process_size = int(size)
- self._process_list = []
- def insert_collect_data(self,
- url: str,
- keyword: str,
- content: str,
- is_batch=True):
- data = CollectData(url, keyword, content, 0)
- if not is_batch:
- self._collectData.insert(data)
- else:
- self._collect_list.append(data)
- self.save_collect_data()
- def save_collect_data(self, is_force=False):
- if is_force or len(self._collect_list) >= self._collect_size:
- self.logger.info("批量保存到数据库,数量: " + str(len(self._collect_list)))
- self._collectData.insert_batch(self._collect_list)
- self._collect_list = []
- def query_urls_to_process(self):
- return self._collectData.fetch_urls_to_process()
- def query_one_collect_by_url(self, url):
- return self._collectData.fetch_one_collect_by_url(url)
- def query_one_process_by_no(self, no):
- return self._processData.fetch_one_process_by_no(no)
- def insert_process_data(self, data: ProcessData, is_batch=True):
- if not is_batch:
- self._processData.insert(data)
- else:
- self._process_list.append(data)
- self.save_process_data()
- # 插入到数据库时会把CollectData设为已处理
- def save_process_data(self, is_force=False):
- if is_force or len(self._process_list) >= self._process_size:
- self.logger.info("批量保存到数据库,数量: " + str(len(self._process_list)))
- self._processData.insert_batch(self._process_list)
- self._process_list = []
- def set_process_other_urls(self, url, other_urls: str):
- return self._processData.set_other_urls(url, other_urls)
- def check_url_is_process(self, url: str) -> bool:
- return self._processData.check_is_process_by_url(url)
- def query_to_send(self):
- return self._processData.fetch_no_send()
- def set_send(self, no: str):
- self._processData.set_send(no)
- def get_email_by_area(self, area: str) -> str:
- return self._areaEmail.fetch_one_by_area(area)
|