Procházet zdrojové kódy

Update 优化采集适配器

YueYunyun před 6 měsíci
rodič
revize
fe623efeda

+ 2 - 2
SourceCode/TenderCrawler/app/adapters/__init__.py

@@ -2,8 +2,8 @@ from adapters.data_collection_adapter_interface import IDataCollectionAdapter
 from stores.data_store_interface import IDataStore
 
 
-def collect(adapter: IDataCollectionAdapter, keyword: str, store: IDataStore = None):
-    adapter.collect(keyword, store)
+def collect(adapter: IDataCollectionAdapter, keywords: str, store: IDataStore = None):
+    adapter.collect(keywords, store)
 
 
 def teardown(adapter: IDataCollectionAdapter):

+ 20 - 21
SourceCode/TenderCrawler/app/adapters/ccgp_data_collection_adapter.py

@@ -3,7 +3,7 @@ from time import sleep
 from selenium.common.exceptions import TimeoutException, NoSuchElementException
 from selenium.webdriver.common.by import By
 from selenium.webdriver.support import expected_conditions as ec
-from selenium.webdriver.support.wait import WebDriverWait
+
 
 import utils
 from adapters.data_collection_adapter_interface import IDataCollectionAdapter
@@ -21,14 +21,12 @@ class CcgpDataCollectionAdapter(IDataCollectionAdapter):
         self._driver = None
         self._keyword = None
         self._adapter_type = "ccgp"
+        self._next_count = 0
 
     def login(self, username: str, password: str) -> None:
         pass
 
-    def collect(self, keyword: str, store: IDataStore):
-        if store:
-            self._store = store
-        self._keyword = keyword
+    def _collect(self, keyword: str):
         items = self._search(keyword)
         self._process_list(items)
         if utils.get_config_bool(self.batch_save_key):
@@ -38,8 +36,7 @@ class CcgpDataCollectionAdapter(IDataCollectionAdapter):
         try:
             if not keyword:
                 raise Exception("搜索关键字不能为空")
-            wait = WebDriverWait(self.driver, 30, 1)
-            wait.until(ec.presence_of_element_located((By.ID, "searchForm")))
+            self._wait_until(ec.presence_of_element_located((By.ID, "searchForm")))
             search_el = self.driver.find_element(By.ID, "kw")
             sleep(2)
             search_el.clear()
@@ -49,12 +46,13 @@ class CcgpDataCollectionAdapter(IDataCollectionAdapter):
             )
             sleep(1)
             search_btn.click()
-            wait.until(
+            self._next_count = 0
+            self._wait_until(
                 ec.presence_of_element_located((By.CLASS_NAME, "vT-srch-result"))
             )
             default_search_txt = "近1周"
             search_txt = utils.get_config_value(self.search_day_key, default_search_txt)
-            utils.get_logger().info(f"搜索关键字: {keyword},搜索条件: {search_txt}")
+            utils.get_logger().debug(f"搜索日期条件: {search_txt}")
             if search_txt != default_search_txt:
                 last_els = self.driver.find_elements(By.XPATH, "//ul[@id='datesel']/li")
                 for last_el in last_els:
@@ -62,7 +60,7 @@ class CcgpDataCollectionAdapter(IDataCollectionAdapter):
                         sleep(1)
                         last_el.click()
                         break
-                wait.until(
+                self._wait_until(
                     ec.presence_of_element_located((By.CLASS_NAME, "vT-srch-result"))
                 )
             else:
@@ -72,7 +70,7 @@ class CcgpDataCollectionAdapter(IDataCollectionAdapter):
                     By.XPATH, "//body/div[@class='vT_z']/div/div/p"
                 )
                 if len(p_els) > 0:
-                    utils.get_logger().info(f" {p_els[0].text}")
+                    utils.get_logger().debug(f" {p_els[0].text}")
                 else:
                     a_links = self.driver.find_elements(
                         By.XPATH, "//div[@class='vT-srch-result-list']/p/a"
@@ -80,7 +78,7 @@ class CcgpDataCollectionAdapter(IDataCollectionAdapter):
                     count = len(a_links)
                     if count > 1:
                         count = count - 1
-                    utils.get_logger().info(f"共查询到 {count} 页,每页 20 条")
+                    utils.get_logger().debug(f"共查询到 {count} 页,每页 20 条")
             except Exception as e:
                 utils.get_logger().error(f"搜索失败[尝试查询页数]: {e}")
             items = self.driver.find_elements(
@@ -103,17 +101,19 @@ class CcgpDataCollectionAdapter(IDataCollectionAdapter):
 
     def _next_page(self) -> list:
         try:
-            wait = WebDriverWait(self.driver, 10, 1)
             next_path = "//div[@class='vT-srch-result-list']/p/a[@class='next']"
             try:
                 btn = self.driver.find_element(By.XPATH, next_path)
             except NoSuchElementException:
-                utils.get_logger().info(f"翻页结束 [{self._adapter_type}]")
+                utils.get_logger().debug(f"翻页结束 [{self._adapter_type}]")
                 return []
             btn.click()
-            utils.get_logger().info(f"跳转到下页: {self.driver.current_url}")
-            sleep(5)
-            wait.until(
+            self._next_count += 1
+            utils.get_logger().debug(
+                f"下一页[{self._next_count+1}]: {self.driver.current_url}"
+            )
+            sleep(1)
+            self._wait_until(
                 ec.presence_of_element_located((By.CLASS_NAME, "vT-srch-result"))
             )
             items = self.driver.find_elements(
@@ -123,11 +123,10 @@ class CcgpDataCollectionAdapter(IDataCollectionAdapter):
         except NoSuchElementException as e:
             raise Exception(f"翻页失败 [{self._adapter_type}] [找不到元素]: {e}")
         except TimeoutException as e:
-            raise Exception(f"翻页结束 [{self._adapter_type}] [超时]: {e}")
+            raise Exception(f"翻页失败 [{self._adapter_type}] [超时]: {e}")
 
     def _process_item(self, item):
         main_handle = self.driver.current_window_handle
-        wait = WebDriverWait(self.driver, 10, 1)
         close = True
         try:
             url = item.get_attribute("href")
@@ -137,13 +136,13 @@ class CcgpDataCollectionAdapter(IDataCollectionAdapter):
             utils.get_logger().debug(f"跳转详情")
             sleep(1)
             item.click()
-            wait.until(ec.number_of_windows_to_be(2))
+            self._wait_until(ec.number_of_windows_to_be(2))
             handles = self.driver.window_handles
             for handle in handles:
                 if handle != main_handle:
                     self.driver.switch_to.window(handle)
                     break
-            wait.until(ec.presence_of_element_located((By.TAG_NAME, "body")))
+            self._wait_until(ec.presence_of_element_located((By.TAG_NAME, "body")))
 
             content = self.driver.find_element(
                 By.XPATH, "//div[@class='vF_deail_maincontent']"

+ 21 - 26
SourceCode/TenderCrawler/app/adapters/chinabidding_data_collection_adapter.py

@@ -3,7 +3,6 @@ from time import sleep
 from selenium.common.exceptions import TimeoutException, NoSuchElementException
 from selenium.webdriver.common.by import By
 from selenium.webdriver.support import expected_conditions as ec
-from selenium.webdriver.support.wait import WebDriverWait
 
 import utils
 from adapters.data_collection_adapter_interface import IDataCollectionAdapter
@@ -21,6 +20,7 @@ class ChinabiddingDataCollectionAdapter(IDataCollectionAdapter):
         self._driver = None
         self._keyword = None
         self._adapter_type = "chinabidding"
+        self._next_count = 0
 
     def login(self, username: str, password: str) -> None:
         try:
@@ -28,24 +28,20 @@ class ChinabiddingDataCollectionAdapter(IDataCollectionAdapter):
                 By.XPATH, "//div[@id='loginRight']/a[@class='login']"
             )
             login_el.click()
-            wait = WebDriverWait(self.driver, 10, 1)
-            wait.until(ec.presence_of_element_located((By.ID, "userpass")))
+            self._wait_until(ec.presence_of_element_located((By.ID, "userpass")))
             un_el = self.driver.find_element(By.ID, "username")
             un_el.send_keys(username)
             pass_el = self.driver.find_element(By.ID, "userpass")
             pass_el.send_keys(password)
             login_btn = self.driver.find_element(By.ID, "login-button")
             login_btn.click()
-            wait.until(ec.presence_of_element_located((By.ID, "site-content")))
+            self._wait_until(ec.presence_of_element_located((By.ID, "site-content")))
         except TimeoutException as e:
             raise Exception(f"登录失败 [{self._adapter_type}] [超时]: {e}")
         except NoSuchElementException as e:
             raise Exception(f"登录失败 [{self._adapter_type}] [找不到元素]: {e}")
 
-    def collect(self, keyword: str, store: IDataStore):
-        if store:
-            self._store = store
-        self._keyword = keyword
+    def _collect(self, keyword: str):
         items = self._search_by_type(keyword, 0)
         self._process_list(items, 0)
         sleep(2)
@@ -68,8 +64,7 @@ class ChinabiddingDataCollectionAdapter(IDataCollectionAdapter):
                     By.XPATH, "//div[@id='z-b-jg-gg']/h2/a[@class='more']"
                 )
             el.click()
-            wait = WebDriverWait(self.driver, 10, 1)
-            wait.until(ec.number_of_windows_to_be(2))
+            self._wait_until(ec.number_of_windows_to_be(2))
             self.driver.close()
             self.driver.switch_to.window(self.driver.window_handles[0])
             return self._search(keyword)
@@ -79,8 +74,7 @@ class ChinabiddingDataCollectionAdapter(IDataCollectionAdapter):
             raise Exception(f"搜索失败 [{self._adapter_type}] [找不到元素]: {e}")
 
     def _search(self, keyword: str) -> list:
-        wait = WebDriverWait(self.driver, 10, 1)
-        wait.until(ec.presence_of_element_located((By.ID, "searchBidProjForm")))
+        self._wait_until(ec.presence_of_element_located((By.ID, "searchBidProjForm")))
         search_el = self.driver.find_element(
             By.XPATH, "//form[@id='searchBidProjForm']/ul/li/input[@id='fullText']"
         )
@@ -90,15 +84,16 @@ class ChinabiddingDataCollectionAdapter(IDataCollectionAdapter):
             By.XPATH, "//form[@id='searchBidProjForm']/ul/li/button"
         )
         search_btn.click()
-        wait.until(ec.presence_of_element_located((By.ID, "site-content")))
+        self._next_count = 0
+        self._wait_until(ec.presence_of_element_located((By.ID, "site-content")))
         default_search_txt = "全部"
         search_txt = utils.get_config_value(self.search_day_key, default_search_txt)
-        utils.get_logger().info(f"搜索关键字: {keyword},搜索条件: {search_txt}")
+        utils.get_logger().debug(f"搜索日期条件: {search_txt}")
         if search_txt != default_search_txt:
             last_el = self.driver.find_element(By.LINK_TEXT, search_txt)
             sleep(1)
             last_el.click()
-            wait.until(ec.presence_of_element_located((By.ID, "site-content")))
+            self._wait_until(ec.presence_of_element_located((By.ID, "site-content")))
         else:
             sleep(1)
         try:
@@ -108,7 +103,7 @@ class ChinabiddingDataCollectionAdapter(IDataCollectionAdapter):
             count = len(a_links)
             if count > 1:
                 count = count - 1
-            utils.get_logger().info(f"共查询到 {count} 页,每页 10 条")
+            utils.get_logger().debug(f"共查询到 {count} 页,每页 10 条")
         except Exception as e:
             utils.get_logger().error(f"搜索失败[尝试查询页数]: {e}")
         items = self.driver.find_elements(By.XPATH, "//ul[@class='as-pager-body']/li/a")
@@ -125,17 +120,19 @@ class ChinabiddingDataCollectionAdapter(IDataCollectionAdapter):
 
     def _next_page(self) -> list:
         try:
-            wait = WebDriverWait(self.driver, 10, 1)
             try:
                 btn = self.driver.find_element(
                     By.XPATH, "//form[@id='pagerSubmitForm']/a[@class='next']"
                 )
             except NoSuchElementException:
-                utils.get_logger().info(f"翻页结束 [{self._adapter_type}]")
+                utils.get_logger().debug(f"翻页结束 [{self._adapter_type}]")
                 return []
             btn.click()
-            utils.get_logger().info(f"跳转到下页: {self.driver.current_url}")
-            wait.until(ec.presence_of_element_located((By.ID, "site-content")))
+            self._next_count += 1
+            utils.get_logger().debug(
+                f"下一页[{self._next_count+1}]: {self.driver.current_url}"
+            )
+            self._wait_until(ec.presence_of_element_located((By.ID, "site-content")))
             items = self.driver.find_elements(
                 By.XPATH, "//ul[@class='as-pager-body']/li/a"
             )
@@ -143,7 +140,7 @@ class ChinabiddingDataCollectionAdapter(IDataCollectionAdapter):
         except NoSuchElementException as e:
             raise Exception(f"翻页失败 [{self._adapter_type}] [找不到元素]: {e}")
         except TimeoutException as e:
-            raise Exception(f"翻页结束 [{self._adapter_type}] [超时]: {e}")
+            raise Exception(f"翻页失败 [{self._adapter_type}] [超时]: {e}")
 
     def _process_item(self, item, data_type):
         main_handle = self.driver.current_window_handle
@@ -154,17 +151,15 @@ class ChinabiddingDataCollectionAdapter(IDataCollectionAdapter):
                 close = False
                 return
             item.click()
-            wait = WebDriverWait(self.driver, 10, 1)
-            wait.until(ec.number_of_windows_to_be(2))
+            self._wait_until(ec.number_of_windows_to_be(2))
             handles = self.driver.window_handles
             for handle in handles:
                 if handle != main_handle:
                     self.driver.switch_to.window(handle)
                     break
             url = self.driver.current_url
-            # utils.get_logger().info(f"跳转详情")
-            print(".", end="")
-            wait.until(ec.presence_of_element_located((By.CLASS_NAME, "content")))
+            utils.get_logger().debug(f"跳转详情")
+            self._wait_until(ec.presence_of_element_located((By.CLASS_NAME, "content")))
             content = self.driver.find_element(By.CLASS_NAME, "content").text
             if self._check_content(content):
                 self._save_db(url, content, data_type)

+ 78 - 41
SourceCode/TenderCrawler/app/adapters/data_collection_adapter_interface.py

@@ -1,6 +1,9 @@
 from abc import ABC, abstractmethod
+from typing import Callable, Union, Literal
 
 from selenium import webdriver
+from selenium.common.exceptions import TimeoutException
+from selenium.webdriver.support.wait import WebDriverWait, D, T
 
 import drivers
 import utils
@@ -16,8 +19,12 @@ class IDataCollectionAdapter(ABC):
     _url = ""
     _store = None
     _driver = None
-    _keyword = None
     _adapter_type = ""
+    _cur_keyword = None
+    _keywords = None
+    _keyword_array = None
+    _error_count = 0
+    _max_error_count = utils.get_config_int("adapter.max_error_count", 5)
 
     @property
     def search_day_key(self) -> str:
@@ -36,20 +43,56 @@ class IDataCollectionAdapter(ABC):
         return self._url
 
     @property
-    def keyword(self):
-        return self._keyword
+    def cur_keyword(self):
+        return self._cur_keyword
+
+    @property
+    def keywords(self):
+        return self._keywords
+
+    @property
+    def keyword_array(self):
+        return self._keyword_array
 
     @property
     def driver(self) -> webdriver:
         if not self._driver:
-            self._driver = self._create_driver()
+            try:
+                self._driver = drivers.gen_driver(self.url)
+            except Exception as e:
+                raise Exception(f"创建驱动器失败: {e}")
         return self._driver
 
-    def _create_driver(self) -> webdriver:
-        try:
-            return drivers.gen_driver(self.url)
-        except Exception as e:
-            raise Exception(f"创建驱动器失败: {e}")
+    def collect(self, keywords: str, store: IDataStore) -> None:
+        """
+        处理搜索结果列表,返回处理后的数据列表
+
+        :param keywords: 搜索结果列表
+        :param store: 数据储存库
+        :type keywords: str
+        :return: 处理后的数据列表
+        :rtype: list
+        :raises Exception: 如果处理失败,应抛出异常
+        """
+        if store:
+            self._store = store
+        if not keywords:
+            raise Exception("未指定搜索关键字")
+        utils.get_logger().info(f"开始采集: {keywords}")
+        self._error_count = 0
+        self._keyword_array = keywords.split(",")
+        count = 0
+        for keyword in self._keyword_array:
+            if not keyword:
+                continue
+            try:
+                count += 1
+                self._cur_keyword = keyword
+                utils.get_logger().info(f"采集关键字[{count}]: {keyword}")
+                self._error_count = 0
+                self._collect(keyword)
+            except Exception as e:
+                raise Exception(f"采集数据失败: {e}")
 
     @abstractmethod
     def login(self, username: str, password: str) -> None:
@@ -68,42 +111,34 @@ class IDataCollectionAdapter(ABC):
         except Exception as e:
             raise Exception(f"登录失败: {e}")
 
-    @abstractmethod
-    def _search(self, keyword: str) -> list:
-        """
-        根据关键字搜索,返回搜索结果列表
+    def _wait(self, timeout=20, poll_frequency=1):
+        return WebDriverWait(self.driver, timeout, poll_frequency)
 
-        :param keyword: 搜索关键字
-        :type keyword: str
-        :return: 搜索结果列表
-        :rtype: list
-        :raises Exception: 如果搜索失败,应抛出异常
-        """
+    def _wait_until(
+        self,
+        method: Callable[[D], Union[Literal[False], T]],
+        timeout=20,
+        poll_frequency=1,
+    ):
         try:
-            results = []
-            # 实现搜索逻辑
-            return results if results else []
-        except Exception as e:
-            raise Exception(f"搜索失败: {e}")
+            self._wait(timeout, poll_frequency).until(method)
+        except TimeoutException as e:
+            self._error_count += 1
+            utils.get_logger().error(
+                f"采集数据 超时 [{self._error_count}/{self._max_error_count}]"
+            )
+            if self._error_count > self._max_error_count:
+                raise e
+            self._wait_until(method)
 
     @abstractmethod
-    def collect(self, keyword: str, store: IDataStore) -> None:
+    def _collect(self, keyword: str) -> None:
         """
-        处理搜索结果列表,返回处理后的数据列表
-
-        :param keyword: 搜索结果列表
-        :param store: 数据储存库
+        根据关键字采集
+        :param keyword: 搜索关键字
         :type keyword: str
-        :return: 处理后的数据列表
-        :rtype: list
-        :raises Exception: 如果处理失败,应抛出异常
         """
-        try:
-            if keyword:
-                # 实现处理逻辑
-                pass
-        except Exception as e:
-            raise Exception(f"处理失败: {e}")
+        pass
 
     def teardown(self) -> None:
         """
@@ -120,7 +155,7 @@ class IDataCollectionAdapter(ABC):
     def _check_is_collect_by_url(self, url: str) -> bool:
         old = self.store.query_one_collect_url(url)
         if old:
-            utils.get_logger().info(f"已采集过: {url}")
+            utils.get_logger().debug(f"已采集过: {url}")
             return True
         return False
 
@@ -144,13 +179,15 @@ class IDataCollectionAdapter(ABC):
 
     def _save_db(self, url, content, data_type=0, attach_str=None, is_invalid=False):
         if not self.store:
-            utils.get_logger().info(f"DataStore 未指定: {url},关键字{self.keyword}")
+            utils.get_logger().info(
+                f"DataStore 未指定: {url},关键字{self.cur_keyword}"
+            )
             return False
         else:
             status = 2 if is_invalid else 0
             data = CollectData(
                 url=url,
-                keyword=self.keyword,
+                keyword=self.cur_keyword,
                 content=content,
                 data_type=data_type,
                 attach_path=attach_str,

+ 2 - 0
SourceCode/TenderCrawler/app/config.yml

@@ -1,5 +1,6 @@
 #file: noinspection SpellCheckingInspection,SpellCheckingInspection,SpellCheckingInspection
 adapter:
+  max_error_count: 5
   chinabidding:
     #search_day: '今天'
     search_day: '近一周'
@@ -15,6 +16,7 @@ adapter:
 default_area: '全国'
 logger:
   file-path: './logs/'
+  level: 'debug'
 save:
   collect_data_key: '红外光谱仪,拉曼光谱仪'
   collect_batch_size: 100

+ 4 - 2
SourceCode/TenderCrawler/app/jobs/data_collector.py

@@ -43,8 +43,10 @@ class DataCollector:
     def set_store(self, store: IDataStore) -> None:
         self._store = store
 
-    def collect(self, keyword: str):
-        adapters.collect(self.adapter, keyword, self.store)
+    def collect(self, keywords: str):
+        if not self.store:
+            raise Exception("未设置存储器")
+        adapters.collect(self.adapter, keywords, self.store)
 
     def close(self):
         utils.get_logger().info(f"关闭浏览器驱动,URL: {self.adapter.url}")

+ 1 - 4
SourceCode/TenderCrawler/app/jobs/job_runner.py

@@ -113,10 +113,7 @@ class JobRunner:
                         url_setting.password,
                         self.store,
                     )
-                    keywords = url_setting.keywords
-                    keyword_array = keywords.split(",")
-                    for keyword in keyword_array:
-                        data_collector.collect(keyword)
+                    data_collector.collect(url_setting.keywords)
                     utils.get_logger().info(f"采集完成: {url_setting.url}")
                 except Exception as e:
                     self._send_error_email(

+ 16 - 4
SourceCode/TenderCrawler/app/utils/logger_helper.py

@@ -16,7 +16,6 @@ class LoggerHelper:
     _log_file_name = f"{config.get("logger.file_name", "crawler")}.log"
     _log_file_path = config.get("logger.file_path", "./logs")
     _log_level_string = config.get("logger.level", "INFO")
-    _log_level = logging.getLevelName(_log_level_string)
 
     def __new__(cls, *args, **kwargs):
         """
@@ -39,8 +38,9 @@ class LoggerHelper:
         """
         初始化日志记录器,包括设置日志级别、创建处理器和格式化器,并将它们组合起来
         """
+        log_level = self._get_log_level()
         self._logger = logging.getLogger("app_logger")
-        self._logger.setLevel(self._log_level)
+        self._logger.setLevel(log_level)
 
         if not os.path.exists(self._log_file_path):
             os.makedirs(self._log_file_path)
@@ -53,11 +53,11 @@ class LoggerHelper:
             backupCount=7,
             encoding="utf-8",
         )
-        file_handler.setLevel(logging.INFO)
+        file_handler.setLevel(log_level)
 
         # 创建控制台处理器
         console_handler = logging.StreamHandler()
-        console_handler.setLevel(logging.INFO)
+        console_handler.setLevel(logging.DEBUG)
 
         # 创建格式化器
         formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
@@ -70,6 +70,18 @@ class LoggerHelper:
         self._logger.addHandler(file_handler)
         self._logger.addHandler(console_handler)
 
+    def _get_log_level(self):
+        try:
+            # 尝试将字符串转换为 logging 模块中的日志级别常量
+            log_level = getattr(logging, self._log_level_string.upper())
+            if not isinstance(log_level, int):
+                raise ValueError
+            return log_level
+        except (AttributeError, ValueError):
+            raise ValueError(
+                f"配置logger出错: Unknown level: '{self._log_level_string}'"
+            )
+
     @classmethod
     def get_logger(cls):
         """

+ 2 - 1
SourceCode/TenderCrawler/docker-compose.yml

@@ -56,10 +56,11 @@ services:
       #      - APP_AI__KEY=
       #      - APP_AI__URL=http://192.168.0.109:7580/api/chat
       #      - APP_AI__MODEL=qwen2.5:7b
+      - APP_LOGGER__LEVEL=INFO
       - APP_JOB__COLLECT=20:00,12:00
       - APP_JOB__PROCESS=23:00,4:00,13:00
       - APP_JOB__SEND_EMAIL=08:20,14:00
-      - APP_JOB__RUN_NOW=0
+      - APP_JOB__RUN_NOW=1
       - APP_SELENIUM__REMOTE_DRIVER_URL=http://y_selenium:4444/wd/hub
     volumes:
       - /home/docker/tender-crawler_v2/app/config.yml:/app/config.yml