123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- import importlib
- from typing import Optional
- from selenium.webdriver.remote.webdriver import WebDriver
- import adapters
- import utils
- from adapters.data_collection_adapter_interface import IDataCollectionAdapter
- from stores.data_store_interface import IDataStore
- class DataCollector:
- """数据采集器"""
- def __init__(
- self, adapter_type: str, url: str, un: str, up: str, store: IDataStore = None
- ):
- """
- 初始化数据采集器
- Args:
- adapter_type: 适配器类型
- url: 目标URL
- un: 用户名
- up: 密码
- store: 数据存储器(可选)
- """
- self._adapter: Optional[IDataCollectionAdapter] = None
- self._store: Optional[IDataStore] = None
- self._retry_count = 0
- self._max_retries = utils.get_config_int("adapter.max_retries", 3)
- try:
- self._adapter = self._gen_adapter(adapter_type, url)
- if store:
- self._store = store
- # 登录处理
- if un and up:
- self.adapter.login(un, up)
- except Exception as e:
- self.logger.error(f"初始化采集器失败: {e}")
- raise
- @property
- def logger(self):
- return utils.get_logger()
- @property
- def driver(self) -> WebDriver:
- return self.adapter.driver
- @property
- def store(self) -> IDataStore:
- return self._store
- @property
- def adapter(self) -> IDataCollectionAdapter:
- return self._adapter
- def collect(self, keywords: str):
- """
- 执行数据采集
- Args:
- keywords: 搜索关键词,多个关键词用逗号分隔
- """
- if not self.store:
- raise Exception("未设置存储器")
- try:
- self.logger.info(f"开始采集数据, 关键词: {keywords}")
- self._retry_count = 0
- while self._retry_count < self._max_retries:
- try:
- adapters.collect(self.adapter, keywords, self.store)
- break
- except Exception as e:
- self._retry_count += 1
- if self._retry_count >= self._max_retries:
- self.logger.error(f"采集失败,已达最大重试次数: {e}")
- raise
- self.logger.warning(
- f"采集失败,准备第{self._retry_count}次重试: {e}"
- )
- self._reset_adapter()
- except Exception as e:
- self.logger.error(f"采集过程发生异常: {e}")
- raise
- def close(self):
- """关闭采集器,释放资源"""
- try:
- pass
- except Exception as e:
- self.logger.error(f"关闭采集器失败: {e}")
- def _reset_adapter(self):
- """重置适配器状态"""
- try:
- self._adapter = self._gen_adapter(
- self.adapter.adapter_type, self.adapter.url
- )
- except Exception as e:
- self.logger.error(f"重置适配器失败: {e}")
- raise
- @staticmethod
- def _gen_adapter(adapter_type: str, url: str) -> IDataCollectionAdapter:
- """
- 生成数据源适配器
- Args:
- adapter_type: 适配器类型
- url: 目标URL
- Returns:
- IDataCollectionAdapter: 适配器实例
- """
- adapter_model_name = utils.get_config_value(
- f"adapter.{adapter_type}.model_name"
- )
- adapter_class_name = utils.get_config_value(
- f"adapter.{adapter_type}.class_name"
- )
- if not adapter_class_name:
- raise Exception("不支持的适配器类型")
- try:
- utils.get_logger().info(
- f"生成适配器 TYPE:{adapter_type},适配器: {adapter_class_name},URL:{url}"
- )
- adapter_module = importlib.import_module(f"adapters.{adapter_model_name}")
- adapter_class = getattr(adapter_module, adapter_class_name)
- adapter = adapter_class(url)
- return adapter
- except ImportError as e:
- raise ImportError(f"无法导入适配器模块 {adapter_model_name}") from e
- except AttributeError as e:
- raise AttributeError(
- f"适配器模块 {adapter_model_name} 中找不到类 {adapter_class_name}"
- ) from e
|