123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- from time import sleep
- from selenium import webdriver
- from selenium.webdriver.common.by import By
- from selenium.webdriver.support.wait import WebDriverWait
- from selenium.webdriver.support import expected_conditions as ec
- from selenium.common.exceptions import TimeoutException, NoSuchElementException
- from drivers.driver_creator import DriverCreator
- from stores.data_store_interface import IDataStore
- from adapters.data_collection_adapter_interface import IDataCollectionAdapter
- from utils.logger_helper import LoggerHelper
- from utils.config_helper import ConfigHelper
- class ChinabiddingDataCollectionAdapter(IDataCollectionAdapter):
- """
- 中国招标网数据采集适配器
- """
- logger = LoggerHelper.get_logger()
- def __init__(self, url: str):
- self._url = url
- self._store = None
- self._driver = None
- self._keyword = None
- @property
- def store(self) -> IDataStore:
- return self._store
- @property
- def url(self):
- return self._url
- @property
- def keyword(self):
- return self._keyword
- @property
- def driver(self):
- if not self._driver:
- self._driver = self.create_driver()
- return self._driver
- def create_driver(self) -> webdriver:
- try:
- return DriverCreator().gen_remote_driver(self.url)
- except Exception as e:
- raise Exception(f"创建驱动器失败: {e}")
- def login(self, driver, username: str, password: str) -> None:
- try:
- login_el = driver.find_element(
- By.XPATH, "//div[@id='loginRight']/a[@class='login']")
- login_el.click()
- wait = WebDriverWait(driver, 10, 1)
- wait.until(ec.presence_of_element_located((By.ID, "userpass")))
- un_el = driver.find_element(By.ID, "username")
- un_el.send_keys(username)
- pass_el = driver.find_element(By.ID, "userpass")
- pass_el.send_keys(password)
- login_btn = driver.find_element(By.ID, "login-button")
- login_btn.click()
- wait.until(ec.presence_of_element_located((By.ID, "site-content")))
- except TimeoutException as e:
- raise Exception(f"登录失败 [超时]: {e}")
- except NoSuchElementException as e:
- raise Exception(f"登录失败 [找不到元素]: {e}")
- def search(self, driver, keyword: str) -> list:
- try:
- self._keyword = keyword
- wait = WebDriverWait(driver, 10, 1)
- wait.until(
- ec.presence_of_element_located((By.ID, "projSearchForm")))
- search_el = driver.find_element(By.ID, "fullText")
- search_el.send_keys(keyword)
- search_btn = driver.find_element(
- By.XPATH, "//form[@id='projSearchForm']/button")
- search_btn.click()
- wait.until(ec.presence_of_element_located((By.ID, "site-content")))
- # 查询3天内的数据
- search_txt = ConfigHelper().get("adapter.chinabidding.search_day")
- if not search_txt:
- search_txt = "近三天"
- self.logger.info(f"搜索关键字: {keyword},搜索条件: {search_txt}")
- last_el = driver.find_element(By.LINK_TEXT, search_txt)
- last_el.click()
- wait.until(ec.presence_of_element_located((By.ID, "site-content")))
- try:
- a_links = driver.find_elements(
- By.XPATH, "//form[@id='pagerSubmitForm']/a")
- count = len(a_links)
- if count > 1:
- count = count - 1
- self.logger.info(f"共查询到 {count} 页")
- except Exception as e:
- self.logger.error(f"搜索失败[尝试查询页数]: {e}")
- items = driver.find_elements(By.XPATH,
- "//ul[@class='as-pager-body']/li/a")
- return items
- except TimeoutException as e:
- raise Exception(f"搜索失败 [超时]: {e}")
- except NoSuchElementException as e:
- raise Exception(f"搜索失败 [找不到元素]: {e}")
- def collect(self, driver, items: list, store: IDataStore) :
- if store:
- self._store = store
- self._process_list(driver, items)
- self.store.save_collect_data(True)
- def _next_page(self, driver) -> list:
- try:
- wait = WebDriverWait(driver, 10, 1)
- next_path = "//form[@id='pagerSubmitForm']/a[@class='next']"
- wait.until(ec.presence_of_element_located((By.XPATH, next_path)))
- btn = driver.find_element(By.XPATH, next_path)
- btn.click()
- self.logger.info(f"跳转到下页: {driver.current_url}")
- wait.until(ec.presence_of_element_located((By.ID, "site-content")))
- items = driver.find_elements(By.XPATH,
- "//ul[@class='as-pager-body']/li/a")
- return items
- except NoSuchElementException as e:
- raise Exception(f"翻页失败 [找不到元素]: {e}")
- except TimeoutException:
- self.logger.info("翻页结束")
- return []
- def _process_item(self, driver, item):
- current_handle = driver.current_window_handle
- try:
- url = item.get_attribute('href')
- old = self.store.query_one_collect_by_url(url)
- if old:
- self.logger.info(f"已采集过: {url}")
- return
- item.click()
- wait = WebDriverWait(driver, 10, 1)
- wait.until(ec.number_of_windows_to_be(2))
- handles = driver.window_handles
- for handle in handles:
- if handle != current_handle:
- driver.switch_to.window(handle)
- break
- url = driver.current_url
- self.logger.info(f"跳转详情: {driver.current_url}")
- wait.until(ec.presence_of_element_located((By.TAG_NAME, "body")))
- content = driver.find_element(By.TAG_NAME, "body").text
- self._save(url, content)
- sleep(1)
- driver.close()
- sleep(2)
- except TimeoutException as e:
- self.logger.error(
- f"采集发生异常 Timeout: {driver.current_url}。Exception: {e}")
- # raise Exception(f"采集失败 [超时]: {e}")
- except NoSuchElementException as e:
- self.logger.error(
- f"采集发生异常 NoSuchElement: {driver.current_url}。Exception: {e}")
- raise Exception(f"采集失败 [找不到元素]: {e}")
- finally:
- driver.switch_to.window(current_handle)
- def _save(self, url, content):
- # self.logger.info(f"保存数据: {url},关键字{self.keyword}")
- if not self.store:
- self.logger.info(f"DataStore 未指定: {url},关键字{self.keyword}")
- else:
- self.store.insert_collect_data(url, self.keyword, content, True)
- def _process_list(self, driver, items: list) -> list:
- if not items:
- return []
- for item in items:
- self._process_item(driver, item)
- sleep(2)
- next_items = self._next_page(driver)
- return self._process_list(driver, next_items)
- def teardown(self, driver) -> None:
- try:
- if driver:
- driver.quit()
- except Exception as e:
- raise Exception(f"关闭驱动器失败: {e}")
|