123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274 |
- import json
- import utils
- from datetime import datetime
- from utils.mysql_helper import MySQLHelper
- class InstrumentData:
- def __init__(
- self,
- company: str,
- name: str,
- manufacturer: str,
- model: str,
- quantity: int,
- unit_price: float,
- ):
- self.company = company # 中标单位名称,参与竞标并中标的公司名称
- self.name = name # 仪器名称,例如:红外光谱仪
- self.manufacturer = manufacturer # 仪器厂商,例如:赛默飞、Bruker
- self.model = model # 仪器的型号 / 规格,例如:NIR25S
- self.quantity = quantity # 中标仪器的数量,台数,例如:2
- self.unit_price = unit_price # 仪器的单价,单位转
- def to_str(self) -> str:
- return f"[名称:{self.name},中标单位:{self.company},仪器厂商:{self.manufacturer},仪器规格:{self.model},数量:{self.quantity},单价:{self.unit_price}]"
- class ProcessResultData:
- def __init__(
- self,
- no=None,
- title=None,
- url=None,
- keyword=None,
- provice=None,
- city=None,
- date=None,
- instruments=None,
- instruments_o=None,
- summary=None,
- attach_path=None,
- status=None,
- create_time=None,
- send_time=None,
- other_urls=None,
- prompt_tokens=None,
- completion_tokens=None,
- total_tokens=None,
- remark=None,
- ):
- self.no = no
- self.title = title
- self.url = url
- self.keyword = keyword
- self.date = date
- self.instruments_str = ""
- self.instruments = []
- self.set_instruments(instruments, instruments_o)
- self.provice = provice.replace("省", "").replace("市", "") if provice else ""
- self.city = (
- city.replace("市", "").replace("区", "").replace("县", "") if city else ""
- )
- if self.provice == self.city:
- self.provice = ""
- self.summary = summary
- self.attach_path = attach_path
- self.status = status
- self.create_time = create_time or datetime.now()
- self.send_time = send_time
- self.other_urls = other_urls
- self.prompt_tokens = prompt_tokens
- self.completion_tokens = completion_tokens
- self.total_tokens = total_tokens
- self.remark = remark
- def __repr__(self):
- return (
- f"ProcessResultData(no={self.no}, title={self.title}, date={self.date}, "
- f"keyword={self.keyword}, provice={self.provice},city={self.city},instruments={self.instruments_str} summary={self.summary}, attach_path={self.attach_path}, "
- f"status={self.status}, create_time={self.create_time}, "
- f"send_time={self.send_time}, remark={self.remark})"
- )
- def set_instruments(self, instruments_str: str, instruments):
- if instruments is None:
- instruments = []
- if instruments_str:
- self.instruments_str = instruments_str
- self.instruments = [
- InstrumentData(**instrument)
- for instrument in json.loads(self.instruments_str)
- ]
- else:
- self.instruments = instruments or []
- self.instruments_str = (
- json.dumps(
- instruments,
- ensure_ascii=False,
- )
- if len(instruments) > 0
- else ""
- )
- _insert_query = """
- INSERT IGNORE INTO t_data_result (no, title, url, keyword, date, provice, city, instruments, summary, attach_path, status, create_time, prompt_tokens, completion_tokens, total_tokens)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- # _update_query = """
- # UPDATE t_collect_data SET status = 1 WHERE url = %s;
- # """
- def insert(self, process_result_data):
- if not isinstance(process_result_data, self.__class__):
- raise TypeError("process_result_data 不是 ProcessResultData 的实例")
- insert_params = (
- process_result_data.no,
- process_result_data.title,
- process_result_data.url,
- process_result_data.keyword,
- process_result_data.date,
- process_result_data.provice,
- process_result_data.city,
- process_result_data.instruments_str,
- process_result_data.summary,
- process_result_data.attach_path,
- 0,
- datetime.now(),
- process_result_data.prompt_tokens,
- process_result_data.completion_tokens,
- process_result_data.total_tokens,
- )
- # update_params = (process_result_data.url, )
- with MySQLHelper() as db_helper:
- db_helper.execute_non_query(self._insert_query, insert_params)
- # db_helper.execute_non_query(self._update_query, update_params)
- utils.get_logger().info(f"共插入 1 条结果处理数据")
- def insert_batch(self, process_result_data_list):
- if not all(
- isinstance(process_result_data, self.__class__)
- for process_result_data in process_result_data_list
- ):
- raise TypeError(
- "process_result_data_list 中的所有元素必须是 ProcessResultData 的实例"
- )
- insert_params = [
- (
- process_result_data.no,
- process_result_data.title,
- process_result_data.url,
- process_result_data.keyword,
- process_result_data.date,
- process_result_data.provice,
- process_result_data.city,
- process_result_data.instruments_str,
- process_result_data.summary,
- process_result_data.attach_path,
- 0,
- datetime.now(),
- process_result_data.prompt_tokens,
- process_result_data.completion_tokens,
- process_result_data.total_tokens,
- )
- for process_result_data in process_result_data_list
- ]
- # update_params = [(process_result_data.url, )
- # for process_result_data in process_result_data_list]
- with MySQLHelper() as db_helper:
- db_helper.execute_non_query(self._insert_query, insert_params)
- affected_rows = db_helper.connection.affected_rows()
- utils.get_logger().info(f"共插入 {affected_rows} 条结果处理数据")
- # for param in update_params:
- # db_helper.execute_non_query(self._update_query, param)
- return affected_rows
- _one_url_query = """
- SELECT url,no,other_urls,attach_path FROM t_data_result WHERE url = %s LIMIT 1
- """
- def fetch_one_process_by_url(self, url: str):
- with MySQLHelper() as db_helper:
- result = db_helper.fetch_one(self._one_url_query, (url,))
- if not result:
- return None
- data = ProcessResultData(
- no=result["no"],
- url=result["url"],
- attach_path=result["attach_path"],
- other_urls=result["other_urls"],
- )
- return data
- _one_no_query = """
- SELECT url,no,other_urls,attach_path FROM t_data_result WHERE no = %s LIMIT 1
- """
- def fetch_one_process_by_no(self, no: str):
- with MySQLHelper() as db_helper:
- result = db_helper.fetch_one(self._one_no_query, (no,))
- if not result:
- return None
- data = ProcessResultData(
- no=result["no"],
- url=result["url"],
- attach_path=result["attach_path"],
- other_urls=result["other_urls"],
- )
- return data
- _not_send_query = "SELECT no, title, url, keyword, date, provice, city, instruments, summary, attach_path, status, create_time, send_time FROM t_data_result WHERE status = 0"
- def fetch_not_send(self):
- with MySQLHelper() as db_helper:
- results = db_helper.execute_query(self._not_send_query)
- data = [ProcessResultData(**result) for result in results]
- return data
- _update_send_status_query = """
- UPDATE t_data_result SET status = 1, send_time = %s WHERE no = %s
- """
- def set_send(self, no):
- with MySQLHelper() as db_helper:
- params = (datetime.now(), no)
- db_helper.execute_non_query(self._update_send_status_query, params)
- _update_other_urls_query = "UPDATE t_data_result SET other_urls = %s WHERE url = %s"
- def set_other_urls(self, url, other_urls):
- with MySQLHelper() as db_helper:
- params = (other_urls, url)
- db_helper.execute_non_query(self._update_other_urls_query, params)
- _query_report = (
- "select * from t_data_result where create_time between %s and %s ORDER BY date"
- )
- def fetch_to_report_by_date(self, start_date, end_date):
- """
- 获取需要生成报表的数据
- :param start_date:
- :param end_date:
- :return:
- """
- with MySQLHelper() as db_helper:
- params = (start_date, end_date)
- results = db_helper.execute_query(self._query_report, params)
- data = [ProcessResultData(**result) for result in results]
- return data
- _delete_before_date_query = "DELETE FROM t_data_result WHERE create_time < %s"
- def delete_before_date(self, date: str):
- """
- 删除指定日期之前的数据
- :param date:
- :return:
- """
- with MySQLHelper() as db_helper:
- params = (date,)
- db_helper.execute_non_query(self._delete_before_date_query, params)
- affected_rows = db_helper.connection.affected_rows()
- utils.get_logger().info(
- f"删除 {date} 之前共 {affected_rows} 条 中标处理记录。"
- )
- return affected_rows
|