process_data.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. from datetime import datetime
  2. from utils.mysql_helper import MySQLHelper
  3. from utils.config_helper import ConfigHelper
  4. from utils.logger_helper import LoggerHelper
  5. class ProcessData:
  6. logger = LoggerHelper.get_logger()
  7. def __init__(self,
  8. no=None,
  9. title=None,
  10. url=None,
  11. keyword=None,
  12. date=None,
  13. area=None,
  14. address=None,
  15. summary=None,
  16. release_date=None,
  17. devices=None,
  18. attach_path=None,
  19. status=None,
  20. create_time=None,
  21. send_time=None,
  22. other_urls=None,
  23. remark=None):
  24. self.no = no
  25. self.title = title
  26. self.url = url
  27. self.date = date
  28. if not area:
  29. area = ConfigHelper().get("default_area", "全国")
  30. self.area = area.replace(" ", "")
  31. self.keyword = keyword
  32. self.address = address
  33. self.summary = summary
  34. self.release_date = release_date
  35. self.devices = devices
  36. self.attach_path = attach_path
  37. self.status = status
  38. self.create_time = create_time or datetime.now()
  39. self.send_time = send_time
  40. self.other_urls = other_urls
  41. self.remark = remark
  42. def __repr__(self):
  43. return (
  44. f"ProcessData(no={self.no}, title={self.title}, date={self.date}, "
  45. f"area={self.area}, address={self.address}, summary={self.summary}, "
  46. f"status={self.status}, create_time={self.create_time}, "
  47. f"send_time={self.send_time}, remark={self.remark})")
  48. _insert_query = """
  49. INSERT IGNORE INTO t_data (no, title, url, keyword, date, area, address, summary, release_date, devices, attach_path, status, create_time)
  50. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  51. """
  52. _update_query = """
  53. UPDATE t_collect_data SET status = 1 WHERE url = %s;
  54. """
  55. def insert(self, process_data):
  56. if not isinstance(process_data, self.__class__):
  57. raise TypeError("process_data 不是 ProcessData 的实例")
  58. insert_params = (process_data.no,
  59. process_data.title,
  60. process_data.url,
  61. process_data.keyword,
  62. process_data.date,
  63. process_data.area,
  64. process_data.address,
  65. process_data.summary,
  66. process_data.release_date,
  67. process_data.devices,
  68. process_data.attach_path,
  69. 0,
  70. datetime.now())
  71. update_params = (process_data.url, )
  72. with MySQLHelper() as db_helper:
  73. db_helper.execute_non_query(self._insert_query, insert_params)
  74. db_helper.execute_non_query(self._update_query, update_params)
  75. def insert_batch(self, process_data_list):
  76. if not all(
  77. isinstance(process_data, self.__class__)
  78. for process_data in process_data_list):
  79. raise TypeError("process_data_list 中的所有元素必须是 ProcessData 的实例")
  80. insert_params = [(
  81. process_data.no,
  82. process_data.title,
  83. process_data.url,
  84. process_data.keyword,
  85. process_data.date,
  86. process_data.area,
  87. process_data.address,
  88. process_data.summary,
  89. process_data.release_date,
  90. process_data.devices,
  91. process_data.attach_path,
  92. 0,
  93. datetime.now(),
  94. ) for process_data in process_data_list]
  95. update_params = [(process_data.url, )
  96. for process_data in process_data_list]
  97. with MySQLHelper() as db_helper:
  98. db_helper.execute_non_query(self._insert_query, insert_params)
  99. affected_rows = db_helper.connection.affected_rows()
  100. self.logger.info(f"成功插入 {affected_rows} 条数据")
  101. for param in update_params:
  102. db_helper.execute_non_query(self._update_query, param)
  103. return affected_rows
  104. _one_query = """
  105. SELECT url,no,other_urls,attach_path FROM t_data WHERE no = %s LIMIT 1
  106. """
  107. def fetch_one_process_by_no(self, no: str):
  108. with MySQLHelper() as db_helper:
  109. result = db_helper.fetch_one(self._one_query, (no, ))
  110. if not result:
  111. return None
  112. data = ProcessData(url=result["url"],
  113. no=result["no"],
  114. other_urls=result["other_urls"],
  115. attach_path=result["attach_path"])
  116. return data
  117. def fetch_no_send(self):
  118. with MySQLHelper() as db_helper:
  119. query = "SELECT no, title, url, keyword, date, area, address, summary, attach_path, release_date FROM t_data WHERE status = 0"
  120. results = db_helper.execute_query(query)
  121. data = [ProcessData(**result) for result in results]
  122. return data
  123. def set_send(self, no):
  124. with MySQLHelper() as db_helper:
  125. query = """
  126. UPDATE t_data
  127. SET status = 1, send_time = %s
  128. WHERE no = %s
  129. """
  130. params = (datetime.now(), no)
  131. db_helper.execute_non_query(query, params)
  132. def set_other_urls(self, url, other_urls):
  133. with MySQLHelper() as db_helper:
  134. query = """
  135. UPDATE t_data
  136. SET other_urls = %s
  137. WHERE url = %s
  138. """
  139. update_query = """
  140. UPDATE t_collect_data SET status = 1 WHERE url = %s;
  141. """
  142. params = (other_urls, url)
  143. db_helper.execute_non_query(query, params)
  144. db_helper.execute_non_query(update_query, (url, ))
  145. def check_is_process_by_url(self, url):
  146. with MySQLHelper() as db_helper:
  147. query = "SELECT * FROM t_data WHERE url = %s"
  148. params = (url, )
  149. results = db_helper.execute_query(query, params)
  150. return True if results else False