Răsfoiți Sursa

Update 优化采集逻辑

YueYunyun 2 săptămâni în urmă
părinte
comite
28e42242d5

+ 200 - 0
SourceCode/TenderCrawler/README.md

@@ -0,0 +1,200 @@
+# 招投标数据采集系统
+
+## 项目说明
+
+本系统用于自动采集和处理招投标信息,主要功能包括:
+
+- 数据采集:从多个招投标网站自动采集数据
+- 数据处理:使用 AI 处理和分析采集的数据
+- 数据分发:通过邮件发送处理后的数据
+- 数据清理:定期清理历史数据
+
+## 系统架构
+
+### 核心模块
+
+- 数据采集模块 (DataCollector)
+- 数据处理模块 (DataProcess)
+- 数据发送模块 (DataSend)
+- 数据清理模块 (DataClean)
+
+### 关键组件
+
+- 适配器 (Adapters): 负责对接不同的数据源
+- 存储层 (Stores): 负责数据持久化
+- 工具类 (Utils): 提供通用功能支持
+
+## 配置说明
+
+### 基础配置
+
+```yaml
+# 数据采集配置
+save:
+  collect_data_key: '红外光谱仪,拉曼光谱仪' # 采集数据关键词过滤
+  collect_batch_size: 100 # 采集数据批量保存大小
+  process_batch_size: 1 # AI处理数据批量大小
+  attach_file_path: './temp_files/attaches/' # 附件保存路径
+  report_file_path: './temp_files/report/' # 报表保存路径
+
+# 任务调度配置
+job:
+  event_id: 1 # 任务ID,改变此值会触发任务重新加载
+  sleep_interval: 10 # 任务检查间隔(秒)
+  collect: '06:00,22:00' # 每天数据采集时间点
+  process: '07:00,10:00' # 每天数据处理时间点
+  send_email: '08:20,14:00' # 每天邮件发送时间点
+  clean_data: '00:05' # 每天数据清理时间点
+
+  # 月度报告配置
+  send_current_month_report_day: 30 # 每月发送当月报告的日期
+  send_current_month_report_time: '08:20' # 发送当月报告的时间点
+  send_prev_month_report_day: 1 # 每月发送上月报告的日期
+  send_prev_month_report_time: '08:20' # 发送上月报告的时间点
+
+  run_now: false # 是否立即执行一次任务
+
+# 数据清理配置
+clean:
+  day: 30 # 默认清理天数(所有类型默认使用此值)
+  attach: 30 # 附件清理天数
+  log: 30 # 日志清理天数
+  collect_data: 30 # 采集数据清理天数
+  process_data: 30 # 招标数据清理天数
+  process_result_data: 60 # 中标数据清理天数(最小45天)
+  report: 90 # 报表清理天数(最小60天)
+
+# AI配置
+ai:
+  key: 'your-api-key' # AI API密钥
+  url: 'https://api-url' # AI API地址
+  model: 'model-name' # 使用的模型名称
+  # 系统提示词
+  system_prompt: '从给定信息中提取出关键信息,并以给定的类型返回json字符串,如果部分信息为空,则该字段返回为空' 
+  # 模板1 处理招标信息
+  prompt_template_1: '在以上内容中提取招标信息:
+    ```typescript
+    export interface Tender { //招标信息
+    no: string; // 招标项目编号
+    title: string; // 招标公告标题
+    province: string; // 招标单位省份
+    city: string; // 招标单位城市
+    date: string; // 项目开标的时间
+    address: string; // 项目开标的地点
+    release_date: string; // 招标信息的发布时间
+    summary: string; // 100字左右的招标条件,联系方式等内容摘要
+    devices: string; // 只涉及到光谱仪相关的设备,其他无关设备不需要,多个设备以逗号分割 ,例如 红外光谱仪,拉曼光谱仪等
+    }
+    ```' 
+  # 模板2 处理中标信息
+  prompt_template_2: '在以上内容中提取中标信息:
+    ```typescript
+    export interface Instrument { // 中标仪器信息
+    company: string; // 中标单位名称,参与竞标并中标的公司名称
+    name: string; // 仪器名称,例如:红外光谱仪
+    manufacturer: string; // 仪器厂商,例如:赛默飞、Bruker
+    model: string; // 仪器的型号/规格,例如:NIR25S
+    quantity: number; // 中标仪器的数量,台数,例如:2
+    unit_price: number; // 仪器的单价,单位转换为元,例如:178000.00
+    }
+    export interface BiddingAcceptance { //中标信息
+    no: string; // 项目编号
+    title: string; // 中标公告标题
+    date: string; // 中标公告时间
+    province: string; // 招标单位省份
+    city: string; // 招标单位城市
+    summary: string; // 公告摘要信息,100字左右
+    instruments: Instrument[]; // 中标设备的信息
+    }
+    ```'
+# 邮件配置
+email:
+  smtp_server: 'smtp.example.com' # SMTP服务器地址
+  smtp_port: 465 # SMTP端口
+  smtp_user: 'user@example.com' # SMTP用户名
+  smtp_password: 'password' # SMTP密码
+  from_email: 'from@example.com' # 发件人地址
+  error_email: 'error@example.com' # 错误通知邮箱
+
+# 数据库配置
+mysql:
+  host: 'localhost' # 数据库主机
+  port: 3306 # 数据库端口
+  db: 'database_name' # 数据库名
+  user: 'root' # 数据库用户名
+  password: 'password' # 数据库密码
+  charset: 'utf8mb4' # 字符集
+```
+
+### 数据源配置
+
+```yaml
+adapter:
+  max_retries: 3 # 最大重试次数
+  # 中国政府采购网配置
+  ccgp:
+    search_day: '近3日' # 搜索时间范围
+    model_name: 'ccgp_data_collection_adapter' # 适配器模块名
+    class_name: 'CCGPDataCollectionAdapter' # 适配器类名
+    batch_save: false # 是否批量保存数据
+
+  # 中国采购与招标网配置
+  chinabidding:
+    search_day: '近一周' # 搜索时间范围
+    model_name: 'chinabidding_data_collection_adapter' # 适配器模块名
+    class_name: 'ChinabiddingDataCollectionAdapter' # 适配器类名
+    batch_save: true # 是否批量保存数据
+
+# Selenium配置
+selenium:
+  remote_driver_url: 'http://127.0.0.1:3534/wd/hub' # WebDriver地址
+```
+
+### 日志配置
+
+```yaml
+logger:
+  file-path: './logs/' # 日志文件路径
+  level: 'debug' # 日志级别
+```
+
+## 部署说明
+
+### 环境要求
+
+- Python 3.8+
+- MySQL 5.7+
+- Selenium WebDriver
+
+### 安装步骤
+
+1. 安装依赖: `pip install -r requirements.txt`
+2. 配置数据库: 执行 `init.sql`
+3. 修改配置: 编辑 `config.yml`
+4. 启动应用: `python app/main.py`
+
+### 目录结构
+
+```
+app/
+├── adapters/           # 数据源适配器
+├── drivers/            # 浏览器驱动
+├── jobs/              # 任务处理模块
+├── models/            # 数据模型
+├── stores/            # 数据存储
+├── utils/             # 工具类
+├── config.yml         # 配置文件
+└── main.py           # 主程序
+```
+
+## 数据采集说明
+
+### 采集流程
+
+1. 系统按配置的时间点(`job.collect`)自动启动采集任务
+2. 根据配置的数据源(`adapter`)和关键词(`save.collect_data_key`)进行数据采集
+3. 采集到的数据经过关键词过滤后保存到数据库
+4. 相关附件下载到指定目录(`save.attach_file_path`)
+
+
+

+ 0 - 4
SourceCode/TenderCrawler/app/adapters/__init__.py

@@ -4,7 +4,3 @@ from stores.data_store_interface import IDataStore
 
 def collect(adapter: IDataCollectionAdapter, keywords: str, store: IDataStore = None):
     adapter.collect(keywords, store)
-
-
-def teardown(adapter: IDataCollectionAdapter):
-    adapter.teardown()

+ 194 - 135
SourceCode/TenderCrawler/app/adapters/ccgp_data_collection_adapter.py

@@ -1,172 +1,213 @@
 from time import sleep
+from typing import List, Optional
 
 from selenium.common.exceptions import TimeoutException, NoSuchElementException
 from selenium.webdriver.common.by import By
 from selenium.webdriver.support import expected_conditions as ec
 
-
 import utils
 from adapters.data_collection_adapter_interface import IDataCollectionAdapter
-from stores.data_store_interface import IDataStore
 
 
-class CcgpDataCollectionAdapter(IDataCollectionAdapter):
-    """
-    中国政府采购网数据采集适配器
-    """
+class CCGPDataCollectionAdapter(IDataCollectionAdapter):
+    """中国政府采购网数据采集适配器"""
 
-    def __init__(self, url: str, store: IDataStore = None):
-        self._url = url
-        self._store = store
-        self._driver = None
-        self._keyword = None
-        self._adapter_type = "ccgp"
-        self._next_count = 0
+    def __init__(self, url: str):
+        """初始化适配器
+
+        Args:
+            url: 目标网站URL
+        """
+        super().__init__(url, "ccgp", "近1周")
 
     def login(self, username: str, password: str) -> None:
+        """登录网站(CCGP无需登录)"""
         pass
 
-    def _collect(self, keyword: str):
-        items = self._search(keyword)
-        if len(items) <= 0:
-            return
-        self._process_list(items)
-        if utils.get_config_bool(self.batch_save_key):
-            self.store.save_collect_data(True)
+    def _collect(self, keyword: str) -> None:
+        """执行数据采集
 
-    def _search(self, keyword: str) -> list:
+        Args:
+            keyword: 单个搜索关键词
+        """
         try:
-            if not keyword:
-                raise Exception("搜索关键字不能为空")
-            self.driver.get(self._url)
-            if not self._wait_until(
-                ec.presence_of_element_located((By.ID, "searchForm"))
-            ):
-                return []
-            search_el = self.driver.find_element(By.ID, "kw")
-            sleep(2)
-            search_el.clear()
-            search_el.send_keys(keyword)
-            search_btn = self.driver.find_element(
-                By.XPATH, "//form[@id='searchForm']/input[@id='doSearch2']"
-            )
-            sleep(1)
-            search_btn.click()
-            self._next_count = 0
-            if not self._wait_until(
-                ec.presence_of_element_located((By.CLASS_NAME, "vT-srch-result"))
-            ):
-                return []
-            default_search_txt = "近1周"
-            search_txt = utils.get_config_value(self.search_day_key, default_search_txt)
-            utils.get_logger().debug(f"搜索日期条件: {search_txt}")
-            if search_txt != default_search_txt:
+            # 获取搜索时间范围
+
+            self.logger.info(f"开始采集关键词: {keyword}, 时间范围: {self._search_txt}")
+
+            # 搜索数据
+            items = self._search(keyword)
+            if not items:
+                return
+
+            # 处理数据列表
+            self._process_list(items)
+
+        except Exception as e:
+            self.logger.error(f"采集失败: {e}")
+            raise
+
+    def _search(self, keyword: str) -> List:
+        """搜索数据
+
+        Args:
+            keyword: 搜索关键词
+
+        Returns:
+            List: 搜索结果列表
+        """
+        # 打开搜索页面
+        self.driver.get(self.url)
+
+        # 等待搜索框
+        self._wait_for(
+            ec.presence_of_element_located((By.ID, "searchForm")),
+            message="搜索框加载超时",
+        )
+
+        # 输入关键词
+        search_el = self.driver.find_element(By.ID, "kw")
+        sleep(2)
+        search_el.clear()
+        search_el.send_keys(keyword)
+
+        # 点击搜索
+        search_btn = self.driver.find_element(
+            By.XPATH, "//form[@id='searchForm']/input[@id='doSearch2']"
+        )
+        sleep(1)
+        search_btn.click()
+
+        # 等待结果加载
+        self._next_count = 0
+        self._wait_for(
+            ec.presence_of_element_located((By.CLASS_NAME, "vT-srch-result")),
+            message="搜索结果加载超时",
+        )
+
+        # 设置时间范围
+        self._set_search_date()
+
+        # 获取结果列表
+        items = self.driver.find_elements(
+            By.XPATH, "//ul[@class='vT-srch-result-list-bid']/li/a"
+        )
+        return items
+
+    def _set_search_date(self) -> None:
+        """设置搜索时间范围"""
+        try:
+            if self._search_txt != self._default_search_txt:
                 last_els = self.driver.find_elements(By.XPATH, "//ul[@id='datesel']/li")
                 for last_el in last_els:
-                    if search_txt == last_el.text:
+                    if self._search_txt == last_el.text:
                         sleep(1)
                         last_el.click()
                         break
-                if not self._wait_until(
-                    ec.presence_of_element_located((By.CLASS_NAME, "vT-srch-result"))
-                ):
-                    return []
+
+                self._wait_for(
+                    ec.presence_of_element_located((By.CLASS_NAME, "vT-srch-result")),
+                    message="设置时间范围后页面加载超时",
+                )
             else:
                 sleep(1)
-            try:
-                p_els = self.driver.find_elements(
-                    By.XPATH, "//body/div[@class='vT_z']/div/div/p"
-                )
-                if len(p_els) > 0:
-                    utils.get_logger().debug(f" {p_els[0].text}")
-                else:
-                    a_links = self.driver.find_elements(
-                        By.XPATH, "//div[@class='vT-srch-result-list']/p/a"
-                    )
-                    count = len(a_links)
-                    if count > 1:
-                        count = count - 1
-                    utils.get_logger().debug(f"共查询到 {count} 页,每页 20 条")
-            except Exception as e:
-                utils.get_logger().error(f"搜索失败[尝试查询页数]: {e}")
-            items = self.driver.find_elements(
-                By.XPATH, "//ul[@class='vT-srch-result-list-bid']/li/a"
-            )
-            return items
-        except TimeoutException as e:
-            raise Exception(f"搜索失败 [{self._adapter_type}] [超时]: {e}")
-        except NoSuchElementException as e:
-            raise Exception(f"搜索失败 [{self._adapter_type}] [找不到元素]: {e}")
 
-    def _process_list(self, items: list) -> list:
+        except Exception as e:
+            self.logger.error(f"设置时间范围失败: {e}")
+
+    def _process_list(self, items: List) -> None:
+        """处理数据列表
+
+        Args:
+            items: 数据列表
+        """
         if not items:
-            return []
+            return
+
+        # 处理当前页
         for item in items:
             self._process_item(item)
         sleep(2)
+
+        # 处理下一页
         next_items = self._next_page()
-        if len(items) <= 0:
-            return []
-        return self._process_list(next_items)
+        if next_items:
+            self._process_list(next_items)
 
-    def _next_page(self) -> list:
+    def _next_page(self) -> Optional[List]:
+        """获取下一页数据"""
         try:
+            # 查找下一页按钮
             next_path = "//div[@class='vT-srch-result-list']/p/a[@class='next']"
             try:
                 btn = self.driver.find_element(By.XPATH, next_path)
             except NoSuchElementException:
-                utils.get_logger().debug(f"翻页结束 [{self._adapter_type}]")
-                return []
+                self.logger.debug("已到最后一页")
+                return None
+
+            # 点击下一页
             btn.click()
             self._next_count += 1
-            utils.get_logger().debug(
-                f"下一页[{self._next_count+1}]: {self.driver.current_url}"
-            )
+            self.logger.debug(f"下一页[{self._next_count+1}]")
             sleep(1)
-            if not self._wait_until(
-                ec.presence_of_element_located((By.CLASS_NAME, "vT-srch-result"))
-            ):
-                return []
+
+            # 等待页面加载
+            self._wait_for(
+                ec.presence_of_element_located((By.CLASS_NAME, "vT-srch-result")),
+                message="下一页加载超时",
+            )
+
+            # 获取数据列表
             items = self.driver.find_elements(
                 By.XPATH, "//ul[@class='vT-srch-result-list-bid']/li/a"
             )
             return items
+
         except NoSuchElementException as e:
-            raise Exception(f"翻页失败 [{self._adapter_type}] [找不到元素]: {e}")
-        except TimeoutException as e:
-            raise Exception(f"翻页失败 [{self._adapter_type}] [超时]: {e}")
+            raise Exception(f"页面元素未找到: {e}")
 
-    def _process_item(self, item):
+    def _process_item(self, item) -> None:
+        """处理单条数据"""
         main_handle = self.driver.current_window_handle
         close = True
+
         try:
+            # 检查URL是否已采集
             url = item.get_attribute("href")
             if self._check_is_collect_by_url(url):
                 close = False
                 return
-            utils.get_logger().debug(f"跳转详情")
+
+            # 打开详情页
+            self.logger.debug("打开详情页")
             sleep(1)
             item.click()
-            if not self._wait_until(ec.number_of_windows_to_be(2)):
-                return
+
+            # 切换窗口
+            self._wait_for(ec.number_of_windows_to_be(2), message="新窗口打开超时")
+
             handles = self.driver.window_handles
             for handle in handles:
                 if handle != main_handle:
                     self.driver.switch_to.window(handle)
                     break
-            if not self._wait_until(
-                ec.presence_of_element_located((By.TAG_NAME, "body"))
-            ):
-                return
 
+            # 等待页面加载
+            self._wait_for(
+                ec.presence_of_element_located((By.TAG_NAME, "body")),
+                message="详情页加载超时",
+            )
+
+            # 获取内容
             content = self.driver.find_element(
                 By.XPATH, "//div[@class='vF_deail_maincontent']"
             ).text
-            # 排除其他公告
+
+            # 判断公告类型
             if self._check_type("其他公告"):
                 self._save_db(url, content, 3, is_invalid=True)
                 return
-            # 判断是否为投标公告
+
             data_type = (
                 1
                 if self._check_type("中标公告")
@@ -174,20 +215,19 @@ class CcgpDataCollectionAdapter(IDataCollectionAdapter):
                 or self._check_type("终止公告")
                 else 0
             )
+
+            # 检查关键词并保存
             if self._check_content(content):
                 attach_str = self._attach_download()
                 self._save_db(url, content, data_type, attach_str)
             else:
                 self._save_db(url, content, data_type, is_invalid=True)
+
         except TimeoutException as e:
-            utils.get_logger().error(
-                f"采集发生异常 [{self._adapter_type}] Timeout: {self.driver.current_url}。Exception: {e}"
-            )
+            self.logger.error(f"处理数据超时: {e}")
         except NoSuchElementException as e:
-            utils.get_logger().error(
-                f"采集发生异常 [{self._adapter_type}] NoSuchElement: {self.driver.current_url}。Exception: {e}"
-            )
-            raise Exception(f"采集失败 [{self._adapter_type}] [找不到元素]: {e}")
+            self.logger.error(f"页面元素未找到: {e}")
+            raise
         finally:
             if close:
                 sleep(1)
@@ -195,56 +235,75 @@ class CcgpDataCollectionAdapter(IDataCollectionAdapter):
                 self.driver.switch_to.window(main_handle)
 
     def _check_type(self, type_str: str) -> bool:
+        """检查公告类型
+
+        Args:
+            type_str: 类型文本
+
+        Returns:
+            bool: 是否匹配
+        """
         links = self.driver.find_elements(By.LINK_TEXT, type_str)
-        if len(links) > 0:
-            utils.get_logger().info(f"{type_str}")
+        if links:
+            self.logger.info(f"公告类型: {type_str}")
             return True
         return False
 
-    def _attach_download(self):
+    def _attach_download(self) -> Optional[str]:
+        """下载附件
+
+        Returns:
+            str: 附件路径
+        """
         paths = []
 
+        # 查找附件链接
         attach_els = self.driver.find_elements(
             By.XPATH, "//td[@class='bid_attachtab_content']/a"
         )
         attach_2_els = self.driver.find_elements(By.XPATH, "//a[@ignore='1']")
-        # 合并两个列表
         all_attachments = attach_els + attach_2_els
-        utils.get_logger().debug(
-            f"附件检索数量: {len(attach_els)}/{len(attach_2_els)}/{len(all_attachments)}"
+
+        self.logger.debug(
+            f"附件数量: {len(attach_els)}/{len(attach_2_els)}/{len(all_attachments)}"
         )
+
+        # 下载附件
         attach_urls = []
-        if len(all_attachments) > 0:
-            for attach_el in all_attachments:
+        for attach_el in all_attachments:
+            try:
+                # 获取附件信息
                 attach_url = attach_el.get_attribute("href")
-                if attach_url not in attach_urls:
-                    attach_urls.append(attach_url)
-                else:
-                    utils.get_logger().info(f"重复附件: {attach_url}")
+                if attach_url in attach_urls:
+                    self.logger.info(f"重复附件: {attach_url}")
                     continue
+                attach_urls.append(attach_url)
+
+                # 获取文件名
                 file_name = (
                     attach_el.text
                     or attach_el.get_attribute("download")
                     or attach_url.split("/")[-1]
                 )
-                if not file_name:
-                    continue
-                # 检查 file_name 是否包含文件扩展名
-                if "." not in file_name:
-                    utils.get_logger().warning(
-                        f"文件名 {file_name} 不包含扩展名,跳过下载。"
-                    )
+                if not file_name or "." not in file_name:
+                    self.logger.warning(f"无效文件名: {file_name}")
                     continue
-                utils.get_logger().debug(
-                    f"开始下载附件: {file_name} 链接: {attach_url}"
-                )
+
+                # 下载文件
+                self.logger.debug(f"下载附件: {file_name}")
                 path = utils.download_remote_file(attach_url, file_name)
                 if path:
-                    utils.get_logger().debug(f"下载附件路径: {path}")
+                    self.logger.debug(f"下载成功: {path}")
                     paths.append(path)
                 else:
-                    utils.get_logger().warning(f"下载附件失败: {file_name}")
+                    self.logger.warning(f"下载失败: {file_name}")
+
+            except Exception as e:
+                self.logger.error(f"处理附件失败: {e}")
+                continue
+
+        # 返回附件路径
         attach_str = ",".join(paths)
         if attach_str:
-            utils.get_logger().info(f"附件下载完成: {attach_str}")
+            self.logger.info(f"附件下载完成: {attach_str}")
         return attach_str

+ 198 - 117
SourceCode/TenderCrawler/app/adapters/chinabidding_data_collection_adapter.py

@@ -1,206 +1,287 @@
 from time import sleep
+from typing import List, Optional
 
 from selenium.common.exceptions import TimeoutException, NoSuchElementException
 from selenium.webdriver.common.by import By
 from selenium.webdriver.support import expected_conditions as ec
-from selenium.webdriver.support.wait import WebDriverWait
 
 import utils
 from adapters.data_collection_adapter_interface import IDataCollectionAdapter
-from stores.data_store_interface import IDataStore
 
 
 class ChinabiddingDataCollectionAdapter(IDataCollectionAdapter):
-    """
-    中国招标网数据采集适配器
-    """
-
-    def __init__(self, url: str, store: IDataStore = None):
-        self._url = url
-        self._store = store
-        self._driver = None
-        self._keyword = None
-        self._adapter_type = "chinabidding"
-        self._next_count = 0
+    """中国采购与招标网数据采集适配器"""
+
+    def __init__(self, url: str):
+        """初始化适配器
+
+        Args:
+            url: 目标网站URL
+        """
+        super().__init__(url, "chinabidding", "全部")
 
     def login(self, username: str, password: str) -> None:
+        """登录网站
+
+        Args:
+            username: 用户名
+            password: 密码
+        """
         try:
+            # 点击登录按钮
             login_el = self.driver.find_element(
                 By.XPATH, "//div[@id='loginRight']/a[@class='login']"
             )
             login_el.click()
-            wait = WebDriverWait(self.driver, 10, 1)
-            wait.until(ec.presence_of_element_located((By.ID, "userpass")))
-            # if not self._wait_until(
-            #     ec.presence_of_element_located((By.ID, "userpass"))
-            # ):
-            #     raise TimeoutException(f"id='userpass' 元素没有找到")
+
+            # 等待登录框加载
+            self._wait_for(
+                ec.presence_of_element_located((By.ID, "userpass")),
+                timeout=10,
+                message="登录框加载超时",
+            )
+
+            # 输入用户名密码
             un_el = self.driver.find_element(By.ID, "username")
             un_el.send_keys(username)
             pass_el = self.driver.find_element(By.ID, "userpass")
             pass_el.send_keys(password)
+
+            # 点击登录
             login_btn = self.driver.find_element(By.ID, "login-button")
             login_btn.click()
-            wait.until(ec.presence_of_element_located((By.ID, "site-content")))
-            # if not self._wait_until(ec.presence_of_element_located((By.ID, "site-content"))):
-            #     raise TimeoutException(f"id='site-content' 元素没有找到")
+
+            # 等待登录成功
+            self._wait_for(
+                ec.presence_of_element_located((By.ID, "site-content")),
+                message="登录成功页面加载超时",
+            )
+            self.logger.info("登录成功")
+
         except TimeoutException as e:
-            raise Exception(f"登录失败 [{self._adapter_type}] [超时]: {e}")
+            raise Exception(f"登录超时: {e}")
         except NoSuchElementException as e:
-            raise Exception(f"登录失败 [{self._adapter_type}] [找不到元素]: {e}")
+            raise Exception(f"页面元素未找到: {e}")
 
-    def _collect(self, keyword: str):
-        items = self._search_by_type(keyword, 0)
-        self._process_list(items, 0)
-        sleep(2)
-        items = self._search_by_type(keyword, 1)
-        self._process_list(items, 1)
-        if utils.get_config_bool(self.batch_save_key):
-            self.store.save_collect_data(True)
+    def _collect(self, keyword: str) -> None:
+        """执行数据采集
 
-    def _search_by_type(self, keyword: str, data_type):
+        Args:
+            keyword: 单个搜索关键词
+        """
         try:
-            self.driver.get(self._url)
-            if data_type == 0:
-                utils.get_logger().info(f"开始采集 招标公告")
-                el = self.driver.find_element(
-                    By.XPATH, "//div[@id='z-b-g-g']/h2/a[@class='more']"
-                )
-            else:
-                utils.get_logger().info(f"开始采集 中标结果公告")
-                el = self.driver.find_element(
-                    By.XPATH, "//div[@id='z-b-jg-gg']/h2/a[@class='more']"
-                )
-            el.click()
-            if not self._wait_until(ec.number_of_windows_to_be(2)):
-                return []
-            self.driver.close()
-            self.driver.switch_to.window(self.driver.window_handles[0])
-            return self._search(keyword)
-        except TimeoutException as e:
-            raise Exception(f"搜索失败 [{self._adapter_type}] [超时]: {e}")
-        except NoSuchElementException as e:
-            raise Exception(f"搜索失败 [{self._adapter_type}] [找不到元素]: {e}")
 
-    def _search(self, keyword: str) -> list:
-        if not self._wait_until(
-            ec.presence_of_element_located((By.ID, "searchBidProjForm"))
-        ):
-            return []
+            self.logger.info(f"开始采集关键词: {keyword}, 时间范围: {self._search_txt}")
+
+            # 采集招标公告
+            self.logger.info("开始采集招标公告")
+            items = self._search_by_type(keyword, 0)
+            self._process_list(items, 0)
+            sleep(2)
+
+            # 采集中标公告
+            self.logger.info("开始采集中标公告")
+            items = self._search_by_type(keyword, 1)
+            self._process_list(items, 1)
+
+        except Exception as e:
+            self.logger.error(f"采集失败: {e}")
+            raise
+
+    def _search_by_type(self, keyword: str, data_type: int) -> List:
+        """根据类型搜索数据
+
+        Args:
+            keyword: 搜索关键词
+            data_type: 数据类型(0:招标,1:中标)
+
+        Returns:
+            List: 搜索结果列表
+        """
+        # 打开首页
+        self.driver.get(self.url)
+
+        # 选择公告类型
+        if data_type == 0:
+            el = self.driver.find_element(
+                By.XPATH, "//div[@id='z-b-g-g']/h2/a[@class='more']"
+            )
+        else:
+            el = self.driver.find_element(
+                By.XPATH, "//div[@id='z-b-jg-gg']/h2/a[@class='more']"
+            )
+        el.click()
+
+        # 切换窗口
+        self._wait_for(ec.number_of_windows_to_be(2), message="新窗口打开超时")
+
+        self.driver.close()
+        self.driver.switch_to.window(self.driver.window_handles[0])
+
+        # 执行搜索
+        return self._search(keyword)
+
+    def _search(self, keyword: str) -> List:
+        """执行搜索"""
+        # 等待搜索框加载
+        self._wait_for(
+            ec.presence_of_element_located((By.ID, "searchBidProjForm")),
+            message="搜索框加载超时",
+        )
+
+        # 输入关键词
         search_el = self.driver.find_element(
             By.XPATH, "//form[@id='searchBidProjForm']/ul/li/input[@id='fullText']"
         )
         search_el.clear()
         search_el.send_keys(keyword)
+
+        # 点击搜索
         search_btn = self.driver.find_element(
             By.XPATH, "//form[@id='searchBidProjForm']/ul/li/button"
         )
         search_btn.click()
+
+        # 等待结果加载
         self._next_count = 0
-        if not self._wait_until(
-            ec.presence_of_element_located((By.ID, "site-content"))
-        ):
-            return []
-        default_search_txt = "全部"
-        search_txt = utils.get_config_value(self.search_day_key, default_search_txt)
-        utils.get_logger().debug(f"搜索日期条件: {search_txt}")
-        if search_txt != default_search_txt:
-            last_el = self.driver.find_element(By.LINK_TEXT, search_txt)
-            sleep(1)
-            last_el.click()
-            if not self._wait_until(
-                ec.presence_of_element_located((By.ID, "site-content"))
-            ):
-                return []
-        else:
-            sleep(1)
-        try:
-            a_links = self.driver.find_elements(
-                By.XPATH, "//form[@id='pagerSubmitForm']/a"
-            )
-            count = len(a_links)
-            if count > 1:
-                count = count - 1
-            utils.get_logger().debug(f"共查询到 {count} 页,每页 10 条")
-        except Exception as e:
-            utils.get_logger().error(f"搜索失败[尝试查询页数]: {e}")
+        self._wait_for(
+            ec.presence_of_element_located((By.ID, "site-content")),
+            message="搜索结果加载超时",
+        )
+
+        # 设置时间范围
+        self._set_search_date()
+
+        # 获取结果列表
         items = self.driver.find_elements(By.XPATH, "//ul[@class='as-pager-body']/li/a")
         return items
 
-    def _process_list(self, items: list, data_type) -> list:
+    def _set_search_date(self) -> None:
+        """设置搜索时间范围"""
+        try:
+            if self._search_txt != self._default_search_txt:
+                last_el = self.driver.find_element(By.LINK_TEXT, self._search_txt)
+                sleep(1)
+                last_el.click()
+
+                self._wait_for(
+                    ec.presence_of_element_located((By.ID, "site-content")),
+                    message="设置时间范围后页面加载超时",
+                )
+            else:
+                sleep(1)
+
+        except Exception as e:
+            self.logger.error(f"设置时间范围失败: {e}")
+
+    def _process_list(self, items: List, data_type: int) -> None:
+        """处理数据列表
+
+        Args:
+            items: 数据列表
+            data_type: 数据类型(0:招标,1:中标)
+        """
         if not items:
-            return []
+            return
+
+        # 处理当前页
         for item in items:
             self._process_item(item, data_type)
         sleep(2)
+
+        # 处理下一页
         next_items = self._next_page()
-        return self._process_list(next_items, data_type)
+        if next_items:
+            self._process_list(next_items, data_type)
+
+    def _next_page(self) -> Optional[List]:
+        """获取下一页数据
 
-    def _next_page(self) -> list:
+        Returns:
+            List: 下一页数据列表
+        """
         try:
+            # 查找下一页按钮
             try:
                 btn = self.driver.find_element(
                     By.XPATH, "//form[@id='pagerSubmitForm']/a[@class='next']"
                 )
             except NoSuchElementException:
-                utils.get_logger().debug(f"翻页结束 [{self._adapter_type}]")
-                return []
+                self.logger.debug("已到最后一页")
+                return None
+
+            # 点击下一页
             btn.click()
             self._next_count += 1
-            utils.get_logger().debug(
-                f"下一页[{self._next_count+1}]: {self.driver.current_url}"
+            self.logger.debug(f"下一页[{self._next_count+1}]")
+
+            # 等待页面加载
+            self._wait_for(
+                ec.presence_of_element_located((By.ID, "site-content")),
+                message="下一页加载超时",
             )
-            if not self._wait_until(
-                ec.presence_of_element_located((By.ID, "site-content"))
-            ):
-                return []
+
+            # 获取数据列表
             items = self.driver.find_elements(
                 By.XPATH, "//ul[@class='as-pager-body']/li/a"
             )
             return items
+
         except NoSuchElementException as e:
-            raise Exception(f"翻页失败 [{self._adapter_type}] [找不到元素]: {e}")
-        except TimeoutException as e:
-            raise Exception(f"翻页失败 [{self._adapter_type}] [超时]: {e}")
+            raise Exception(f"页面元素未找到: {e}")
 
-    def _process_item(self, item, data_type):
+    def _process_item(self, item, data_type: int) -> None:
+        """处理单条数据
+
+        Args:
+            item: 数据项
+            data_type: 数据类型(0:招标,1:中标)
+        """
         main_handle = self.driver.current_window_handle
         close = True
+
         try:
+            # 检查URL是否已采集
             url = item.get_attribute("href")
             if self._check_is_collect_by_url(url):
                 close = False
                 return
+
+            # 打开详情页
             item.click()
-            if not self._wait_until(ec.number_of_windows_to_be(2)):
-                return
+            self._wait_for(ec.number_of_windows_to_be(2), message="新窗口打开超时")
+
+            # 切换窗口
             handles = self.driver.window_handles
             for handle in handles:
                 if handle != main_handle:
                     self.driver.switch_to.window(handle)
                     break
+
+            # 获取URL
             url = self.driver.current_url
-            utils.get_logger().debug(f"跳转详情")
-            if not self._wait_until(
-                ec.presence_of_element_located((By.CLASS_NAME, "content"))
-            ):
-                return
+            self.logger.debug(f"打开详情页: {url}")
+
+            # 等待内容加载
+            self._wait_for(
+                ec.presence_of_element_located((By.CLASS_NAME, "content")),
+                message="详情页加载超时",
+            )
+
+            # 获取内容
             content = self.driver.find_element(By.CLASS_NAME, "content").text
+
+            # 检查关键词并保存
             if self._check_content(content):
                 self._save_db(url, content, data_type)
             else:
                 self._save_db(url, content, data_type, is_invalid=True)
 
         except TimeoutException as e:
-            utils.get_logger().error(
-                f"采集发生异常 [{self._adapter_type}] Timeout: {self.driver.current_url}。Exception: {e}"
-            )
-            # raise Exception(f"采集失败 [超时]: {e}")
+            self.logger.error(f"处理数据超时: {e}")
         except NoSuchElementException as e:
-            utils.get_logger().error(
-                f"采集发生异常 [{self._adapter_type}] NoSuchElement: {self.driver.current_url}。Exception: {e}"
-            )
-            raise Exception(f"采集失败 [{self._adapter_type}] [找不到元素]: {e}")
+            self.logger.error(f"页面元素未找到: {e}")
+            raise
         finally:
             if close:
                 sleep(2)

+ 208 - 167
SourceCode/TenderCrawler/app/adapters/data_collection_adapter_interface.py

@@ -1,9 +1,10 @@
 from abc import ABC, abstractmethod
-from typing import Callable, Union, Literal
+from typing import Optional, List, Any
+from datetime import datetime
 
-from selenium import webdriver
+from selenium.webdriver.remote.webdriver import WebDriver
+from selenium.webdriver.support.wait import WebDriverWait
 from selenium.common.exceptions import TimeoutException
-from selenium.webdriver.support.wait import WebDriverWait, D, T
 
 import drivers
 import utils
@@ -12,221 +13,261 @@ from stores.data_store_interface import IDataStore
 
 
 class IDataCollectionAdapter(ABC):
-    """
-    数据收集适配器抽象类
-    """
-
-    _url = ""
-    _store = None
-    _driver = None
-    _adapter_type = ""
-    _cur_keyword = None
-    _keywords = None
-    _keyword_array = None
-    _error_count = 0
-    _max_error_count = utils.get_config_int("adapter.max_error_count", 3)
-
-    _err_keywords = {}
+    """数据采集适配器基类"""
+
+    def __init__(self, url: str, adapter_type: str, default_search_txt: str = ""):
+        """初始化适配器
+
+        Args:
+            url: 目标网站URL
+        """
+        self._url = url
+        self._adapter_type = adapter_type
+        self._default_search_txt = default_search_txt
+        self._search_txt = utils.get_config_value(
+            self.search_day_key, default_search_txt
+        )
+        self._next_count = 0
+
+        self._store: Optional[IDataStore] = None
+        self._driver: Optional[WebDriver] = None
+        self._adapter_type = ""
+        self._keyword = None
+        self._keywords: List[str] = []
+        self._timeout = utils.get_config_int("selenium.page_load_timeout", 30)
+        self._max_retries = utils.get_config_int("adapter.max_retries", 3)
+        self._retry_keywords: dict = {}  # 记录重试关键词及次数
+
+        try:
+            # 初始化WebDriver
+            self._driver = drivers.gen_driver(url)
+            self.logger.info(f"初始化WebDriver成功: {url}")
+        except Exception as e:
+            self.logger.error(f"初始化WebDriver失败: {e}")
+            raise
 
     @property
-    def search_day_key(self) -> str:
-        return f"adapter.{self._adapter_type}.search_day"
+    def logger(self):
+        return utils.get_logger()
 
     @property
-    def batch_save_key(self) -> str:
-        return f"adapter.{self._adapter_type}.batch_save"
+    def driver(self) -> WebDriver:
+        return self._driver
 
     @property
     def store(self) -> IDataStore:
         return self._store
 
     @property
-    def url(self):
+    def url(self) -> str:
         return self._url
 
     @property
-    def cur_keyword(self):
-        return self._cur_keyword
+    def keyword(self) -> str:
+        return self._keyword
 
     @property
-    def keywords(self):
-        return self._keywords
+    def adapter_type(self) -> str:
+        return self._adapter_type
 
     @property
-    def keyword_array(self):
-        return self._keyword_array
+    def batch_save_key(self) -> str:
+        return f"adapter.{self._adapter_type}.batch_save"
 
     @property
-    def driver(self) -> webdriver:
-        if not self._driver:
-            try:
-                self._driver = drivers.gen_driver(self.url)
-            except Exception as e:
-                raise Exception(f"创建驱动器失败: {e}")
-        return self._driver
+    def search_day_key(self) -> str:
+        return f"adapter.{self._adapter_type}.search_day"
 
-    def collect(self, keywords: str, store: IDataStore) -> None:
-        """
-        处理搜索结果列表,返回处理后的数据列表
-
-        :param keywords: 搜索结果列表
-        :param store: 数据储存库
-        :type keywords: str
-        :return: 处理后的数据列表
-        :rtype: list
-        :raises Exception: 如果处理失败,应抛出异常
+    def collect(self, keyword: str, store: Optional[IDataStore] = None) -> None:
+        """执行数据采集
+
+        Args:
+            keyword: 搜索关键词,多个关键词以逗号分隔
+            store: 数据存储器
         """
-        if store:
+        try:
             self._store = store
-        if not keywords:
-            raise Exception("未指定搜索关键字")
-        utils.get_logger().info(f"开始采集: {keywords}")
-        self._error_count = 0
-        self._keyword_array = keywords.split(",")
-        count = 0
-        for keyword in self._keyword_array:
-            if not keyword:
-                continue
-            try:
-                count += 1
-                self._cur_keyword = keyword
-                utils.get_logger().info(f"采集关键字[{count}]: {keyword}")
-                self._error_count = 0
-                self._collect(keyword)
-                if self.cur_keyword in self._err_keywords:
-                    del self._err_keywords[self.cur_keyword]  # 删除键
-            except Exception as e:
-                utils.get_logger().error(f"==> {e}")
-            # except Exception as e:
-            #     raise Exception(f"采集数据失败: {e}")
-        self._collect_error_keywords()
+            self._keyword = keyword
+            self._keywords = utils.to_array(keyword)
+            self._retry_keywords.clear()
 
-    @abstractmethod
-    def login(self, username: str, password: str) -> None:
-        """
-        如果需要登录,则登录后跳转到搜索页面(不自动跳转的需要手动执行)
+            # 首次采集所有关键词
+            for kw in self._keywords:
+                try:
+                    self.logger.debug(f"开始采集关键词: {kw}")
+                    self._collect(kw)
+                except TimeoutException as e:
+                    self.logger.warning(f"采集关键词 {kw} 超时: {e}")
+                    self._retry_keywords[kw] = 1
+                    continue
+                except Exception as e:
+                    self.logger.error(f"采集关键词 {kw} 失败: {e}")
+                    continue
 
-        :param username: 用户名
-        :type username: str
-        :param password: 密码
-        :type password: str
-        :raises Exception: 如果登录失败,应抛出异常
-        """
-        try:
-            # 实现登录逻辑
-            pass
-        except Exception as e:
-            raise Exception(f"登录失败: {e}")
+            # 重试超时的关键词
+            while self._retry_keywords:
+                retry_kws = list(self._retry_keywords.keys())
+                for kw in retry_kws:
+                    retry_count = self._retry_keywords[kw]
+                    if retry_count >= self._max_retries:
+                        self.logger.error(f"关键词 {kw} 超过最大重试次数")
+                        del self._retry_keywords[kw]
+                        continue
+
+                    try:
+                        self.logger.info(f"重试采集关键词[{retry_count}]: {kw}")
+                        self._collect(kw)
+                        del self._retry_keywords[kw]
+                    except TimeoutException as e:
+                        self.logger.warning(f"重试采集关键词 {kw} 超时: {e}")
+                        self._retry_keywords[kw] = retry_count + 1
+                        continue
+                    except Exception as e:
+                        self.logger.error(f"重试采集关键词 {kw} 失败: {e}")
+                        del self._retry_keywords[kw]
+                        continue
 
-    def _wait(self, timeout=20, poll_frequency=1):
-        return WebDriverWait(self.driver, timeout, poll_frequency)
+            # 批量保存
+            if utils.get_config_bool(self.batch_save_key):
+                self.store.save_collect_data(True)
 
-    def _wait_until(
+        except Exception as e:
+            self.logger.error(f"采集失败: {e}")
+            raise
+        finally:
+            self.cleanup()
+
+    def _wait_for(
         self,
-        method: Callable[[D], Union[Literal[False], T]],
-        timeout=20,
-        poll_frequency=1,
-    ) -> bool:
-        try:
-            self._wait(timeout, poll_frequency).until(method)
-            return True
-        except TimeoutException:
-            err_count = (
-                self._err_keywords[self.cur_keyword]
-                if self.cur_keyword in self._err_keywords
-                else 0
-            )
-            err_count += 1
-            utils.get_logger().error(
-                f"采集数据 超时 [{self.cur_keyword}][{err_count}/{self._max_error_count}]"
-            )
-            self._err_keywords[self.cur_keyword] = err_count
-            if err_count > self._max_error_count:
-                del self._err_keywords[self.cur_keyword]  # 删除键
-            return False
-            # raise TimeoutException(
-            #     f"采集数据 超时 {self.cur_keyword} [{err_count}/{self._max_error_count}]"
-            # )
-
-    def _collect_error_keywords(self):
-        if not self._err_keywords:
-            return
-        for keyword, err_count in self._err_keywords.items():
-            try:
-                utils.get_logger().info(
-                    f"重新采集错误关键字[{err_count}/{self._max_error_count}]: {keyword}"
-                )
-                self._cur_keyword = keyword
-                self._collect(keyword)
-                if self.cur_keyword in self._err_keywords:
-                    del self._err_keywords[self.cur_keyword]  # 删除键
-            except Exception as e:
-                utils.get_logger().error(f"失败: {e}")
-        self._collect_error_keywords()
+        condition: Any,
+        timeout: Optional[int] = None,
+        message: Optional[str] = None,
+    ) -> Any:
+        """等待条件满足
 
-    @abstractmethod
-    def _collect(self, keyword: str) -> None:
-        """
-        根据关键字采集
-        :param keyword: 搜索关键字
-        :type keyword: str
-        """
-        pass
+        Args:
+            condition: 等待条件
+            timeout: 超时时间(秒),默认使用全局超时时间
+            message: 超时错误消息
+
+        Returns:
+            Any: 条件满足时的返回值
 
-    def teardown(self) -> None:
+        Raises:
+            TimeoutException: 等待超时
         """
-        关闭浏览器驱动器
+        if not timeout:
+            timeout = self._timeout
+
+        wait = WebDriverWait(self.driver, timeout)
+        return wait.until(condition, message)
+
+    def _check_is_collect_by_url(self, url: str) -> bool:
+        """检查URL是否已采集
 
-        :raises Exception: 如果关闭驱动器失败,应抛出异常
+        Args:
+            url: 目标URL
+
+        Returns:
+            bool: 是否已采集
         """
+        if not self.store:
+            return False
         try:
-            if self.driver:
-                self.driver.quit()
+            old = self.store.query_one_collect_url(url)
+            if old:
+                self.logger.debug(f"URL已采集: {url}")
+                return True
+            return False
         except Exception as e:
-            raise Exception(f"关闭驱动器失败: {e}")
+            self.logger.error(f"检查URL采集状态失败: {e}")
+            return False
 
-    def _check_is_collect_by_url(self, url: str) -> bool:
-        old = self.store.query_one_collect_url(url)
-        if old:
-            utils.get_logger().debug(f"已采集过: {url}")
-            return True
-        return False
+    def _check_content(self, content: str) -> bool:
+        """检查内容是否包含关键词
 
-    def _check_content(self, content) -> bool:
-        collect_data_key = utils.get_config_value("save.collect_data_key")
-        if not collect_data_key:
-            utils.get_logger().info("未配置 save.collect_data_key,跳过内容检查")
+        Args:
+            content: 内容文本
+
+        Returns:
+            bool: 是否包含关键词
+        """
+        if not content:
+            return False
+
+        # 使用当前正在处理的关键词进行匹配
+        if self._keyword in content:
+            self.logger.info(f"内容包含关键词: {self._keyword}")
             return True
-        # utils.get_logger().info(f"检查数据有效性: {collect_data_key}")
-        collect_data_key = collect_data_key.replace(",", ",")
-        keys = collect_data_key.split(",")
-        keys = [key.strip() for key in keys]
-        for key in keys:
-            key = key.strip()
-            # utils.get_logger().info(f"检查数据有效性: {key}")
-            if key in content:
-                utils.get_logger().info(f"有效数据: {self.driver.current_url}")
-                return True
 
         return False
 
-    def _save_db(self, url, content, data_type=0, attach_str=None, is_invalid=False):
+    def _save_db(
+        self,
+        url: str,
+        content: str,
+        data_type: int = 0,
+        attach_str: str = None,
+        is_invalid: bool = False,
+    ) -> bool:
+        """保存数据到数据库
+
+        Args:
+            url: 数据URL
+            content: 数据内容
+            data_type: 数据类型(0:招标,1:中标)
+            attach_str: 附件路径
+            is_invalid: 是否无效数据
+
+        Returns:
+            bool: 是否保存成功
+        """
         if not self.store:
-            utils.get_logger().info(
-                f"DataStore 未指定: {url},关键字{self.cur_keyword}"
-            )
+            self.logger.info(f"未设置存储器: {url}")
             return False
-        else:
+
+        try:
             status = 2 if is_invalid else 0
             data = CollectData(
                 url=url,
-                keyword=self.cur_keyword,
+                keyword=self.keyword,
                 content=content,
                 data_type=data_type,
                 attach_path=attach_str,
                 status=status,
+                create_time=datetime.now(),
             )
             self.store.insert_collect_data(
                 data, utils.get_config_bool(self.batch_save_key)
             )
             return True
+        except Exception as e:
+            self.logger.error(f"保存数据失败: {e}")
+            return False
+
+    def cleanup(self):
+        """清理资源"""
+        try:
+            if self.driver:
+                self.driver.quit()
+        except Exception as e:
+            self.logger.error(f"清理资源失败: {e}")
+
+    @abstractmethod
+    def login(self, username: str, password: str) -> None:
+        """登录网站
+
+        Args:
+            username: 用户名
+            password: 密码
+        """
+        pass
+
+    @abstractmethod
+    def _collect(self, keyword: str) -> None:
+        """采集数据
+
+        Args:
+            keyword: 搜索关键词
+        """
+        pass

+ 50 - 45
SourceCode/TenderCrawler/app/config.yml

@@ -1,18 +1,20 @@
 #file: noinspection SpellCheckingInspection,SpellCheckingInspection,SpellCheckingInspection
 adapter:
-  max_error_count: 5
+  max_retries: 3
   chinabidding:
     #search_day: '今天'
     search_day: '近一周'
     model_name: 'chinabidding_data_collection_adapter'
     class_name: 'ChinabiddingDataCollectionAdapter'
     batch_save: True
+    timeout: 30
   ccgp:
     #search_day: '今日'
     search_day: '近3日'
     model_name: 'ccgp_data_collection_adapter'
-    class_name: 'CcgpDataCollectionAdapter'
+    class_name: 'CCGPDataCollectionAdapter'
     batch_save: False
+    timeout: 30
 default_area: '全国'
 logger:
   file-path: './logs/'
@@ -31,53 +33,53 @@ mysql:
   password: Iwb-2024
   charset: utf8mb4
 ai:
-#  url: http://192.168.0.109:7580/api/chat
-#  model: qwen2.5:7b
+  #  url: http://192.168.0.109:7580/api/chat
+  #  model: qwen2.5:7b
   key: sk-febca8fea4a247f096cedeea9f185520
   url: https://dashscope.aliyuncs.com/compatible-mode/v1
   model: qwen-plus
   max_tokens: 1024
-  system_prompt: "从给定信息中提取出关键信息,并以给定的类型返回json字符串,如果部分信息为空,则该字段返回为空"
-  prompt_template_1: "在以上内容中提取招标信息:
-            ```typescript
-            export interface Tender { //招标信息
-                no: string; // 招标项目编号
-                title: string; // 招标公告标题
-                province: string; // 招标单位省份
-                city: string; // 招标单位城市
-                date: string; // 项目开标的时间
-                address: string; // 项目开标的地点
-                release_date: string; // 招标信息的发布时间
-                summary: string; // 100字左右的招标条件,联系方式等内容摘要
-                devices: string; // 只涉及到光谱仪相关的设备,其他无关设备不需要,多个设备以逗号分割 ,例如 红外光谱仪,拉曼光谱仪等
-            }
-            ```"
-  prompt_template_2: "在以上内容中提取中标信息:
-            ```typescript
-            export interface Instrument { // 中标仪器信息
-              company: string; // 中标单位名称,参与竞标并中标的公司名称
-              name: string; // 仪器名称,例如:红外光谱仪
-              manufacturer: string; // 仪器厂商,例如:赛默飞、Bruker
-              model: string; // 仪器的型号/规格,例如:NIR25S
-              quantity: number; // 中标仪器的数量,台数,例如:2
-              unit_price: number; // 仪器的单价,单位转换为元,例如:178000.00
-            }
-            export interface BiddingAcceptance { //中标信息
-              no: string; // 项目编号
-              title: string; // 中标公告标题
-              date: string; // 中标公告时间
-              province: string; // 招标单位省份
-              city: string; // 招标单位城市
-              summary: string; // 公告摘要信息,100字左右
-              instruments: Instrument[]; // 中标设备的信息
-            }
-            ```"
+  system_prompt: '从给定信息中提取出关键信息,并以给定的类型返回json字符串,如果部分信息为空,则该字段返回为空'
+  prompt_template_1: '在以上内容中提取招标信息:
+    ```typescript
+    export interface Tender { //招标信息
+    no: string; // 招标项目编号
+    title: string; // 招标公告标题
+    province: string; // 招标单位省份
+    city: string; // 招标单位城市
+    date: string; // 项目开标的时间
+    address: string; // 项目开标的地点
+    release_date: string; // 招标信息的发布时间
+    summary: string; // 100字左右的招标条件,联系方式等内容摘要
+    devices: string; // 只涉及到光谱仪相关的设备,其他无关设备不需要,多个设备以逗号分割 ,例如 红外光谱仪,拉曼光谱仪等
+    }
+    ```'
+  prompt_template_2: '在以上内容中提取中标信息:
+    ```typescript
+    export interface Instrument { // 中标仪器信息
+    company: string; // 中标单位名称,参与竞标并中标的公司名称
+    name: string; // 仪器名称,例如:红外光谱仪
+    manufacturer: string; // 仪器厂商,例如:赛默飞、Bruker
+    model: string; // 仪器的型号/规格,例如:NIR25S
+    quantity: number; // 中标仪器的数量,台数,例如:2
+    unit_price: number; // 仪器的单价,单位转换为元,例如:178000.00
+    }
+    export interface BiddingAcceptance { //中标信息
+    no: string; // 项目编号
+    title: string; // 中标公告标题
+    date: string; // 中标公告时间
+    province: string; // 招标单位省份
+    city: string; // 招标单位城市
+    summary: string; // 公告摘要信息,100字左右
+    instruments: Instrument[]; // 中标设备的信息
+    }
+    ```'
 email:
-#  smtp_server: smtp.exmail.qq.com
-#  smtp_port: 465
-#  smtp_user: yueyy@iwbnet.com
-#  smtp_password: EXN38AtT97FX635c
-#  from_email: yueyy@iwbnet.com
+  #  smtp_server: smtp.exmail.qq.com
+  #  smtp_port: 465
+  #  smtp_user: yueyy@iwbnet.com
+  #  smtp_password: EXN38AtT97FX635c
+  #  from_email: yueyy@iwbnet.com
   smtp_server: smtp.163.com
   smtp_port: 465
   smtp_user: yueyunyun88@163.com
@@ -98,7 +100,10 @@ job:
   clean_data: 00:05 # 每日清理数据时间
   run_now: false
 selenium:
-  remote_driver_url: http://127.0.0.1:3534/wd/hub
+  remote_driver_url: 'http://127.0.0.1:3534/wd/hub'
+  page_load_timeout: 30
+  implicit_wait: 10
+  headless: true
 clean:
   day: 30 # 清理多少天前的数据 0不清理
   # 下面的没有配置 默认使用 day 的配置

+ 109 - 43
SourceCode/TenderCrawler/app/jobs/data_collector.py

@@ -1,36 +1,56 @@
 import importlib
+from typing import Optional
 
-from selenium import webdriver
+from selenium.webdriver.remote.webdriver import WebDriver
 
 import adapters
 import utils
 from adapters.data_collection_adapter_interface import IDataCollectionAdapter
 from stores.data_store_interface import IDataStore
-from stores.default_data_store import DefaultDataStore
 
 
 class DataCollector:
-
-    _adapter = None
-    _driver = None
-    _store = None
+    """数据采集器"""
 
     def __init__(
         self, adapter_type: str, url: str, un: str, up: str, store: IDataStore = None
     ):
-        self._adapter = self._gen_adapter(adapter_type, url)
-        self._driver = self.adapter.driver
-        # if type == "chinabidding":
-        #     return
-        self.adapter.login(un, up)
-        if store:
-            self._store = store
-        else:
-            self._store = DefaultDataStore()
+        """
+        初始化数据采集器
+
+        Args:
+            adapter_type: 适配器类型
+            url: 目标URL
+            un: 用户名
+            up: 密码
+            store: 数据存储器(可选)
+        """
+        self._adapter: Optional[IDataCollectionAdapter] = None
+        self._store: Optional[IDataStore] = None
+        self._retry_count = 0
+        self._max_retries = utils.get_config_int("adapter.max_retries", 3)
+
+        try:
+            self._adapter = self._gen_adapter(adapter_type, url)
+
+            if store:
+                self._store = store
+
+            # 登录处理
+            if un and up:
+                self.adapter.login(un, up)
+
+        except Exception as e:
+            self.logger.error(f"初始化采集器失败: {e}")
+            raise
+
+    @property
+    def logger(self):
+        return utils.get_logger()
 
     @property
-    def driver(self) -> webdriver:
-        return self._driver
+    def driver(self) -> WebDriver:
+        return self.adapter.driver
 
     @property
     def store(self) -> IDataStore:
@@ -40,43 +60,89 @@ class DataCollector:
     def adapter(self) -> IDataCollectionAdapter:
         return self._adapter
 
-    def set_store(self, store: IDataStore) -> None:
-        self._store = store
-
     def collect(self, keywords: str):
+        """
+        执行数据采集
+
+        Args:
+            keywords: 搜索关键词,多个关键词用逗号分隔
+        """
         if not self.store:
             raise Exception("未设置存储器")
-        adapters.collect(self.adapter, keywords, self.store)
+
+        try:
+            self.logger.info(f"开始采集数据, 关键词: {keywords}")
+            self._retry_count = 0
+
+            while self._retry_count < self._max_retries:
+                try:
+                    adapters.collect(self.adapter, keywords, self.store)
+                    break
+                except Exception as e:
+                    self._retry_count += 1
+                    if self._retry_count >= self._max_retries:
+                        self.logger.error(f"采集失败,已达最大重试次数: {e}")
+                        raise
+                    self.logger.warning(
+                        f"采集失败,准备第{self._retry_count}次重试: {e}"
+                    )
+                    self._reset_adapter()
+
+        except Exception as e:
+            self.logger.error(f"采集过程发生异常: {e}")
+            raise
 
     def close(self):
-        utils.get_logger().info(f"关闭浏览器驱动,URL: {self.adapter.url}")
-        adapters.teardown(self.adapter)
+        """关闭采集器,释放资源"""
+        try:
+            pass
+        except Exception as e:
+            self.logger.error(f"关闭采集器失败: {e}")
+
+    def _reset_adapter(self):
+        """重置适配器状态"""
+        try:
+            self._adapter = self._gen_adapter(
+                self.adapter.adapter_type, self.adapter.url
+            )
+        except Exception as e:
+            self.logger.error(f"重置适配器失败: {e}")
+            raise
 
     @staticmethod
-    def _gen_adapter(adapter_type: str, url: str):
+    def _gen_adapter(adapter_type: str, url: str) -> IDataCollectionAdapter:
+        """
+        生成数据源适配器
+
+        Args:
+            adapter_type: 适配器类型
+            url: 目标URL
+
+        Returns:
+            IDataCollectionAdapter: 适配器实例
+        """
         adapter_model_name = utils.get_config_value(
             f"adapter.{adapter_type}.model_name"
         )
         adapter_class_name = utils.get_config_value(
             f"adapter.{adapter_type}.class_name"
         )
-        if adapter_class_name:
-            try:
-                utils.get_logger().info(
-                    f"生成适配器 TYPE:{adapter_type},适配器: {adapter_class_name},URL:{url}"
-                )
-                # 使用 importlib 动态导入模块
-                adapter_module = importlib.import_module(
-                    f"adapters.{adapter_model_name}"
-                )
-                adapter_class = getattr(adapter_module, adapter_class_name)
-                adapter = adapter_class(url)
-            except ImportError as e:
-                raise ImportError(f"无法导入适配器模块 {adapter_model_name}") from e
-            except AttributeError as e:
-                raise AttributeError(
-                    f"适配器模块 {adapter_model_name} 中找不到类 {adapter_class_name}"
-                ) from e
-        else:
+
+        if not adapter_class_name:
             raise Exception("不支持的适配器类型")
-        return adapter
+
+        try:
+            utils.get_logger().info(
+                f"生成适配器 TYPE:{adapter_type},适配器: {adapter_class_name},URL:{url}"
+            )
+            adapter_module = importlib.import_module(f"adapters.{adapter_model_name}")
+            adapter_class = getattr(adapter_module, adapter_class_name)
+            adapter = adapter_class(url)
+            return adapter
+
+        except ImportError as e:
+            raise ImportError(f"无法导入适配器模块 {adapter_model_name}") from e
+        except AttributeError as e:
+            raise AttributeError(
+                f"适配器模块 {adapter_model_name} 中找不到类 {adapter_class_name}"
+            ) from e