|
@@ -0,0 +1,186 @@
|
|
|
+import os
|
|
|
+import shutil
|
|
|
+import tools.utils as utils
|
|
|
+from datetime import datetime, timedelta
|
|
|
+from urllib.parse import urlparse
|
|
|
+
|
|
|
+import pandas as pd
|
|
|
+import requests
|
|
|
+import mimetypes
|
|
|
+import base64
|
|
|
+
|
|
|
+
|
|
|
+class FileHelper:
|
|
|
+
|
|
|
+ DEFAULT_ATTACH_PATH = "./temp_files/attaches/"
|
|
|
+ DEFAULT_REPORT_PATH = "./temp_files/report/"
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ attach_path = utils.get_config_value(
|
|
|
+ "save.attach_file_path", self.DEFAULT_ATTACH_PATH
|
|
|
+ )
|
|
|
+ attach_path = attach_path.replace("\\", "/")
|
|
|
+ attach_path = attach_path.replace("//", "/")
|
|
|
+ self._attach_file_path = attach_path
|
|
|
+ report_path = utils.get_config_value(
|
|
|
+ "save.report_file_path", self.DEFAULT_REPORT_PATH
|
|
|
+ )
|
|
|
+ report_path = report_path.replace("\\", "/")
|
|
|
+ report_path = report_path.replace("//", "/")
|
|
|
+ self._report_file_path = report_path
|
|
|
+
|
|
|
+ def download_remote_file(self, file_url: str, file_name: str) -> str | None:
|
|
|
+ utils.get_logger().info(f"下载远程文件: {file_url} 文件名:{file_name}")
|
|
|
+ current_timestamp = datetime.now().strftime("%H%M%S%f")[:-3] # 取前三位毫秒
|
|
|
+ file_name = f"{current_timestamp}@{file_name}"
|
|
|
+ file_path = os.path.join(
|
|
|
+ self._attach_file_path, f'{datetime.now().strftime("%Y-%m-%d")}'
|
|
|
+ )
|
|
|
+ if not os.path.exists(file_path):
|
|
|
+ os.makedirs(file_path)
|
|
|
+ path = os.path.join(file_path, file_name)
|
|
|
+ path = path.replace("\\", "/")
|
|
|
+ path = path.replace("//", "/")
|
|
|
+ # 10个不同的 User-Agent
|
|
|
+ user_agents = [
|
|
|
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
|
|
|
+ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.2 Safari/605.1.15",
|
|
|
+ "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
|
|
|
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
|
|
|
+ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/91.0.4472.124 Safari/605.1.15",
|
|
|
+ "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0",
|
|
|
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36",
|
|
|
+ "Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.2 Mobile/15E148 Safari/604.1",
|
|
|
+ "Mozilla/5.0 (iPad; CPU OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.2 Mobile/15E148 Safari/604.1",
|
|
|
+ "Mozilla/5.0 (Linux; Android 11; SM-G973F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Mobile Safari/537.36",
|
|
|
+ ]
|
|
|
+
|
|
|
+ # 根据文件名长度选择一个 User-Agent
|
|
|
+ ua_index = len(file_name) % len(user_agents)
|
|
|
+ # 解析 file_url 获取 Referer
|
|
|
+ parsed_url = urlparse(file_url)
|
|
|
+ referer = f"{parsed_url.scheme}://{parsed_url.netloc}/".replace(
|
|
|
+ "//download.", "//www."
|
|
|
+ )
|
|
|
+ headers = {
|
|
|
+ "User-Agent": user_agents[ua_index],
|
|
|
+ "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
|
|
|
+ "Accept-Encoding": "gzip, deflate, br",
|
|
|
+ "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,zh-TW;q=0.7",
|
|
|
+ "Referer": referer,
|
|
|
+ }
|
|
|
+
|
|
|
+ try:
|
|
|
+ response = requests.get(file_url, headers=headers, allow_redirects=True)
|
|
|
+ response.raise_for_status()
|
|
|
+ with open(path, "wb") as f:
|
|
|
+ f.write(response.content)
|
|
|
+ utils.get_logger().info(f"文件下载成功: {file_name}")
|
|
|
+ return path
|
|
|
+ except requests.exceptions.HTTPError as http_err:
|
|
|
+ utils.get_logger().error(f"HTTP 错误: {http_err}")
|
|
|
+ except Exception as e:
|
|
|
+ utils.get_logger().error(f"文件下载失败: {file_name}。Exception: {e}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ def clean_attach_file(self, day: int) -> None:
|
|
|
+ try:
|
|
|
+ current_time = datetime.now()
|
|
|
+ cutoff_time = current_time - timedelta(days=day)
|
|
|
+ for root, dirs, _ in os.walk(self._attach_file_path):
|
|
|
+ for dir_name in dirs:
|
|
|
+ path = os.path.join(root, dir_name)
|
|
|
+ dir_path = (
|
|
|
+ str(path).replace(self._attach_file_path, "").replace("\\", "/")
|
|
|
+ )
|
|
|
+ if dir_path.count("/") > 0:
|
|
|
+ continue
|
|
|
+ try:
|
|
|
+ dir_date = datetime.strptime(dir_path, "%Y-%m-%d")
|
|
|
+ if dir_date < cutoff_time:
|
|
|
+ try:
|
|
|
+ shutil.rmtree(path)
|
|
|
+ utils.get_logger().info(
|
|
|
+ f" 删除目录及其内容: {dir_path}"
|
|
|
+ )
|
|
|
+ except PermissionError:
|
|
|
+ utils.get_logger().error(
|
|
|
+ f" 权限错误,无法删除目录: {dir_path}"
|
|
|
+ )
|
|
|
+ except Exception as e:
|
|
|
+ utils.get_logger().error(
|
|
|
+ f" 删除目录失败: {dir_path}。Exception: {e}"
|
|
|
+ )
|
|
|
+ except ValueError:
|
|
|
+ # 如果目录名称不符合 %Y-%m/%d 格式,跳过
|
|
|
+ continue
|
|
|
+ except Exception as e:
|
|
|
+ utils.get_logger().error(f"attach 文件清理失败。Exception: {e}")
|
|
|
+
|
|
|
+ def save_report_excel(self, data, file_name: str = None) -> str:
|
|
|
+ try:
|
|
|
+ df = pd.DataFrame(data)
|
|
|
+ file_path = os.path.join(
|
|
|
+ self._report_file_path, f'{datetime.now().strftime("%Y-%m-%d")}'
|
|
|
+ )
|
|
|
+ if not os.path.exists(file_path):
|
|
|
+ os.makedirs(file_path)
|
|
|
+ file_name = f"{file_name}_{datetime.now().strftime('%H%M%S')}.xlsx"
|
|
|
+ path = os.path.join(file_path, file_name)
|
|
|
+ path = path.replace("\\", "/")
|
|
|
+ path = path.replace("//", "/")
|
|
|
+ df.to_excel(path, index=False)
|
|
|
+ utils.get_logger().debug(f"Report报存成功: {file_name}")
|
|
|
+ return path
|
|
|
+ except Exception as e:
|
|
|
+ utils.get_logger().error(f"保存 Report Excel 文件失败。Exception: {e}")
|
|
|
+ return ""
|
|
|
+
|
|
|
+ def clean_report_file(self, day: int) -> None:
|
|
|
+ try:
|
|
|
+ current_time = datetime.now()
|
|
|
+ cutoff_time = current_time - timedelta(days=day)
|
|
|
+ for root, dirs, _ in os.walk(self._report_file_path):
|
|
|
+ for dir_name in dirs:
|
|
|
+ path = os.path.join(root, dir_name)
|
|
|
+ dir_path = (
|
|
|
+ str(path).replace(self._report_file_path, "").replace("\\", "/")
|
|
|
+ )
|
|
|
+ if dir_path.count("/") > 0:
|
|
|
+ continue
|
|
|
+ try:
|
|
|
+ dir_date = datetime.strptime(dir_path, "%Y-%m-%d")
|
|
|
+ if dir_date < cutoff_time:
|
|
|
+ try:
|
|
|
+ shutil.rmtree(path)
|
|
|
+ utils.get_logger().info(
|
|
|
+ f" Report 删除目录及其内容: {dir_path}"
|
|
|
+ )
|
|
|
+ except PermissionError:
|
|
|
+ utils.get_logger().error(
|
|
|
+ f" Report 权限错误,无法删除目录: {dir_path}"
|
|
|
+ )
|
|
|
+ except Exception as e:
|
|
|
+ utils.get_logger().error(
|
|
|
+ f" Report 删除目录失败: {dir_path}。Exception: {e}"
|
|
|
+ )
|
|
|
+ except ValueError:
|
|
|
+ # 如果目录名称不符合 %Y-%m/%d 格式,跳过
|
|
|
+ continue
|
|
|
+ except Exception as e:
|
|
|
+ utils.get_logger().error(f"Report 文件清理失败。Exception: {e}")
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def encode_file(file_path: str):
|
|
|
+ if not os.path.exists(file_path):
|
|
|
+ utils.get_logger().error(f"文件不存在: {file_path}")
|
|
|
+ raise FileNotFoundError(f"文件不存在: {file_path}")
|
|
|
+ # 根据文件扩展名获取 MIME 类型
|
|
|
+ mime_type, _ = mimetypes.guess_type(file_path)
|
|
|
+ if mime_type is None:
|
|
|
+ mime_type = 'image/jpeg' # 默认使用 jpeg 类型
|
|
|
+ # 将图片编码为 base64 字符串
|
|
|
+ with open(file_path, "rb") as image_file:
|
|
|
+ encoded_string = base64.b64encode(image_file.read())
|
|
|
+ base64_str = encoded_string.decode("utf-8")
|
|
|
+ return f"data:{mime_type};base64,{base64_str}"
|