data_collector.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. import importlib
  2. from typing import Optional
  3. from selenium.webdriver.remote.webdriver import WebDriver
  4. import adapters
  5. import utils
  6. from adapters.data_collection_adapter_interface import IDataCollectionAdapter
  7. from stores.data_store_interface import IDataStore
  8. class DataCollector:
  9. """数据采集器"""
  10. def __init__(
  11. self, adapter_type: str, url: str, un: str, up: str, store: IDataStore = None
  12. ):
  13. """
  14. 初始化数据采集器
  15. Args:
  16. adapter_type: 适配器类型
  17. url: 目标URL
  18. un: 用户名
  19. up: 密码
  20. store: 数据存储器(可选)
  21. """
  22. self._adapter: Optional[IDataCollectionAdapter] = None
  23. self._store: Optional[IDataStore] = None
  24. self._retry_count = 0
  25. self._max_retries = utils.get_config_int("adapter.max_retries", 3)
  26. try:
  27. self._adapter = self._gen_adapter(adapter_type, url)
  28. if store:
  29. self._store = store
  30. # 登录处理
  31. if un and up:
  32. self.adapter.login(un, up)
  33. except Exception as e:
  34. self.logger.error(f"初始化采集器失败: {e}")
  35. raise
  36. @property
  37. def logger(self):
  38. return utils.get_logger()
  39. @property
  40. def driver(self) -> WebDriver:
  41. return self.adapter.driver
  42. @property
  43. def store(self) -> IDataStore:
  44. return self._store
  45. @property
  46. def adapter(self) -> IDataCollectionAdapter:
  47. return self._adapter
  48. def collect(self, keywords: str):
  49. """
  50. 执行数据采集
  51. Args:
  52. keywords: 搜索关键词,多个关键词用逗号分隔
  53. """
  54. if not self.store:
  55. raise Exception("未设置存储器")
  56. try:
  57. self.logger.info(f"开始采集数据, 关键词: {keywords}")
  58. self._retry_count = 0
  59. while self._retry_count < self._max_retries:
  60. try:
  61. adapters.collect(self.adapter, keywords, self.store)
  62. break
  63. except Exception as e:
  64. self._retry_count += 1
  65. if self._retry_count >= self._max_retries:
  66. self.logger.error(f"采集失败,已达最大重试次数: {e}")
  67. raise
  68. self.logger.warning(
  69. f"采集失败,准备第{self._retry_count}次重试: {e}"
  70. )
  71. self._reset_adapter()
  72. except Exception as e:
  73. self.logger.error(f"采集过程发生异常: {e}")
  74. raise
  75. def close(self):
  76. """关闭采集器,释放资源"""
  77. try:
  78. pass
  79. except Exception as e:
  80. self.logger.error(f"关闭采集器失败: {e}")
  81. def _reset_adapter(self):
  82. """重置适配器状态"""
  83. try:
  84. self._adapter = self._gen_adapter(
  85. self.adapter.adapter_type, self.adapter.url
  86. )
  87. except Exception as e:
  88. self.logger.error(f"重置适配器失败: {e}")
  89. raise
  90. @staticmethod
  91. def _gen_adapter(adapter_type: str, url: str) -> IDataCollectionAdapter:
  92. """
  93. 生成数据源适配器
  94. Args:
  95. adapter_type: 适配器类型
  96. url: 目标URL
  97. Returns:
  98. IDataCollectionAdapter: 适配器实例
  99. """
  100. adapter_model_name = utils.get_config_value(
  101. f"adapter.{adapter_type}.model_name"
  102. )
  103. adapter_class_name = utils.get_config_value(
  104. f"adapter.{adapter_type}.class_name"
  105. )
  106. if not adapter_class_name:
  107. raise Exception("不支持的适配器类型")
  108. try:
  109. utils.get_logger().info(
  110. f"生成适配器 TYPE:{adapter_type},适配器: {adapter_class_name},URL:{url}"
  111. )
  112. adapter_module = importlib.import_module(f"adapters.{adapter_model_name}")
  113. adapter_class = getattr(adapter_module, adapter_class_name)
  114. adapter = adapter_class(url)
  115. return adapter
  116. except ImportError as e:
  117. raise ImportError(f"无法导入适配器模块 {adapter_model_name}") from e
  118. except AttributeError as e:
  119. raise AttributeError(
  120. f"适配器模块 {adapter_model_name} 中找不到类 {adapter_class_name}"
  121. ) from e