common_util.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. import io
  2. import os
  3. from functools import lru_cache
  4. from typing import List
  5. import pandas as pd
  6. import requests
  7. from openpyxl import Workbook
  8. from openpyxl.styles import Alignment, PatternFill
  9. from openpyxl.utils import get_column_letter
  10. from openpyxl.worksheet.datavalidation import DataValidation
  11. from core.settings import cache_settings
  12. class CommonUtil:
  13. @staticmethod
  14. @lru_cache()
  15. def get_ip_location(oper_ip: str):
  16. """
  17. 查询ip归属区域
  18. :param oper_ip: 需要查询的ip
  19. :return: ip归属区域
  20. """
  21. oper_location = "内网IP"
  22. try:
  23. if oper_ip != "127.0.0.1" and oper_ip != "localhost":
  24. oper_location = "未知"
  25. ip_result = requests.get(
  26. f"https://qifu-api.baidubce.com/ip/geo/v1/district?ip={oper_ip}"
  27. )
  28. if ip_result.status_code == 200:
  29. prov = ip_result.json().get("data").get("prov")
  30. city = ip_result.json().get("data").get("city")
  31. if prov or city:
  32. oper_location = f"{prov}-{city}"
  33. except Exception as e:
  34. oper_location = "未知"
  35. print(e)
  36. return oper_location
  37. @staticmethod
  38. def bytes2human(n, format_str="%(value).1f%(symbol)s"):
  39. """Used by various scripts. See:
  40. http://goo.gl/zeJZl
  41. >>> bytes2human(10000)
  42. '9.8K'
  43. >>> bytes2human(100001221)
  44. '95.4M'
  45. """
  46. symbols = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
  47. prefix = {}
  48. for i, s in enumerate(symbols[1:]):
  49. prefix[s] = 1 << (i + 1) * 10
  50. for symbol in reversed(symbols[1:]):
  51. if n >= prefix[symbol]:
  52. value = float(n) / prefix[symbol]
  53. return format_str % locals()
  54. return format_str % dict(symbol=symbols[0], value=n)
  55. @staticmethod
  56. def bytes2file_response(bytes_info):
  57. yield bytes_info
  58. @staticmethod
  59. def export_list2excel(list_data: List):
  60. """
  61. 工具方法:将需要导出的list数据转化为对应excel的二进制数据
  62. :param list_data: 数据列表
  63. :return: 字典信息对应excel的二进制数据
  64. """
  65. df = pd.DataFrame(list_data)
  66. binary_data = io.BytesIO()
  67. # 使用 ExcelWriter 明确指定引擎和 buffer
  68. with pd.ExcelWriter(binary_data, engine="openpyxl") as writer:
  69. df.to_excel(writer, index=False)
  70. # 返回 BytesIO 中的二进制数据
  71. return binary_data.getvalue()
  72. @staticmethod
  73. def get_excel_template(
  74. header_list: List, selector_header_list: List, option_list: List[dict]
  75. ):
  76. """
  77. 工具方法:将需要导出的list数据转化为对应excel的二进制数据
  78. :param header_list: 表头数据列表
  79. :param selector_header_list: 需要设置为选择器格式的表头数据列表
  80. :param option_list: 选择器格式的表头预设的选项列表
  81. :return: 模板excel的二进制数据
  82. """
  83. # 创建Excel工作簿
  84. wb = Workbook()
  85. # 选择默认的活动工作表
  86. ws = wb.active
  87. # 设置表头文字
  88. headers = header_list
  89. # 设置表头背景样式为灰色,前景色为白色
  90. header_fill = PatternFill(
  91. start_color="ababab", end_color="ababab", fill_type="solid"
  92. )
  93. # 将表头写入第一行
  94. for col_num, header in enumerate(headers, 1):
  95. cell = ws.cell(row=1, column=col_num)
  96. cell.value = header
  97. cell.fill = header_fill
  98. # 设置列宽度为16
  99. ws.column_dimensions[chr(64 + col_num)].width = 12
  100. # 设置水平居中对齐
  101. cell.alignment = Alignment(horizontal="center")
  102. # 设置选择器的预设选项
  103. options = option_list
  104. # 获取selector_header的字母索引
  105. for selector_header in selector_header_list:
  106. column_selector_header_index = headers.index(selector_header) + 1
  107. # 创建数据有效性规则
  108. header_option = []
  109. for option in options:
  110. if option.get(selector_header):
  111. header_option = option.get(selector_header)
  112. dv = DataValidation(type="list", formula1=f'"{",".join(header_option)}"')
  113. # 设置数据有效性规则的起始单元格和结束单元格
  114. dv.add(
  115. f"{get_column_letter(column_selector_header_index)}2:{get_column_letter(column_selector_header_index)}1048576"
  116. )
  117. # 添加数据有效性规则到工作表
  118. ws.add_data_validation(dv)
  119. # 保存Excel文件为字节类型的数据
  120. file = io.BytesIO()
  121. wb.save(file)
  122. file.seek(0)
  123. # 读取字节数据
  124. excel_data = file.getvalue()
  125. return excel_data
  126. @staticmethod
  127. def get_filepath_from_url(url: str):
  128. """
  129. 工具方法:根据请求参数获取文件路径
  130. :param url: 请求参数中的url参数
  131. :return: 文件路径
  132. """
  133. file_info = url.split("?")[1].split("&")
  134. task_id = file_info[0].split("=")[1]
  135. file_name = file_info[1].split("=")[1]
  136. task_path = file_info[2].split("=")[1]
  137. filepath = os.path.join(cache_settings.PATH, task_path, task_id, file_name)
  138. return filepath
  139. @staticmethod
  140. def worship():
  141. print(
  142. """
  143. ////////////////////////////////////////////////////////////////////
  144. // _ooOoo_ //
  145. // o8888888o //
  146. // 88" . "88 //
  147. // (| ^_^ |) //
  148. // O\ = /O //
  149. // ____/`---'\____ //
  150. // .' \\| |// `. //
  151. // / \\||| : |||// \ //
  152. // / _||||| -:- |||||- \ //
  153. // | | \\\ - /// | | //
  154. // | \_| ''\---/'' | | //
  155. // \ .-\__ `-` ___/-. / //
  156. // ___`. .' /--.--\ `. . ___ //
  157. // ."" '< `.___\_<|>_/___.' >'"". //
  158. // | | : `- \`.;`\ _ /`;.`/ - ` : | | //
  159. // \ \ `-. \_ __\ /__ _/ .-` / / //
  160. // ========`-.____`-.___\_____/___.-`____.-'======== //
  161. // `=---=' //
  162. // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ //
  163. // 佛祖保佑 永不宕机 永无BUG //
  164. ////////////////////////////////////////////////////////////////////
  165. """
  166. )