process_result_data.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. import json
  2. import utils
  3. from datetime import datetime
  4. from utils.mysql_helper import MySQLHelper
  5. class InstrumentData:
  6. def __init__(
  7. self,
  8. company: str,
  9. name: str,
  10. manufacturer: str,
  11. model: str,
  12. quantity: int,
  13. unit_price: float,
  14. ):
  15. self.company = company # 中标单位名称,参与竞标并中标的公司名称
  16. self.name = name # 仪器名称,例如:红外光谱仪
  17. self.manufacturer = manufacturer # 仪器厂商,例如:赛默飞、Bruker
  18. self.model = model # 仪器的型号 / 规格,例如:NIR25S
  19. self.quantity = quantity # 中标仪器的数量,台数,例如:2
  20. self.unit_price = unit_price # 仪器的单价,单位转
  21. def to_str(self) -> str:
  22. return f"[名称:{self.name},中标单位:{self.company},仪器厂商:{self.manufacturer},仪器规格:{self.model},数量:{self.quantity},单价:{self.unit_price}]"
  23. class ProcessResultData:
  24. def __init__(
  25. self,
  26. no=None,
  27. title=None,
  28. url=None,
  29. keyword=None,
  30. provice=None,
  31. city=None,
  32. date=None,
  33. instruments=None,
  34. instruments_o=None,
  35. summary=None,
  36. attach_path=None,
  37. status=None,
  38. create_time=None,
  39. send_time=None,
  40. other_urls=None,
  41. prompt_tokens=None,
  42. completion_tokens=None,
  43. total_tokens=None,
  44. remark=None,
  45. ):
  46. self.no = no
  47. self.title = title
  48. self.url = url
  49. self.keyword = keyword
  50. self.date = date
  51. self.instruments_str = ""
  52. self.instruments = []
  53. self.set_instruments(instruments, instruments_o)
  54. self.provice = provice.replace("省", "").replace("市", "") if provice else ""
  55. self.city = (
  56. city.replace("市", "").replace("区", "").replace("县", "") if city else ""
  57. )
  58. if self.provice == self.city:
  59. self.provice = ""
  60. self.summary = summary
  61. self.attach_path = attach_path
  62. self.status = status
  63. self.create_time = create_time or datetime.now()
  64. self.send_time = send_time
  65. self.other_urls = other_urls
  66. self.prompt_tokens = prompt_tokens
  67. self.completion_tokens = completion_tokens
  68. self.total_tokens = total_tokens
  69. self.remark = remark
  70. def __repr__(self):
  71. return (
  72. f"ProcessResultData(no={self.no}, title={self.title}, date={self.date}, "
  73. f"keyword={self.keyword}, provice={self.provice},city={self.city},instruments={self.instruments_str} summary={self.summary}, attach_path={self.attach_path}, "
  74. f"status={self.status}, create_time={self.create_time}, "
  75. f"send_time={self.send_time}, remark={self.remark})"
  76. )
  77. def set_instruments(self, instruments_str: str, instruments):
  78. if instruments is None:
  79. instruments = []
  80. if instruments_str:
  81. self.instruments_str = instruments_str
  82. self.instruments = [
  83. InstrumentData(**instrument)
  84. for instrument in json.loads(self.instruments_str)
  85. ]
  86. else:
  87. self.instruments = instruments or []
  88. self.instruments_str = (
  89. json.dumps(
  90. instruments,
  91. ensure_ascii=False,
  92. )
  93. if len(instruments) > 0
  94. else ""
  95. )
  96. _insert_query = """
  97. INSERT IGNORE INTO t_data_result (no, title, url, keyword, date, provice, city, instruments, summary, attach_path, status, create_time, prompt_tokens, completion_tokens, total_tokens)
  98. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  99. """
  100. # _update_query = """
  101. # UPDATE t_collect_data SET status = 1 WHERE url = %s;
  102. # """
  103. def insert(self, process_result_data):
  104. if not isinstance(process_result_data, self.__class__):
  105. raise TypeError("process_result_data 不是 ProcessResultData 的实例")
  106. insert_params = (
  107. process_result_data.no,
  108. process_result_data.title,
  109. process_result_data.url,
  110. process_result_data.keyword,
  111. process_result_data.date,
  112. process_result_data.provice,
  113. process_result_data.city,
  114. process_result_data.instruments_str,
  115. process_result_data.summary,
  116. process_result_data.attach_path,
  117. 0,
  118. datetime.now(),
  119. process_result_data.prompt_tokens,
  120. process_result_data.completion_tokens,
  121. process_result_data.total_tokens,
  122. )
  123. # update_params = (process_result_data.url, )
  124. with MySQLHelper() as db_helper:
  125. db_helper.execute_non_query(self._insert_query, insert_params)
  126. # db_helper.execute_non_query(self._update_query, update_params)
  127. utils.get_logger().info(f"共插入 1 条结果处理数据")
  128. def insert_batch(self, process_result_data_list):
  129. if not all(
  130. isinstance(process_result_data, self.__class__)
  131. for process_result_data in process_result_data_list
  132. ):
  133. raise TypeError(
  134. "process_result_data_list 中的所有元素必须是 ProcessResultData 的实例"
  135. )
  136. insert_params = [
  137. (
  138. process_result_data.no,
  139. process_result_data.title,
  140. process_result_data.url,
  141. process_result_data.keyword,
  142. process_result_data.date,
  143. process_result_data.provice,
  144. process_result_data.city,
  145. process_result_data.instruments_str,
  146. process_result_data.summary,
  147. process_result_data.attach_path,
  148. 0,
  149. datetime.now(),
  150. process_result_data.prompt_tokens,
  151. process_result_data.completion_tokens,
  152. process_result_data.total_tokens,
  153. )
  154. for process_result_data in process_result_data_list
  155. ]
  156. # update_params = [(process_result_data.url, )
  157. # for process_result_data in process_result_data_list]
  158. with MySQLHelper() as db_helper:
  159. db_helper.execute_non_query(self._insert_query, insert_params)
  160. affected_rows = db_helper.connection.affected_rows()
  161. utils.get_logger().info(f"共插入 {affected_rows} 条结果处理数据")
  162. # for param in update_params:
  163. # db_helper.execute_non_query(self._update_query, param)
  164. return affected_rows
  165. _one_url_query = """
  166. SELECT url,no,other_urls,attach_path FROM t_data_result WHERE url = %s LIMIT 1
  167. """
  168. def fetch_one_process_by_url(self, url: str):
  169. with MySQLHelper() as db_helper:
  170. result = db_helper.fetch_one(self._one_url_query, (url,))
  171. if not result:
  172. return None
  173. data = ProcessResultData(
  174. no=result["no"],
  175. url=result["url"],
  176. attach_path=result["attach_path"],
  177. other_urls=result["other_urls"],
  178. )
  179. return data
  180. _one_no_query = """
  181. SELECT url,no,other_urls,attach_path FROM t_data_result WHERE no = %s LIMIT 1
  182. """
  183. def fetch_one_process_by_no(self, no: str):
  184. with MySQLHelper() as db_helper:
  185. result = db_helper.fetch_one(self._one_no_query, (no,))
  186. if not result:
  187. return None
  188. data = ProcessResultData(
  189. no=result["no"],
  190. url=result["url"],
  191. attach_path=result["attach_path"],
  192. other_urls=result["other_urls"],
  193. )
  194. return data
  195. _not_send_query = "SELECT no, title, url, keyword, date, provice, city, instruments, summary, attach_path, status, create_time, send_time FROM t_data_result WHERE status = 0"
  196. def fetch_not_send(self):
  197. with MySQLHelper() as db_helper:
  198. results = db_helper.execute_query(self._not_send_query)
  199. data = [ProcessResultData(**result) for result in results]
  200. return data
  201. _update_send_status_query = """
  202. UPDATE t_data_result SET status = 1, send_time = %s WHERE no = %s
  203. """
  204. def set_send(self, no):
  205. with MySQLHelper() as db_helper:
  206. params = (datetime.now(), no)
  207. db_helper.execute_non_query(self._update_send_status_query, params)
  208. _update_other_urls_query = "UPDATE t_data_result SET other_urls = %s WHERE url = %s"
  209. def set_other_urls(self, url, other_urls):
  210. with MySQLHelper() as db_helper:
  211. params = (other_urls, url)
  212. db_helper.execute_non_query(self._update_other_urls_query, params)
  213. _query_report = (
  214. "select * from t_data_result where create_time between %s and %s ORDER BY date"
  215. )
  216. def fetch_to_report_by_date(self, start_date, end_date):
  217. """
  218. 获取需要生成报表的数据
  219. :param start_date:
  220. :param end_date:
  221. :return:
  222. """
  223. with MySQLHelper() as db_helper:
  224. params = (start_date, end_date)
  225. results = db_helper.execute_query(self._query_report, params)
  226. data = [ProcessResultData(**result) for result in results]
  227. return data
  228. _delete_before_date_query = "DELETE FROM t_data_result WHERE create_time < %s"
  229. def delete_before_date(self, date: str):
  230. """
  231. 删除指定日期之前的数据
  232. :param date:
  233. :return:
  234. """
  235. with MySQLHelper() as db_helper:
  236. params = (date,)
  237. db_helper.execute_non_query(self._delete_before_date_query, params)
  238. affected_rows = db_helper.connection.affected_rows()
  239. utils.get_logger().info(
  240. f"删除 {date} 之前共 {affected_rows} 条 中标处理记录。"
  241. )
  242. return affected_rows