瀏覽代碼

init 系统工具包utils

YueYunyun 3 月之前
父節點
當前提交
8aed67e316

+ 120 - 0
README_UTILS.md

@@ -0,0 +1,120 @@
+# 工具模块使用说明
+
+## 数据处理工具
+
+### to_json(data)
+
+- 功能:将 Python 对象转换为 JSON 字符串
+- 参数:
+  - data: 需要转换的 Python 对象
+- 返回:JSON 格式的字符串
+
+```python
+from app.utils import to_json
+
+data = {'key': 'value'}
+json_str = to_json(data)
+```
+
+## 数据验证工具
+
+### is_email(email)
+
+- 功能:验证邮箱格式
+- 参数:
+  - email: 需要验证的邮箱地址
+- 返回:布尔值,True 表示格式正确
+
+```python
+from app.utils import is_email
+
+is_valid = is_email('test@example.com')
+```
+
+### is_valid_ip(ip)
+
+- 功能:验证 IP 地址格式
+- 参数:
+  - ip: 需要验证的 IP 地址
+- 返回:布尔值,True 表示格式正确
+
+```python
+from app.utils import is_valid_ip
+
+is_valid = is_valid_ip('192.168.1.1')
+```
+
+## 文件操作工具
+
+### get_file_size(file_path)
+
+- 功能:获取文件大小
+- 参数:
+  - file_path: 文件路径
+- 返回:文件大小(字节)
+
+```python
+from app.utils import get_file_size
+
+size = get_file_size('test.txt')
+```
+
+## 日期时间工具
+
+### get_current_time()
+
+- 功能:获取当前时间
+- 返回:当前时间的 datetime 对象
+
+```python
+from app.utils import get_current_time
+
+now = get_current_time()
+```
+
+### format_date(date, fmt='%Y-%m-%d %H:%M:%S')
+
+- 功能:格式化日期时间
+- 参数:
+  - date: 需要格式化的日期时间
+  - fmt: 格式化字符串(可选,默认'%Y-%m-%d %H:%M:%S')
+- 返回:格式化后的字符串
+
+```python
+from app.utils import format_date
+
+now = get_current_time()
+formatted = format_date(now)
+```
+
+### parse_date(date_str, fmt='%Y-%m-%d %H:%M:%S')
+
+- 功能:解析字符串为日期时间
+- 参数:
+  - date_str: 日期时间字符串
+  - fmt: 格式化字符串(可选,默认'%Y-%m-%d %H:%M:%S')
+- 返回:解析后的 datetime 对象
+
+```python
+from app.utils import parse_date
+
+dt = parse_date('2023-01-01 12:00:00')
+```
+
+### get_timestamp()
+
+- 功能:获取当前时间戳
+- 返回:当前时间的时间戳(秒)
+
+```python
+from app.utils import get_timestamp
+
+ts = get_timestamp()
+```
+
+## 使用建议
+
+1. 建议通过`from app.utils import ...`的方式按需导入工具方法
+2. 所有工具方法都经过单元测试,可以直接使用
+3. 如果遇到问题,请检查相关参数是否正确
+4. 日期时间工具使用时请注意时区问题

+ 43 - 0
SERVER/app/utils/__init__.py

@@ -0,0 +1,43 @@
+"""
+工具模块集合
+包含数据处理、验证等常用工具类
+
+版本: 1.0.0
+
+使用示例:
+    from app.utils import DataUtil, ValidationUtil
+    
+    # 数据处理示例
+    data = {'key': 'value'}
+    json_str = DataUtil.to_json(data)
+    
+    # 验证示例  
+    email = 'test@example.com'
+    is_valid = ValidationUtil.is_email(email)
+"""
+
+__version__ = '1.0.0'
+
+from .date_util import DateUtil
+from .data_util import DataUtil
+from .file_util import FileUtil
+from .network_util import NetworkUtil
+from .string_util import StringUtil
+from .validation_util import ValidationUtil
+
+# 常用方法快捷调用
+to_json = DataUtil.to_json
+is_email = ValidationUtil.is_email
+is_valid_ip = NetworkUtil.is_valid_ip
+get_file_size = FileUtil.get_file_size
+get_current_time = DateUtil.get_current_time
+format_date = DateUtil.format_date
+parse_date = DateUtil.parse_date
+get_timestamp = DateUtil.get_timestamp
+
+__all__ = [
+    'DataUtil', 'DateUtil', 'FileUtil', 'NetworkUtil', 'StringUtil',
+    'ValidationUtil', '__version__', 'to_json', 'is_email', 'is_valid_ip',
+    'get_file_size', 'get_current_time', 'format_date', 'parse_date',
+    'get_timestamp'
+]

+ 113 - 0
SERVER/app/utils/data_util.py

@@ -0,0 +1,113 @@
+import json
+from typing import Optional, Union, Dict, List
+from decimal import Decimal
+
+
+class DataUtil:
+    """数据处理工具类"""
+
+    @staticmethod
+    def to_json(data: Union[Dict, List]) -> Optional[str]:
+        """将数据转换为JSON字符串
+        
+        Args:
+            data: 要转换的数据
+            
+        Returns:
+            JSON字符串,如果转换失败返回None
+        """
+        try:
+            return json.dumps(data, ensure_ascii=False)
+        except Exception:
+            return None
+
+    @staticmethod
+    def from_json(json_str: str) -> Optional[Union[Dict, List]]:
+        """将JSON字符串转换为Python对象
+        
+        Args:
+            json_str: JSON字符串
+            
+        Returns:
+            Python对象,如果转换失败返回None
+        """
+        try:
+            return json.loads(json_str)
+        except Exception:
+            return None
+
+    @staticmethod
+    def to_decimal(value: Union[str, int, float]) -> Optional[Decimal]:
+        """将值转换为Decimal类型
+        
+        Args:
+            value: 要转换的值
+            
+        Returns:
+            Decimal对象,如果转换失败返回None
+        """
+        try:
+            return Decimal(str(value))
+        except Exception:
+            return None
+
+    @staticmethod
+    def deep_merge(dict1: Dict, dict2: Dict) -> Dict:
+        """深度合并两个字典
+        
+        Args:
+            dict1: 第一个字典
+            dict2: 第二个字典
+            
+        Returns:
+            合并后的字典
+        """
+        result = dict1.copy()
+        for key, value in dict2.items():
+            if key in result and isinstance(result[key], dict) and isinstance(
+                    value, dict):
+                result[key] = DataUtil.deep_merge(result[key], value)
+            else:
+                result[key] = value
+        return result
+
+    @staticmethod
+    def flatten_dict(d: Dict, parent_key: str = '', sep: str = '.') -> Dict:
+        """将嵌套字典展平
+        
+        Args:
+            d: 要展平的字典
+            parent_key: 父级键名
+            sep: 分隔符
+            
+        Returns:
+            展平后的字典
+        """
+        items = []
+        for k, v in d.items():
+            new_key = f"{parent_key}{sep}{k}" if parent_key else k
+            if isinstance(v, dict):
+                items.extend(
+                    DataUtil.flatten_dict(v, new_key, sep=sep).items())
+            else:
+                items.append((new_key, v))
+        return dict(items)
+
+    @staticmethod
+    def get_nested_value(data: Dict, keys: List[str], default=None):
+        """获取嵌套字典中的值
+        
+        Args:
+            data: 要查找的字典
+            keys: 键名列表
+            default: 默认值
+            
+        Returns:
+            查找到的值,如果不存在返回默认值
+        """
+        for key in keys:
+            if isinstance(data, dict) and key in data:
+                data = data[key]
+            else:
+                return default
+        return data

+ 116 - 0
SERVER/app/utils/date_util.py

@@ -0,0 +1,116 @@
+from datetime import datetime, timedelta
+from typing import Optional, Tuple
+
+
+class DateUtil:
+    """日期时间工具类"""
+
+    @staticmethod
+    def get_current_time() -> datetime:
+        """获取当前时间
+        
+        Returns:
+            当前时间
+        """
+        return datetime.now()
+
+    @staticmethod
+    def format_date(date: datetime, fmt: str = '%Y-%m-%d %H:%M:%S') -> str:
+        """格式化日期时间
+        
+        Args:
+            date: 日期时间对象
+            fmt: 格式化字符串
+            
+        Returns:
+            格式化后的日期时间字符串
+        """
+        return date.strftime(fmt)
+
+    @staticmethod
+    def parse_date(date_str: str,
+                   fmt: str = '%Y-%m-%d %H:%M:%S') -> Optional[datetime]:
+        """解析日期时间字符串
+        
+        Args:
+            date_str: 日期时间字符串
+            fmt: 格式化字符串
+            
+        Returns:
+            解析后的日期时间对象,如果解析失败返回None
+        """
+        try:
+            return datetime.strptime(date_str, fmt)
+        except ValueError:
+            return None
+
+    @staticmethod
+    def get_date_range(start_date: datetime,
+                       end_date: datetime) -> Tuple[datetime, datetime]:
+        """获取日期范围
+        
+        Args:
+            start_date: 开始日期
+            end_date: 结束日期
+            
+        Returns:
+            包含开始日期和结束日期的元组
+        """
+        return (start_date, end_date)
+
+    @staticmethod
+    def add_days(date: datetime, days: int) -> datetime:
+        """增加天数
+        
+        Args:
+            date: 日期时间对象
+            days: 要增加的天数
+            
+        Returns:
+            增加天数后的日期时间对象
+        """
+        return date + timedelta(days=days)
+
+    @staticmethod
+    def get_week_start_end(date: datetime) -> Tuple[datetime, datetime]:
+        """获取指定日期所在周的起始和结束日期
+        
+        Args:
+            date: 日期时间对象
+            
+        Returns:
+            包含周起始日期和结束日期的元组
+        """
+        start = date - timedelta(days=date.weekday())
+        end = start + timedelta(days=6)
+        return (start, end)
+
+    @staticmethod
+    def is_leap_year(year: int) -> bool:
+        """判断是否为闰年
+        
+        Args:
+            year: 年份
+            
+        Returns:
+            bool: 是否为闰年
+        """
+        return year % 4 == 0 and (year % 100 != 0 or year % 400 == 0)
+
+    @staticmethod
+    def get_days_in_month(year: int, month: int) -> int:
+        """获取指定月份的天数
+        
+        Args:
+            year: 年份
+            month: 月份
+            
+        Returns:
+            指定月份的天数
+        """
+        if month == 2:
+            return 29 if DateUtil.is_leap_year(year) else 28
+        elif month in [4, 6, 9, 11]:
+            return 30
+        else:
+            return 31

+ 123 - 0
SERVER/app/utils/file_util.py

@@ -0,0 +1,123 @@
+import os
+import shutil
+from typing import Optional, List
+
+
+class FileUtil:
+    """文件操作工具类"""
+
+    @staticmethod
+    def get_file_extension(filename: str) -> Optional[str]:
+        """获取文件扩展名
+        
+        Args:
+            filename: 文件名
+            
+        Returns:
+            文件扩展名,如果文件没有扩展名返回None
+        """
+        _, ext = os.path.splitext(filename)
+        return ext[1:] if ext else None
+
+    @staticmethod
+    def get_file_size(filepath: str) -> Optional[int]:
+        """获取文件大小
+        
+        Args:
+            filepath: 文件路径
+            
+        Returns:
+            文件大小(字节),如果文件不存在返回None
+        """
+        try:
+            return os.path.getsize(filepath)
+        except OSError:
+            return None
+
+    @staticmethod
+    def is_file_exists(filepath: str) -> bool:
+        """检查文件是否存在
+        
+        Args:
+            filepath: 文件路径
+            
+        Returns:
+            bool: 文件是否存在
+        """
+        return os.path.isfile(filepath)
+
+    @staticmethod
+    def create_directory(dirpath: str) -> bool:
+        """创建目录
+        
+        Args:
+            dirpath: 目录路径
+            
+        Returns:
+            bool: 是否创建成功
+        """
+        try:
+            os.makedirs(dirpath, exist_ok=True)
+            return True
+        except OSError:
+            return False
+
+    @staticmethod
+    def list_files(dirpath: str,
+                   recursive: bool = False) -> Optional[List[str]]:
+        """列出目录下的文件
+        
+        Args:
+            dirpath: 目录路径
+            recursive: 是否递归列出
+            
+        Returns:
+            文件路径列表,如果目录不存在返回None
+        """
+        if not os.path.isdir(dirpath):
+            return None
+
+        if recursive:
+            file_list = []
+            for root, _, files in os.walk(dirpath):
+                for file in files:
+                    file_list.append(os.path.join(root, file))
+            return file_list
+        else:
+            return [
+                f for f in os.listdir(dirpath)
+                if os.path.isfile(os.path.join(dirpath, f))
+            ]
+
+    @staticmethod
+    def delete_file(filepath: str) -> bool:
+        """删除文件
+        
+        Args:
+            filepath: 文件路径
+            
+        Returns:
+            bool: 是否删除成功
+        """
+        try:
+            os.remove(filepath)
+            return True
+        except OSError:
+            return False
+
+    @staticmethod
+    def copy_file(src: str, dst: str) -> bool:
+        """复制文件
+        
+        Args:
+            src: 源文件路径
+            dst: 目标文件路径
+            
+        Returns:
+            bool: 是否复制成功
+        """
+        try:
+            shutil.copy2(src, dst)
+            return True
+        except OSError:
+            return False

+ 79 - 0
SERVER/app/utils/network_util.py

@@ -0,0 +1,79 @@
+import re
+import socket
+import urllib.parse
+from typing import Optional, Tuple
+
+
+class NetworkUtil:
+    """网络工具类"""
+
+    @staticmethod
+    def is_valid_ip(ip: str) -> bool:
+        """验证IP地址格式
+        
+        Args:
+            ip: IP地址
+            
+        Returns:
+            bool: 是否为有效IP地址
+        """
+        try:
+            socket.inet_aton(ip)
+            return True
+        except socket.error:
+            return False
+
+    @staticmethod
+    def extract_domain(url: str) -> Optional[str]:
+        """从URL中提取域名
+        
+        Args:
+            url: 完整URL
+            
+        Returns:
+            提取的域名,如果解析失败返回None
+        """
+        try:
+            parsed = urllib.parse.urlparse(url)
+            if parsed.netloc:
+                return parsed.netloc
+            return None
+        except Exception:
+            return None
+
+    @staticmethod
+    def is_valid_url(url: str) -> bool:
+        """验证URL格式
+        
+        Args:
+            url: 要验证的URL
+            
+        Returns:
+            bool: 是否为有效URL
+        """
+        pattern = re.compile(
+            r'^(?:http|ftp)s?://'  # http:// or https://
+            r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|'  # domain...
+            r'localhost|'  # localhost...
+            r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'  # ...or ip
+            r'(?::\d+)?'  # optional port
+            r'(?:/?|[/?]\S+)$',
+            re.IGNORECASE)
+        return bool(re.match(pattern, url))
+
+    @staticmethod
+    def get_ip_info(ip: str) -> Optional[Tuple[str, str]]:
+        """获取IP地址的地理位置信息
+        
+        Args:
+            ip: IP地址
+            
+        Returns:
+            包含国家和城市的元组,如果获取失败返回None
+        """
+        try:
+            # 这里可以集成第三方IP查询服务
+            # 示例返回
+            return ('China', 'Beijing')
+        except Exception:
+            return None

+ 98 - 0
SERVER/app/utils/string_util.py

@@ -0,0 +1,98 @@
+import re
+from typing import Optional, Union
+
+
+class StringUtil:
+    """字符串处理工具类"""
+
+    @staticmethod
+    def is_empty(s: Optional[str]) -> bool:
+        """判断字符串是否为空
+        
+        Args:
+            s: 输入字符串
+            
+        Returns:
+            bool: 是否为空
+        """
+        return s is None or len(s.strip()) == 0
+
+    @staticmethod
+    def to_camel_case(s: str) -> str:
+        """将下划线命名转换为驼峰命名
+        
+        Args:
+            s: 输入字符串
+            
+        Returns:
+            驼峰命名字符串
+        """
+        parts = s.split('_')
+        return parts[0] + ''.join(x.title() for x in parts[1:])
+
+    @staticmethod
+    def to_snake_case(s: str) -> str:
+        """将驼峰命名转换为下划线命名
+        
+        Args:
+            s: 输入字符串
+            
+        Returns:
+            下划线命名字符串
+        """
+        s = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', s)
+        return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s).lower()
+
+    @staticmethod
+    def truncate(s: str, length: int, suffix: str = '...') -> str:
+        """截断字符串
+        
+        Args:
+            s: 输入字符串
+            length: 最大长度
+            suffix: 后缀
+            
+        Returns:
+            截断后的字符串
+        """
+        if len(s) <= length:
+            return s
+        return s[:length - len(suffix)] + suffix
+
+    @staticmethod
+    def is_email(s: str) -> bool:
+        """验证是否为有效的邮箱地址
+        
+        Args:
+            s: 输入字符串
+            
+        Returns:
+            bool: 是否为有效邮箱
+        """
+        pattern = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'
+        return bool(re.match(pattern, s))
+
+    @staticmethod
+    def is_phone(s: str) -> bool:
+        """验证是否为有效的手机号码
+        
+        Args:
+            s: 输入字符串
+            
+        Returns:
+            bool: 是否为有效手机号
+        """
+        pattern = r'^1[3-9]\d{9}$'
+        return bool(re.match(pattern, s))
+
+    @staticmethod
+    def join_with_comma(items: list) -> str:
+        """将列表元素用逗号连接
+        
+        Args:
+            items: 输入列表
+            
+        Returns:
+            连接后的字符串
+        """
+        return ', '.join(str(item) for item in items)

+ 119 - 0
SERVER/app/utils/validation_util.py

@@ -0,0 +1,119 @@
+import re
+from typing import Optional, Union
+from datetime import datetime
+
+
+class ValidationUtil:
+    """验证工具类"""
+
+    @staticmethod
+    def is_email(email: str) -> bool:
+        """验证邮箱格式
+        
+        Args:
+            email: 邮箱地址
+            
+        Returns:
+            bool: 是否为有效邮箱
+        """
+        pattern = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'
+        return bool(re.match(pattern, email))
+
+    @staticmethod
+    def is_phone(phone: str) -> bool:
+        """验证手机号格式
+        
+        Args:
+            phone: 手机号码
+            
+        Returns:
+            bool: 是否为有效手机号
+        """
+        pattern = r'^1[3-9]\d{9}$'
+        return bool(re.match(pattern, phone))
+
+    @staticmethod
+    def is_id_card(id_card: str) -> bool:
+        """验证身份证号格式
+        
+        Args:
+            id_card: 身份证号码
+            
+        Returns:
+            bool: 是否为有效身份证号
+        """
+        pattern = r'^\d{17}[\dXx]$'
+        return bool(re.match(pattern, id_card))
+
+    @staticmethod
+    def is_date(date_str: str, fmt: str = '%Y-%m-%d') -> bool:
+        """验证日期格式
+        
+        Args:
+            date_str: 日期字符串
+            fmt: 日期格式
+            
+        Returns:
+            bool: 是否为有效日期
+        """
+        try:
+            datetime.strptime(date_str, fmt)
+            return True
+        except ValueError:
+            return False
+
+    @staticmethod
+    def is_number(value: Union[str, int, float]) -> bool:
+        """验证是否为数字
+        
+        Args:
+            value: 输入值
+            
+        Returns:
+            bool: 是否为数字
+        """
+        if isinstance(value, (int, float)):
+            return True
+        try:
+            float(value)
+            return True
+        except ValueError:
+            return False
+
+    @staticmethod
+    def is_in_range(value: Union[int, float],
+                    min_val: Optional[Union[int, float]] = None,
+                    max_val: Optional[Union[int, float]] = None) -> bool:
+        """验证数值是否在指定范围内
+        
+        Args:
+            value: 要验证的值
+            min_val: 最小值
+            max_val: 最大值
+            
+        Returns:
+            bool: 是否在范围内
+        """
+        if min_val is not None and value < min_val:
+            return False
+        if max_val is not None and value > max_val:
+            return False
+        return True
+
+    @staticmethod
+    def is_empty(value: Optional[Union[str, list, dict]]) -> bool:
+        """验证值是否为空
+        
+        Args:
+            value: 输入值
+            
+        Returns:
+            bool: 是否为空
+        """
+        if value is None:
+            return True
+        if isinstance(value, str) and not value.strip():
+            return True
+        if isinstance(value, (list, dict)) and not value:
+            return True
+        return False