| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- from datetime import datetime
- from enum import Enum
- from typing import (
- Optional,
- TypeVar,
- Type,
- Dict,
- Any,
- List,
- Union,
- ClassVar,
- get_type_hints,
- get_origin,
- get_args,
- Generic,
- )
- from pydantic import BaseModel
- ModelType = TypeVar("ModelType")
- class DtoBase(BaseModel):
- @classmethod
- def from_model(cls, model):
- """从数据库模型转换为DTO实例(兼容所有ORM模型)"""
- # 优先使用模型自带的to_dict方法(SQLAlchemy模型)
- if hasattr(model, "to_dict"):
- return model.to_dict()
- # 其次尝试Pydantic模型的model_dump
- elif hasattr(model, "model_dump"):
- return model.model_dump()
- # 最终回退使用实例字典
- return model.__dict__
- """基础DTO类,提供通用的转换方法和验证方法"""
- __abstract__ = True
- id: Optional[int] = None
- # 需要格式化的日期时间字段列表
- datetime_fields: ClassVar[List[str]] = []
- # 日期时间格式
- datetime_format: ClassVar[str] = "%Y-%m-%d %H:%M:%S"
- @classmethod
- def from_dict(cls: Type[ModelType], data: Dict[str, Any]) -> ModelType:
- """从字典创建DTO对象,统一的字典转DTO方法"""
- # 预处理枚举类型
- processed_data = cls.preprocess_enum_fields(data)
- return cls(**processed_data)
- def to_dict(self) -> Dict[str, Any]:
- """转换为字典格式,自动处理日期时间字段"""
- data = self.model_dump()
- # 处理所有日期时间字段
- for field in self.datetime_fields:
- if field in data and data[field] is not None:
- if isinstance(data[field], datetime):
- data[field] = data[field].strftime(self.datetime_format)
- # 处理枚举类型字段
- for field_name, field_value in data.items():
- if isinstance(field_value, Enum):
- data[field_name] = field_value.value
- return data
- @classmethod
- def validate_field_length(
- cls, field_name: str, value: str, max_length: int
- ) -> None:
- """验证字段长度"""
- if value and len(value) > max_length:
- raise ValueError(f"{field_name}长度不能超过{max_length}个字符")
- @classmethod
- def validate_required_field(cls, field_name: str, value: Any) -> None:
- """验证必填字段"""
- if value is None:
- raise ValueError(f"{field_name}不能为空")
- @classmethod
- def convert_to_enum(cls, enum_class: Type[Enum], value: Any) -> Enum | None:
- """将值转换为枚举类型"""
- if value is None:
- return None
- if isinstance(value, enum_class):
- return value
- # 尝试使用to_enum方法转换
- if hasattr(enum_class, "to_enum"):
- return enum_class.to_enum(value)
- # 标准转换方式
- try:
- return enum_class(value)
- except ValueError:
- # 获取枚举的第一个值作为默认值
- return next(iter(enum_class))
- @classmethod
- def preprocess_enum_fields(cls, data: Dict[str, Any]) -> Dict[str, Any]:
- """预处理字典中的枚举类型字段"""
- if not data:
- return data
- result = data.copy()
- hints = get_type_hints(cls)
- for field_name, field_type in hints.items():
- # 跳过不在数据中的字段
- if field_name not in result:
- continue
- # 处理Optional类型
- origin = get_origin(field_type)
- if origin is Union:
- args = get_args(field_type)
- for arg in args:
- if isinstance(arg, type) and issubclass(arg, Enum):
- result[field_name] = cls.convert_to_enum(
- arg, result[field_name]
- )
- break
- # 直接处理Enum类型
- elif isinstance(field_type, type) and issubclass(field_type, Enum):
- result[field_name] = cls.convert_to_enum(field_type, result[field_name])
- return result
- class Config:
- from_attributes = True
- class CreateDtoBase(DtoBase):
- __abstract__ = True
- created_by: Optional[str] = None
- created_time: Optional[datetime] = None
- datetime_fields: ClassVar[List[str]] = ["created_time"]
- class UpdateDtoBase(CreateDtoBase):
- __abstract__ = True
- updated_by: Optional[str] = None
- updated_time: Optional[datetime] = None
- datetime_fields: ClassVar[List[str]] = ["created_time", "updated_time"]
- class FullDtoBase(UpdateDtoBase):
- """完整DTO类,包含所有基础字段"""
- __abstract__ = True
- is_del: Optional[int] = 0
- deleted_by: Optional[str] = None
- deleted_time: Optional[datetime] = None
- # 默认需要格式化的日期时间字段
- datetime_fields: ClassVar[List[str]] = [
- "created_time",
- "updated_time",
- "deleted_time",
- ]
- class PageDto(BaseModel):
- """分页查询DTO"""
- page_num: int = 1
- page_size: int = 10
- order: Optional[str] = None
- filters: Optional[Dict[str, Any]] = None
- filter_conditions: Optional[Dict[str, Any]] = None
- search: Optional[str] = None
- @property
- def offset(self):
- return (self.page_num - 1) * self.page_size
- @property
- def limit(self):
- return self.page_size
- GetSchemaType = TypeVar("GetSchemaType", bound=DtoBase)
- class PageResultDto(BaseModel, Generic[GetSchemaType]):
- """分页查询结果DTO"""
- total: int
- rows: List[GetSchemaType]
- page_size: Optional[int]
|