ccgp_data_collection_adapter.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. from time import sleep
  2. from selenium.common.exceptions import TimeoutException, NoSuchElementException
  3. from selenium.webdriver.common.by import By
  4. from selenium.webdriver.support import expected_conditions as ec
  5. import utils
  6. from adapters.data_collection_adapter_interface import IDataCollectionAdapter
  7. from stores.data_store_interface import IDataStore
  8. class CcgpDataCollectionAdapter(IDataCollectionAdapter):
  9. """
  10. 中国政府采购网数据采集适配器
  11. """
  12. def __init__(self, url: str, store: IDataStore = None):
  13. self._url = url
  14. self._store = store
  15. self._driver = None
  16. self._keyword = None
  17. self._adapter_type = "ccgp"
  18. self._next_count = 0
  19. def login(self, username: str, password: str) -> None:
  20. pass
  21. def _collect(self, keyword: str):
  22. items = self._search(keyword)
  23. if len(items) <= 0:
  24. return
  25. self._process_list(items)
  26. if utils.get_config_bool(self.batch_save_key):
  27. self.store.save_collect_data(True)
  28. def _search(self, keyword: str) -> list:
  29. try:
  30. if not keyword:
  31. raise Exception("搜索关键字不能为空")
  32. self.driver.get(self._url)
  33. if not self._wait_until(
  34. ec.presence_of_element_located((By.ID, "searchForm"))
  35. ):
  36. return []
  37. search_el = self.driver.find_element(By.ID, "kw")
  38. sleep(2)
  39. search_el.clear()
  40. search_el.send_keys(keyword)
  41. search_btn = self.driver.find_element(
  42. By.XPATH, "//form[@id='searchForm']/input[@id='doSearch2']"
  43. )
  44. sleep(1)
  45. search_btn.click()
  46. self._next_count = 0
  47. if not self._wait_until(
  48. ec.presence_of_element_located((By.CLASS_NAME, "vT-srch-result"))
  49. ):
  50. return []
  51. default_search_txt = "近1周"
  52. search_txt = utils.get_config_value(self.search_day_key, default_search_txt)
  53. utils.get_logger().debug(f"搜索日期条件: {search_txt}")
  54. if search_txt != default_search_txt:
  55. last_els = self.driver.find_elements(By.XPATH, "//ul[@id='datesel']/li")
  56. for last_el in last_els:
  57. if search_txt == last_el.text:
  58. sleep(1)
  59. last_el.click()
  60. break
  61. if not self._wait_until(
  62. ec.presence_of_element_located((By.CLASS_NAME, "vT-srch-result"))
  63. ):
  64. return []
  65. else:
  66. sleep(1)
  67. try:
  68. p_els = self.driver.find_elements(
  69. By.XPATH, "//body/div[@class='vT_z']/div/div/p"
  70. )
  71. if len(p_els) > 0:
  72. utils.get_logger().debug(f" {p_els[0].text}")
  73. else:
  74. a_links = self.driver.find_elements(
  75. By.XPATH, "//div[@class='vT-srch-result-list']/p/a"
  76. )
  77. count = len(a_links)
  78. if count > 1:
  79. count = count - 1
  80. utils.get_logger().debug(f"共查询到 {count} 页,每页 20 条")
  81. except Exception as e:
  82. utils.get_logger().error(f"搜索失败[尝试查询页数]: {e}")
  83. items = self.driver.find_elements(
  84. By.XPATH, "//ul[@class='vT-srch-result-list-bid']/li/a"
  85. )
  86. return items
  87. except TimeoutException as e:
  88. raise Exception(f"搜索失败 [{self._adapter_type}] [超时]: {e}")
  89. except NoSuchElementException as e:
  90. raise Exception(f"搜索失败 [{self._adapter_type}] [找不到元素]: {e}")
  91. def _process_list(self, items: list) -> list:
  92. if not items:
  93. return []
  94. for item in items:
  95. self._process_item(item)
  96. sleep(2)
  97. next_items = self._next_page()
  98. if len(items) <= 0:
  99. return []
  100. return self._process_list(next_items)
  101. def _next_page(self) -> list:
  102. try:
  103. next_path = "//div[@class='vT-srch-result-list']/p/a[@class='next']"
  104. try:
  105. btn = self.driver.find_element(By.XPATH, next_path)
  106. except NoSuchElementException:
  107. utils.get_logger().debug(f"翻页结束 [{self._adapter_type}]")
  108. return []
  109. btn.click()
  110. self._next_count += 1
  111. utils.get_logger().debug(
  112. f"下一页[{self._next_count+1}]: {self.driver.current_url}"
  113. )
  114. sleep(1)
  115. if not self._wait_until(
  116. ec.presence_of_element_located((By.CLASS_NAME, "vT-srch-result"))
  117. ):
  118. return []
  119. items = self.driver.find_elements(
  120. By.XPATH, "//ul[@class='vT-srch-result-list-bid']/li/a"
  121. )
  122. return items
  123. except NoSuchElementException as e:
  124. raise Exception(f"翻页失败 [{self._adapter_type}] [找不到元素]: {e}")
  125. except TimeoutException as e:
  126. raise Exception(f"翻页失败 [{self._adapter_type}] [超时]: {e}")
  127. def _process_item(self, item):
  128. main_handle = self.driver.current_window_handle
  129. close = True
  130. try:
  131. url = item.get_attribute("href")
  132. if self._check_is_collect_by_url(url):
  133. close = False
  134. return
  135. utils.get_logger().debug(f"跳转详情")
  136. sleep(1)
  137. item.click()
  138. if not self._wait_until(ec.number_of_windows_to_be(2)):
  139. return
  140. handles = self.driver.window_handles
  141. for handle in handles:
  142. if handle != main_handle:
  143. self.driver.switch_to.window(handle)
  144. break
  145. if not self._wait_until(
  146. ec.presence_of_element_located((By.TAG_NAME, "body"))
  147. ):
  148. return
  149. content = self.driver.find_element(
  150. By.XPATH, "//div[@class='vF_deail_maincontent']"
  151. ).text
  152. # 排除其他公告
  153. if self._check_type("其他公告"):
  154. self._save_db(url, content, 3, is_invalid=True)
  155. return
  156. # 判断是否为投标公告
  157. data_type = (
  158. 1
  159. if self._check_type("中标公告")
  160. or self._check_type("成交公告")
  161. or self._check_type("终止公告")
  162. else 0
  163. )
  164. if self._check_content(content):
  165. attach_str = self._attach_download()
  166. self._save_db(url, content, data_type, attach_str)
  167. else:
  168. self._save_db(url, content, data_type, is_invalid=True)
  169. except TimeoutException as e:
  170. utils.get_logger().error(
  171. f"采集发生异常 [{self._adapter_type}] Timeout: {self.driver.current_url}。Exception: {e}"
  172. )
  173. except NoSuchElementException as e:
  174. utils.get_logger().error(
  175. f"采集发生异常 [{self._adapter_type}] NoSuchElement: {self.driver.current_url}。Exception: {e}"
  176. )
  177. raise Exception(f"采集失败 [{self._adapter_type}] [找不到元素]: {e}")
  178. finally:
  179. if close:
  180. sleep(1)
  181. self.driver.close()
  182. self.driver.switch_to.window(main_handle)
  183. def _check_type(self, type_str: str) -> bool:
  184. links = self.driver.find_elements(By.LINK_TEXT, type_str)
  185. if len(links) > 0:
  186. utils.get_logger().info(f"{type_str}")
  187. return True
  188. return False
  189. def _attach_download(self):
  190. paths = []
  191. attach_els = self.driver.find_elements(
  192. By.XPATH, "//td[@class='bid_attachtab_content']/a"
  193. )
  194. attach_2_els = self.driver.find_elements(By.XPATH, "//a[@ignore='1']")
  195. # 合并两个列表
  196. all_attachments = attach_els + attach_2_els
  197. utils.get_logger().debug(
  198. f"附件检索数量: {len(attach_els)}/{len(attach_2_els)}/{len(all_attachments)}"
  199. )
  200. attach_urls = []
  201. if len(all_attachments) > 0:
  202. for attach_el in all_attachments:
  203. attach_url = attach_el.get_attribute("href")
  204. if attach_url not in attach_urls:
  205. attach_urls.append(attach_url)
  206. else:
  207. utils.get_logger().info(f"重复附件: {attach_url}")
  208. continue
  209. file_name = (
  210. attach_el.text
  211. or attach_el.get_attribute("download")
  212. or attach_url.split("/")[-1]
  213. )
  214. if not file_name:
  215. continue
  216. # 检查 file_name 是否包含文件扩展名
  217. if "." not in file_name:
  218. utils.get_logger().warning(
  219. f"文件名 {file_name} 不包含扩展名,跳过下载。"
  220. )
  221. continue
  222. utils.get_logger().debug(
  223. f"开始下载附件: {file_name} 链接: {attach_url}"
  224. )
  225. path = utils.download_remote_file(attach_url, file_name)
  226. if path:
  227. utils.get_logger().debug(f"下载附件路径: {path}")
  228. paths.append(path)
  229. else:
  230. utils.get_logger().warning(f"下载附件失败: {file_name}")
  231. attach_str = ",".join(paths)
  232. if attach_str:
  233. utils.get_logger().info(f"附件下载完成: {attach_str}")
  234. return attach_str