123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262 |
- # import threading
- from datetime import datetime
- import schedule
- from dateutil import parser
- import utils
- from jobs.data_clean import DataClean
- from jobs.data_collector import DataCollector
- from jobs.data_process import DataProcess
- from jobs.data_send import DataSend
- from models.url_setting import UrlSetting
- from stores.mysql_data_store import MysqlDataStore
- class JobRunner:
- _data_collector = None
- store = MysqlDataStore() # 复用 store 对象
- def run_job(self, is_run_now=True):
- try:
- utils.get_logger().info("加载任务")
- collect_time = utils.get_config_value("job.collect")
- process_time = utils.get_config_value("job.process")
- send_email_time = utils.get_config_value("job.send_email")
- clean_data_time = utils.get_config_value("job.clean_data")
- collect_times = self._validate_and_format_time(collect_time, ["06:00"])
- for time in collect_times:
- utils.get_logger().info(f"{time} 执行 采集处理数据 任务")
- schedule.every().day.at(time).do(self._collect_process_job)
- process_times = self._validate_and_format_time(
- process_time, ["10:00", "15:00", "19:00"]
- )
- for time in process_times:
- utils.get_logger().info(f"{time} 执行 AI处理数据 任务")
- schedule.every().day.at(time).do(self._process_job)
- send_email_times = self._validate_and_format_time(
- send_email_time, ["08:20", "14:00"]
- )
- for time in send_email_times:
- utils.get_logger().info(f"{time} 执行 发送邮件 任务")
- schedule.every().day.at(time).do(self._send_job)
- if utils.get_config_int("job.send_current_month_report_day") > 0:
- report_time = utils.get_config_value(
- "job.send_current_month_report_time"
- )
- times = self._validate_and_format_time(report_time, ["08:20"])
- for time in times:
- utils.get_logger().info(
- f"每月{str(self._get_current_month_report_day()).rjust(2,"0")}日 {time} 执行 发送当月报告 任务"
- )
- schedule.every().day.at(time).do(
- self._send_current_month_report_job
- )
- if utils.get_config_int("job.send_prev_month_report_day") > 0:
- report_time = utils.get_config_value("job.send_prev_month_report_time")
- times = self._validate_and_format_time(report_time, ["08:20"])
- for time in times:
- utils.get_logger().info(
- f"每月{str(self._get_prev_month_report_day()).rjust(2,"0")}日 {time} 执行 发送上月报告 任务"
- )
- schedule.every().day.at(time).do(self._send_prev_month_report_job)
- clean_data_times = self._validate_and_format_time(
- clean_data_time, ["00:05"]
- )
- utils.get_logger().info(f"{clean_data_times[0]} 执行 清理数据 任务")
- schedule.every().day.at(clean_data_times[0]).do(self._clean_job)
- urls = UrlSetting().fetch_all()
- if not urls or len(urls) == 0:
- utils.get_logger().error("未找到任何 URL 设置")
- return
- utils.get_logger().info(f"共找到 {len(urls)} 个 URL 设置")
- for url in urls:
- utils.get_logger().info(f"{url}")
- if is_run_now and utils.get_config_bool("job.run_now"):
- utils.get_logger().info("立即执行采集任务")
- self._collect_process_job()
- # self._clean_job()
- # self._process_job()
- # self._send_job()
- # DataSend(self.store).send_report_current_month()
- except Exception as e:
- utils.get_logger().error(f"应用程序停止: {e}")
- raise e
- def restart_job(self):
- schedule.clear()
- utils.get_logger().info("定时配置更新,重启任务")
- self.run_job(False)
- def stop_job(self):
- schedule.clear()
- self._stop_data_collector()
- def _stop_data_collector(self):
- if self._data_collector:
- self._data_collector.close()
- # def _collect_process_job(self):
- # threading.Thread(target=self._collect_process).start()
- def _collect_process_job(self):
- try:
- utils.get_logger().info("开始执行 数据采集处理 任务")
- url_setting = UrlSetting()
- for url_setting in url_setting.fetch_all():
- self._data_collector = None
- try:
- utils.get_logger().info(f"开始采集: {url_setting.url}")
- self._data_collector = DataCollector(
- url_setting.adapter_type,
- url_setting.url,
- url_setting.username,
- url_setting.password,
- self.store,
- )
- self._data_collector.collect(url_setting.keywords)
- utils.get_logger().info(f"采集完成: {url_setting.url}")
- except Exception as e:
- self._send_error_email(
- "数据采集",
- f"\n Type: {url_setting.adapter_type} \n Url: {url_setting.url}\n 错误: {str(e)}",
- )
- utils.get_logger().error(f"采集发生异常: {e}")
- finally:
- self._stop_data_collector()
- try:
- utils.get_logger().info(f"开始AI处理: {url_setting.url}")
- data_process = DataProcess(self.store)
- data_process.process()
- except Exception as e:
- self._send_error_email(
- "AI数据处理",
- f"\n Type: {url_setting.adapter_type} \n Url: {url_setting.url}\n 错误: {str(e)}",
- )
- utils.get_logger().error(f"AI处理发生异常: {e}")
- break # 中断当前 URL 设置的处理
- utils.get_logger().info("数据采集处理 任务执行完毕")
- except Exception as e:
- utils.get_logger().error(f"数据采集处理 任务执行失败: {e}")
- # def _process_job(self):
- # threading.Thread(target=self._process).start()
- def _process_job(self):
- try:
- utils.get_logger().info("开始执行 AI处理数据 任务")
- data_process = DataProcess(self.store)
- data_process.process()
- utils.get_logger().info("AI处理数据 任务执行完毕")
- except Exception as e:
- self._send_error_email("AI数据处理", f"\n 错误: {str(e)}")
- utils.get_logger().error(f"AI任务 执行失败: {e}")
- def _send_job(self):
- try:
- utils.get_logger().info("开始执行 邮件发送 任务")
- DataSend(self.store).send()
- utils.get_logger().info("邮件发送 任务执行完毕")
- except Exception as e:
- self._send_error_email("邮件发送", f"\n 错误: {str(e)}")
- utils.get_logger().error(f"邮件发送 任务执行失败: {e}")
- def _send_current_month_report_job(self):
- try:
- if datetime.today().day == self._get_current_month_report_day():
- utils.get_logger().info("开始执行 邮件发送当月报告 任务")
- DataSend(self.store).send_report_current_month()
- utils.get_logger().info("邮件发送当月报告 任务执行完毕")
- except Exception as e:
- self._send_error_email("邮件发送", f"\n 错误: {str(e)}")
- utils.get_logger().error(f"邮件发送当月报告 任务执行失败: {e}")
- @staticmethod
- def _get_current_month_report_day():
- day = utils.get_config_int("job.send_current_month_report_day", 30)
- if datetime.today().month == 2 and day > 28:
- day = 28
- if datetime.today().month in [4, 6, 9, 11] and day > 30:
- day = 30
- if day > 31:
- day = 31
- return day
- def _send_prev_month_report_job(self):
- try:
- if datetime.today().day == self._get_prev_month_report_day():
- utils.get_logger().info("开始执行 邮件发送上月报告 任务")
- DataSend(self.store).send_report_prev_month()
- utils.get_logger().info("邮件发送上月报告 任务执行完毕")
- except Exception as e:
- self._send_error_email("邮件发送", f"\n 错误: {str(e)}")
- utils.get_logger().error(f"邮件发送上月报告 任务执行失败: {e}")
- @staticmethod
- def _get_prev_month_report_day():
- day = utils.get_config_int("job.send_prev_month_report_day", 1)
- if datetime.today().month == 2 and day > 28:
- day = 28
- if datetime.today().month in [4, 6, 9, 11] and day > 30:
- day = 30
- if day > 31:
- day = 31
- return day
- def _clean_job(self):
- try:
- utils.get_logger().info("开始执行 清理数据 任务")
- DataClean().clean()
- utils.get_logger().info("清理数据 任务执行完毕")
- except Exception as e:
- self._send_error_email("清理数据", f"\n 错误: {str(e)}")
- utils.get_logger().error(f"清理数据 任务执行失败: {e}")
- @staticmethod
- def _validate_and_format_time(time_str, default_time: list):
- """验证并格式化时间字符串"""
- if not time_str:
- return default_time
- time_str = str(time_str).strip().replace(",", ",")
- # 分割字符串为列表
- items = [item.strip().strip("'").strip('"') for item in time_str.split(",")]
- # 初始化结果列表
- formatted_times = []
- for item in items:
- if not item:
- continue # 跳过空字符串
- try:
- item = item.replace(":", ":")
- # 使用 dateutil.parser 解析时间字符串
- parsed_time = parser.parse(item).time().strftime("%H:%M:%S")
- formatted_times.append(parsed_time)
- except Exception as e:
- utils.get_logger().error(f"配置时间解析错误: {item},: {e} ")
- if len(formatted_times) == 0:
- utils.get_logger().error(f"解析时间失败,使用默认时间 {default_time}")
- return default_time
- return formatted_times
- @staticmethod
- def _send_error_email(title: str, error: str) -> None:
- email = utils.get_config_value("email.error_email")
- utils.get_logger().info(f"发送错误邮件: {email}")
- if not email:
- return
- title = f"{title}异常"
- content = f"{title},请及时处理。\n\n异常信息:{error}"
- utils.send_email(email, title, content, False, None)
|