浏览代码

Add 采集中标报告

YueYunyun 8 月之前
父节点
当前提交
84b0a049bc

+ 1 - 1
SourceCode/TenderCrawler/.env

@@ -1,6 +1,6 @@
 SELENIUM_CHROME_PORT=3534
 MYSQL_ROOT_PASSWORD=123456qwertyu
-MYSQL_DATABASE=iwb_data_collect_v1.0
+MYSQL_DATABASE=iwb_data_collect_v2.0
 MYSQL_USER=iwb_data
 MYSQL_PASSWORD=123456iwb
 MYSQL_PORT=3535

+ 57 - 47
SourceCode/TenderCrawler/app/adapters/ccgp_data_collection_adapter.py

@@ -80,9 +80,9 @@ class CcgpDataCollectionAdapter(IDataCollectionAdapter):
                                          "//ul[@class='vT-srch-result-list-bid']/li/a")
             return items
         except TimeoutException as e:
-            raise Exception(f"搜索失败 [超时]: {e}")
+            raise Exception(f"搜索失败 [{self._adapter_type}] [超时]: {e}")
         except NoSuchElementException as e:
-            raise Exception(f"搜索失败 [找不到元素]: {e}")
+            raise Exception(f"搜索失败 [{self._adapter_type}] [找不到元素]: {e}")
 
 
     def _process_list(self,  items: list) -> list:
@@ -99,20 +99,22 @@ class CcgpDataCollectionAdapter(IDataCollectionAdapter):
         try:
             wait = WebDriverWait(self.driver, 10, 1)
             next_path = "//div[@class='vT-srch-result-list']/p/a[@class='next']"
-            wait.until(ec.presence_of_element_located((By.XPATH, next_path)))
-            btn = self.driver.find_element(By.XPATH, next_path)
+            try:
+                btn = self.driver.find_element(By.XPATH, next_path)
+            except NoSuchElementException:
+                self.logger.info(f"翻页结束 [{self._adapter_type}]")
+                return []
             btn.click()
             self.logger.info(f"跳转到下页: {self.driver.current_url}")
             sleep(5)
-            wait.until(ec.presence_of_element_located((By.ID, "vT-srch-result")))
+            wait.until(ec.presence_of_element_located((By.CLASS_NAME, "vT-srch-result")))
             items = self.driver.find_elements(By.XPATH,
                                          "//ul[@class='vT-srch-result-list-bid']/li/a")
             return items
         except NoSuchElementException as e:
-            raise Exception(f"翻页失败 [找不到元素]: {e}")
-        except TimeoutException:
-            self.logger.info("翻页结束")
-            return []
+            raise Exception(f"翻页失败 [{self._adapter_type}] [找不到元素]: {e}")
+        except TimeoutException as e:
+            raise Exception(f"翻页结束 [{self._adapter_type}] [超时]: {e}")
 
     def _process_item(self,  item):
         main_handle = self.driver.current_window_handle
@@ -123,7 +125,8 @@ class CcgpDataCollectionAdapter(IDataCollectionAdapter):
             if self._check_is_collect_by_url(url):
                 close = False
                 return
-            self.logger.info(f"跳转详情")
+            # self.logger.info(f"跳转详情")
+            print(".", end="")
             sleep(1)
             item.click()
             wait.until(ec.number_of_windows_to_be(2))
@@ -133,49 +136,24 @@ class CcgpDataCollectionAdapter(IDataCollectionAdapter):
                     self.driver.switch_to.window(handle)
                     break
             wait.until(ec.presence_of_element_located((By.TAG_NAME, "body")))
-            # 判断是否为投标公告
-            if self._check_type("中标公告") or  self._check_type("成交公告") or self._check_type("终止公告"):
-                self._save_db(url, "", is_invalid=True)
-                return
+
             content = self.driver.find_element(By.XPATH, "//div[@class='vF_deail_maincontent']").text
+            # 判断是否为投标公告
+            data_type = 0 if self._check_type("中标公告") or self._check_type("成交公告") or self._check_type(
+                    "终止公告") or self._check_type("其他公告") else 1
+
             if self._check_content(content):
-                paths = []
-
-                attach_els = self.driver.find_elements(By.XPATH, "//td[@class='bid_attachtab_content']/a")
-                attach_2_els = self.driver.find_elements(By.XPATH, "//a[@ignore='1']")
-
-                # 合并两个列表
-                all_attachments = attach_els + attach_2_els
-                attach_urls = []
-                if len(all_attachments) > 0:
-                    for attach_el in attach_els:
-                        attach_url = attach_el.get_attribute('href')
-                        if attach_url not in attach_urls:
-                            attach_urls.append(attach_url)
-                        else:
-                            self.logger.info(f"重复附件: {attach_url}")
-                            continue
-                        file_name =  attach_el.text or attach_el.get_attribute('download') or attach_url.split('/')[-1]
-                        if not file_name:
-                            continue
-                        # 检查 file_name 是否包含文件扩展名
-                        if '.' not in file_name:
-                            self.logger.warning(f"文件名 {file_name} 不包含扩展名,跳过下载。")
-                            continue
-                        path = self.file_helper.download_remote_file(attach_url, file_name)
-                        if path:
-                            paths.append(path)
-                attach_str = ",".join(paths)
-                self._save_db(url, content, attach_str)
+                attach_str = self._attach_download()
+                self._save_db(url, content, data_type, attach_str)
             else:
-                self._save_db(url, content, is_invalid=True)
+                self._save_db(url, content, data_type, is_invalid=True)
         except TimeoutException as e:
             self.logger.error(
-                f"采集发生异常 Timeout: {self.driver.current_url}。Exception: {e}")
+                f"采集发生异常 [{self._adapter_type}] Timeout: {self.driver.current_url}。Exception: {e}")
         except NoSuchElementException as e:
             self.logger.error(
-                f"采集发生异常 NoSuchElement: {self.driver.current_url}。Exception: {e}")
-            raise Exception(f"采集失败 [找不到元素]: {e}")
+                f"采集发生异常 [{self._adapter_type}] NoSuchElement: {self.driver.current_url}。Exception: {e}")
+            raise Exception(f"采集失败 [{self._adapter_type}] [找不到元素]: {e}")
         finally:
             if close:
                 sleep(1)
@@ -185,7 +163,39 @@ class CcgpDataCollectionAdapter(IDataCollectionAdapter):
     def _check_type(self,type_str: str)->bool:
         links = self.driver.find_elements(By.LINK_TEXT, type_str)
         if len(links) > 0:
-            self.logger.info(f"{type_str},跳过")
+            self.logger.info(f"{type_str}")
             return True
         return False
 
+
+    def _attach_download(self):
+        paths = []
+
+        attach_els = self.driver.find_elements(By.XPATH, "//td[@class='bid_attachtab_content']/a")
+        attach_2_els = self.driver.find_elements(By.XPATH, "//a[@ignore='1']")
+
+        # 合并两个列表
+        all_attachments = attach_els + attach_2_els
+        attach_urls = []
+        if len(all_attachments) > 0:
+            for attach_el in attach_els:
+                attach_url = attach_el.get_attribute('href')
+                if attach_url not in attach_urls:
+                    attach_urls.append(attach_url)
+                else:
+                    self.logger.info(f"重复附件: {attach_url}")
+                    continue
+                file_name = attach_el.text or attach_el.get_attribute('download') or attach_url.split('/')[-1]
+                if not file_name:
+                    continue
+                # 检查 file_name 是否包含文件扩展名
+                if '.' not in file_name:
+                    self.logger.warning(f"文件名 {file_name} 不包含扩展名,跳过下载。")
+                    continue
+                path = self.file_helper.download_remote_file(attach_url, file_name)
+                if path:
+                    paths.append(path)
+        attach_str = ",".join(paths)
+        if attach_str:
+            self.logger.info(f"附件下载完成: {attach_str}")
+        return attach_str

+ 77 - 75
SourceCode/TenderCrawler/app/adapters/chinabidding_data_collection_adapter.py

@@ -15,7 +15,6 @@ class ChinabiddingDataCollectionAdapter(IDataCollectionAdapter):
     中国招标网数据采集适配器
     """
 
-
     def __init__(self, url: str,store:IDataStore=None):
         self._url = url
         self._store = store
@@ -23,25 +22,6 @@ class ChinabiddingDataCollectionAdapter(IDataCollectionAdapter):
         self._keyword = None
         self._adapter_type = "chinabidding"
 
-    # @property
-    # def store(self) -> IDataStore:
-    #     return self._store
-    #
-    # @property
-    # def url(self):
-    #     return self._url
-    #
-    # @property
-    # def keyword(self):
-    #     return self._keyword
-    #
-    # @property
-    # def driver(self)->webdriver:
-    #     if not self._driver:
-    #         self._driver = self._create_driver()
-    #     return self._driver
-
-
     def login(self, username: str, password: str) -> None:
         try:
             login_el = self.driver.find_element(
@@ -57,73 +37,95 @@ class ChinabiddingDataCollectionAdapter(IDataCollectionAdapter):
             login_btn.click()
             wait.until(ec.presence_of_element_located((By.ID, "site-content")))
         except TimeoutException as e:
-            raise Exception(f"登录失败 [超时]: {e}")
+            raise Exception(f"登录失败 [{self._adapter_type}] [超时]: {e}")
         except NoSuchElementException as e:
-            raise Exception(f"登录失败 [找不到元素]: {e}")
+            raise Exception(f"登录失败 [{self._adapter_type}] [找不到元素]: {e}")
 
 
     def collect(self, keyword: str, store: IDataStore):
         if store:
             self._store = store
         self._keyword = keyword
-        items = self._search(keyword)
-        self._process_list(items)
+        items = self._search_by_type(keyword, 0)
+        self._process_list(items,0)
+        sleep(2)
+        items = self._search_by_type(keyword,1)
+        self._process_list(items,1)
         if self.config.get_bool(self.batch_save_key):
             self.store.save_collect_data(True)
 
-    def _search(self, keyword: str) -> list:
+    def _search_by_type(self, keyword: str,data_type):
         try:
-            wait = WebDriverWait(self.driver, 10, 1)
-            wait.until(
-                ec.presence_of_element_located((By.ID, "projSearchForm")))
-            search_el = self.driver.find_element(By.ID, "fullText")
-            search_el.send_keys("")
-            search_el.send_keys(keyword)
-            search_btn = self.driver.find_element(
-                By.XPATH, "//form[@id='projSearchForm']/button")
-            search_btn.click()
-            wait.until(ec.presence_of_element_located((By.ID, "site-content")))
-            default_search_txt = "近3日"
-            search_txt = self.config.get(self.search_day_key, default_search_txt)
-            self.logger.info(f"搜索关键字: {keyword},搜索条件: {search_txt}")
-            if search_txt != default_search_txt:
-                last_el = self.driver.find_element(By.LINK_TEXT, search_txt)
-                last_el.click()
-                wait.until(ec.presence_of_element_located((By.ID, "site-content")))
+            self.driver.get(self._url)
+            if data_type == 0:
+                self.logger.info(f"开始采集 招标公告")
+                el = self.driver.find_element(By.XPATH, "//div[@id='z-b-g-g']/h2/a[@class='more']")
             else:
-                sleep(1)
-            try:
-                a_links = self.driver.find_elements(
-                    By.XPATH, "//form[@id='pagerSubmitForm']/a")
-                count = len(a_links)
-                if count > 1:
-                    count = count - 1
-                self.logger.info(f"共查询到 {count} 页")
-            except Exception as e:
-                self.logger.error(f"搜索失败[尝试查询页数]: {e}")
-            items = self.driver.find_elements(By.XPATH,
-                                              "//ul[@class='as-pager-body']/li/a")
-            return items
+                self.logger.info(f"开始采集 中标结果公告")
+                el = self.driver.find_element(By.XPATH, "//div[@id='z-b-jg-gg']/h2/a[@class='more']")
+            el.click()
+            wait = WebDriverWait(self.driver, 10, 1)
+            wait.until(ec.number_of_windows_to_be(2))
+            self.driver.close()
+            self.driver.switch_to.window(self.driver.window_handles[0])
+            return self._search(keyword)
         except TimeoutException as e:
-            raise Exception(f"搜索失败 [超时]: {e}")
+            raise Exception(f"搜索失败 [{self._adapter_type}] [超时]: {e}")
         except NoSuchElementException as e:
-            raise Exception(f"搜索失败 [找不到元素]: {e}")
+            raise Exception(f"搜索失败 [{self._adapter_type}] [找不到元素]: {e}")
 
-    def _process_list(self, items: list) -> list:
+
+    def _search(self, keyword: str) -> list:
+        wait = WebDriverWait(self.driver, 10, 1)
+        wait.until(
+            ec.presence_of_element_located((By.ID, "searchBidProjForm")))
+        search_el = self.driver.find_element(By.XPATH, "//form[@id='searchBidProjForm']/ul/li/input[@id='fullText']")
+        search_el.clear()
+        search_el.send_keys(keyword)
+        search_btn = self.driver.find_element(
+            By.XPATH, "//form[@id='searchBidProjForm']/ul/li/button")
+        search_btn.click()
+        wait.until(ec.presence_of_element_located((By.ID, "site-content")))
+        default_search_txt = "全部"
+        search_txt = self.config.get(self.search_day_key, default_search_txt)
+        self.logger.info(f"搜索关键字: {keyword},搜索条件: {search_txt}")
+        if search_txt != default_search_txt:
+            last_el = self.driver.find_element(By.LINK_TEXT, search_txt)
+            sleep(1)
+            last_el.click()
+            wait.until(ec.presence_of_element_located((By.ID, "site-content")))
+        else:
+            sleep(1)
+        try:
+            a_links = self.driver.find_elements(
+                By.XPATH, "//form[@id='pagerSubmitForm']/a")
+            count = len(a_links)
+            if count > 1:
+                count = count - 1
+            self.logger.info(f"共查询到 {count} 页")
+        except Exception as e:
+            self.logger.error(f"搜索失败[尝试查询页数]: {e}")
+        items = self.driver.find_elements(By.XPATH,
+                                          "//ul[@class='as-pager-body']/li/a")
+        return items
+
+    def _process_list(self, items: list,data_type) -> list:
         if not items:
             return []
         for item in items:
-            self._process_item(item)
+            self._process_item(item,data_type)
         sleep(2)
         next_items = self._next_page()
-        return self._process_list(next_items)
+        return self._process_list(next_items,data_type)
 
     def _next_page(self) -> list:
         try:
             wait = WebDriverWait(self.driver, 10, 1)
-            next_path = "//form[@id='pagerSubmitForm']/a[@class='next']"
-            wait.until(ec.presence_of_element_located((By.XPATH, next_path)))
-            btn = self.driver.find_element(By.XPATH, next_path)
+            try:
+                btn = self.driver.find_element(By.XPATH, "//form[@id='pagerSubmitForm']/a[@class='next']")
+            except NoSuchElementException:
+                self.logger.info(f"翻页结束 [{self._adapter_type}]")
+                return []
             btn.click()
             self.logger.info(f"跳转到下页: {self.driver.current_url}")
             wait.until(ec.presence_of_element_located((By.ID, "site-content")))
@@ -131,12 +133,11 @@ class ChinabiddingDataCollectionAdapter(IDataCollectionAdapter):
                                          "//ul[@class='as-pager-body']/li/a")
             return items
         except NoSuchElementException as e:
-            raise Exception(f"翻页失败 [找不到元素]: {e}")
+            raise Exception(f"翻页失败 [{self._adapter_type}] [找不到元素]: {e}")
         except TimeoutException:
-            self.logger.info("翻页结束")
-            return []
+            raise Exception(f"翻页结束 [{self._adapter_type}] [超时]: {e}")
 
-    def _process_item(self, item):
+    def _process_item(self, item,data_type):
         main_handle = self.driver.current_window_handle
         close = True
         try:
@@ -153,22 +154,23 @@ class ChinabiddingDataCollectionAdapter(IDataCollectionAdapter):
                     self.driver.switch_to.window(handle)
                     break
             url = self.driver.current_url
-            self.logger.info(f"跳转详情")
-            wait.until(ec.presence_of_element_located((By.TAG_NAME, "body")))
-            content = self.driver.find_element(By.TAG_NAME, "body").text
+            # self.logger.info(f"跳转详情")
+            print(".",end="")
+            wait.until(ec.presence_of_element_located((By.CLASS_NAME, "content")))
+            content = self.driver.find_element(By.CLASS_NAME, "content").text
             if self._check_content(content):
-                self._save_db(url, content)
+                self._save_db(url, content, data_type)
             else:
-                self._save_db(url, content, is_invalid=True)
+                self._save_db(url, content, data_type, is_invalid=True)
 
         except TimeoutException as e:
             self.logger.error(
-                f"采集发生异常 Timeout: {self.driver.current_url}。Exception: {e}")
+                f"采集发生异常 [{self._adapter_type}] Timeout: {self.driver.current_url}。Exception: {e}")
             # raise Exception(f"采集失败 [超时]: {e}")
         except NoSuchElementException as e:
             self.logger.error(
-                f"采集发生异常 NoSuchElement: {self.driver.current_url}。Exception: {e}")
-            raise Exception(f"采集失败 [找不到元素]: {e}")
+                f"采集发生异常 [{self._adapter_type}] NoSuchElement: {self.driver.current_url}。Exception: {e}")
+            raise Exception(f"采集失败 [{self._adapter_type}] [找不到元素]: {e}")
         finally:
             if close:
                 sleep(2)

+ 8 - 3
SourceCode/TenderCrawler/app/adapters/data_collection_adapter_interface.py

@@ -8,7 +8,6 @@ from drivers.driver_creator import DriverCreator
 from utils.logger_helper import LoggerHelper
 from utils.config_helper import ConfigHelper
 from models.collect_data import CollectData
-from models.process_data import ProcessData
 
 
 class IDataCollectionAdapter(ABC):
@@ -148,13 +147,19 @@ class IDataCollectionAdapter(ABC):
 
 
 
-    def _save_db(self, url, content, attach_str = None,is_invalid=False):
+    def _save_db(self, url, content, data_type=0, attach_str = None,is_invalid=False):
         if not self.store:
             self.logger.info(f"DataStore 未指定: {url},关键字{self.keyword}")
             return False
         else:
             status = 2 if is_invalid else 0
-            data = CollectData(url, self.keyword, content, attach_str, status)
+            data = CollectData(
+                url=url,
+                keyword=self.keyword,
+                content=content,
+                data_type=data_type,
+                attach_path=attach_str,
+                status=status)
             self.store.insert_collect_data(data, self.config.get_bool(self.batch_save_key))
             return True
 

+ 2 - 1
SourceCode/TenderCrawler/app/config.yml

@@ -35,7 +35,8 @@ ai:
   model: qwen-plus
   max_tokens: 1024
   system_prompt: 请帮我分析以下文字,提取出关键信息,并以json格式字符串返回,如果部分信息为空,则该字段返回为空。
-  prompt_template: 在以上内容中提取信息:编号(no) 、标题(title)、在哪个城市招标(area)、开标的时间(date)、开标的地点(address)、发布时间(release_date)、150字左右的招标条件要求及联系方式等内容摘要(summary),设备(device)。提取出相关设备的名称信息,多个设备以逗号分割。返回包含no,title,area,date,address,release_date,summary,device字段的json格式字符串,没有找到或未提供的信息json字段为空。
+  prompt_template_1: 在以上内容中提取信息:编号(no) 、标题(title)、在哪个城市招标(area)、开标的时间(date)、开标的地点(address)、发布时间(release_date)、150字左右的招标条件要求及联系方式等内容摘要(summary),设备(devices)。提取出相关设备的名称信息,多个设备以逗号分割。返回包含no,title,area,date,address,release_date,summary,devices字段的json格式字符串,没有找到或未提供的信息json字段为空。
+  prompt_template_2: 在以上内容中提取信息:编号(no) 、标题(title)、公告时间(date)、标中的总价格(price)、标中的公司,多个以逗号分割(bidder)、150-300字的标的物说明,标的物价格,公司的明细等内容摘要(summary),设备(devices)。提取出相关设备的名称信息,多个设备以逗号分割。返回包含no,title,date,price,bidder,summary字段的json格式字符串,没有找到或未提供的信息json字段为空。
 email:
 #  smtp_server: smtp.exmail.qq.com
 #  smtp_port: 465

+ 1 - 1
SourceCode/TenderCrawler/app/drivers/driver_creator.py

@@ -14,7 +14,7 @@ class DriverCreator:
         # 设置Chrome选项
         options = webdriver.ChromeOptions()
 
-        # options.add_argument('--headless')  # 无头模式运行
+        options.add_argument('--headless')  # 无头模式运行
         options.add_argument('--no-sandbox')
         options.add_argument('--disable-dev-shm-usage')
         options.add_experimental_option('excludeSwitches',

+ 5 - 2
SourceCode/TenderCrawler/app/main.py

@@ -1,4 +1,5 @@
 import time
+import datetime
 import schedule
 
 from utils.config_helper import ConfigHelper
@@ -6,7 +7,7 @@ from utils.logger_helper import LoggerHelper
 from main.runner import Runner
 
 logger = LoggerHelper.get_logger()
-DEFAULT_USER_SLEEP_INTERVAL = 60 * 30  # 配置默认时间间隔30分钟
+DEFAULT_USER_SLEEP_INTERVAL = 10  # 配置默认时间间隔10秒
 
 runner = Runner()
 runner.run()
@@ -15,6 +16,8 @@ interval = ConfigHelper().get_int("schedule.sleep_interval",DEFAULT_USER_SLEEP_I
 
 if __name__ == '__main__':
     while True:
-        logger.info(f"等待下次检查执行... {interval}秒后")
+        now = datetime.datetime.now()
+        if now.minute == 0:
+            logger.info("程序运行中...")
         schedule.run_pending()
         time.sleep(interval)

+ 94 - 38
SourceCode/TenderCrawler/app/main/data_process.py

@@ -1,4 +1,6 @@
+from models.process_result_data import ProcessResultData
 from utils.logger_helper import LoggerHelper
+from utils.config_helper import ConfigHelper
 from utils.ai_helper import AiHelper
 from stores.data_store_interface import IDataStore
 from models.collect_data import CollectData
@@ -7,11 +9,26 @@ from models.process_data import ProcessData
 
 class DataProcess:
     logger = LoggerHelper.get_logger()
-
+    config = ConfigHelper()
     _store = None
 
+    DEFAULT_AI_SYSTEM_PROMPT = "请帮我分析以下文字,提取出关键信息,并以json格式字符串返回,如果部分信息为空,则该字段返回为空。"
+    DEFAULT_AI_PROMPT_TEMPLATE_1 = """在以上内容中提取信息:
+            编号(no) 、标题(title)、在哪个城市招标(area)、开标的时间(date)、开标的地点(address)、发布时间(release_date)、150字左右的招标条件要求及联系方式等内容摘要(summary), 设备(devices)。
+            提取出相关设备的名称信息, 多个设备以逗号分割。
+            返回包含no, title, area, date, address, release_date, summary, devices字段的json格式字符串,没有找到或未提供的信息json字段为空。
+            """
+    DEFAULT_AI_PROMPT_TEMPLATE_2 = """在以上内容中提取信息:
+            编号(no) 、标题(title)、公告时间(date)、标中的总价格(price)、标中的公司,多个以逗号分割(bidder)、150-300字的标的物说明,标的物价格,公司的明细等内容摘要(summary),设备(devices)。
+            提取出相关设备的名称信息,多个设备以逗号分割。返回包含no,title,date,price,bidder,summary字段的json格式字符串,没有找到或未提供的信息json字段为空  """
     def __init__(self, store: IDataStore):
         self._store = store
+        self._ai_system_prompt = self.config.get("ai.system_prompt",
+                                                 self.DEFAULT_AI_SYSTEM_PROMPT)
+        self._ai_prompt_template_1 = self.config.get(
+            "ai.prompt_template_1", self.DEFAULT_AI_PROMPT_TEMPLATE_1)
+        self._ai_prompt_template_2 = self.config.get(
+            "ai.prompt_template_2", self.DEFAULT_AI_PROMPT_TEMPLATE_2)
 
     @property
     def store(self) -> IDataStore:
@@ -23,49 +40,88 @@ class DataProcess:
             for item in urls:
                 self._process_item(item)
             self.store.save_process_data(True)
+            self.store.save_process_result_data(True)
         except Exception as e:
-            self.logger.error(f"数据处理过程中发生异常: {e}")
+            self.logger.error(f"数据处理发生异常: {e}")
+            raise Exception(f"数据处理发生异常: {e}")
 
     def _process_item(self, url: str) -> None:
-        self.logger.info("START ==>" + url)
-        item = self.store.query_one_collect_by_url(url)
-        if not item:
-            self.logger.info("END: NOT FOUND URL==>" + url)
-            return
-        if item.status == 1:
-            self.logger.info("ALREADY URL==>" + url)
-            return
-        data = self._ai_process(item)
-        if data:
-            old = None
-            if data.no:
-                old = self.store.query_one_process_by_no(data.no)
-            if not old:
-                data.url = url
-                data.keyword = item.keyword
-                self.store.insert_process_data(data)
-            else:
-                if old.url != url:
-                    if old.other_urls:
-                        old.other_urls += f",{url}"
-                    else:
-                        old.other_urls = url
-                    self.store.set_process_other_urls(data.url, old.other_urls)
-                self.logger.info(f"ALREADY 编号: {data.no} URL:{old.other_urls}")
+       try:
+           self.logger.info(f"START ==>URL:{url}")
+           item = self.store.query_one_collect_by_url(url)
+           if not item:
+               self.logger.info(f"END==> NOT FOUND URL:{url}")
+               return
+           if item.status == 1:
+               self.logger.info(f"ALREADY1 URL:{url}")
+               return
+           data = self.store.query_one_process_by_url(url) if item.data_type == 0 else  self.store.query_one_process_result_by_url(url)
+           if data :
+               self.logger.info(f"ALREADY2 [{item.data_type}] URL==> {url}")
+               return
+           data =  self._ai_process_1(item) if item.data_type == 0 else self._ai_process_2(item)
+           if data:
+               old = None
+               if data.no:
+                   old = self.store.query_one_process_result_by_no(data.no) if item.data_type == 0 else self.store.query_one_process_by_no(data.no)
+               if not old:
+                   data.url = url
+                   data.keyword = item.keyword
+                   data.attach_path = item.attach_path
+                   if item.data_type == 0:
+                       self.store.insert_process_data(data)
+                   else:
+                       self.store.insert_process_result_data(data)
+               else:
+                   if old.url != url:
+                       if old.other_urls:
+                           old.other_urls += f",{url}"
+                       else:
+                           old.other_urls = url
+                       if item.data_type == 0:
+                          self.store.set_process_other_urls(data.url, old.other_urls)
+                       else:
+                          self.store.set_process_result_other_urls(data.url, old.other_urls)
+                   self.logger.info(f"ALREADY 编号: {data.no} URL:{old.other_urls}")
+
+           self.logger.info("END   ==>" + url)
+       except Exception as e:
+           self.logger.error(f"数据处理发生异常: {url} {e}")
+
+    def _ai_process_1(self, item: CollectData) -> ProcessData | None:
+        try:
+            data = AiHelper().call_openai(self._ai_system_prompt,f"{item.content} {self._ai_prompt_template_1}")
+            return ProcessData(no=data.get("no"),
+                               title=data.get("title"),
+                               date=data.get("date"),
+                               area=data.get("area"),
+                               address=data.get("address"),
+                               devices=data.get("devices"),
+                               summary=data.get("summary"),
+                               release_date=data.get("release_date"),
+                               prompt_tokens=data.get("prompt_tokens"),
+                               completion_tokens=data.get("completion_tokens"),
+                               total_tokens=data.get("total_tokens"),
+                               )
+        except Exception as e:
+            self.logger.error(f"AI 提取数据失败1: {item.url} {e}")
+            return None
 
-        self.logger.info("END   ==>" + url)
 
-    def _ai_process(self, item: CollectData) -> ProcessData | None:
+    def _ai_process_2(self, item: CollectData) -> ProcessResultData | None:
         try:
-            data = AiHelper().call_ai(item.content)
-            return data
+            data = AiHelper().call_openai(self._ai_system_prompt,f"{item.content} {self._ai_prompt_template_2}")
+            return ProcessResultData(no=data.get("no"),
+                               title=data.get("title"),
+                               date=data.get("date"),
+                               price=data.get("price"),
+                               bidder=data.get("bidder"),
+                               summary=data.get("summary"),
+                               prompt_tokens=data.get("prompt_tokens"),
+                               completion_tokens=data.get("completion_tokens"),
+                               total_tokens=data.get("total_tokens"),
+                               )
         except Exception as e:
-            self.logger.error(f"AI 提取数据失败: {item.url} {e}")
+            self.logger.error(f"AI 提取数据失败2: {item.url} {e}")
             return None
 
-    # def _generate_unique_id(self) -> str:
-    #     from datetime import datetime
-    #     current_time = datetime.now().strftime("%Y%m%d%H%M%S%f")
-    #     thread_id = threading.current_thread().ident
-    #     unique_id = f"{current_time}-{thread_id}"
-    #     return unique_id

+ 166 - 2
SourceCode/TenderCrawler/app/main/data_send.py

@@ -1,7 +1,11 @@
+from datetime import datetime, timedelta
+import calendar
+
 from utils.logger_helper import LoggerHelper
 from utils.email_helper import EmailHelper
 from stores.data_store_interface import IDataStore
 from models.process_data import ProcessData
+from models.process_result_data import ProcessResultData
 
 
 class DataSend:
@@ -24,6 +28,30 @@ class DataSend:
         if len(self._error_arr) > 0:
             self._send_email_no_found()
 
+
+    def send_report_current_month(self):
+        # 查询当月的数据
+        start_date ,end_date = self._get_first_and_last_day_of_current_month()
+        self._send_reports(start_date,end_date)
+
+    def send_report_prev_month(self):
+        # 查询上月的数据
+        start_date ,end_date = self._get_first_and_last_day_of_prev_month()
+        self._send_reports(start_date,end_date)
+
+    def _send_reports(self,start_date,end_date):
+        self.logger.info(f"开始发送中标报告邮件,开始日期:{start_date.strftime("%Y-%m-%d")},结束日期:{end_date.strftime("%Y-%m-%d")}")
+        email = self.store.query_master_email()
+        if not email:
+            self.logger.error(f"没有找到master email")
+            return
+        items = self.store.query_to_report_by_date(start_date,end_date)
+        title = f"{start_date.month}月中标结果报告"
+        body = self._build_report_email_html(title, items)
+        flag= EmailHelper().send_email(email, title, body,True)
+        if flag:
+            self.logger.info(f"发送中标报告邮件成功")
+
     def _send_item(self, item: ProcessData) -> None:
         self.logger.info(f"开始发送邮件,地区为:{item.area} ,URL为 {item.url}")
         email = self.store.get_email_by_area(item.area)
@@ -32,13 +60,13 @@ class DataSend:
             if item.area not in self._error_arr:
                 self._error_arr.append(item.area)
             return
-        body = self._build_email_content(item)
+        body = self._build_email_html(item)
         flag = EmailHelper().send_email(email, item.title, body, True, item.attach_path)
         if flag:
             self.store.set_send(item.no)
 
     @staticmethod
-    def _build_email_content(item: ProcessData, other: str = "") -> str:
+    def _build_email_html(item: ProcessData, other: str = "") -> str:
         html_body = f"""
         <html>
         <head>
@@ -107,6 +135,114 @@ class DataSend:
         """
         return html_body
 
+    def _build_report_email_html(self, title, items) -> str:
+        body = ""
+        for item in items:
+            body += self._build_report_email_body(item)
+        html = f"""
+        <html>
+        <head>
+            <style>
+                body {{
+                    background-color: #f4f4f9;
+                    font-family: Arial, sans-serif;
+                    margin: 0;
+                    padding: 20px;
+                }}
+                h1 {{
+                    text-align: center;
+                    color: #333;
+                }}
+                .container {{
+                    max-width: 600px;
+                    margin: 0 auto;
+                    background-color: #fff;
+                    padding: 20px;
+                    border-radius: 8px;
+                    box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);
+                }}
+                .button-container {{
+                    text-align: center;
+                    margin-top: 20px;
+                }}
+                .button {{
+                    display: inline-block;
+                    padding: 10px 20px;
+                    font-size: 16px;
+                    color: #fff!important;
+                    background-color: #007bff;
+                    text-decoration: none;
+                    border-radius: 5px;
+                    transition: background-color 0.3s;
+                }}
+                .button:hover {{
+                    background-color: #0056b3;
+                }}
+                .system {{
+                    color: #aaa;
+                }}
+                .card {{
+                    background-color: #ffffff;
+                    border: 1px solid #dddddd;
+                    border-radius: 8px;
+                    margin-bottom: 20px;
+                    padding: 20px;
+                    box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);
+                }}
+                .card h2 {{
+                    margin-top: 0;
+                }}
+                .card p {{
+                    margin: 0;
+                }}
+                .button-container {{
+                    text-align: center;
+                    margin-top: 15px;
+                }}
+                .button {{
+                    display: inline-block;
+                    padding: 6px 15px;
+                    font-size: 14px;
+                    color: #fff!important;
+                    background-color: #007bff;
+                    text-decoration: none;
+                    border-radius: 3px;
+                    transition: background-color 0.3s;
+                }}
+                .button:hover {{
+                    background-color: #0056b3;
+                }}
+            </style>
+        </head>
+        <body>
+            <div class="container">
+                <h1>{title}</h1>
+                {body}
+                <p class="system">本邮件由系统自动发送,请勿回复。</p>
+            </div>
+        </body>
+        </html>
+        """
+        return html
+
+    @staticmethod
+    def _build_report_email_body(item: ProcessResultData) -> str:
+        body = f"""
+           <div class="card">
+               <h2>{item.title}</h2>
+               <p><strong>项目编号:</strong> {item.no}</p>
+               <p><strong>公告日期:</strong> {item.date}</p>
+               <p><strong>关键词:</strong> {item.keyword}</p>
+               <p><strong>价格:</strong> {item.price}</p>
+               <p><strong>中标人:</strong> {item.bidder}</p>
+               <p><strong>摘要:</strong> {item.summary}</p>
+               <div class="button-container">
+                <a href="{item.url}" class="button">查看详情</a>
+               </div>
+           </div>
+           """
+        return body
+
     def _send_email_no_found(self) -> None:
         email = EmailHelper().config.get("email.error_email")
         self.logger.info(f"开始发送区域邮箱未匹配邮件: {email}")
@@ -117,3 +253,31 @@ class DataSend:
         content += "、".join(self._error_arr)
         content += "\n\n请及时添加相关配置。"
         EmailHelper().send_email(email, title, content, False, None)
+
+    @staticmethod
+    def _get_first_and_last_day_of_current_month():
+        # 获取当前日期
+        today = datetime.today()
+        # 获取这个月的第一天
+        first_day_of_current_month = datetime(today.year, today.month, 1, 0, 0, 0)
+        # 获取这个月的最后一天
+        _, last_day = calendar.monthrange(today.year, today.month)
+        last_day_of_current_month = datetime(today.year, today.month, last_day, 23, 59, 59)
+        return first_day_of_current_month, last_day_of_current_month
+    @staticmethod
+    def _get_first_and_last_day_of_prev_month():
+        # 获取当前日期
+        today = datetime.today()
+        # 获取上个月的年份和月份
+        if today.month == 1:
+            prev_month_year = today.year - 1
+            prev_month = 12
+        else:
+            prev_month_year = today.year
+            prev_month = today.month - 1
+        # 获取上个月的第一天
+        first_day_prev_month = datetime(prev_month_year, prev_month, 1,0,0,0)
+        # 获取上个月的最后一天
+        _, last_day = calendar.monthrange(prev_month_year, prev_month)
+        last_day_of_prev_month = datetime(prev_month_year, prev_month, last_day, 23, 59, 59)
+        return first_day_prev_month, last_day_of_prev_month

+ 71 - 43
SourceCode/TenderCrawler/app/main/runner.py

@@ -1,5 +1,6 @@
-from dateutil import parser
 import schedule
+from dateutil import parser
+from datetime import datetime
 
 from utils.logger_helper import LoggerHelper
 from utils.config_helper import ConfigHelper
@@ -9,7 +10,6 @@ from main.data_collector import DataCollector
 from main.data_process import DataProcess
 from main.data_send import DataSend
 from utils.email_helper import EmailHelper
-from utils.file_helper import FileHelper
 
 class Runner:
     logger = LoggerHelper.get_logger()
@@ -17,42 +17,50 @@ class Runner:
     store = MysqlDataStore()  # 复用 store 对象
 
     def run(self):
-        self.logger.info("应用程序已启动!")
-        urls = UrlSetting().fetch_all()
-        if not urls or len(urls) == 0:
-            self.logger.error("未找到任何 URL 设置")
-            return
-        self.logger.info(f"共找到 {len(urls)} 个 URL 设置")
-
-        collect_time = self.config.get("schedule.collect")
-        process_time = self.config.get("schedule.process")
-        send_email_time = self.config.get("schedule.send_email")
-
-        collect_times = self._validate_and_format_time(collect_time, ["06:00"])
-        for time in collect_times:
-            self.logger.info(f"{time} 执行 采集处理数据 任务")
-            schedule.every().day.at(time).do(self._collect_process_job)
-
-        process_times = self._validate_and_format_time(
-            process_time, ["10:00", "15:00", "19:00"])
-        for time in process_times:
-            self.logger.info(f"{time} 执行  AI处理数据  任务")
-            schedule.every().day.at(time).do(self._process_job)
-
-        send_email_times = self._validate_and_format_time(
-            send_email_time, ["08:20", "14:00"])
-        for time in send_email_times:
-            self.logger.info(f"{time} 执行   发送邮件   任务")
-            schedule.every().day.at(time).do(self._send_job)
-        if self.config.get_bool("schedule.run_now"):
-            self.logger.info("立即执行任务")
-            self._collect_process_job()
-            # self._send_job()
-            # self._process_job()
+        try:
+            self.logger.info("应用程序已启动!")
+            urls = UrlSetting().fetch_all()
+            if not urls or len(urls) == 0:
+                self.logger.error("未找到任何 URL 设置")
+                return
+            self.logger.info(f"共找到 {len(urls)} 个 URL 设置")
+
+            collect_time = self.config.get("schedule.collect")
+            process_time = self.config.get("schedule.process")
+            send_email_time = self.config.get("schedule.send_email")
+
+            collect_times = self._validate_and_format_time(collect_time, ["06:00"])
+            for time in collect_times:
+                self.logger.info(f"{time} 执行 采集处理数据 任务")
+                schedule.every().day.at(time).do(self._collect_process_job)
+
+            process_times = self._validate_and_format_time(
+                process_time, ["10:00", "15:00", "19:00"])
+            for time in process_times:
+                self.logger.info(f"{time} 执行 AI处理数据  任务")
+                schedule.every().day.at(time).do(self._process_job)
+
+            send_email_times = self._validate_and_format_time(
+                send_email_time, ["08:20", "14:00"])
+            for time in send_email_times:
+                self.logger.info(f"{time} 执行  发送邮件   任务")
+                schedule.every().day.at(time).do(self._send_job)
+
+            schedule.every().day.at("08:00").do(self._send_prev_month_report_job)
+
+            if self.config.get_bool("schedule.run_now"):
+                self.logger.info("立即执行任务")
+                self._collect_process_job()
+                self._send_job()
+                # self._process_job()
+
+        except Exception as e:
+            self.logger.error(f"应用程序停止: {e}")
+            raise e
 
     def _collect_process_job(self):
         try:
-            self.logger.info("开始执行数据采集处理任务")
+            self.logger.info("开始执行 数据采集处理 任务")
             url_setting = UrlSetting()
             for url_setting in url_setting.fetch_all():
                 data_collector =None
@@ -89,28 +97,48 @@ class Runner:
                     )
                     self.logger.error(f"AI处理发生异常: {e}")
                     break  # 中断当前 URL 设置的处理
-            self.logger.info("数据采集处理任务执行完毕")
+            self.logger.info("数据采集处理 任务执行完毕")
         except Exception as e:
-            self.logger.error(f"数据采集处理任务执行失败: {e}")
+            self.logger.error(f"数据采集处理 任务执行失败: {e}")
 
     def _process_job(self):
         try:
-            self.logger.info("开始AI处理数据执行任务")
+            self.logger.info("开始执行 AI处理数据 任务")
             data_process = DataProcess(self.store)
             data_process.process()
-            self.logger.info("AI处理数据任务执行完毕")
+            self.logger.info("AI处理数据 任务执行完毕")
         except Exception as e:
             self._send_error_email("AI数据处理", f"\n    错误: {str(e)}")
-            self.logger.error(f"AI任务执行失败: {e}")
+            self.logger.error(f"AI任务 执行失败: {e}")
 
     def _send_job(self):
         try:
-            self.logger.info("开始邮件发送执行任务")
+            self.logger.info("开始执行 邮件发送 任务")
             DataSend(self.store).send()
-            self.logger.info("邮件发送任务执行完毕")
+            self.logger.info("邮件发送 任务执行完毕")
+        except Exception as e:
+            self._send_error_email("邮件发送", f"\n    错误: {str(e)}")
+            self.logger.error(f"邮件发送 任务执行失败: {e}")
+
+    def _send_current_month_report_job(self):
+        try:
+            self.logger.info("开始执行 邮件发送当月报告 任务")
+            DataSend(self.store).send_report_current_month()
+            self.logger.info("邮件发送当月报告 任务执行完毕")
         except Exception as e:
             self._send_error_email("邮件发送", f"\n    错误: {str(e)}")
-            self.logger.error(f"邮件发送任务执行失败: {e}")
+            self.logger.error(f"邮件发送当月报告 任务执行失败: {e}")
+
+    def _send_prev_month_report_job(self):
+        try:
+            if datetime.today().day == 1 :
+                self.logger.info("开始执行 邮件发送上月报告 任务")
+                DataSend(self.store).send_report_prev_month()
+                self.logger.info("邮件发送上月报告 任务执行完毕")
+        except Exception as e:
+            self._send_error_email("邮件发送", f"\n    错误: {str(e)}")
+            self.logger.error(f"邮件发送上月报告 任务执行失败: {e}")
+
 
     def _validate_and_format_time(self, time_str, default_time: list):
         """验证并格式化时间字符串"""

+ 8 - 0
SourceCode/TenderCrawler/app/models/area_email.py

@@ -36,6 +36,7 @@ class AreaEmail:
     #         db_helper.execute_non_query(query, params)
 
     _query = "SELECT name,area,email FROM t_area_email WHERE is_active = 1"
+    _query_master = "SELECT email FROM t_area_email WHERE name='master' AND is_active = 1"
     _query_by_area = "SELECT email FROM t_area_email WHERE CONCAT(area,',') like %s AND is_active = 1"
     # 查询 AreaEmail 数据
     def fetch_all(self):
@@ -51,3 +52,10 @@ class AreaEmail:
             if result is None:
                 return None
             return result["email"]
+
+    def fetch_master_email(self):
+        with MySQLHelper() as db_helper:
+            result = db_helper.fetch_one(self._query_master)
+            if result is None:
+                return None
+            return result["email"]

+ 49 - 37
SourceCode/TenderCrawler/app/models/collect_data.py

@@ -10,10 +10,14 @@ class CollectData:
     PROCESSED = 1
     INVALID = 2
 
+    DATA_TYPE_0 = 0
+    DATA_TYPE_RESULT = 1
+
     def __init__(self,
                  url=None,
                  keyword=None,
                  content=None,
+                 data_type=None,
                  attach_path=None,
                  status=UNPROCESSED,
                  create_time=None,
@@ -21,6 +25,7 @@ class CollectData:
         self.url = url
         self.keyword = keyword
         self.content = content
+        self.data_type = data_type
         self.attach_path = attach_path
         self.status = status
         self.create_time = create_time or datetime.now()
@@ -34,25 +39,27 @@ class CollectData:
         )
 
     _insert_query = """
-        INSERT IGNORE INTO t_collect_data (url, keyword, content, attach_path, status, create_time)
-        VALUES (%s, %s, %s, %s, %s, %s);
+        INSERT IGNORE INTO t_collect_data (url, keyword, content, data_type, attach_path, status, create_time)
+        VALUES (%s, %s, %s, %s, %s, %s, %s);
         """
     _insert_query_history = """
-         INSERT IGNORE INTO t_collect_data_history (url, keyword, content, attach_path, status, create_time)
-         VALUES (%s, %s, %s, %s, %s, %s);
+         INSERT IGNORE INTO t_collect_data_history (url, keyword, content, data_type, attach_path, status, create_time)
+         VALUES (%s, %s, %s, %s, %s, %s, %s);
          """
     _delete_query = """
          DELETE FROM t_collect_data
          WHERE url = %s;
          """
+
     def insert(self, collect_data):
         if not isinstance(collect_data, self.__class__):
             raise TypeError("collect_data 不是 CollectData 的实例")
         with MySQLHelper() as db_helper:
 
             params = (collect_data.url, collect_data.keyword,
-                       collect_data.content,collect_data.attach_path,
-                      collect_data.status, datetime.now())
+                      collect_data.content, collect_data.data_type,
+                      collect_data.attach_path, collect_data.status,
+                      datetime.now())
             if collect_data.status == self.INVALID:
                 db_helper.execute_non_query(self._insert_query_history, params)
             else:
@@ -69,32 +76,32 @@ class CollectData:
                 collect_data.url,
                 collect_data.keyword,
                 collect_data.content,
+                collect_data.data_type,
                 collect_data.attach_path,
                 collect_data.status,
                 datetime.now()  # 每次调用 datetime.now() 获取当前时间
-            ) for collect_data in collect_data_list
-            if collect_data.status != 2
+            ) for collect_data in collect_data_list if collect_data.status != 2
         ]
         params2 = [
             (
                 collect_data.url,
                 collect_data.keyword,
                 collect_data.content,
+                collect_data.data_type,
                 collect_data.attach_path,
                 collect_data.status,
                 datetime.now()  # 每次调用 datetime.now() 获取当前时间
-            ) for collect_data in collect_data_list
-            if collect_data.status == 2
+            ) for collect_data in collect_data_list if collect_data.status == 2
         ]
 
         with MySQLHelper() as db_helper:
             db_helper.execute_non_query(self._insert_query, params)
-            # 获取受影响的行数
-            affected_rows = db_helper.connection.affected_rows()
-
+            affected_rows1 = db_helper.connection.affected_rows()
+            self.logger.info(f"成功插入 {affected_rows1} 条有效数据")
             db_helper.execute_non_query(self._insert_query_history, params2)
-            self.logger.info(f"成功插入 {affected_rows} 条数据")
-            return affected_rows
+            affected_rows2 = db_helper.connection.affected_rows()
+            self.logger.info(f"成功插入 {affected_rows2} 条无效历史数据")
+            return affected_rows1 + affected_rows2
 
     # def insert_url(self, url: str, keyword: str, content: str):
     #     with MySQLHelper() as db_helper:
@@ -127,6 +134,7 @@ class CollectData:
             FROM t_collect_data
             WHERE status = 0
             """
+
             results = db_helper.execute_query(query)
             data = [result['url'] for result in results]
             return data
@@ -145,7 +153,7 @@ class CollectData:
     def fetch_one_collect_by_url(self, url: str):
         with MySQLHelper() as db_helper:
             query = """
-                SELECT url,keyword,content,status FROM t_collect_data WHERE url = %s  LIMIT 1
+                SELECT url,keyword,content,data_type,status FROM t_collect_data WHERE url = %s  LIMIT 1
             """
             result = db_helper.fetch_one(query, (url, ))
             if not result:
@@ -153,10 +161,11 @@ class CollectData:
             data = CollectData(url=result["url"],
                                keyword=result["keyword"],
                                content=result["content"],
+                               data_type=result["data_type"],
                                status=result["status"])
             return data
 
-    def set_process(self, url: str):
+    def set_process(self, url):
         # with MySQLHelper() as db_helper:
         #     query = """
         #     UPDATE t_collect_data
@@ -164,40 +173,43 @@ class CollectData:
         #     WHERE url = %s
         #     """
         #     db_helper.execute_non_query(query, (url))
-        self.move_to_history_and_delete(url)
+        urls = [url]
+        self.move_to_history_and_delete(urls)
 
+    def set_process_list(self, urls: list):
+        self.move_to_history_and_delete(urls)
 
-    def move_to_history_and_delete(self, url: str):
+    def move_to_history_and_delete(self, urls: list):
         with MySQLHelper() as db_helper:
             # 查询 t_collect_data 中的数据
-            query = """
-             SELECT url, keyword, content, attach_path, status, create_time, process_time
+            placeholders = ', '.join(['%s'] * len(urls))
+            query = f"""
+             SELECT url, keyword, content, data_type, attach_path, status, create_time, process_time
              FROM t_collect_data
-             WHERE url = %s
+             WHERE url IN  ({placeholders})
              """
-            result = db_helper.fetch_one(query, (url,))
-            if not result:
-                self.logger.warning(f"URL {url} 未在 t_collect_data 中找到,无法移动到历史表并删除。")
+            results = db_helper.execute_query(query, urls)
+            if not results:
+                self.logger.warning(
+                    f"URLs {urls} 未在 t_collect_data 中找到,无法移动到历史表并删除。")
                 return False
 
             # 将数据插入到 t_collect_data_history
             insert_query = self._insert_query_history
-            insert_params = (
-                result["url"],
-                result["keyword"],
-                result["content"],
-                result["attach_path"],
-                result["status"],
-                result["create_time"]
-            )
+            insert_params = [
+                (result["url"], result["keyword"], result["content"],
+                 result["data_type"], result["attach_path"], result["status"],
+                 result["create_time"]) for result in results
+            ]
             db_helper.execute_non_query(insert_query, insert_params)
 
             # 删除 t_collect_data 中的数据
-            delete_query = self._delete_query
-            delete_params = (url,)
-            db_helper.execute_non_query(delete_query, delete_params)
+            delete_query = f"DELETE FROM t_collect_data WHERE url IN ({placeholders})"
+            db_helper.execute_non_query(delete_query, urls)
 
-            self.logger.info(f"URL {url} 已从 t_collect_data 移动到 t_collect_data_history 并删除。")
+            self.logger.info(
+                f"URLs {urls} 已从 t_collect_data 移动到 t_collect_data_history 并删除。"
+            )
             return True
 
     def fetch_by_status(self, status=0):

+ 42 - 15
SourceCode/TenderCrawler/app/models/process_data.py

@@ -24,6 +24,9 @@ class ProcessData:
                  create_time=None,
                  send_time=None,
                  other_urls=None,
+                 prompt_tokens=None,
+                 completion_tokens=None,
+                 total_tokens=None,
                  remark=None):
         self.no = no
         self.title = title
@@ -42,6 +45,9 @@ class ProcessData:
         self.create_time = create_time or datetime.now()
         self.send_time = send_time
         self.other_urls = other_urls
+        self.prompt_tokens = prompt_tokens
+        self.completion_tokens = completion_tokens
+        self.total_tokens = total_tokens
         self.remark = remark
 
     def __repr__(self):
@@ -52,12 +58,12 @@ class ProcessData:
             f"send_time={self.send_time}, remark={self.remark})")
 
     _insert_query = """
-              INSERT IGNORE INTO t_data (no, title, url, keyword, date, area, address, summary, release_date, devices, attach_path, status, create_time)
-              VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+              INSERT IGNORE INTO t_data (no, title, url, keyword, date, area, address, summary, release_date, devices, attach_path, status, create_time, prompt_tokens, completion_tokens, total_tokens)
+              VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
           """
-    _update_query = """
-                UPDATE t_collect_data SET status = 1 WHERE url = %s;
-            """
+    # _update_query = """
+    #             UPDATE t_collect_data SET status = 1 WHERE url = %s;
+    #         """
     def insert(self, process_data):
         if not isinstance(process_data, self.__class__):
             raise TypeError("process_data 不是 ProcessData 的实例")
@@ -74,13 +80,16 @@ class ProcessData:
                          process_data.devices,
                          process_data.attach_path,
                          0,
-                         datetime.now())
+                         datetime.now(),
+                         process_data.prompt_tokens,
+                         process_data.completion_tokens,
+                         process_data.total_tokens)
 
-        update_params = (process_data.url, )
+        # update_params = (process_data.url, )
 
         with MySQLHelper() as db_helper:
             db_helper.execute_non_query(self._insert_query, insert_params)
-            db_helper.execute_non_query(self._update_query, update_params)
+            # db_helper.execute_non_query(self._update_query, update_params)
 
     def insert_batch(self, process_data_list):
         if not all(
@@ -103,26 +112,44 @@ class ProcessData:
             process_data.attach_path,
             0,
             datetime.now(),
+            process_data.prompt_tokens,
+            process_data.completion_tokens,
+            process_data.total_tokens
         ) for process_data in process_data_list]
 
-        update_params = [(process_data.url, )
-                         for process_data in process_data_list]
+        # update_params = [(process_data.url, )
+        #                  for process_data in process_data_list]
 
         with MySQLHelper() as db_helper:
             db_helper.execute_non_query(self._insert_query, insert_params)
             affected_rows = db_helper.connection.affected_rows()
             self.logger.info(f"成功插入 {affected_rows} 条数据")
-            for param in update_params:
-                db_helper.execute_non_query(self._update_query, param)
+            # for param in update_params:
+            #     db_helper.execute_non_query(self._update_query, param)
             return affected_rows
 
-    _one_query = """
-                    SELECT url,no,other_urls,attach_path FROM t_data WHERE no = %s  LIMIT 1
+
+    _one_url_query = """
+                    SELECT url,no,other_urls,attach_path FROM t_data WHERE url = %s  LIMIT 1
                 """
+    def fetch_one_process_by_url(self, url: str):
+        with MySQLHelper() as db_helper:
+            result = db_helper.fetch_one(self._one_url_query, (url, ))
+            if not result:
+                return None
+            data = ProcessData(url=result["url"],
+                               no=result["no"],
+                               other_urls=result["other_urls"],
+                               attach_path=result["attach_path"])
+            return data
+
+    _one_no_query = """
+                      SELECT url,no,other_urls,attach_path FROM t_data WHERE no = %s  LIMIT 1
+                  """
     def fetch_one_process_by_no(self, no: str):
         with MySQLHelper() as db_helper:
 
-            result = db_helper.fetch_one(self._one_query, (no, ))
+            result = db_helper.fetch_one(self._one_no_query, (no, ))
             if not result:
                 return None
             data = ProcessData(url=result["url"],

+ 201 - 0
SourceCode/TenderCrawler/app/models/process_result_data.py

@@ -0,0 +1,201 @@
+from datetime import datetime
+from utils.mysql_helper import MySQLHelper
+from utils.logger_helper import LoggerHelper
+
+
+class ProcessResultData:
+
+    logger = LoggerHelper.get_logger()
+
+    def __init__(self,
+                 no=None,
+                 title=None,
+                 url=None,
+                 keyword=None,
+                 date=None,
+                 price=None,
+                 bidder=None,
+                 summary=None,
+                 attach_path=None,
+                 status=None,
+                 create_time=None,
+                 send_time=None,
+                 other_urls=None,
+                 prompt_tokens=None,
+                 completion_tokens=None,
+                 total_tokens=None,
+                 remark=None):
+        self.no = no
+        self.title = title
+        self.url = url
+        self.keyword = keyword
+        self.date = date
+        self.price = price
+        self.bidder = bidder
+        self.summary = summary
+        self.attach_path = attach_path
+        self.status = status
+        self.create_time = create_time or datetime.now()
+        self.send_time = send_time
+        self.other_urls = other_urls
+        self.prompt_tokens = prompt_tokens
+        self.completion_tokens = completion_tokens
+        self.total_tokens = total_tokens
+        self.remark = remark
+
+    def __repr__(self):
+        return (
+            f"ProcessResultData(no={self.no}, title={self.title}, date={self.date}, "
+            f"keyword={self.keyword}, price={self.price}, bidder={self.bidder}, summary={self.summary}, attach_path={self.attach_path}, "
+            f"status={self.status}, create_time={self.create_time}, "
+            f"send_time={self.send_time}, remark={self.remark})")
+
+    _insert_query = """
+              INSERT IGNORE INTO t_data_result (no, title, url, keyword, date, price,  bidder, summary, attach_path, status, create_time, prompt_tokens, completion_tokens, total_tokens)
+              VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+          """
+    # _update_query = """
+    #             UPDATE t_collect_data SET status = 1 WHERE url = %s;
+    #         """
+
+    def insert(self, process_result_data):
+        if not isinstance(process_result_data, self.__class__):
+            raise TypeError("process_result_data 不是 ProcessResultData 的实例")
+
+        insert_params = (process_result_data.no,
+                         process_result_data.title,
+                         process_result_data.url,
+                         process_result_data.keyword,
+                         process_result_data.date,
+                         process_result_data.price,
+                         process_result_data.bidder,
+                         process_result_data.summary,
+                         process_result_data.attach_path,
+                         0,
+                         datetime.now(),
+                         process_result_data.prompt_tokens,
+                         process_result_data.completion_tokens,
+                         process_result_data.total_tokens)
+
+        # update_params = (process_result_data.url, )
+
+        with MySQLHelper() as db_helper:
+            db_helper.execute_non_query(self._insert_query, insert_params)
+            # db_helper.execute_non_query(self._update_query, update_params)
+
+    def insert_batch(self, process_result_data_list):
+        if not all(
+                isinstance(process_result_data, self.__class__)
+                for process_result_data in process_result_data_list):
+            raise TypeError("process_result_data_list 中的所有元素必须是 ProcessResultData 的实例")
+
+        insert_params = [(
+            process_result_data.no,
+            process_result_data.title,
+            process_result_data.url,
+            process_result_data.keyword,
+            process_result_data.date,
+            process_result_data.price,
+            process_result_data.bidder,
+            process_result_data.summary,
+            process_result_data.attach_path,
+            0,
+            datetime.now(),
+            process_result_data.prompt_tokens,
+            process_result_data.completion_tokens,
+            process_result_data.total_tokens
+        ) for process_result_data in process_result_data_list]
+
+        # update_params = [(process_result_data.url, )
+        #                  for process_result_data in process_result_data_list]
+
+        with MySQLHelper() as db_helper:
+            db_helper.execute_non_query(self._insert_query, insert_params)
+            affected_rows = db_helper.connection.affected_rows()
+            self.logger.info(f"成功插入 {affected_rows} 条数据")
+            # for param in update_params:
+            #     db_helper.execute_non_query(self._update_query, param)
+            return affected_rows
+
+    _one_url_query = """
+                      SELECT url,no,other_urls,attach_path FROM t_data_result WHERE url = %s  LIMIT 1
+                  """
+
+    def fetch_one_process_by_url(self, url: str):
+        with MySQLHelper() as db_helper:
+            result = db_helper.fetch_one(self._one_url_query, (url,))
+            if not result:
+                return None
+            data = ProcessResultData(url=result["url"],
+                               no=result["no"],
+                               other_urls=result["other_urls"],
+                               attach_path=result["attach_path"])
+            return data
+
+    _one_no_query = """
+                        SELECT url,no,other_urls,attach_path FROM t_data_result WHERE no = %s  LIMIT 1
+                    """
+
+    def fetch_one_process_by_no(self, no: str):
+        with MySQLHelper() as db_helper:
+            result = db_helper.fetch_one(self._one_no_query, (no, ))
+            if not result:
+                return None
+            data = ProcessResultData(url=result["url"],
+                                     no=result["no"],
+                                     other_urls=result["other_urls"],
+                               attach_path=result["attach_path"])
+            return data
+
+    def fetch_no_send(self):
+        with MySQLHelper() as db_helper:
+            query = "SELECT no, title, url, keyword, date, price, bidder, summary, attach_path, status, create_time, send_time FROM t_data_result WHERE status = 0"
+            results = db_helper.execute_query(query)
+            data = [ProcessResultData(**result) for result in results]
+            return data
+
+    def set_send(self, no):
+        with MySQLHelper() as db_helper:
+            query = """
+            UPDATE t_data_result
+            SET status = 1, send_time = %s
+            WHERE no = %s
+            """
+            params = (datetime.now(), no)
+            db_helper.execute_non_query(query, params)
+
+    def set_other_urls(self, url, other_urls):
+        with MySQLHelper() as db_helper:
+            query = """
+            UPDATE t_data_result
+            SET other_urls = %s
+            WHERE url = %s
+            """
+            update_query = """
+            UPDATE t_collect_data SET status = 1 WHERE url = %s;
+            """
+            params = (other_urls, url)
+            db_helper.execute_non_query(query, params)
+            db_helper.execute_non_query(update_query, (url, ))
+
+    def check_is_process_by_url(self, url):
+        with MySQLHelper() as db_helper:
+            query = "SELECT * FROM t_data_result WHERE url = %s"
+            params = (url, )
+            results = db_helper.execute_query(query, params)
+            return True if results else False
+    _query_report= """
+        select * from t_data_result where create_time between %s and %s
+    """
+    def fetch_to_report_by_date(self,start_date, end_date):
+        """
+        获取需要生成报表的数据
+        :param start_date:
+        :param end_date:
+        :return:
+        """
+        with MySQLHelper() as db_helper:
+            params = (start_date, end_date)
+            results = db_helper.execute_query(self._query_report, params)
+            data = [ProcessResultData(**result) for result in results]
+            return data

+ 36 - 0
SourceCode/TenderCrawler/app/stores/data_store_interface.py

@@ -1,6 +1,7 @@
 from abc import ABC, abstractmethod
 from models.collect_data import CollectData
 from models.process_data import ProcessData
+from models.process_result_data import ProcessResultData
 
 
 class IDataStore(ABC):
@@ -32,6 +33,10 @@ class IDataStore(ABC):
     def query_one_process_by_no(self, no):
         raise NotImplementedError("query_one_process_by_no 应由子类重写。")
 
+    @abstractmethod
+    def query_one_process_by_url(self, no):
+        raise NotImplementedError("query_one_process_by_url 应由子类重写。")
+
     @abstractmethod
     def insert_process_data(self, data: ProcessData):
         raise NotImplementedError("insert_process_data 应由子类重写。")
@@ -45,13 +50,44 @@ class IDataStore(ABC):
         raise NotImplementedError("save_process_data 应由子类重写。")
 
     @abstractmethod
+    def query_one_process_result_by_url(self, url):
+        raise NotImplementedError("query_one_process_result_by_url 应由子类重写。")
+
+    @abstractmethod
+    def query_one_process_result_by_no(self, no):
+        raise NotImplementedError("query_one_process_result_by_no 应由子类重写。")
+
+    @abstractmethod
+    def insert_process_result_data(self, data: ProcessResultData, is_batch=True):
+        raise NotImplementedError("insert_process_result_data 应由子类重写。")
+
+    @abstractmethod
+    def save_process_result_data(self, is_force=False):
+        raise NotImplementedError("save_process_result_data 应由子类重写。")
+
+    @abstractmethod
+    def set_process_result_other_urls(self, url, other_urls: str):
+        raise NotImplementedError("set_process_result_other_urls 应由子类重写。")
+    @abstractmethod
     def query_to_send(self):
         raise NotImplementedError("query_to_send 应由子类重写。")
 
+    @abstractmethod
+    def query_to_report_by_date(self, start_date, end_date):
+        raise NotImplementedError("query_to_report_by_date 应由子类重写。")
+
     @abstractmethod
     def set_send(self, no: str):
         raise NotImplementedError("set_send 应由子类重写。")
 
+    @abstractmethod
+    def get_emails(self) -> str:
+        raise NotImplementedError("get_emails 应由子类重写。")
+
+    @abstractmethod
+    def query_master_email(self) -> str:
+        raise NotImplementedError("get_master_email 应由子类重写。")
+
     @abstractmethod
     def get_email_by_area(self, area: str):
         raise NotImplementedError("get_email_by_area 应由子类重写。")

+ 35 - 3
SourceCode/TenderCrawler/app/stores/default_data_store.py

@@ -1,8 +1,13 @@
+from models.process_result_data import ProcessResultData
 from utils.logger_helper import LoggerHelper
 from stores.data_store_interface import IDataStore
 
 
-class DefaultDataStore(IDataStore):
+class  DefaultDataStore(IDataStore):
+
+
+
+
 
 
 
@@ -12,9 +17,9 @@ class DefaultDataStore(IDataStore):
         pass
 
     def query_one_collect_url(self, url: str) :
-        self.logger.info(f"Default: fetch_one_url")
+        self.logger.info(f"Default: FETCH_ONE_URL")
     def insert_collect_data(self, data , is_batch=True):
-        self.logger.info(f"Default: insert_collect_data")
+        self.logger.info(f"Default: INSERT_COLLECT_DATA")
 
 
     def save_collect_data(self, is_force=False):
@@ -25,6 +30,8 @@ class DefaultDataStore(IDataStore):
 
     def query_one_collect_by_url(self, url):
         self.logger.info("Default: QUERY_ONE_PROCESS")
+    def query_one_process_by_url(self, no):
+        self.logger.info(f"Default: query_one_process_by_url")
     def query_one_process_by_no(self, no):
         self.logger.info(f"Default: query_one_process_by_no")
     def insert_process_data(self, data):
@@ -36,11 +43,36 @@ class DefaultDataStore(IDataStore):
     def set_process_other_urls(self, url, other_urls: str):
         self.logger.info("Default: SET_PROCESS_OTHER_URLS")
 
+    def query_one_process_result_by_url(self, url):
+        self.logger.info(f"Default: QUERY_ONE_PROCESS_RESULT_BY_URL")
+
+    def query_one_process_result_by_no(self, no):
+        self.logger.info(f"Default: QUERY_ONE_PROCESS_RESULT_BY_NO")
+
+    def insert_process_result_data(self, data: ProcessResultData, is_batch=True):
+        self.logger.info(f"Default: INSERT_PROCESS_RESULT_DATA")
+
+    def save_process_result_data(self, is_force=False):
+        self.logger.info(f"Default: SAVE_PROCESS_RESULT_DATA")
+
+    def set_process_result_other_urls(self, url, other_urls: str):
+        self.logger.info(f"Default: SET_PROCESS_RESULT_OTHER_URLS")
+
     def query_to_send(self):
         self.logger.info("Default: QUERY_TO_SEND")
+    def query_to_report_by_date(self, start_date, end_date):
+        self.logger.info("Default: QUERY_TO_REPORT_BY_DATE")
 
     def set_send(self, no: str):
         self.logger.info("Default: SET_SEND")
 
+    def get_emails(self) :
+        self.logger.info("Default: GET_EMAILS")
+
+
+    def query_master_email(self) :
+        self.logger.info("Default: GET_MASTER_EMAIL")
+
+
     def get_email_by_area(self, area: str):
         self.logger.info("Default: GET_EMAIL_BY_AREA")

+ 48 - 3
SourceCode/TenderCrawler/app/stores/mysql_data_store.py

@@ -3,6 +3,7 @@ from utils.config_helper import ConfigHelper
 from stores.data_store_interface import IDataStore
 from models.collect_data import CollectData
 from models.process_data import ProcessData
+from models.process_result_data import ProcessResultData
 from models.area_email import AreaEmail
 
 
@@ -12,6 +13,7 @@ class MysqlDataStore(IDataStore):
     config = ConfigHelper()
     _collectData = CollectData()
     _processData = ProcessData()
+    _processResultData = ProcessResultData()
     _areaEmail = AreaEmail()
 
     def __init__(self):
@@ -19,19 +21,21 @@ class MysqlDataStore(IDataStore):
         self._collect_list = []
         self._process_size = self.config.get_int('save.process_batch_size',1)
         self._process_list = []
+        self._process_result_list = []
 
     def query_one_collect_url(self, url: str) -> str | None:
         return self._collectData.fetch_one_url(url)
     def insert_collect_data(self, data: CollectData, is_batch=True):
         if not is_batch:
             self._collectData.insert(data)
+            self.logger.info(f"保存 采集数据 到数据库: {data.url}" )
         else:
             self._collect_list.append(data)
             self.save_collect_data()
 
     def save_collect_data(self, is_force=False):
         if (is_force and len(self._collect_list)>0) or len(self._collect_list) >= self._collect_size:
-            self.logger.info("批量保存到数据库,数量: " + str(len(self._collect_list)))
+            self.logger.info("批量保存 采集数据 到数据库,数量: " + str(len(self._collect_list)))
             self._collectData.insert_batch(self._collect_list)
             self._collect_list = []
 
@@ -41,13 +45,17 @@ class MysqlDataStore(IDataStore):
     def query_one_collect_by_url(self, url):
         return self._collectData.fetch_one_collect_by_url(url)
 
+    def query_one_process_by_url(self, url):
+        return self._processData.fetch_one_process_by_url(url)
+
     def query_one_process_by_no(self, no):
         return self._processData.fetch_one_process_by_no(no)
 
     def insert_process_data(self, data: ProcessData, is_batch=True):
         if not is_batch:
             self._processData.insert(data)
-            self.logger.info(f"保存到数据库: {data.url}" )
+            self._collectData.set_process(data.url)
+            self.logger.info(f"保存 处理数据 到数据库: {data.url}" )
         else:
             self._process_list.append(data)
             self.save_process_data()
@@ -55,21 +63,58 @@ class MysqlDataStore(IDataStore):
     # 插入到数据库时会把CollectData设为已处理
     def save_process_data(self, is_force=False):
         if (is_force and len(self._process_list)>0) or len(self._process_list) >= self._process_size:
-            self.logger.info(f"批量保存到数据库,数量: {str(len(self._process_list))}")
+            self.logger.info(f"批量保存 处理数据 到数据库,数量: {str(len(self._process_list))}")
             self._processData.insert_batch(self._process_list)
+            urls = [item.url for item in self._process_list]
+            self._collectData.set_process_list(urls)
             self._process_list = []
 
     def set_process_other_urls(self, url, other_urls: str):
         return self._processData.set_other_urls(url, other_urls)
 
+    def query_one_process_result_by_url(self, url):
+        return self._processResultData.fetch_one_process_by_url(url)
+
+    def query_one_process_result_by_no(self, no):
+        return self._processResultData.fetch_one_process_by_no(no)
+
+    def insert_process_result_data(self, data: ProcessResultData, is_batch=True):
+        if not is_batch:
+            self._processResultData.insert(data)
+            self._collectData.set_process(data.url)
+            self.logger.info(f"保存 处理数据结果 到数据库: {data.url}" )
+        else:
+            self._process_result_list.append(data)
+            self.save_process_result_data()
+
+    def save_process_result_data(self, is_force=False):
+        if (is_force and len(self._process_result_list)>0) or len(self._process_result_list) >= self._process_size:
+            self.logger.info(f"批量保存 处理数据结果 到数据库,数量: {str(len(self._process_result_list))}")
+            self._processResultData.insert_batch(self._process_result_list)
+            urls = [item.url for item in self._process_result_list]
+            self._collectData.set_process_list(urls)
+            self._process_result_list = []
+
+    def set_process_result_other_urls(self, url, other_urls: str):
+        return self._processResultData.set_other_urls(url, other_urls)
+
     def check_url_is_process(self, url: str) -> bool:
         return self._processData.check_is_process_by_url(url)
 
     def query_to_send(self):
         return self._processData.fetch_no_send()
 
+    def query_to_report_by_date(self, start_date, end_date):
+        return self._processResultData.fetch_to_report_by_date(start_date, end_date)
+
     def set_send(self, no: str):
         self._processData.set_send(no)
 
+    def get_emails(self) -> str:
+        return self._areaEmail.fetch_all()
+
     def get_email_by_area(self, area: str) -> str:
         return self._areaEmail.fetch_one_by_area(area)
+
+    def query_master_email(self) -> str:
+        return self._areaEmail.fetch_master_email()

+ 19 - 75
SourceCode/TenderCrawler/app/utils/ai_helper.py

@@ -1,11 +1,9 @@
 import re
-import requests
 from openai import OpenAI
 import json
 
 from utils.logger_helper import LoggerHelper
 from utils.config_helper import ConfigHelper
-from models.process_data import ProcessData
 
 
 class AiHelper:
@@ -16,11 +14,6 @@ class AiHelper:
     _ai_api_key = None
     _ai_api_url = None
     _ai_max_tokens = 150
-    DEFAULT_AI_SYSTEM_PROMPT = "请帮我分析以下文字,提取出关键信息,并以json格式字符串返回,如果部分信息为空,则该字段返回为空。"
-    DEFAULT_AI_PROMPT_TEMPLATE = """在以上内容中提取信息:编号(no) 、标题(title)、在哪个城市招标(area)、开标的时间(date)、
-    开标的地点(address)、发布时间(release_date)、150字左右的招标条件要求及联系方式等内容摘要(summary),相关采购设备的名称信息,多个设备以逗号分割(device)。
-    返回包含no,title,area,date,address,release_date,summary,device字段的json格式字符串,没有找到或未提供的信息json字段为空。
-"""
 
     def __init__(self):
         self._ai_api_key = self.config.get("ai.key")
@@ -29,15 +22,8 @@ class AiHelper:
         max_tokens = self.config.get("ai.max_tokens")
         if max_tokens:
             self._ai_max_tokens = int(max_tokens)
-        self._ai_system_prompt = self.config.get("ai.system_prompt",
-                                                 self.DEFAULT_AI_SYSTEM_PROMPT)
-        self._ai_prompt_template = self.config.get(
-            "ai.prompt_template", self.DEFAULT_AI_PROMPT_TEMPLATE)
 
-
-    def call_ai(self, content: str) -> ProcessData:
-        # 截取前100个字符进行日志记录
-        # truncated_content = content[:100]
+    def call_openai(self, system_prompt: str, user_prompt: str) -> json:
         self.logger.info("调用AI API")
         if self._ai_api_key is None:
             raise Exception("AI API key 没有配置")
@@ -50,21 +36,26 @@ class AiHelper:
             model=self._api_model,
             messages=[{
                 "role": "system",
-                "content": self._ai_system_prompt,
+                "content": system_prompt,
             }, {
                 "role": "user",
-                "content": f"{content}  {self._ai_prompt_template}",
+                "content": user_prompt,
             }],
             stream=False,
             temperature=0.7,
         )
-
-        self.logger.info(f"AI Response: {completion.model_dump_json()}")
-        response = json.loads(completion.model_dump_json())
-        #self.logger.info(f"AI Response: {response}")
         try:
-            res_str = self._extract_message_content(response)
-            return self._parse_response(res_str, True)
+            response = completion.model_dump_json()
+            self.logger.info(f"AI Response: {response}")
+            response_json = json.loads(response)
+            res_str = self._extract_message_content(response_json)
+            result = self._parse_response(res_str, True)
+            usage = response_json["usage"]
+            result["completion_tokens"] = usage.get("completion_tokens", 0)
+            result["prompt_tokens"] = usage.get("prompt_tokens", 0)
+            result["total_tokens"] = usage.get("total_tokens", 0)
+            self.logger.info(f"AI Process JSON: {result}")
+            return result
         except Exception as e:
             raise Exception(f"解析 AI 响应错误: {e}")
 
@@ -98,18 +89,12 @@ class AiHelper:
 
         return message_content
 
-    def _parse_response(self, response: str, first=True) -> ProcessData:
-        self.logger.info(f"AI Response JSON STR: {response}")
+    def _parse_response(self, response: str, first=True) -> json:
+        # self.logger.info(f"AI Response JSON STR: {response}")
         try:
             data = json.loads(response)
-            return ProcessData(no=data.get("no"),
-                               title=data.get("title"),
-                               date=data.get("date"),
-                               area=data.get("area"),
-                               address=data.get("address"),
-                               devices=data.get("device"),
-                               summary=data.get("summary"),
-                               release_date=data.get("release_date"))
+            return data
+
         except json.JSONDecodeError as e:
             if first:
                 self.logger.error(f"JSON 解析错误,去除部分特殊字符重新解析一次: {e}")
@@ -118,45 +103,4 @@ class AiHelper:
                 message_content = re.sub(r'[‘’]', "", message_content)  # 替换单引号
                 return self._parse_response(message_content, False)
             else:
-                raise Exception(f"解析 AI 响应错误: {e}")
-
-
-    def call_ai_1(self, content: str) -> ProcessData:
-        # 截取前100个字符进行日志记录
-        # truncated_content = content[:100]
-        self.logger.info("调用AI API")
-        if self._ai_api_key is None:
-            raise Exception("AI API key 没有配置")
-        if self._ai_api_url is None:
-            raise Exception("AI API url 没有配置")
-        if self._api_model is None:
-            raise Exception("AI API model 没有配置")
-        headers = {
-            "Content-Type": "application/json",
-            "Authorization": f"Bearer {self._ai_api_key}"
-        }
-        messages = [{
-            "role": "system",
-            "content": self._ai_system_prompt
-        }, {
-            "role": "user",
-            "content": f"{content} {self._ai_prompt_template}"
-        }]
-
-        data = {
-            "model": self._api_model,
-            "messages": messages,
-            "stream": False,
-            "max_tokens": self._ai_max_tokens
-        }
-        response = requests.post(self._ai_api_url, headers=headers, json=data)
-        if response.status_code == 200:
-            try:
-                self.logger.info(f"AI Response: {response.text}")
-                res_str = self._extract_message_content(response.json())
-                return self._parse_response(res_str, True)
-            except Exception as e:
-                raise Exception(f"解析 AI 响应错误: {e}")
-        else:
-            raise Exception(
-                f"调用 AI 错误: {response.status_code} - {response.text}")
+                raise Exception(f"解析 AI 响应错误: {response} {e}")

+ 1 - 1
SourceCode/TenderCrawler/app/utils/file_helper.py

@@ -20,7 +20,7 @@ class FileHelper:
         self.logger.info(f"下载远程文件: {file_url}  文件名:{file_name}")
         current_timestamp = datetime.now().strftime("%H%M%S%f")[:-3]  # 取前三位毫秒
         file_name = f"{current_timestamp}@{file_name}"
-        file_path = os.path.join(self._attach_file_path, f'{datetime.now().strftime("%Y-%m-%d")}')
+        file_path = os.path.join(self._attach_file_path, f'{datetime.now().strftime("%Y-%m/%d")}')
         if not os.path.exists(file_path):
             os.makedirs(file_path)
         path = os.path.join(file_path, file_name)

+ 17 - 18
SourceCode/TenderCrawler/docker-compose.yml

@@ -12,16 +12,15 @@ services:
       - TZ=Asia/Shanghai
       # - MYSQL_DEFAULT_AUTHENTICATION_PLUGIN=mysql_native_password
     volumes:
-       - /home/docker/tender-crawler/mysql/log:/var/log/mysql
-       - /home/docker/tender-crawler/mysql/data:/var/lib/mysql
-       - /home/docker/tender-crawler/mysql/conf.d:/etc/mysql/conf.d
-       - /etc/localtime:/etc/localtime:ro
-       - /home/docker/tender-crawler/app/init.sql:/docker-entrypoint-initdb.d/init.sql # 挂载 init.sql 文件
+      - /home/docker/tender-crawler_v2/mysql/log:/var/log/mysql
+      - /home/docker/tender-crawler_v2/mysql/data:/var/lib/mysql
+      - /etc/localtime:/etc/localtime:ro
+      - /home/docker/tender-crawler_v2/app/init.sql:/docker-entrypoint-initdb.d/init.sql # 挂载 init.sql 文件
       # - ./.dev/mysql5.7/log:/var/log/mysql
       # - ./.dev/mysql5.7/data:/var/lib/mysql
-#      - ./.dev/mysql8.0.39/log:/var/log/mysql
-#      - ./.dev/mysql8.0.39/data:/var/lib/mysql
-#      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
+      # - ./.dev/mysql8.0.39/log:/var/log/mysql
+      # - ./.dev/mysql8.0.39/data:/var/lib/mysql
+      # - ./init.sql:/docker-entrypoint-initdb.d/init.sql
     ports:
       - '${MYSQL_PORT}:3306'
     networks:
@@ -42,7 +41,7 @@ services:
 
   crawler-app:
     build: .
-    image: y_tender-crawler-app:1.0.0
+    image: y_tender-crawler-app:2.0.0
     container_name: y_tender-crawler-app
     depends_on:
       - crawler-mysql
@@ -54,9 +53,9 @@ services:
       - APP_MYSQL__DB=${MYSQL_DATABASE}
       - APP_MYSQL__USER=${MYSQL_USER}
       - APP_MYSQL__PASSWORD=${MYSQL_PASSWORD}
-#      - APP_AI__KEY=
-#      - APP_AI__URL=http://192.168.0.109:7580/api/chat
-#      - APP_AI__MODEL=qwen2.5:7b
+      #      - APP_AI__KEY=
+      #      - APP_AI__URL=http://192.168.0.109:7580/api/chat
+      #      - APP_AI__MODEL=qwen2.5:7b
       - APP_AI__MAX_TOKENS=1024
       - APP_SCHEDULE__SLEEP_INTERVAL=600 #单位:秒 10分钟检查一次
       - APP_SCHEDULE__COLLECT=20:00,12:00
@@ -65,12 +64,12 @@ services:
       - APP_SCHEDULE__RUN_NOW=1
       - APP_SELENIUM__REMOTE_DRIVER_URL=http://y_selenium:4444/wd/hub
     volumes:
-      - /home/docker/tender-crawler/app/config.yml:/app/config.yml
-      - /home/docker/tender-crawler/app/logs:/app/logs
-      - /home/docker/tender-crawler/app/attaches:/app/attaches
-#      - ./.dev/app/config.yml:/app/config.yml
-#      - ./.dev/app/logs:/app/logs
-#      - ./.dev/app/attaches:/app/attaches
+      - /home/docker/tender-crawler_v2/app/config.yml:/app/config.yml
+      - /home/docker/tender-crawler_v2/app/logs:/app/logs
+      - /home/docker/tender-crawler_v2/app/attaches:/app/attaches
+    #      - ./.dev/app/config.yml:/app/config.yml
+    #      - ./.dev/app/logs:/app/logs
+    #      - ./.dev/app/attaches:/app/attaches
     networks:
       - crawler-net
     # 如果需要暴露端口

+ 30 - 2
SourceCode/TenderCrawler/init.sql

@@ -66,6 +66,7 @@ CREATE TABLE `t_collect_data`  (
   `url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '页面详情URL',
   `keyword` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '检索到页面的关键字',
   `content` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '页面详情',
+  `data_type` int(4) NOT NULL DEFAULT 0 COMMENT '数据类型 0:招标 1:中标',
   `attach_path` varchar(1000) NULL DEFAULT NULL COMMENT '附件路径',
   `status` int(4) NOT NULL DEFAULT 0 COMMENT '状态 0:未处理 1:已处理',
   `create_time` datetime NULL DEFAULT NULL COMMENT '创建时间',
@@ -81,6 +82,7 @@ CREATE TABLE `t_collect_data_history`  (
   `url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '页面详情URL',
   `keyword` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '检索到页面的关键字',
   `content` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '页面详情',
+  `data_type` int(4) NOT NULL DEFAULT 0 COMMENT '数据类型 0:招标 1:中标',
   `attach_path` varchar(1000) NULL DEFAULT NULL COMMENT '附件路径',
   `status` int(4) NOT NULL DEFAULT 0 COMMENT '状态 0:未处理 1:已处理',
   `create_time` datetime NULL DEFAULT NULL COMMENT '创建时间',
@@ -104,14 +106,40 @@ CREATE TABLE `t_data`  (
   `release_date` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '发布时间',
   `devices` varchar(1000) NULL DEFAULT NULL COMMENT '相关设备',
   `attach_path` varchar(2000) NULL DEFAULT NULL COMMENT '附件路径',
-  `status` int(4) NULL DEFAULT NULL COMMENT '状态 0:未推送 1:已推送',
+  `status` int(4) NULL DEFAULT 0 COMMENT '状态 0:未推送 1:已推送',
   `create_time` datetime NULL DEFAULT NULL COMMENT '创建时间',
   `send_time` datetime NULL DEFAULT NULL COMMENT '推送时间',
   `other_urls` varchar(1000) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '其他连接(招标编号相同的多个链接)',
+  `prompt_tokens` int NULL DEFAULT 0 COMMENT '输入token数量',
+  `completion_tokens` int NULL DEFAULT 0 COMMENT '输出token数量',
+  `total_tokens` int NULL DEFAULT 0 COMMENT '总token数量',
   `remark` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '备注',
   PRIMARY KEY (`url`) USING BTREE
 ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;
 
-
+-- ----------------------------
+-- Table structure for t_data_result
+-- ----------------------------
+DROP TABLE IF EXISTS `t_data_result`;
+CREATE TABLE `t_data_result`  (
+  `url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '详情链接',
+  `keyword` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '搜索关键字',
+  `no` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '招标编号',
+  `title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '招标标题',
+  `date` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '公告时间',
+  `price` varchar(1000) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '中标金额',
+  `bidder` varchar(1000) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '中标人',
+  `summary` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '招标摘要',
+  `attach_path` varchar(2000) NULL DEFAULT NULL COMMENT '附件路径',
+  `status` int(4) NULL DEFAULT 0 COMMENT '状态 0:未推送 1:已推送',
+  `create_time` datetime NULL DEFAULT NULL COMMENT '创建时间',
+  `send_time` datetime NULL DEFAULT NULL COMMENT '推送时间',
+  `other_urls` varchar(1000) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '其他连接(招标编号相同的多个链接)',
+  `prompt_tokens` int NULL DEFAULT 0 COMMENT '输入token数量',
+  `completion_tokens` int NULL DEFAULT 0 COMMENT '输出token数量',
+  `total_tokens` int NULL DEFAULT 0 COMMENT '总token数量',
+  `remark` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '备注',
+  PRIMARY KEY (`url`) USING BTREE
+) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;
 
 SET FOREIGN_KEY_CHECKS = 1;