process_data.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. import utils
  2. from datetime import datetime
  3. from utils.mysql_helper import MySQLHelper
  4. class ProcessData:
  5. def __init__(
  6. self,
  7. no=None,
  8. title=None,
  9. url=None,
  10. keyword=None,
  11. date=None,
  12. area=None,
  13. address=None,
  14. summary=None,
  15. release_date=None,
  16. devices=None,
  17. attach_path=None,
  18. status=None,
  19. create_time=None,
  20. send_time=None,
  21. other_urls=None,
  22. prompt_tokens=None,
  23. completion_tokens=None,
  24. total_tokens=None,
  25. remark=None,
  26. ):
  27. self.no = no
  28. self.title = title
  29. self.url = url
  30. self.date = date
  31. if not area:
  32. area = utils.get_config_value("default_area", "全国")
  33. self.area = area.replace(" ", "")
  34. self.keyword = keyword
  35. self.address = address
  36. self.summary = summary
  37. self.release_date = release_date
  38. self.devices = devices
  39. self.attach_path = attach_path
  40. self.status = status
  41. self.create_time = create_time or datetime.now()
  42. self.send_time = send_time
  43. self.other_urls = other_urls
  44. self.prompt_tokens = prompt_tokens
  45. self.completion_tokens = completion_tokens
  46. self.total_tokens = total_tokens
  47. self.remark = remark
  48. def __repr__(self):
  49. return (
  50. f"ProcessData(no={self.no}, title={self.title}, date={self.date}, "
  51. f"area={self.area}, address={self.address}, summary={self.summary}, "
  52. f"status={self.status}, create_time={self.create_time}, "
  53. f"send_time={self.send_time}, remark={self.remark})"
  54. )
  55. _insert_query = """
  56. INSERT IGNORE INTO t_data (no, title, url, keyword, date, area, address, summary, release_date, devices, attach_path, status, create_time, prompt_tokens, completion_tokens, total_tokens)
  57. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  58. """
  59. # _update_query = """
  60. # UPDATE t_collect_data SET status = 1 WHERE url = %s;
  61. # """
  62. def insert(self, process_data):
  63. if not isinstance(process_data, self.__class__):
  64. raise TypeError("process_data 不是 ProcessData 的实例")
  65. insert_params = (
  66. process_data.no,
  67. process_data.title,
  68. process_data.url,
  69. process_data.keyword,
  70. process_data.date,
  71. process_data.area,
  72. process_data.address,
  73. process_data.summary,
  74. process_data.release_date,
  75. process_data.devices,
  76. process_data.attach_path,
  77. 0,
  78. datetime.now(),
  79. process_data.prompt_tokens,
  80. process_data.completion_tokens,
  81. process_data.total_tokens,
  82. )
  83. # update_params = (process_data.url, )
  84. with MySQLHelper() as db_helper:
  85. db_helper.execute_non_query(self._insert_query, insert_params)
  86. # db_helper.execute_non_query(self._update_query, update_params)
  87. def insert_batch(self, process_data_list):
  88. if not all(
  89. isinstance(process_data, self.__class__)
  90. for process_data in process_data_list
  91. ):
  92. raise TypeError("process_data_list 中的所有元素必须是 ProcessData 的实例")
  93. insert_params = [
  94. (
  95. process_data.no,
  96. process_data.title,
  97. process_data.url,
  98. process_data.keyword,
  99. process_data.date,
  100. process_data.area,
  101. process_data.address,
  102. process_data.summary,
  103. process_data.release_date,
  104. process_data.devices,
  105. process_data.attach_path,
  106. 0,
  107. datetime.now(),
  108. process_data.prompt_tokens,
  109. process_data.completion_tokens,
  110. process_data.total_tokens,
  111. )
  112. for process_data in process_data_list
  113. ]
  114. # update_params = [(process_data.url, )
  115. # for process_data in process_data_list]
  116. with MySQLHelper() as db_helper:
  117. db_helper.execute_non_query(self._insert_query, insert_params)
  118. affected_rows = db_helper.connection.affected_rows()
  119. utils.get_logger().info(f"成功插入 {affected_rows} 条数据")
  120. # for param in update_params:
  121. # db_helper.execute_non_query(self._update_query, param)
  122. return affected_rows
  123. _one_url_query = (
  124. "SELECT url,no,other_urls,attach_path FROM t_data WHERE url = %s LIMIT 1"
  125. )
  126. def fetch_one_process_by_url(self, url: str):
  127. with MySQLHelper() as db_helper:
  128. result = db_helper.fetch_one(self._one_url_query, (url,))
  129. if not result:
  130. return None
  131. data = ProcessData(
  132. url=result["url"],
  133. no=result["no"],
  134. other_urls=result["other_urls"],
  135. attach_path=result["attach_path"],
  136. )
  137. return data
  138. _one_no_query = (
  139. "SELECT url,no,other_urls,attach_path FROM t_data WHERE no = %s LIMIT 1"
  140. )
  141. def fetch_one_process_by_no(self, no: str):
  142. with MySQLHelper() as db_helper:
  143. result = db_helper.fetch_one(self._one_no_query, (no,))
  144. if not result:
  145. return None
  146. data = ProcessData(
  147. url=result["url"],
  148. no=result["no"],
  149. other_urls=result["other_urls"],
  150. attach_path=result["attach_path"],
  151. )
  152. return data
  153. _not_send_query = "SELECT no, title, url, keyword, date, area, address, summary, attach_path, release_date FROM t_data WHERE status = 0"
  154. def fetch_not_send(self):
  155. with MySQLHelper() as db_helper:
  156. results = db_helper.execute_query(self._not_send_query)
  157. data = [ProcessData(**result) for result in results]
  158. return data
  159. _set_send_query = "UPDATE t_data SET status = 1, send_time = %s WHERE no = %s"
  160. def set_send(self, no):
  161. with MySQLHelper() as db_helper:
  162. params = (datetime.now(), no)
  163. db_helper.execute_non_query(self._set_send_query, params)
  164. _update_other_urls_query = "UPDATE t_data SET other_urls = %s WHERE url = %s"
  165. def set_other_urls(self, url, other_urls):
  166. with MySQLHelper() as db_helper:
  167. params = (other_urls, url)
  168. db_helper.execute_non_query(self._update_other_urls_query, params)
  169. _delete_before_date_query = "DELETE FROM t_data WHERE date < %s"
  170. def delete_before_date(self, date: str):
  171. """
  172. 删除指定日期之前的数据
  173. :param date: 日期字符串,格式为 YYYY-MM-DD
  174. :return: 删除的行数
  175. """
  176. with MySQLHelper() as db_helper:
  177. params = (date,)
  178. db_helper.execute_non_query(self._delete_before_date_query, params)
  179. affected_rows = db_helper.connection.affected_rows()
  180. utils.get_logger().info(
  181. f"删除 {date} 之前共 {affected_rows} 条 招标处理记录。"
  182. )
  183. return affected_rows