mysql_data_store.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. from utils.logger_helper import LoggerHelper
  2. from utils.config_helper import ConfigHelper
  3. from stores.data_store_interface import IDataStore
  4. from models.collect_data import CollectData
  5. from models.process_data import ProcessData
  6. from models.area_email import AreaEmail
  7. class MysqlDataStore(IDataStore):
  8. logger = LoggerHelper.get_logger()
  9. config = ConfigHelper()
  10. _collectData = CollectData()
  11. _processData = ProcessData()
  12. _areaEmail = AreaEmail()
  13. def __init__(self):
  14. size = self.config.get('save.collect_batch_size')
  15. if not size:
  16. size = 1
  17. self._collect_size = int(size)
  18. self._collect_list = []
  19. size = self.config.get('save.process_batch_size')
  20. if not size:
  21. size = 1
  22. self._process_size = int(size)
  23. self._process_list = []
  24. def insert_collect_data(self,
  25. url: str,
  26. keyword: str,
  27. content: str,
  28. is_batch=True):
  29. data = CollectData(url, keyword, content, 0)
  30. if not is_batch:
  31. self._collectData.insert(data)
  32. else:
  33. self._collect_list.append(data)
  34. self.save_collect_data()
  35. def save_collect_data(self, is_force=False):
  36. if is_force or len(self._collect_list) >= self._collect_size:
  37. self.logger.info("批量保存到数据库,数量: " + str(len(self._collect_list)))
  38. self._collectData.insert_batch(self._collect_list)
  39. self._collect_list = []
  40. def query_urls_to_process(self):
  41. return self._collectData.fetch_urls_to_process()
  42. def query_one_collect_by_url(self, url):
  43. return self._collectData.fetch_one_collect_by_url(url)
  44. def query_one_process_by_no(self, no):
  45. return self._processData.fetch_one_process_by_no(no)
  46. def insert_process_data(self, data: ProcessData, is_batch=True):
  47. if not is_batch:
  48. self._processData.insert(data)
  49. else:
  50. self._process_list.append(data)
  51. self.save_process_data()
  52. # 插入到数据库时会把CollectData设为已处理
  53. def save_process_data(self, is_force=False):
  54. if is_force or len(self._process_list) >= self._process_size:
  55. self.logger.info("批量保存到数据库,数量: " + str(len(self._process_list)))
  56. self._processData.insert_batch(self._process_list)
  57. self._process_list = []
  58. def set_process_other_urls(self, url, other_urls: str):
  59. return self._processData.set_other_urls(url, other_urls)
  60. def check_url_is_process(self, url: str) -> bool:
  61. return self._processData.check_is_process_by_url(url)
  62. def query_to_send(self):
  63. return self._processData.fetch_no_send()
  64. def set_send(self, no: str):
  65. self._processData.set_send(no)
  66. def get_email_by_area(self, area: str) -> str:
  67. return self._areaEmail.fetch_one_by_area(area)