123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309 |
- from time import sleep
- from typing import List, Optional
- from selenium.common.exceptions import TimeoutException, NoSuchElementException
- from selenium.webdriver.common.by import By
- from selenium.webdriver.support import expected_conditions as ec
- import utils
- from adapters.data_collection_adapter_interface import IDataCollectionAdapter
- class CCGPDataCollectionAdapter(IDataCollectionAdapter):
- """中国政府采购网数据采集适配器"""
- def __init__(self, url: str):
- """初始化适配器
- Args:
- url: 目标网站URL
- """
- super().__init__(url, "ccgp", "近1周")
- def login(self, username: str, password: str) -> None:
- """登录网站(CCGP无需登录)"""
- pass
- def _collect(self, keyword: str) -> None:
- """执行数据采集
- Args:
- keyword: 单个搜索关键词
- """
- try:
- # 获取搜索时间范围
- self.logger.info(f"开始采集关键词: {keyword}, 时间范围: {self._search_txt}")
- # 搜索数据
- items = self._search(keyword)
- if not items:
- return
- # 处理数据列表
- self._process_list(items)
- except Exception as e:
- self.logger.error(f"采集失败: {e}")
- raise
- def _search(self, keyword: str) -> List:
- """搜索数据
- Args:
- keyword: 搜索关键词
- Returns:
- List: 搜索结果列表
- """
- # 打开搜索页面
- self.driver.get(self.url)
- # 等待搜索框
- self._wait_for(
- ec.presence_of_element_located((By.ID, "searchForm")),
- message="搜索框加载超时",
- )
- # 输入关键词
- search_el = self.driver.find_element(By.ID, "kw")
- sleep(2)
- search_el.clear()
- search_el.send_keys(keyword)
- # 点击搜索
- search_btn = self.driver.find_element(
- By.XPATH, "//form[@id='searchForm']/input[@id='doSearch2']"
- )
- sleep(1)
- search_btn.click()
- # 等待结果加载
- self._next_count = 0
- self._wait_for(
- ec.presence_of_element_located((By.CLASS_NAME, "vT-srch-result")),
- message="搜索结果加载超时",
- )
- # 设置时间范围
- self._set_search_date()
- # 获取结果列表
- items = self.driver.find_elements(
- By.XPATH, "//ul[@class='vT-srch-result-list-bid']/li/a"
- )
- return items
- def _set_search_date(self) -> None:
- """设置搜索时间范围"""
- try:
- if self._search_txt != self._default_search_txt:
- last_els = self.driver.find_elements(By.XPATH, "//ul[@id='datesel']/li")
- for last_el in last_els:
- if self._search_txt == last_el.text:
- sleep(1)
- last_el.click()
- break
- self._wait_for(
- ec.presence_of_element_located((By.CLASS_NAME, "vT-srch-result")),
- message="设置时间范围后页面加载超时",
- )
- else:
- sleep(1)
- except Exception as e:
- self.logger.error(f"设置时间范围失败: {e}")
- def _process_list(self, items: List) -> None:
- """处理数据列表
- Args:
- items: 数据列表
- """
- if not items:
- return
- # 处理当前页
- for item in items:
- self._process_item(item)
- sleep(2)
- # 处理下一页
- next_items = self._next_page()
- if next_items:
- self._process_list(next_items)
- def _next_page(self) -> Optional[List]:
- """获取下一页数据"""
- try:
- # 查找下一页按钮
- next_path = "//div[@class='vT-srch-result-list']/p/a[@class='next']"
- try:
- btn = self.driver.find_element(By.XPATH, next_path)
- except NoSuchElementException:
- self.logger.debug("已到最后一页")
- return None
- # 点击下一页
- btn.click()
- self._next_count += 1
- self.logger.debug(f"下一页[{self._next_count+1}]")
- sleep(1)
- # 等待页面加载
- self._wait_for(
- ec.presence_of_element_located((By.CLASS_NAME, "vT-srch-result")),
- message="下一页加载超时",
- )
- # 获取数据列表
- items = self.driver.find_elements(
- By.XPATH, "//ul[@class='vT-srch-result-list-bid']/li/a"
- )
- return items
- except NoSuchElementException as e:
- raise Exception(f"页面元素未找到: {e}")
- def _process_item(self, item) -> None:
- """处理单条数据"""
- main_handle = self.driver.current_window_handle
- close = True
- try:
- # 检查URL是否已采集
- url = item.get_attribute("href")
- if self._check_is_collect_by_url(url):
- close = False
- return
- # 打开详情页
- self.logger.debug("打开详情页")
- sleep(1)
- item.click()
- # 切换窗口
- self._wait_for(ec.number_of_windows_to_be(2), message="新窗口打开超时")
- handles = self.driver.window_handles
- for handle in handles:
- if handle != main_handle:
- self.driver.switch_to.window(handle)
- break
- # 等待页面加载
- self._wait_for(
- ec.presence_of_element_located((By.TAG_NAME, "body")),
- message="详情页加载超时",
- )
- # 获取内容
- content = self.driver.find_element(
- By.XPATH, "//div[@class='vF_deail_maincontent']"
- ).text
- # 判断公告类型
- if self._check_type("其他公告"):
- self._save_db(url, content, 3, is_invalid=True)
- return
- data_type = (
- 1
- if self._check_type("中标公告")
- or self._check_type("成交公告")
- or self._check_type("终止公告")
- else 0
- )
- # 检查关键词并保存
- if self._check_content(content):
- attach_str = self._attach_download()
- self._save_db(url, content, data_type, attach_str)
- else:
- self._save_db(url, content, data_type, is_invalid=True)
- except TimeoutException as e:
- self.logger.error(f"处理数据超时: {e}")
- except NoSuchElementException as e:
- self.logger.error(f"页面元素未找到: {e}")
- raise
- finally:
- if close:
- sleep(1)
- self.driver.close()
- self.driver.switch_to.window(main_handle)
- def _check_type(self, type_str: str) -> bool:
- """检查公告类型
- Args:
- type_str: 类型文本
- Returns:
- bool: 是否匹配
- """
- links = self.driver.find_elements(By.LINK_TEXT, type_str)
- if links:
- self.logger.info(f"公告类型: {type_str}")
- return True
- return False
- def _attach_download(self) -> Optional[str]:
- """下载附件
- Returns:
- str: 附件路径
- """
- paths = []
- # 查找附件链接
- attach_els = self.driver.find_elements(
- By.XPATH, "//td[@class='bid_attachtab_content']/a"
- )
- attach_2_els = self.driver.find_elements(By.XPATH, "//a[@ignore='1']")
- all_attachments = attach_els + attach_2_els
- self.logger.debug(
- f"附件数量: {len(attach_els)}/{len(attach_2_els)}/{len(all_attachments)}"
- )
- # 下载附件
- attach_urls = []
- for attach_el in all_attachments:
- try:
- # 获取附件信息
- attach_url = attach_el.get_attribute("href")
- if attach_url in attach_urls:
- self.logger.info(f"重复附件: {attach_url}")
- continue
- attach_urls.append(attach_url)
- # 获取文件名
- file_name = (
- attach_el.text
- or attach_el.get_attribute("download")
- or attach_url.split("/")[-1]
- )
- if not file_name or "." not in file_name:
- self.logger.warning(f"无效文件名: {file_name}")
- continue
- # 下载文件
- self.logger.debug(f"下载附件: {file_name}")
- path = utils.download_remote_file(attach_url, file_name)
- if path:
- self.logger.debug(f"下载成功: {path}")
- paths.append(path)
- else:
- self.logger.warning(f"下载失败: {file_name}")
- except Exception as e:
- self.logger.error(f"处理附件失败: {e}")
- continue
- # 返回附件路径
- attach_str = ",".join(paths)
- if attach_str:
- self.logger.info(f"附件下载完成: {attach_str}")
- return attach_str
|