process_data.py 7.7 KB


  1. import utils
  2. from datetime import datetime
  3. from utils.mysql_helper import MySQLHelper
  4. class ProcessData:
  5. def __init__(
  6. self,
  7. no=None,
  8. title=None,
  9. url=None,
  10. keyword=None,
  11. date=None,
  12. province=None,
  13. city=None,
  14. address=None,
  15. budget=None,
  16. summary=None,
  17. release_date=None,
  18. devices=None,
  19. attach_path=None,
  20. status=None,
  21. create_time=None,
  22. send_time=None,
  23. other_urls=None,
  24. prompt_tokens=None,
  25. completion_tokens=None,
  26. total_tokens=None,
  27. remark=None,
  28. ):
  29. self.no = no
  30. self.title = title
  31. self.url = url
  32. self.date = date
  33. self.province = province.replace("省", "").replace("市", "") if province else ""
  34. self.city = (
  35. city.replace("市", "").replace("区", "").replace("县", "") if city else ""
  36. )
  37. if self.province == self.city:
  38. self.province = ""
  39. self.keyword = keyword
  40. self.budget = budget
  41. self.address = address
  42. self.summary = summary
  43. self.release_date = release_date
  44. self.devices = devices
  45. self.attach_path = attach_path
  46. self.status = status
  47. self.create_time = create_time or datetime.now()
  48. self.send_time = send_time
  49. self.other_urls = other_urls
  50. self.prompt_tokens = prompt_tokens
  51. self.completion_tokens = completion_tokens
  52. self.total_tokens = total_tokens
  53. self.remark = remark
  54. def __repr__(self):
  55. return (
  56. f"ProcessData(no={self.no}, title={self.title}, date={self.date}, "
  57. f"province={self.province},city={self.city}, address={self.address}, summary={self.summary}, "
  58. f"status={self.status}, create_time={self.create_time}, "
  59. f"send_time={self.send_time}, remark={self.remark})"
  60. )
  61. _insert_query = """
  62. INSERT IGNORE INTO t_data (no, title, url, keyword, date, province, city, address, budget, summary, release_date, devices, attach_path, status, create_time, prompt_tokens, completion_tokens, total_tokens)
  63. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  64. """
  65. # _update_query = """
  66. # UPDATE t_collect_data SET status = 1 WHERE url = %s;
  67. # """
  68. def insert(self, process_data):
  69. if not isinstance(process_data, self.__class__):
  70. raise TypeError("process_data 不是 ProcessData 的实例")
  71. insert_params = (
  72. process_data.no,
  73. process_data.title,
  74. process_data.url,
  75. process_data.keyword,
  76. process_data.date,
  77. process_data.province,
  78. process_data.city,
  79. process_data.address,
  80. process_data.budget,
  81. process_data.summary,
  82. process_data.release_date,
  83. process_data.devices,
  84. process_data.attach_path,
  85. 0,
  86. datetime.now(),
  87. process_data.prompt_tokens,
  88. process_data.completion_tokens,
  89. process_data.total_tokens,
  90. )
  91. # update_params = (process_data.url, )
  92. with MySQLHelper() as db_helper:
  93. db_helper.execute_non_query(self._insert_query, insert_params)
  94. # db_helper.execute_non_query(self._update_query, update_params)
  95. utils.get_logger().info(f"共插入 1 条处理数据")
  96. def insert_batch(self, process_data_list):
  97. if not all(
  98. isinstance(process_data, self.__class__)
  99. for process_data in process_data_list
  100. ):
  101. raise TypeError("process_data_list 中的所有元素必须是 ProcessData 的实例")
  102. insert_params = [
  103. (
  104. process_data.no,
  105. process_data.title,
  106. process_data.url,
  107. process_data.keyword,
  108. process_data.date,
  109. process_data.province,
  110. process_data.city,
  111. process_data.address,
  112. process_data.budget,
  113. process_data.summary,
  114. process_data.release_date,
  115. process_data.devices,
  116. process_data.attach_path,
  117. 0,
  118. datetime.now(),
  119. process_data.prompt_tokens,
  120. process_data.completion_tokens,
  121. process_data.total_tokens,
  122. )
  123. for process_data in process_data_list
  124. ]
  125. # update_params = [(process_data.url, )
  126. # for process_data in process_data_list]
  127. with MySQLHelper() as db_helper:
  128. db_helper.execute_non_query(self._insert_query, insert_params)
  129. affected_rows = db_helper.connection.affected_rows()
  130. utils.get_logger().info(f"共插入 {affected_rows} 条处理数据")
  131. # for param in update_params:
  132. # db_helper.execute_non_query(self._update_query, param)
  133. return affected_rows
  134. _one_url_query = (
  135. "SELECT url,no,other_urls,attach_path FROM t_data WHERE url = %s LIMIT 1"
  136. )
  137. def fetch_one_process_by_url(self, url: str):
  138. with MySQLHelper() as db_helper:
  139. result = db_helper.fetch_one(self._one_url_query, (url,))
  140. if not result:
  141. return None
  142. data = ProcessData(
  143. url=result["url"],
  144. no=result["no"],
  145. other_urls=result["other_urls"],
  146. attach_path=result["attach_path"],
  147. )
  148. return data
  149. _one_no_query = (
  150. "SELECT url,no,other_urls,attach_path FROM t_data WHERE no = %s LIMIT 1"
  151. )
  152. def fetch_one_process_by_no(self, no: str):
  153. with MySQLHelper() as db_helper:
  154. result = db_helper.fetch_one(self._one_no_query, (no,))
  155. if not result:
  156. return None
  157. data = ProcessData(
  158. url=result["url"],
  159. no=result["no"],
  160. other_urls=result["other_urls"],
  161. attach_path=result["attach_path"],
  162. )
  163. return data
  164. _not_send_query = "SELECT no, title, url, keyword, devices,date, city, address, budget, summary, attach_path, release_date FROM t_data WHERE status = 0"
  165. def fetch_not_send(self):
  166. with MySQLHelper() as db_helper:
  167. results = db_helper.execute_query(self._not_send_query)
  168. data = [ProcessData(**result) for result in results]
  169. return data
  170. _set_send_query = "UPDATE t_data SET status = 1, send_time = %s WHERE no = %s"
  171. def set_send(self, no):
  172. with MySQLHelper() as db_helper:
  173. params = (datetime.now(), no)
  174. db_helper.execute_non_query(self._set_send_query, params)
  175. _update_other_urls_query = "UPDATE t_data SET other_urls = %s WHERE url = %s"
  176. def set_other_urls(self, url, other_urls):
  177. with MySQLHelper() as db_helper:
  178. params = (other_urls, url)
  179. db_helper.execute_non_query(self._update_other_urls_query, params)
  180. _delete_before_date_query = "DELETE FROM t_data WHERE date < %s"
  181. def delete_before_date(self, date: str):
  182. """
  183. 删除指定日期之前的数据
  184. :param date: 日期字符串,格式为 YYYY-MM-DD
  185. :return: 删除的行数
  186. """
  187. with MySQLHelper() as db_helper:
  188. params = (date,)
  189. db_helper.execute_non_query(self._delete_before_date_query, params)
  190. affected_rows = db_helper.connection.affected_rows()
  191. utils.get_logger().info(
  192. f"删除 {date} 之前共 {affected_rows} 条 招标处理记录。"
  193. )
  194. return affected_rows