123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 |
- import utils
- from datetime import datetime
- from utils.mysql_helper import MySQLHelper
- class ProcessData:
- def __init__(
- self,
- no=None,
- title=None,
- url=None,
- keyword=None,
- date=None,
- province=None,
- city=None,
- address=None,
- budget=None,
- summary=None,
- release_date=None,
- devices=None,
- attach_path=None,
- status=None,
- create_time=None,
- send_time=None,
- other_urls=None,
- prompt_tokens=None,
- completion_tokens=None,
- total_tokens=None,
- remark=None,
- ):
- self.no = no
- self.title = title
- self.url = url
- self.date = date
- self.province = province.replace("省", "").replace("市", "") if province else ""
- self.city = (
- city.replace("市", "").replace("区", "").replace("县", "") if city else ""
- )
- if self.province == self.city:
- self.province = ""
- self.keyword = keyword
- self.budget = budget
- self.address = address
- self.summary = summary
- self.release_date = release_date
- self.devices = devices
- self.attach_path = attach_path
- self.status = status
- self.create_time = create_time or datetime.now()
- self.send_time = send_time
- self.other_urls = other_urls
- self.prompt_tokens = prompt_tokens
- self.completion_tokens = completion_tokens
- self.total_tokens = total_tokens
- self.remark = remark
- def __repr__(self):
- return (
- f"ProcessData(no={self.no}, title={self.title}, date={self.date}, "
- f"province={self.province},city={self.city}, address={self.address}, summary={self.summary}, "
- f"status={self.status}, create_time={self.create_time}, "
- f"send_time={self.send_time}, remark={self.remark})"
- )
- _insert_query = """
- INSERT IGNORE INTO t_data (no, title, url, keyword, date, province, city, address, budget, summary, release_date, devices, attach_path, status, create_time, prompt_tokens, completion_tokens, total_tokens)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- # _update_query = """
- # UPDATE t_collect_data SET status = 1 WHERE url = %s;
- # """
- def insert(self, process_data):
- if not isinstance(process_data, self.__class__):
- raise TypeError("process_data 不是 ProcessData 的实例")
- insert_params = (
- process_data.no,
- process_data.title,
- process_data.url,
- process_data.keyword,
- process_data.date,
- process_data.province,
- process_data.city,
- process_data.address,
- process_data.budget,
- process_data.summary,
- process_data.release_date,
- process_data.devices,
- process_data.attach_path,
- 0,
- datetime.now(),
- process_data.prompt_tokens,
- process_data.completion_tokens,
- process_data.total_tokens,
- )
- # update_params = (process_data.url, )
- with MySQLHelper() as db_helper:
- db_helper.execute_non_query(self._insert_query, insert_params)
- # db_helper.execute_non_query(self._update_query, update_params)
- utils.get_logger().info(f"共插入 1 条处理数据")
- def insert_batch(self, process_data_list):
- if not all(
- isinstance(process_data, self.__class__)
- for process_data in process_data_list
- ):
- raise TypeError("process_data_list 中的所有元素必须是 ProcessData 的实例")
- insert_params = [
- (
- process_data.no,
- process_data.title,
- process_data.url,
- process_data.keyword,
- process_data.date,
- process_data.province,
- process_data.city,
- process_data.address,
- process_data.budget,
- process_data.summary,
- process_data.release_date,
- process_data.devices,
- process_data.attach_path,
- 0,
- datetime.now(),
- process_data.prompt_tokens,
- process_data.completion_tokens,
- process_data.total_tokens,
- )
- for process_data in process_data_list
- ]
- # update_params = [(process_data.url, )
- # for process_data in process_data_list]
- with MySQLHelper() as db_helper:
- db_helper.execute_non_query(self._insert_query, insert_params)
- affected_rows = db_helper.connection.affected_rows()
- utils.get_logger().info(f"共插入 {affected_rows} 条处理数据")
- # for param in update_params:
- # db_helper.execute_non_query(self._update_query, param)
- return affected_rows
- _one_url_query = (
- "SELECT url,no,other_urls,attach_path FROM t_data WHERE url = %s LIMIT 1"
- )
- def fetch_one_process_by_url(self, url: str):
- with MySQLHelper() as db_helper:
- result = db_helper.fetch_one(self._one_url_query, (url,))
- if not result:
- return None
- data = ProcessData(
- url=result["url"],
- no=result["no"],
- other_urls=result["other_urls"],
- attach_path=result["attach_path"],
- )
- return data
- _one_no_query = (
- "SELECT url,no,other_urls,attach_path FROM t_data WHERE no = %s LIMIT 1"
- )
- def fetch_one_process_by_no(self, no: str):
- with MySQLHelper() as db_helper:
- result = db_helper.fetch_one(self._one_no_query, (no,))
- if not result:
- return None
- data = ProcessData(
- url=result["url"],
- no=result["no"],
- other_urls=result["other_urls"],
- attach_path=result["attach_path"],
- )
- return data
- _not_send_query = "SELECT no, title, url, keyword, devices,date, city, address, budget, summary, attach_path, release_date FROM t_data WHERE status = 0"
- def fetch_not_send(self):
- with MySQLHelper() as db_helper:
- results = db_helper.execute_query(self._not_send_query)
- data = [ProcessData(**result) for result in results]
- return data
- _set_send_query = "UPDATE t_data SET status = 1, send_time = %s WHERE no = %s"
- def set_send(self, no):
- with MySQLHelper() as db_helper:
- params = (datetime.now(), no)
- db_helper.execute_non_query(self._set_send_query, params)
- _update_other_urls_query = "UPDATE t_data SET other_urls = %s WHERE url = %s"
- def set_other_urls(self, url, other_urls):
- with MySQLHelper() as db_helper:
- params = (other_urls, url)
- db_helper.execute_non_query(self._update_other_urls_query, params)
- _delete_before_date_query = "DELETE FROM t_data WHERE date < %s"
- def delete_before_date(self, date: str):
- """
- 删除指定日期之前的数据
- :param date: 日期字符串,格式为 YYYY-MM-DD
- :return: 删除的行数
- """
- with MySQLHelper() as db_helper:
- params = (date,)
- db_helper.execute_non_query(self._delete_before_date_query, params)
- affected_rows = db_helper.connection.affected_rows()
- utils.get_logger().info(
- f"删除 {date} 之前共 {affected_rows} 条 招标处理记录。"
- )
- return affected_rows
|