123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- from time import sleep
- from selenium.common.exceptions import TimeoutException, NoSuchElementException
- from selenium.webdriver.common.by import By
- from selenium.webdriver.support import expected_conditions as ec
- import utils
- from adapters.data_collection_adapter_interface import IDataCollectionAdapter
- from stores.data_store_interface import IDataStore
- class CcgpDataCollectionAdapter(IDataCollectionAdapter):
- """
- 中国政府采购网数据采集适配器
- """
- def __init__(self, url: str, store: IDataStore = None):
- self._url = url
- self._store = store
- self._driver = None
- self._keyword = None
- self._adapter_type = "ccgp"
- self._next_count = 0
- def login(self, username: str, password: str) -> None:
- pass
- def _collect(self, keyword: str):
- items = self._search(keyword)
- if len(items) <= 0:
- return
- self._process_list(items)
- if utils.get_config_bool(self.batch_save_key):
- self.store.save_collect_data(True)
- def _search(self, keyword: str) -> list:
- try:
- if not keyword:
- raise Exception("搜索关键字不能为空")
- self.driver.get(self._url)
- if not self._wait_until(
- ec.presence_of_element_located((By.ID, "searchForm"))
- ):
- return []
- search_el = self.driver.find_element(By.ID, "kw")
- sleep(2)
- search_el.clear()
- search_el.send_keys(keyword)
- search_btn = self.driver.find_element(
- By.XPATH, "//form[@id='searchForm']/input[@id='doSearch2']"
- )
- sleep(1)
- search_btn.click()
- self._next_count = 0
- if not self._wait_until(
- ec.presence_of_element_located((By.CLASS_NAME, "vT-srch-result"))
- ):
- return []
- default_search_txt = "近1周"
- search_txt = utils.get_config_value(self.search_day_key, default_search_txt)
- utils.get_logger().debug(f"搜索日期条件: {search_txt}")
- if search_txt != default_search_txt:
- last_els = self.driver.find_elements(By.XPATH, "//ul[@id='datesel']/li")
- for last_el in last_els:
- if search_txt == last_el.text:
- sleep(1)
- last_el.click()
- break
- if not self._wait_until(
- ec.presence_of_element_located((By.CLASS_NAME, "vT-srch-result"))
- ):
- return []
- else:
- sleep(1)
- try:
- p_els = self.driver.find_elements(
- By.XPATH, "//body/div[@class='vT_z']/div/div/p"
- )
- if len(p_els) > 0:
- utils.get_logger().debug(f" {p_els[0].text}")
- else:
- a_links = self.driver.find_elements(
- By.XPATH, "//div[@class='vT-srch-result-list']/p/a"
- )
- count = len(a_links)
- if count > 1:
- count = count - 1
- utils.get_logger().debug(f"共查询到 {count} 页,每页 20 条")
- except Exception as e:
- utils.get_logger().error(f"搜索失败[尝试查询页数]: {e}")
- items = self.driver.find_elements(
- By.XPATH, "//ul[@class='vT-srch-result-list-bid']/li/a"
- )
- return items
- except TimeoutException as e:
- raise Exception(f"搜索失败 [{self._adapter_type}] [超时]: {e}")
- except NoSuchElementException as e:
- raise Exception(f"搜索失败 [{self._adapter_type}] [找不到元素]: {e}")
- def _process_list(self, items: list) -> list:
- if not items:
- return []
- for item in items:
- self._process_item(item)
- sleep(2)
- next_items = self._next_page()
- if len(items) <= 0:
- return []
- return self._process_list(next_items)
- def _next_page(self) -> list:
- try:
- next_path = "//div[@class='vT-srch-result-list']/p/a[@class='next']"
- try:
- btn = self.driver.find_element(By.XPATH, next_path)
- except NoSuchElementException:
- utils.get_logger().debug(f"翻页结束 [{self._adapter_type}]")
- return []
- btn.click()
- self._next_count += 1
- utils.get_logger().debug(
- f"下一页[{self._next_count+1}]: {self.driver.current_url}"
- )
- sleep(1)
- if not self._wait_until(
- ec.presence_of_element_located((By.CLASS_NAME, "vT-srch-result"))
- ):
- return []
- items = self.driver.find_elements(
- By.XPATH, "//ul[@class='vT-srch-result-list-bid']/li/a"
- )
- return items
- except NoSuchElementException as e:
- raise Exception(f"翻页失败 [{self._adapter_type}] [找不到元素]: {e}")
- except TimeoutException as e:
- raise Exception(f"翻页失败 [{self._adapter_type}] [超时]: {e}")
- def _process_item(self, item):
- main_handle = self.driver.current_window_handle
- close = True
- try:
- url = item.get_attribute("href")
- if self._check_is_collect_by_url(url):
- close = False
- return
- utils.get_logger().debug(f"跳转详情")
- sleep(1)
- item.click()
- if not self._wait_until(ec.number_of_windows_to_be(2)):
- return
- handles = self.driver.window_handles
- for handle in handles:
- if handle != main_handle:
- self.driver.switch_to.window(handle)
- break
- if not self._wait_until(
- ec.presence_of_element_located((By.TAG_NAME, "body"))
- ):
- return
- content = self.driver.find_element(
- By.XPATH, "//div[@class='vF_deail_maincontent']"
- ).text
- # 排除其他公告
- if self._check_type("其他公告"):
- self._save_db(url, content, 3, is_invalid=True)
- return
- # 判断是否为投标公告
- data_type = (
- 1
- if self._check_type("中标公告")
- or self._check_type("成交公告")
- or self._check_type("终止公告")
- else 0
- )
- if self._check_content(content):
- attach_str = self._attach_download()
- self._save_db(url, content, data_type, attach_str)
- else:
- self._save_db(url, content, data_type, is_invalid=True)
- except TimeoutException as e:
- utils.get_logger().error(
- f"采集发生异常 [{self._adapter_type}] Timeout: {self.driver.current_url}。Exception: {e}"
- )
- except NoSuchElementException as e:
- utils.get_logger().error(
- f"采集发生异常 [{self._adapter_type}] NoSuchElement: {self.driver.current_url}。Exception: {e}"
- )
- raise Exception(f"采集失败 [{self._adapter_type}] [找不到元素]: {e}")
- finally:
- if close:
- sleep(1)
- self.driver.close()
- self.driver.switch_to.window(main_handle)
- def _check_type(self, type_str: str) -> bool:
- links = self.driver.find_elements(By.LINK_TEXT, type_str)
- if len(links) > 0:
- utils.get_logger().info(f"{type_str}")
- return True
- return False
- def _attach_download(self):
- paths = []
- attach_els = self.driver.find_elements(
- By.XPATH, "//td[@class='bid_attachtab_content']/a"
- )
- attach_2_els = self.driver.find_elements(By.XPATH, "//a[@ignore='1']")
- # 合并两个列表
- all_attachments = attach_els + attach_2_els
- utils.get_logger().debug(
- f"附件检索数量: {len(attach_els)}/{len(attach_2_els)}/{len(all_attachments)}"
- )
- attach_urls = []
- if len(all_attachments) > 0:
- for attach_el in all_attachments:
- attach_url = attach_el.get_attribute("href")
- if attach_url not in attach_urls:
- attach_urls.append(attach_url)
- else:
- utils.get_logger().info(f"重复附件: {attach_url}")
- continue
- file_name = (
- attach_el.text
- or attach_el.get_attribute("download")
- or attach_url.split("/")[-1]
- )
- if not file_name:
- continue
- # 检查 file_name 是否包含文件扩展名
- if "." not in file_name:
- utils.get_logger().warning(
- f"文件名 {file_name} 不包含扩展名,跳过下载。"
- )
- continue
- utils.get_logger().debug(
- f"开始下载附件: {file_name} 链接: {attach_url}"
- )
- path = utils.download_remote_file(attach_url, file_name)
- if path:
- utils.get_logger().debug(f"下载附件路径: {path}")
- paths.append(path)
- else:
- utils.get_logger().warning(f"下载附件失败: {file_name}")
- attach_str = ",".join(paths)
- if attach_str:
- utils.get_logger().info(f"附件下载完成: {attach_str}")
- return attach_str
|