123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- from time import sleep
- from selenium.common.exceptions import TimeoutException, NoSuchElementException
- from selenium.webdriver.common.by import By
- from selenium.webdriver.support import expected_conditions as ec
- from selenium.webdriver.support.wait import WebDriverWait
- import utils
- from adapters.data_collection_adapter_interface import IDataCollectionAdapter
- from stores.data_store_interface import IDataStore
- class ChinabiddingDataCollectionAdapter(IDataCollectionAdapter):
- """
- 中国招标网数据采集适配器
- """
- def __init__(self, url: str, store: IDataStore = None):
- self._url = url
- self._store = store
- self._driver = None
- self._keyword = None
- self._adapter_type = "chinabidding"
- self._next_count = 0
- def login(self, username: str, password: str) -> None:
- try:
- login_el = self.driver.find_element(
- By.XPATH, "//div[@id='loginRight']/a[@class='login']"
- )
- login_el.click()
- wait = WebDriverWait(self.driver, 10, 1)
- wait.until(ec.presence_of_element_located((By.ID, "userpass")))
- # if not self._wait_until(
- # ec.presence_of_element_located((By.ID, "userpass"))
- # ):
- # raise TimeoutException(f"id='userpass' 元素没有找到")
- un_el = self.driver.find_element(By.ID, "username")
- un_el.send_keys(username)
- pass_el = self.driver.find_element(By.ID, "userpass")
- pass_el.send_keys(password)
- login_btn = self.driver.find_element(By.ID, "login-button")
- login_btn.click()
- wait.until(ec.presence_of_element_located((By.ID, "site-content")))
- # if not self._wait_until(ec.presence_of_element_located((By.ID, "site-content"))):
- # raise TimeoutException(f"id='site-content' 元素没有找到")
- except TimeoutException as e:
- raise Exception(f"登录失败 [{self._adapter_type}] [超时]: {e}")
- except NoSuchElementException as e:
- raise Exception(f"登录失败 [{self._adapter_type}] [找不到元素]: {e}")
- def _collect(self, keyword: str):
- items = self._search_by_type(keyword, 0)
- self._process_list(items, 0)
- sleep(2)
- items = self._search_by_type(keyword, 1)
- self._process_list(items, 1)
- if utils.get_config_bool(self.batch_save_key):
- self.store.save_collect_data(True)
- def _search_by_type(self, keyword: str, data_type):
- try:
- self.driver.get(self._url)
- if data_type == 0:
- utils.get_logger().info(f"开始采集 招标公告")
- el = self.driver.find_element(
- By.XPATH, "//div[@id='z-b-g-g']/h2/a[@class='more']"
- )
- else:
- utils.get_logger().info(f"开始采集 中标结果公告")
- el = self.driver.find_element(
- By.XPATH, "//div[@id='z-b-jg-gg']/h2/a[@class='more']"
- )
- el.click()
- if not self._wait_until(ec.number_of_windows_to_be(2)):
- return []
- self.driver.close()
- self.driver.switch_to.window(self.driver.window_handles[0])
- return self._search(keyword)
- except TimeoutException as e:
- raise Exception(f"搜索失败 [{self._adapter_type}] [超时]: {e}")
- except NoSuchElementException as e:
- raise Exception(f"搜索失败 [{self._adapter_type}] [找不到元素]: {e}")
- def _search(self, keyword: str) -> list:
- if not self._wait_until(
- ec.presence_of_element_located((By.ID, "searchBidProjForm"))
- ):
- return []
- search_el = self.driver.find_element(
- By.XPATH, "//form[@id='searchBidProjForm']/ul/li/input[@id='fullText']"
- )
- search_el.clear()
- search_el.send_keys(keyword)
- search_btn = self.driver.find_element(
- By.XPATH, "//form[@id='searchBidProjForm']/ul/li/button"
- )
- search_btn.click()
- self._next_count = 0
- if not self._wait_until(
- ec.presence_of_element_located((By.ID, "site-content"))
- ):
- return []
- default_search_txt = "全部"
- search_txt = utils.get_config_value(self.search_day_key, default_search_txt)
- utils.get_logger().debug(f"搜索日期条件: {search_txt}")
- if search_txt != default_search_txt:
- last_el = self.driver.find_element(By.LINK_TEXT, search_txt)
- sleep(1)
- last_el.click()
- if not self._wait_until(
- ec.presence_of_element_located((By.ID, "site-content"))
- ):
- return []
- else:
- sleep(1)
- try:
- a_links = self.driver.find_elements(
- By.XPATH, "//form[@id='pagerSubmitForm']/a"
- )
- count = len(a_links)
- if count > 1:
- count = count - 1
- utils.get_logger().debug(f"共查询到 {count} 页,每页 10 条")
- except Exception as e:
- utils.get_logger().error(f"搜索失败[尝试查询页数]: {e}")
- items = self.driver.find_elements(By.XPATH, "//ul[@class='as-pager-body']/li/a")
- return items
- def _process_list(self, items: list, data_type) -> list:
- if not items:
- return []
- for item in items:
- self._process_item(item, data_type)
- sleep(2)
- next_items = self._next_page()
- return self._process_list(next_items, data_type)
- def _next_page(self) -> list:
- try:
- try:
- btn = self.driver.find_element(
- By.XPATH, "//form[@id='pagerSubmitForm']/a[@class='next']"
- )
- except NoSuchElementException:
- utils.get_logger().debug(f"翻页结束 [{self._adapter_type}]")
- return []
- btn.click()
- self._next_count += 1
- utils.get_logger().debug(
- f"下一页[{self._next_count+1}]: {self.driver.current_url}"
- )
- if not self._wait_until(
- ec.presence_of_element_located((By.ID, "site-content"))
- ):
- return []
- items = self.driver.find_elements(
- By.XPATH, "//ul[@class='as-pager-body']/li/a"
- )
- return items
- except NoSuchElementException as e:
- raise Exception(f"翻页失败 [{self._adapter_type}] [找不到元素]: {e}")
- except TimeoutException as e:
- raise Exception(f"翻页失败 [{self._adapter_type}] [超时]: {e}")
- def _process_item(self, item, data_type):
- main_handle = self.driver.current_window_handle
- close = True
- try:
- url = item.get_attribute("href")
- if self._check_is_collect_by_url(url):
- close = False
- return
- item.click()
- if not self._wait_until(ec.number_of_windows_to_be(2)):
- return
- handles = self.driver.window_handles
- for handle in handles:
- if handle != main_handle:
- self.driver.switch_to.window(handle)
- break
- url = self.driver.current_url
- utils.get_logger().debug(f"跳转详情")
- if not self._wait_until(
- ec.presence_of_element_located((By.CLASS_NAME, "content"))
- ):
- return
- content = self.driver.find_element(By.CLASS_NAME, "content").text
- if self._check_content(content):
- self._save_db(url, content, data_type)
- else:
- self._save_db(url, content, data_type, is_invalid=True)
- except TimeoutException as e:
- utils.get_logger().error(
- f"采集发生异常 [{self._adapter_type}] Timeout: {self.driver.current_url}。Exception: {e}"
- )
- # raise Exception(f"采集失败 [超时]: {e}")
- except NoSuchElementException as e:
- utils.get_logger().error(
- f"采集发生异常 [{self._adapter_type}] NoSuchElement: {self.driver.current_url}。Exception: {e}"
- )
- raise Exception(f"采集失败 [{self._adapter_type}] [找不到元素]: {e}")
- finally:
- if close:
- sleep(2)
- self.driver.close()
- self.driver.switch_to.window(main_handle)
|