|
@@ -1,34 +1,32 @@
|
|
|
-import schedule
|
|
|
+import schedule,threading
|
|
|
from dateutil import parser
|
|
|
from datetime import datetime
|
|
|
|
|
|
from utils.logger_helper import LoggerHelper
|
|
|
from utils.config_helper import ConfigHelper
|
|
|
+from utils.email_helper import EmailHelper
|
|
|
from stores.mysql_data_store import MysqlDataStore
|
|
|
from models.url_setting import UrlSetting
|
|
|
-from main.data_collector import DataCollector
|
|
|
-from main.data_process import DataProcess
|
|
|
-from main.data_send import DataSend
|
|
|
-from utils.email_helper import EmailHelper
|
|
|
+from jobs.data_collector import DataCollector
|
|
|
+from jobs.data_process import DataProcess
|
|
|
+from jobs.data_send import DataSend
|
|
|
+from jobs.data_clean import DataClean
|
|
|
|
|
|
|
|
|
-class Runner:
|
|
|
+class JobRunner:
|
|
|
logger = LoggerHelper.get_logger()
|
|
|
config = ConfigHelper()
|
|
|
store = MysqlDataStore() # 复用 store 对象
|
|
|
|
|
|
- def run(self):
|
|
|
+ def run_job(self, is_run_now=True):
|
|
|
try:
|
|
|
- self.logger.info("应用程序已启动!")
|
|
|
- urls = UrlSetting().fetch_all()
|
|
|
- if not urls or len(urls) == 0:
|
|
|
- self.logger.error("未找到任何 URL 设置")
|
|
|
- return
|
|
|
- self.logger.info(f"共找到 {len(urls)} 个 URL 设置")
|
|
|
+ self.logger.info("加载任务")
|
|
|
+
|
|
|
|
|
|
- collect_time = self.config.get("schedule.collect")
|
|
|
- process_time = self.config.get("schedule.process")
|
|
|
- send_email_time = self.config.get("schedule.send_email")
|
|
|
+ collect_time = self.config.get("job.collect")
|
|
|
+ process_time = self.config.get("job.process")
|
|
|
+ send_email_time = self.config.get("job.send_email")
|
|
|
+ clean_data_time = self.config.get("job.clean_data")
|
|
|
|
|
|
collect_times = self._validate_and_format_time(
|
|
|
collect_time, ["06:00"])
|
|
@@ -48,31 +46,53 @@ class Runner:
|
|
|
self.logger.info(f"{time} 执行 发送邮件 任务")
|
|
|
schedule.every().day.at(time).do(self._send_job)
|
|
|
|
|
|
- if self.config.get_int("schedule.send_current_month_report_day")>0:
|
|
|
- report_time = self.config.get("schedule.send_current_month_report_time")
|
|
|
+ if self.config.get_int("job.send_current_month_report_day")>0:
|
|
|
+ report_time = self.config.get("job.send_current_month_report_time")
|
|
|
times = self._validate_and_format_time(report_time,["08:20"])
|
|
|
for time in times:
|
|
|
- self.logger.info(f"每月{self._get_current_month_report_day()}日 {time} 执行 发送当月报告 任务")
|
|
|
+ self.logger.info(f"每月{str(self._get_current_month_report_day()).rjust(2,"0")}日 {time} 执行 发送当月报告 任务")
|
|
|
schedule.every().day.at(time).do(self._send_prev_month_report_job)
|
|
|
|
|
|
- if self.config.get_int("schedule.send_prev_month_report_day")>0:
|
|
|
- report_time = self.config.get("schedule.send_prev_month_report_time")
|
|
|
+ if self.config.get_int("job.send_prev_month_report_day")>0:
|
|
|
+ report_time = self.config.get("job.send_prev_month_report_time")
|
|
|
times = self._validate_and_format_time(report_time, ["08:20"])
|
|
|
for time in times:
|
|
|
- self.logger.info(f"每月{self._get_prev_month_report_day()}日 {time} 执行 发送上月报告 任务")
|
|
|
+ self.logger.info(f"每月{str(self._get_prev_month_report_day()).rjust(2,"0")}日 {time} 执行 发送上月报告 任务")
|
|
|
schedule.every().day.at(time).do(self._send_prev_month_report_job)
|
|
|
|
|
|
- if self.config.get_bool("schedule.run_now"):
|
|
|
- self.logger.info("立即执行任务")
|
|
|
+ clean_data_times = self._validate_and_format_time(
|
|
|
+ clean_data_time, ["00:05"])
|
|
|
+ self.logger.info(f"{clean_data_times[0]} 执行 清理数据 任务")
|
|
|
+ schedule.every().day.at(clean_data_times[0]).do(self._clean_job)
|
|
|
+
|
|
|
+ urls = UrlSetting().fetch_all()
|
|
|
+ if not urls or len(urls) == 0:
|
|
|
+ self.logger.error("未找到任何 URL 设置")
|
|
|
+ return
|
|
|
+ self.logger.info(f"共找到 {len(urls)} 个 URL 设置")
|
|
|
+ for url in urls:
|
|
|
+ self.logger.info(f"{url}")
|
|
|
+
|
|
|
+ if is_run_now and self.config.get_bool("job.run_now"):
|
|
|
+ self.logger.info("立即执行采集任务")
|
|
|
self._collect_process_job()
|
|
|
- self._send_job()
|
|
|
+ # self._clean_job()
|
|
|
# self._process_job()
|
|
|
+ # self._send_job()
|
|
|
|
|
|
except Exception as e:
|
|
|
self.logger.error(f"应用程序停止: {e}")
|
|
|
raise e
|
|
|
|
|
|
+ def restart_job(self):
|
|
|
+ schedule.clear()
|
|
|
+ self.logger.info("定时配置更新,重启任务")
|
|
|
+ self.run_job(False)
|
|
|
+
|
|
|
def _collect_process_job(self):
|
|
|
+ threading.Thread(target=self._collect_process).start()
|
|
|
+
|
|
|
+ def _collect_process(self):
|
|
|
try:
|
|
|
self.logger.info("开始执行 数据采集处理 任务")
|
|
|
url_setting = UrlSetting()
|
|
@@ -116,6 +136,9 @@ class Runner:
|
|
|
self.logger.error(f"数据采集处理 任务执行失败: {e}")
|
|
|
|
|
|
def _process_job(self):
|
|
|
+ threading.Thread(target=self._process).start()
|
|
|
+
|
|
|
+ def _process(self):
|
|
|
try:
|
|
|
self.logger.info("开始执行 AI处理数据 任务")
|
|
|
data_process = DataProcess(self.store)
|
|
@@ -145,7 +168,7 @@ class Runner:
|
|
|
self.logger.error(f"邮件发送当月报告 任务执行失败: {e}")
|
|
|
|
|
|
def _get_current_month_report_day(self):
|
|
|
- day = self.config.get_int("schedule.send_current_month_report_day",30)
|
|
|
+ day = self.config.get_int("job.send_current_month_report_day",30)
|
|
|
if datetime.today().month==2 and day > 28 :
|
|
|
day = 28
|
|
|
if datetime.today().month in [4,6,9,11] and day > 30:
|
|
@@ -165,7 +188,7 @@ class Runner:
|
|
|
self.logger.error(f"邮件发送上月报告 任务执行失败: {e}")
|
|
|
|
|
|
def _get_prev_month_report_day(self):
|
|
|
- day = self.config.get_int("schedule.send_prev_month_report_day",1)
|
|
|
+ day = self.config.get_int("job.send_prev_month_report_day",1)
|
|
|
if datetime.today().month == 2 and day > 28:
|
|
|
day = 28
|
|
|
if datetime.today().month in [4, 6, 9, 11] and day > 30:
|
|
@@ -173,6 +196,16 @@ class Runner:
|
|
|
if day > 31:
|
|
|
day = 31
|
|
|
return day
|
|
|
+
|
|
|
+ def _clean_job(self):
|
|
|
+ try:
|
|
|
+ self.logger.info("开始执行 清理数据 任务")
|
|
|
+ DataClean().clean()
|
|
|
+ self.logger.info("清理数据 任务执行完毕")
|
|
|
+ except Exception as e:
|
|
|
+ self._send_error_email("清理数据", f"\n 错误: {str(e)}")
|
|
|
+ self.logger.error(f"清理数据 任务执行失败: {e}")
|
|
|
+
|
|
|
def _validate_and_format_time(self, time_str, default_time: list):
|
|
|
"""验证并格式化时间字符串"""
|
|
|
if not time_str:
|
|
@@ -210,3 +243,4 @@ class Runner:
|
|
|
title = f"{title}异常"
|
|
|
content = f"{title},请及时处理。\n\n异常信息:{error}"
|
|
|
email_helper.send_email(email, title, content, False, None)
|
|
|
+
|