| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- import io
- import os
- from functools import lru_cache
- from typing import List
- import pandas as pd
- import requests
- from openpyxl import Workbook
- from openpyxl.styles import Alignment, PatternFill
- from openpyxl.utils import get_column_letter
- from openpyxl.worksheet.datavalidation import DataValidation
- from core.settings import cache_settings
- class CommonUtil:
- @staticmethod
- @lru_cache()
- def get_ip_location(oper_ip: str):
- """
- 查询ip归属区域
- :param oper_ip: 需要查询的ip
- :return: ip归属区域
- """
- oper_location = "内网IP"
- try:
- if oper_ip != "127.0.0.1" and oper_ip != "localhost":
- oper_location = "未知"
- ip_result = requests.get(
- f"https://qifu-api.baidubce.com/ip/geo/v1/district?ip={oper_ip}"
- )
- if ip_result.status_code == 200:
- prov = ip_result.json().get("data").get("prov")
- city = ip_result.json().get("data").get("city")
- if prov or city:
- oper_location = f"{prov}-{city}"
- except Exception as e:
- oper_location = "未知"
- print(e)
- return oper_location
- @staticmethod
- def bytes2human(n, format_str="%(value).1f%(symbol)s"):
- """Used by various scripts. See:
- http://goo.gl/zeJZl
- >>> bytes2human(10000)
- '9.8K'
- >>> bytes2human(100001221)
- '95.4M'
- """
- symbols = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
- prefix = {}
- for i, s in enumerate(symbols[1:]):
- prefix[s] = 1 << (i + 1) * 10
- for symbol in reversed(symbols[1:]):
- if n >= prefix[symbol]:
- value = float(n) / prefix[symbol]
- return format_str % locals()
- return format_str % dict(symbol=symbols[0], value=n)
- @staticmethod
- def bytes2file_response(bytes_info):
- yield bytes_info
- @staticmethod
- def export_list2excel(list_data: List):
- """
- 工具方法:将需要导出的list数据转化为对应excel的二进制数据
- :param list_data: 数据列表
- :return: 字典信息对应excel的二进制数据
- """
- df = pd.DataFrame(list_data)
- binary_data = io.BytesIO()
- # 使用 ExcelWriter 明确指定引擎和 buffer
- with pd.ExcelWriter(binary_data, engine="openpyxl") as writer:
- df.to_excel(writer, index=False)
- # 返回 BytesIO 中的二进制数据
- return binary_data.getvalue()
- @staticmethod
- def get_excel_template(
- header_list: List, selector_header_list: List, option_list: List[dict]
- ):
- """
- 工具方法:将需要导出的list数据转化为对应excel的二进制数据
- :param header_list: 表头数据列表
- :param selector_header_list: 需要设置为选择器格式的表头数据列表
- :param option_list: 选择器格式的表头预设的选项列表
- :return: 模板excel的二进制数据
- """
- # 创建Excel工作簿
- wb = Workbook()
- # 选择默认的活动工作表
- ws = wb.active
- # 设置表头文字
- headers = header_list
- # 设置表头背景样式为灰色,前景色为白色
- header_fill = PatternFill(
- start_color="ababab", end_color="ababab", fill_type="solid"
- )
- # 将表头写入第一行
- for col_num, header in enumerate(headers, 1):
- cell = ws.cell(row=1, column=col_num)
- cell.value = header
- cell.fill = header_fill
- # 设置列宽度为16
- ws.column_dimensions[chr(64 + col_num)].width = 12
- # 设置水平居中对齐
- cell.alignment = Alignment(horizontal="center")
- # 设置选择器的预设选项
- options = option_list
- # 获取selector_header的字母索引
- for selector_header in selector_header_list:
- column_selector_header_index = headers.index(selector_header) + 1
- # 创建数据有效性规则
- header_option = []
- for option in options:
- if option.get(selector_header):
- header_option = option.get(selector_header)
- dv = DataValidation(type="list", formula1=f'"{",".join(header_option)}"')
- # 设置数据有效性规则的起始单元格和结束单元格
- dv.add(
- f"{get_column_letter(column_selector_header_index)}2:{get_column_letter(column_selector_header_index)}1048576"
- )
- # 添加数据有效性规则到工作表
- ws.add_data_validation(dv)
- # 保存Excel文件为字节类型的数据
- file = io.BytesIO()
- wb.save(file)
- file.seek(0)
- # 读取字节数据
- excel_data = file.getvalue()
- return excel_data
- @staticmethod
- def get_filepath_from_url(url: str):
- """
- 工具方法:根据请求参数获取文件路径
- :param url: 请求参数中的url参数
- :return: 文件路径
- """
- file_info = url.split("?")[1].split("&")
- task_id = file_info[0].split("=")[1]
- file_name = file_info[1].split("=")[1]
- task_path = file_info[2].split("=")[1]
- filepath = os.path.join(cache_settings.PATH, task_path, task_id, file_name)
- return filepath
- @staticmethod
- def worship():
- print(
- """
- ////////////////////////////////////////////////////////////////////
- // _ooOoo_ //
- // o8888888o //
- // 88" . "88 //
- // (| ^_^ |) //
- // O\ = /O //
- // ____/`---'\____ //
- // .' \\| |// `. //
- // / \\||| : |||// \ //
- // / _||||| -:- |||||- \ //
- // | | \\\ - /// | | //
- // | \_| ''\---/'' | | //
- // \ .-\__ `-` ___/-. / //
- // ___`. .' /--.--\ `. . ___ //
- // ."" '< `.___\_<|>_/___.' >'"". //
- // | | : `- \`.;`\ _ /`;.`/ - ` : | | //
- // \ \ `-. \_ __\ /__ _/ .-` / / //
- // ========`-.____`-.___\_____/___.-`____.-'======== //
- // `=---=' //
- // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ //
- // 佛祖保佑 永不宕机 永无BUG //
- ////////////////////////////////////////////////////////////////////
- """
- )
|