job_runner.py 11 KB


  1. # import threading
  2. from datetime import datetime
  3. import schedule
  4. from dateutil import parser
  5. import utils
  6. from jobs.data_clean import DataClean
  7. from jobs.data_collector import DataCollector
  8. from jobs.data_process import DataProcess
  9. from jobs.data_send import DataSend
  10. from models.url_setting import UrlSetting
  11. from stores.mysql_data_store import MysqlDataStore
  12. class JobRunner:
  13. _data_collector = None
  14. store = MysqlDataStore() # 复用 store 对象
  15. def run_job(self, is_run_now=True):
  16. try:
  17. utils.get_logger().info("加载任务")
  18. collect_time = utils.get_config_value("job.collect")
  19. process_time = utils.get_config_value("job.process")
  20. send_email_time = utils.get_config_value("job.send_email")
  21. clean_data_time = utils.get_config_value("job.clean_data")
  22. collect_times = self._validate_and_format_time(collect_time, ["06:00"])
  23. for time in collect_times:
  24. utils.get_logger().info(f"{time} 执行 采集处理数据 任务")
  25. schedule.every().day.at(time).do(self._collect_process_job)
  26. process_times = self._validate_and_format_time(
  27. process_time, ["10:00", "15:00", "19:00"]
  28. )
  29. for time in process_times:
  30. utils.get_logger().info(f"{time} 执行 AI处理数据 任务")
  31. schedule.every().day.at(time).do(self._process_job)
  32. send_email_times = self._validate_and_format_time(
  33. send_email_time, ["08:20", "14:00"]
  34. )
  35. for time in send_email_times:
  36. utils.get_logger().info(f"{time} 执行 发送邮件 任务")
  37. schedule.every().day.at(time).do(self._send_job)
  38. if utils.get_config_int("job.send_current_month_report_day") > 0:
  39. report_time = utils.get_config_value(
  40. "job.send_current_month_report_time"
  41. )
  42. times = self._validate_and_format_time(report_time, ["08:20"])
  43. for time in times:
  44. utils.get_logger().info(
  45. f"每月{str(self._get_current_month_report_day()).rjust(2,"0")}日 {time} 执行 发送当月报告 任务"
  46. )
  47. schedule.every().day.at(time).do(
  48. self._send_current_month_report_job
  49. )
  50. if utils.get_config_int("job.send_prev_month_report_day") > 0:
  51. report_time = utils.get_config_value("job.send_prev_month_report_time")
  52. times = self._validate_and_format_time(report_time, ["08:20"])
  53. for time in times:
  54. utils.get_logger().info(
  55. f"每月{str(self._get_prev_month_report_day()).rjust(2,"0")}日 {time} 执行 发送上月报告 任务"
  56. )
  57. schedule.every().day.at(time).do(self._send_prev_month_report_job)
  58. clean_data_times = self._validate_and_format_time(
  59. clean_data_time, ["00:05"]
  60. )
  61. utils.get_logger().info(f"{clean_data_times[0]} 执行 清理数据 任务")
  62. schedule.every().day.at(clean_data_times[0]).do(self._clean_job)
  63. urls = UrlSetting().fetch_all()
  64. if not urls or len(urls) == 0:
  65. utils.get_logger().error("未找到任何 URL 设置")
  66. return
  67. utils.get_logger().info(f"共找到 {len(urls)} 个 URL 设置")
  68. for url in urls:
  69. utils.get_logger().info(f"{url}")
  70. if is_run_now and utils.get_config_bool("job.run_now"):
  71. utils.get_logger().info("立即执行采集任务")
  72. self._collect_process_job()
  73. # self._clean_job()
  74. # self._process_job()
  75. # self._send_job()
  76. # DataSend(self.store).send_report_current_month()
  77. except Exception as e:
  78. utils.get_logger().error(f"应用程序停止: {e}")
  79. raise e
  80. def restart_job(self):
  81. schedule.clear()
  82. utils.get_logger().info("定时配置更新,重启任务")
  83. self.run_job(False)
  84. def stop_job(self):
  85. schedule.clear()
  86. self._stop_data_collector()
  87. def _stop_data_collector(self):
  88. if self._data_collector:
  89. self._data_collector.close()
  90. # def _collect_process_job(self):
  91. # threading.Thread(target=self._collect_process).start()
  92. def _collect_process_job(self):
  93. try:
  94. utils.get_logger().info("开始执行 数据采集处理 任务")
  95. url_setting = UrlSetting()
  96. for url_setting in url_setting.fetch_all():
  97. self._data_collector = None
  98. try:
  99. utils.get_logger().info(f"开始采集: {url_setting.url}")
  100. self._data_collector = DataCollector(
  101. url_setting.adapter_type,
  102. url_setting.url,
  103. url_setting.username,
  104. url_setting.password,
  105. self.store,
  106. )
  107. self._data_collector.collect(url_setting.keywords)
  108. utils.get_logger().info(f"采集完成: {url_setting.url}")
  109. except Exception as e:
  110. self._send_error_email(
  111. "数据采集",
  112. f"\n Type: {url_setting.adapter_type} \n Url: {url_setting.url}\n 错误: {str(e)}",
  113. )
  114. utils.get_logger().error(f"采集发生异常: {e}")
  115. finally:
  116. self._stop_data_collector()
  117. try:
  118. utils.get_logger().info(f"开始AI处理: {url_setting.url}")
  119. data_process = DataProcess(self.store)
  120. data_process.process()
  121. except Exception as e:
  122. self._send_error_email(
  123. "AI数据处理",
  124. f"\n Type: {url_setting.adapter_type} \n Url: {url_setting.url}\n 错误: {str(e)}",
  125. )
  126. utils.get_logger().error(f"AI处理发生异常: {e}")
  127. break # 中断当前 URL 设置的处理
  128. utils.get_logger().info("数据采集处理 任务执行完毕")
  129. except Exception as e:
  130. utils.get_logger().error(f"数据采集处理 任务执行失败: {e}")
  131. # def _process_job(self):
  132. # threading.Thread(target=self._process).start()
  133. def _process_job(self):
  134. try:
  135. utils.get_logger().info("开始执行 AI处理数据 任务")
  136. data_process = DataProcess(self.store)
  137. data_process.process()
  138. utils.get_logger().info("AI处理数据 任务执行完毕")
  139. except Exception as e:
  140. self._send_error_email("AI数据处理", f"\n 错误: {str(e)}")
  141. utils.get_logger().error(f"AI任务 执行失败: {e}")
  142. def _send_job(self):
  143. try:
  144. utils.get_logger().info("开始执行 邮件发送 任务")
  145. DataSend(self.store).send()
  146. utils.get_logger().info("邮件发送 任务执行完毕")
  147. except Exception as e:
  148. self._send_error_email("邮件发送", f"\n 错误: {str(e)}")
  149. utils.get_logger().error(f"邮件发送 任务执行失败: {e}")
  150. def _send_current_month_report_job(self):
  151. try:
  152. if datetime.today().day == self._get_current_month_report_day():
  153. utils.get_logger().info("开始执行 邮件发送当月报告 任务")
  154. DataSend(self.store).send_report_current_month()
  155. utils.get_logger().info("邮件发送当月报告 任务执行完毕")
  156. except Exception as e:
  157. self._send_error_email("邮件发送", f"\n 错误: {str(e)}")
  158. utils.get_logger().error(f"邮件发送当月报告 任务执行失败: {e}")
  159. @staticmethod
  160. def _get_current_month_report_day():
  161. day = utils.get_config_int("job.send_current_month_report_day", 30)
  162. if datetime.today().month == 2 and day > 28:
  163. day = 28
  164. if datetime.today().month in [4, 6, 9, 11] and day > 30:
  165. day = 30
  166. if day > 31:
  167. day = 31
  168. return day
  169. def _send_prev_month_report_job(self):
  170. try:
  171. if datetime.today().day == self._get_prev_month_report_day():
  172. utils.get_logger().info("开始执行 邮件发送上月报告 任务")
  173. DataSend(self.store).send_report_prev_month()
  174. utils.get_logger().info("邮件发送上月报告 任务执行完毕")
  175. except Exception as e:
  176. self._send_error_email("邮件发送", f"\n 错误: {str(e)}")
  177. utils.get_logger().error(f"邮件发送上月报告 任务执行失败: {e}")
  178. @staticmethod
  179. def _get_prev_month_report_day():
  180. day = utils.get_config_int("job.send_prev_month_report_day", 1)
  181. if datetime.today().month == 2 and day > 28:
  182. day = 28
  183. if datetime.today().month in [4, 6, 9, 11] and day > 30:
  184. day = 30
  185. if day > 31:
  186. day = 31
  187. return day
  188. def _clean_job(self):
  189. try:
  190. utils.get_logger().info("开始执行 清理数据 任务")
  191. DataClean().clean()
  192. utils.get_logger().info("清理数据 任务执行完毕")
  193. except Exception as e:
  194. self._send_error_email("清理数据", f"\n 错误: {str(e)}")
  195. utils.get_logger().error(f"清理数据 任务执行失败: {e}")
  196. @staticmethod
  197. def _validate_and_format_time(time_str, default_time: list):
  198. """验证并格式化时间字符串"""
  199. if not time_str:
  200. return default_time
  201. time_str = str(time_str).strip().replace(",", ",")
  202. # 分割字符串为列表
  203. items = [item.strip().strip("'").strip('"') for item in time_str.split(",")]
  204. # 初始化结果列表
  205. formatted_times = []
  206. for item in items:
  207. if not item:
  208. continue # 跳过空字符串
  209. try:
  210. item = item.replace(":", ":")
  211. # 使用 dateutil.parser 解析时间字符串
  212. parsed_time = parser.parse(item).time().strftime("%H:%M:%S")
  213. formatted_times.append(parsed_time)
  214. except Exception as e:
  215. utils.get_logger().error(f"配置时间解析错误: {item},: {e} ")
  216. if len(formatted_times) == 0:
  217. utils.get_logger().error(f"解析时间失败,使用默认时间 {default_time}")
  218. return default_time
  219. return formatted_times
  220. @staticmethod
  221. def _send_error_email(title: str, error: str) -> None:
  222. email = utils.get_config_value("email.error_email")
  223. utils.get_logger().info(f"发送错误邮件: {email}")
  224. if not email:
  225. return
  226. title = f"{title}异常"
  227. content = f"{title},请及时处理。\n\n异常信息:{error}"
  228. utils.send_email(email, title, content, False, None)