123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- from datetime import datetime
- from utils.mysql_helper import MySQLHelper
- from utils.config_helper import ConfigHelper
- from utils.logger_helper import LoggerHelper
- class ProcessData:
- logger = LoggerHelper.get_logger()
- def __init__(self,
- no=None,
- title=None,
- url=None,
- keyword=None,
- date=None,
- area=None,
- address=None,
- summary=None,
- release_date=None,
- devices=None,
- attach_path=None,
- status=None,
- create_time=None,
- send_time=None,
- other_urls=None,
- remark=None):
- self.no = no
- self.title = title
- self.url = url
- self.date = date
- if not area:
- area = ConfigHelper().get("default_area", "全国")
- self.area = area.replace(" ", "")
- self.keyword = keyword
- self.address = address
- self.summary = summary
- self.release_date = release_date
- self.devices = devices
- self.attach_path = attach_path
- self.status = status
- self.create_time = create_time or datetime.now()
- self.send_time = send_time
- self.other_urls = other_urls
- self.remark = remark
- def __repr__(self):
- return (
- f"ProcessData(no={self.no}, title={self.title}, date={self.date}, "
- f"area={self.area}, address={self.address}, summary={self.summary}, "
- f"status={self.status}, create_time={self.create_time}, "
- f"send_time={self.send_time}, remark={self.remark})")
- _insert_query = """
- INSERT IGNORE INTO t_data (no, title, url, keyword, date, area, address, summary, release_date, devices, attach_path, status, create_time)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- _update_query = """
- UPDATE t_collect_data SET status = 1 WHERE url = %s;
- """
- def insert(self, process_data):
- if not isinstance(process_data, self.__class__):
- raise TypeError("process_data 不是 ProcessData 的实例")
- insert_params = (process_data.no,
- process_data.title,
- process_data.url,
- process_data.keyword,
- process_data.date,
- process_data.area,
- process_data.address,
- process_data.summary,
- process_data.release_date,
- process_data.devices,
- process_data.attach_path,
- 0,
- datetime.now())
- update_params = (process_data.url, )
- with MySQLHelper() as db_helper:
- db_helper.execute_non_query(self._insert_query, insert_params)
- db_helper.execute_non_query(self._update_query, update_params)
- def insert_batch(self, process_data_list):
- if not all(
- isinstance(process_data, self.__class__)
- for process_data in process_data_list):
- raise TypeError("process_data_list 中的所有元素必须是 ProcessData 的实例")
- insert_params = [(
- process_data.no,
- process_data.title,
- process_data.url,
- process_data.keyword,
- process_data.date,
- process_data.area,
- process_data.address,
- process_data.summary,
- process_data.release_date,
- process_data.devices,
- process_data.attach_path,
- 0,
- datetime.now(),
- ) for process_data in process_data_list]
- update_params = [(process_data.url, )
- for process_data in process_data_list]
- with MySQLHelper() as db_helper:
- db_helper.execute_non_query(self._insert_query, insert_params)
- affected_rows = db_helper.connection.affected_rows()
- self.logger.info(f"成功插入 {affected_rows} 条数据")
- for param in update_params:
- db_helper.execute_non_query(self._update_query, param)
- return affected_rows
- _one_query = """
- SELECT url,no,other_urls,attach_path FROM t_data WHERE no = %s LIMIT 1
- """
- def fetch_one_process_by_no(self, no: str):
- with MySQLHelper() as db_helper:
- result = db_helper.fetch_one(self._one_query, (no, ))
- if not result:
- return None
- data = ProcessData(url=result["url"],
- no=result["no"],
- other_urls=result["other_urls"],
- attach_path=result["attach_path"])
- return data
- def fetch_no_send(self):
- with MySQLHelper() as db_helper:
- query = "SELECT no, title, url, keyword, date, area, address, summary, attach_path, release_date FROM t_data WHERE status = 0"
- results = db_helper.execute_query(query)
- data = [ProcessData(**result) for result in results]
- return data
- def set_send(self, no):
- with MySQLHelper() as db_helper:
- query = """
- UPDATE t_data
- SET status = 1, send_time = %s
- WHERE no = %s
- """
- params = (datetime.now(), no)
- db_helper.execute_non_query(query, params)
- def set_other_urls(self, url, other_urls):
- with MySQLHelper() as db_helper:
- query = """
- UPDATE t_data
- SET other_urls = %s
- WHERE url = %s
- """
- update_query = """
- UPDATE t_collect_data SET status = 1 WHERE url = %s;
- """
- params = (other_urls, url)
- db_helper.execute_non_query(query, params)
- db_helper.execute_non_query(update_query, (url, ))
- def check_is_process_by_url(self, url):
- with MySQLHelper() as db_helper:
- query = "SELECT * FROM t_data WHERE url = %s"
- params = (url, )
- results = db_helper.execute_query(query, params)
- return True if results else False
|