job_runner.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. import threading
  2. from datetime import datetime
  3. import schedule
  4. from dateutil import parser
  5. import utils
  6. from jobs.data_clean import DataClean
  7. from jobs.data_collector import DataCollector
  8. from jobs.data_process import DataProcess
  9. from jobs.data_send import DataSend
  10. from models.url_setting import UrlSetting
  11. from stores.mysql_data_store import MysqlDataStore
  12. class JobRunner:
  13. store = MysqlDataStore() # 复用 store 对象
  14. def run_job(self, is_run_now=True):
  15. try:
  16. utils.get_logger().info("加载任务")
  17. collect_time = utils.get_config_value("job.collect")
  18. process_time = utils.get_config_value("job.process")
  19. send_email_time = utils.get_config_value("job.send_email")
  20. clean_data_time = utils.get_config_value("job.clean_data")
  21. collect_times = self._validate_and_format_time(collect_time, ["06:00"])
  22. for time in collect_times:
  23. utils.get_logger().info(f"{time} 执行 采集处理数据 任务")
  24. schedule.every().day.at(time).do(self._collect_process_job)
  25. process_times = self._validate_and_format_time(
  26. process_time, ["10:00", "15:00", "19:00"]
  27. )
  28. for time in process_times:
  29. utils.get_logger().info(f"{time} 执行 AI处理数据 任务")
  30. schedule.every().day.at(time).do(self._process_job)
  31. send_email_times = self._validate_and_format_time(
  32. send_email_time, ["08:20", "14:00"]
  33. )
  34. for time in send_email_times:
  35. utils.get_logger().info(f"{time} 执行 发送邮件 任务")
  36. schedule.every().day.at(time).do(self._send_job)
  37. if utils.get_config_int("job.send_current_month_report_day") > 0:
  38. report_time = utils.get_config_value(
  39. "job.send_current_month_report_time"
  40. )
  41. times = self._validate_and_format_time(report_time, ["08:20"])
  42. for time in times:
  43. utils.get_logger().info(
  44. f"每月{str(self._get_current_month_report_day()).rjust(2,"0")}日 {time} 执行 发送当月报告 任务"
  45. )
  46. schedule.every().day.at(time).do(self._send_prev_month_report_job)
  47. if utils.get_config_int("job.send_prev_month_report_day") > 0:
  48. report_time = utils.get_config_value("job.send_prev_month_report_time")
  49. times = self._validate_and_format_time(report_time, ["08:20"])
  50. for time in times:
  51. utils.get_logger().info(
  52. f"每月{str(self._get_prev_month_report_day()).rjust(2,"0")}日 {time} 执行 发送上月报告 任务"
  53. )
  54. schedule.every().day.at(time).do(self._send_prev_month_report_job)
  55. clean_data_times = self._validate_and_format_time(
  56. clean_data_time, ["00:05"]
  57. )
  58. utils.get_logger().info(f"{clean_data_times[0]} 执行 清理数据 任务")
  59. schedule.every().day.at(clean_data_times[0]).do(self._clean_job)
  60. urls = UrlSetting().fetch_all()
  61. if not urls or len(urls) == 0:
  62. utils.get_logger().error("未找到任何 URL 设置")
  63. return
  64. utils.get_logger().info(f"共找到 {len(urls)} 个 URL 设置")
  65. for url in urls:
  66. utils.get_logger().info(f"{url}")
  67. if is_run_now and utils.get_config_bool("job.run_now"):
  68. utils.get_logger().info("立即执行采集任务")
  69. self._collect_process_job()
  70. # self._clean_job()
  71. # self._process_job()
  72. # self._send_job()
  73. except Exception as e:
  74. utils.get_logger().error(f"应用程序停止: {e}")
  75. raise e
  76. def restart_job(self):
  77. schedule.clear()
  78. utils.get_logger().info("定时配置更新,重启任务")
  79. self.run_job(False)
  80. def _collect_process_job(self):
  81. threading.Thread(target=self._collect_process).start()
  82. def _collect_process(self):
  83. try:
  84. utils.get_logger().info("开始执行 数据采集处理 任务")
  85. url_setting = UrlSetting()
  86. for url_setting in url_setting.fetch_all():
  87. data_collector = None
  88. try:
  89. utils.get_logger().info(f"开始采集: {url_setting.url}")
  90. data_collector = DataCollector(
  91. url_setting.adapter_type,
  92. url_setting.url,
  93. url_setting.username,
  94. url_setting.password,
  95. self.store,
  96. )
  97. data_collector.collect(url_setting.keywords)
  98. utils.get_logger().info(f"采集完成: {url_setting.url}")
  99. except Exception as e:
  100. self._send_error_email(
  101. "数据采集",
  102. f"\n Type: {url_setting.adapter_type} \n Url: {url_setting.url}\n 错误: {str(e)}",
  103. )
  104. utils.get_logger().error(f"采集发生异常: {e}")
  105. finally:
  106. if data_collector:
  107. data_collector.close()
  108. try:
  109. utils.get_logger().info(f"开始AI处理: {url_setting.url}")
  110. data_process = DataProcess(self.store)
  111. data_process.process()
  112. except Exception as e:
  113. self._send_error_email(
  114. "AI数据处理",
  115. f"\n Type: {url_setting.adapter_type} \n Url: {url_setting.url}\n 错误: {str(e)}",
  116. )
  117. utils.get_logger().error(f"AI处理发生异常: {e}")
  118. break # 中断当前 URL 设置的处理
  119. utils.get_logger().info("数据采集处理 任务执行完毕")
  120. except Exception as e:
  121. utils.get_logger().error(f"数据采集处理 任务执行失败: {e}")
  122. def _process_job(self):
  123. threading.Thread(target=self._process).start()
  124. def _process(self):
  125. try:
  126. utils.get_logger().info("开始执行 AI处理数据 任务")
  127. data_process = DataProcess(self.store)
  128. data_process.process()
  129. utils.get_logger().info("AI处理数据 任务执行完毕")
  130. except Exception as e:
  131. self._send_error_email("AI数据处理", f"\n 错误: {str(e)}")
  132. utils.get_logger().error(f"AI任务 执行失败: {e}")
  133. def _send_job(self):
  134. try:
  135. utils.get_logger().info("开始执行 邮件发送 任务")
  136. DataSend(self.store).send()
  137. utils.get_logger().info("邮件发送 任务执行完毕")
  138. except Exception as e:
  139. self._send_error_email("邮件发送", f"\n 错误: {str(e)}")
  140. utils.get_logger().error(f"邮件发送 任务执行失败: {e}")
  141. def _send_current_month_report_job(self):
  142. try:
  143. if datetime.today().day == self._get_current_month_report_day():
  144. utils.get_logger().info("开始执行 邮件发送当月报告 任务")
  145. DataSend(self.store).send_report_current_month()
  146. utils.get_logger().info("邮件发送当月报告 任务执行完毕")
  147. except Exception as e:
  148. self._send_error_email("邮件发送", f"\n 错误: {str(e)}")
  149. utils.get_logger().error(f"邮件发送当月报告 任务执行失败: {e}")
  150. @staticmethod
  151. def _get_current_month_report_day():
  152. day = utils.get_config_int("job.send_current_month_report_day", 30)
  153. if datetime.today().month == 2 and day > 28:
  154. day = 28
  155. if datetime.today().month in [4, 6, 9, 11] and day > 30:
  156. day = 30
  157. if day > 31:
  158. day = 31
  159. return day
  160. def _send_prev_month_report_job(self):
  161. try:
  162. if datetime.today().day == self._get_prev_month_report_day():
  163. utils.get_logger().info("开始执行 邮件发送上月报告 任务")
  164. DataSend(self.store).send_report_prev_month()
  165. utils.get_logger().info("邮件发送上月报告 任务执行完毕")
  166. except Exception as e:
  167. self._send_error_email("邮件发送", f"\n 错误: {str(e)}")
  168. utils.get_logger().error(f"邮件发送上月报告 任务执行失败: {e}")
  169. @staticmethod
  170. def _get_prev_month_report_day():
  171. day = utils.get_config_int("job.send_prev_month_report_day", 1)
  172. if datetime.today().month == 2 and day > 28:
  173. day = 28
  174. if datetime.today().month in [4, 6, 9, 11] and day > 30:
  175. day = 30
  176. if day > 31:
  177. day = 31
  178. return day
  179. def _clean_job(self):
  180. try:
  181. utils.get_logger().info("开始执行 清理数据 任务")
  182. DataClean().clean()
  183. utils.get_logger().info("清理数据 任务执行完毕")
  184. except Exception as e:
  185. self._send_error_email("清理数据", f"\n 错误: {str(e)}")
  186. utils.get_logger().error(f"清理数据 任务执行失败: {e}")
  187. @staticmethod
  188. def _validate_and_format_time(time_str, default_time: list):
  189. """验证并格式化时间字符串"""
  190. if not time_str:
  191. return default_time
  192. time_str = time_str.strip().replace(",", ",")
  193. # 分割字符串为列表
  194. items = [item.strip().strip("'").strip('"') for item in time_str.split(",")]
  195. # 初始化结果列表
  196. formatted_times = []
  197. for item in items:
  198. if not item:
  199. continue # 跳过空字符串
  200. try:
  201. item = item.replace(":", ":")
  202. # 使用 dateutil.parser 解析时间字符串
  203. parsed_time = parser.parse(item).time().strftime("%H:%M:%S")
  204. formatted_times.append(parsed_time)
  205. except Exception as e:
  206. utils.get_logger().error(f"配置时间解析错误: {item},: {e} ")
  207. if len(formatted_times) == 0:
  208. utils.get_logger().error(f"解析时间失败,使用默认时间 {default_time}")
  209. return default_time
  210. return formatted_times
  211. @staticmethod
  212. def _send_error_email(title: str, error: str) -> None:
  213. email = utils.get_config_value("email.error_email")
  214. utils.get_logger().info(f"发送错误邮件: {email}")
  215. if not email:
  216. return
  217. title = f"{title}异常"
  218. content = f"{title},请及时处理。\n\n异常信息:{error}"
  219. utils.send_email(email, title, content, False, None)