data_collector.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. import importlib
  2. from selenium import webdriver
  3. from stores.data_store_interface import IDataStore
  4. from stores.default_data_store import DefaultDataStore
  5. from adapters.data_collection_adapter_interface import IDataCollectionAdapter
  6. from utils.logger_helper import LoggerHelper
  7. from utils.config_helper import ConfigHelper
  8. class DataCollector:
  9. logger = LoggerHelper.get_logger()
  10. config = ConfigHelper()
  11. _adapter = None
  12. _driver = None
  13. _store = None
  14. # 使用字典映射域名和适配器类
  15. # _adapterModelMap = {"chinabidding": "chinabidding_data_collection_adapter"}
  16. # _adapterClassMap = {"chinabidding": "ChinabiddingDataCollectionAdapter"}
  17. def __init__(self,
  18. adapter_type: str,
  19. url: str,
  20. un: str,
  21. up: str,
  22. store: IDataStore = None):
  23. self._adapter = self._gen_adapter(adapter_type, url)
  24. self._driver = self.adapter.driver
  25. # if type == "chinabidding":
  26. # return
  27. self.adapter.login(un, up)
  28. if store:
  29. self._store = store
  30. else:
  31. self._store = DefaultDataStore()
  32. @property
  33. def driver(self) -> webdriver:
  34. return self._driver
  35. @property
  36. def store(self) -> IDataStore:
  37. return self._store
  38. @property
  39. def adapter(self) -> IDataCollectionAdapter:
  40. return self._adapter
  41. def set_store(self, store: IDataStore) -> None:
  42. self._store = store
  43. def collect(self, keyword: str):
  44. self.adapter.collect(keyword, self.store)
  45. def close(self):
  46. self.logger.info(f"关闭浏览器驱动,URL: {self.adapter.url}")
  47. self.adapter.teardown()
  48. def _gen_adapter(self, adapter_type: str, url: str):
  49. adapter_model_name = self.config.get(f"adapter.{adapter_type}.model_name")
  50. adapter_class_name = self.config.get(f"adapter.{adapter_type}.class_name")
  51. if adapter_class_name:
  52. try:
  53. self.logger.info(
  54. f"生成适配器 TYPE:{adapter_type},适配器: {adapter_class_name},URL:{url}")
  55. # 使用 importlib 动态导入模块
  56. adapter_module = importlib.import_module(
  57. f"adapters.{adapter_model_name}")
  58. adapter_class = getattr(adapter_module, adapter_class_name)
  59. adapter = adapter_class(url)
  60. except ImportError as e:
  61. raise ImportError(f"无法导入适配器模块 {adapter_model_name}") from e
  62. except AttributeError as e:
  63. raise AttributeError(
  64. f"适配器模块 {adapter_model_name} 中找不到类 {adapter_class_name}"
  65. ) from e
  66. else:
  67. raise Exception("不支持的适配器类型")
  68. return adapter