فهرست منبع

Update 优化main主循环代码

YueYunyun 6 ماه پیش
والد
کامیت
00f90e6a74

+ 4 - 4
SourceCode/TenderCrawler/app/adapters/data_collection_adapter_interface.py

@@ -24,7 +24,7 @@ class IDataCollectionAdapter(ABC):
     _keywords = None
     _keyword_array = None
     _error_count = 0
-    _max_error_count = utils.get_config_int("adapter.max_error_count", 5)
+    _max_error_count = utils.get_config_int("adapter.max_error_count", 3)
 
     @property
     def search_day_key(self) -> str:
@@ -124,12 +124,12 @@ class IDataCollectionAdapter(ABC):
             self._wait(timeout, poll_frequency).until(method)
         except TimeoutException as e:
             self._error_count += 1
-            utils.get_logger().error(
-                f"采集数据 超时 [{self._error_count}/{self._max_error_count}]"
-            )
             if self._error_count > self._max_error_count:
                 raise e
             self._wait_until(method)
+            utils.get_logger().error(
+                f"采集数据 超时 [{self._error_count}/{self._max_error_count}]"
+            )
 
     @abstractmethod
     def _collect(self, keyword: str) -> None:

+ 24 - 14
SourceCode/TenderCrawler/app/jobs/job_runner.py

@@ -1,4 +1,4 @@
-import threading
+# import threading
 from datetime import datetime
 
 import schedule
@@ -15,6 +15,7 @@ from stores.mysql_data_store import MysqlDataStore
 
 class JobRunner:
 
+    _data_collector = None
     store = MysqlDataStore()  # 复用 store 对象
 
     def run_job(self, is_run_now=True):
@@ -54,7 +55,9 @@ class JobRunner:
                     utils.get_logger().info(
                         f"每月{str(self._get_current_month_report_day()).rjust(2,"0")}日 {time} 执行  发送当月报告   任务"
                     )
-                    schedule.every().day.at(time).do(self._send_prev_month_report_job)
+                    schedule.every().day.at(time).do(
+                        self._send_current_month_report_job
+                    )
 
             if utils.get_config_int("job.send_prev_month_report_day") > 0:
                 report_time = utils.get_config_value("job.send_prev_month_report_time")
@@ -95,25 +98,33 @@ class JobRunner:
         utils.get_logger().info("定时配置更新,重启任务")
         self.run_job(False)
 
-    def _collect_process_job(self):
-        threading.Thread(target=self._collect_process).start()
+    def stop_job(self):
+        schedule.clear()
+        self._stop_data_collector()
+
+    def _stop_data_collector(self):
+        if self._data_collector:
+            self._data_collector.close()
 
-    def _collect_process(self):
+    # def _collect_process_job(self):
+    #     threading.Thread(target=self._collect_process).start()
+
+    def _collect_process_job(self):
         try:
             utils.get_logger().info("开始执行 数据采集处理 任务")
             url_setting = UrlSetting()
             for url_setting in url_setting.fetch_all():
-                data_collector = None
+                self._data_collector = None
                 try:
                     utils.get_logger().info(f"开始采集: {url_setting.url}")
-                    data_collector = DataCollector(
+                    self._data_collector = DataCollector(
                         url_setting.adapter_type,
                         url_setting.url,
                         url_setting.username,
                         url_setting.password,
                         self.store,
                     )
-                    data_collector.collect(url_setting.keywords)
+                    self._data_collector.collect(url_setting.keywords)
                     utils.get_logger().info(f"采集完成: {url_setting.url}")
                 except Exception as e:
                     self._send_error_email(
@@ -122,8 +133,7 @@ class JobRunner:
                     )
                     utils.get_logger().error(f"采集发生异常: {e}")
                 finally:
-                    if data_collector:
-                        data_collector.close()
+                    self._stop_data_collector()
 
                 try:
                     utils.get_logger().info(f"开始AI处理: {url_setting.url}")
@@ -140,10 +150,10 @@ class JobRunner:
         except Exception as e:
             utils.get_logger().error(f"数据采集处理 任务执行失败: {e}")
 
-    def _process_job(self):
-        threading.Thread(target=self._process).start()
+    # def _process_job(self):
+    #     threading.Thread(target=self._process).start()
 
-    def _process(self):
+    def _process_job(self):
         try:
             utils.get_logger().info("开始执行 AI处理数据 任务")
             data_process = DataProcess(self.store)
@@ -218,7 +228,7 @@ class JobRunner:
         """验证并格式化时间字符串"""
         if not time_str:
             return default_time
-        time_str = time_str.strip().replace(",", ",")
+        time_str = str(time_str).strip().replace(",", ",")
         # 分割字符串为列表
         items = [item.strip().strip("'").strip('"') for item in time_str.split(",")]
 

+ 80 - 18
SourceCode/TenderCrawler/app/main.py

@@ -1,31 +1,93 @@
 import datetime
+import signal
+import sys
 import time
+from typing import Optional
 
 import schedule
 
 import utils
 from jobs.job_runner import JobRunner
 
-DEFAULT_USER_SLEEP_INTERVAL = 10  # 配置默认时间间隔10秒
 
-utils.get_logger().info("应用程序启动...")
+class Application:
 
-job = JobRunner()
-job.run_job()
+    def __init__(self):
+        self.logger = utils.get_logger()
+        self.running = True
+        self.job: Optional[JobRunner] = None
+        self.interval = utils.get_config_int("job.sleep_interval", 10)  # 默认10秒
 
-interval = utils.get_config_int("job.sleep_interval", DEFAULT_USER_SLEEP_INTERVAL)
+        # 注册信号处理
+        signal.signal(signal.SIGINT, self._handle_shutdown)
+        signal.signal(signal.SIGTERM, self._handle_shutdown)
 
-if __name__ == "__main__":
-    while True:
-        schedule.run_pending()
-        now = datetime.datetime.now()
-        time.sleep(interval)
-        # 重新加载配置及任务
-        if now.minute == 0 and now.second <= interval:
-            job_id = utils.get_config_int("job.event_id")
+    def _handle_shutdown(self, signum, frame):
+        """处理退出信号"""
+        self.logger.info(f"收到退出信号 {signum}, 正在关闭应用...")
+        self.running = False
+        if self.job:
+            self.job.stop_job()
+
+    def _reload_config(self) -> bool:
+        """重新加载配置,返回是否需要重启任务"""
+        try:
+            old_job_id = utils.get_config_int("job.event_id")
             utils.reload_config()
-            interval = utils.get_config_int(
-                "job.sleep_interval", DEFAULT_USER_SLEEP_INTERVAL
-            )
-            if job_id != utils.get_config_int("job.event_id"):
-                job.restart_job()
+            self.interval = utils.get_config_int("job.sleep_interval", 10)
+            new_job_id = utils.get_config_int("job.event_id")
+
+            return old_job_id != new_job_id
+        except Exception as e:
+            self.logger.error(f"重新加载配置失败: {e}")
+            return False
+
+    def _check_reload(self, now: datetime.datetime):
+        """检查是否需要重新加载配置"""
+        try:
+            # 每小时整点重新加载配置
+            if now.minute == 0 and now.second <= self.interval:
+                self.logger.info("开始重新加载配置...")
+                if self._reload_config():
+                    self.logger.info("任务ID已更新,重启任务...")
+                    self.job.restart_job()
+                else:
+                    self.logger.info("配置重新加载完成,无需重启任务")
+        except Exception as e:
+            self.logger.error(f"检查重新加载配置失败: {e}")
+
+    def run(self):
+        """运行应用程序"""
+        try:
+            self.logger.info("正在启动应用程序...")
+
+            # 初始化任务
+            self.job = JobRunner()
+            self.job.run_job()
+
+            self.logger.info(f"应用程序启动成功! 任务执行检测间隔: {self.interval}秒")
+
+            # 主循环
+            while self.running:
+                try:
+                    now = datetime.datetime.now()
+                    schedule.run_pending()
+                    time.sleep(self.interval)
+                    self._check_reload(now)
+                except Exception as e:
+                    self.logger.error(f"主循环执行异常: {e}")
+                    time.sleep(self.interval)
+
+        except Exception as e:
+            self.logger.error(f"应用程序运行异常: {e}")
+            sys.exit(1)
+        finally:
+            self.logger.info("应用程序正在关闭...")
+            if self.job:
+                self.job.stop_job()
+            self.logger.info("应用程序已关闭")
+
+
+if __name__ == "__main__":
+    app = Application()
+    app.run()

+ 6 - 3
SourceCode/TenderCrawler/app/models/collect_data.py

@@ -68,8 +68,10 @@ class CollectData:
             )
             if collect_data.status == self.INVALID:
                 db_helper.execute_non_query(self._insert_query_history, params)
+                utils.get_logger().info(f"成功插入 1 条无效历史数据")
             else:
                 db_helper.execute_non_query(self._insert_query, params)
+                utils.get_logger().info(f"成功插入 1 条有效数据")
 
     def insert_batch(self, collect_data_list):
         if not all(
@@ -108,11 +110,12 @@ class CollectData:
         with MySQLHelper() as db_helper:
             db_helper.execute_non_query(self._insert_query, params)
             affected_rows1 = db_helper.connection.affected_rows()
-            utils.get_logger().info(f"成功插入 {affected_rows1} 条有效数据")
             db_helper.execute_non_query(self._insert_query_history, params2)
             affected_rows2 = db_helper.connection.affected_rows()
-            utils.get_logger().info(f"成功插入 {affected_rows2} 条无效历史数据")
-            return affected_rows1 + affected_rows2
+            utils.get_logger().info(
+                f"共插入 {affected_rows2} 条数据。 {affected_rows1} 条有效数据,{affected_rows2-affected_rows1} 条无效历史数据。"
+            )
+            return affected_rows2
 
     # def insert_url(self, url: str, keyword: str, content: str):
     #     with MySQLHelper() as db_helper:

+ 2 - 1
SourceCode/TenderCrawler/app/models/process_data.py

@@ -94,6 +94,7 @@ class ProcessData:
         with MySQLHelper() as db_helper:
             db_helper.execute_non_query(self._insert_query, insert_params)
             # db_helper.execute_non_query(self._update_query, update_params)
+            utils.get_logger().info(f"共插入 1 条处理数据")
 
     def insert_batch(self, process_data_list):
         if not all(
@@ -130,7 +131,7 @@ class ProcessData:
         with MySQLHelper() as db_helper:
             db_helper.execute_non_query(self._insert_query, insert_params)
             affected_rows = db_helper.connection.affected_rows()
-            utils.get_logger().info(f"成功插入 {affected_rows} 条数据")
+            utils.get_logger().info(f"共插入 {affected_rows} 条处理数据")
             # for param in update_params:
             #     db_helper.execute_non_query(self._update_query, param)
             return affected_rows

+ 5 - 2
SourceCode/TenderCrawler/app/models/process_result_data.py

@@ -86,6 +86,7 @@ class ProcessResultData:
         with MySQLHelper() as db_helper:
             db_helper.execute_non_query(self._insert_query, insert_params)
             # db_helper.execute_non_query(self._update_query, update_params)
+            utils.get_logger().info(f"共插入 1 条结果处理数据")
 
     def insert_batch(self, process_result_data_list):
         if not all(
@@ -122,7 +123,7 @@ class ProcessResultData:
         with MySQLHelper() as db_helper:
             db_helper.execute_non_query(self._insert_query, insert_params)
             affected_rows = db_helper.connection.affected_rows()
-            utils.get_logger().info(f"成功插入 {affected_rows} 条数据")
+            utils.get_logger().info(f"共插入 {affected_rows} 条结果处理数据")
             # for param in update_params:
             #     db_helper.execute_non_query(self._update_query, param)
             return affected_rows
@@ -185,7 +186,9 @@ class ProcessResultData:
             params = (other_urls, url)
             db_helper.execute_non_query(self._update_other_urls_query, params)
 
-    _query_report = "select * from t_data_result where create_time between %s and %s"
+    _query_report = (
+        "select * from t_data_result where create_time between %s and %s ORDER BY date"
+    )
 
     def fetch_to_report_by_date(self, start_date, end_date):
         """

+ 8 - 8
SourceCode/TenderCrawler/app/stores/mysql_data_store.py

@@ -26,7 +26,7 @@ class MysqlDataStore(IDataStore):
     def insert_collect_data(self, data: CollectData, is_batch=True):
         if not is_batch:
             self._collectData.insert(data)
-            utils.get_logger().info(f"保存 采集数据 到数据库: {data.url}")
+            # utils.get_logger().info(f"保存 采集数据 到数据库: {data.url}")
         else:
             self._collect_list.append(data)
             self.save_collect_data()
@@ -35,9 +35,9 @@ class MysqlDataStore(IDataStore):
         if (is_force and len(self._collect_list) > 0) or len(
             self._collect_list
         ) >= self._collect_size:
-            utils.get_logger().info(
-                "批量保存 采集数据 到数据库,数量: " + str(len(self._collect_list))
-            )
+            # utils.get_logger().info(
+            #     "批量保存 采集数据 到数据库,数量: " + str(len(self._collect_list))
+            # )
             self._collectData.insert_batch(self._collect_list)
             self._collect_list = []
 
@@ -57,7 +57,7 @@ class MysqlDataStore(IDataStore):
         if not is_batch:
             self._processData.insert(data)
             self._collectData.set_process(data.url)
-            utils.get_logger().info(f"保存 处理数据 到数据库: {data.url}")
+            # utils.get_logger().info(f"保存 处理数据 到数据库: {data.url}")
         else:
             self._process_list.append(data)
             self.save_process_data()
@@ -67,9 +67,9 @@ class MysqlDataStore(IDataStore):
         if (is_force and len(self._process_list) > 0) or len(
             self._process_list
         ) >= self._process_size:
-            utils.get_logger().info(
-                f"批量保存 处理数据 到数据库,数量: {str(len(self._process_list))}"
-            )
+            # utils.get_logger().info(
+            #     f"批量保存 处理数据 到数据库,数量: {str(len(self._process_list))}"
+            # )
             self._processData.insert_batch(self._process_list)
             urls = [item.url for item in self._process_list]
             self._collectData.set_process_list(urls)

+ 1 - 1
SourceCode/TenderCrawler/docker-compose.yml

@@ -60,7 +60,7 @@ services:
       - APP_JOB__COLLECT=20:00,12:00
       - APP_JOB__PROCESS=23:00,4:00,13:00
       - APP_JOB__SEND_EMAIL=08:20,14:00
-      - APP_JOB__RUN_NOW=1
+      - APP_JOB__RUN_NOW=0
       - APP_SELENIUM__REMOTE_DRIVER_URL=http://y_selenium:4444/wd/hub
     volumes:
       - /home/docker/tender-crawler_v2/app/config.yml:/app/config.yml