123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226 |
- from core.dtos import TotalBudgetInfoDto, ChapterDto
- import base64
- class ExcelParseZgsDto:
- def __init__(self, zgs_id: int, zgs_code: str):
- self.zgs_id = zgs_id
- self.zgs_code = zgs_code
- @classmethod
- def from_dto(cls, dto: TotalBudgetInfoDto):
- return cls(zgs_id=dto.budget_id, zgs_code=dto.budget_code)
- def to_dict(self):
- return {"zgs_id": self.zgs_id, "zgs_code": self.zgs_code}
- class ExcelParseItemDto:
- def __init__(self, item_id: int, item_code: str, item_name: str):
- self.item_id = item_id
- self.item_code = item_code
- self.item_name = item_name
- @classmethod
- def from_dto(cls, dto: ChapterDto):
- return cls(
- item_id=dto.item_id, item_code=dto.item_code, item_name=dto.project_name
- )
- def to_dict(self):
- return {
- "item_id": self.item_id,
- "item_code": self.item_code,
- "item_name": self.item_name,
- }
- class ExcelParseFileDto:
- def __init__(self, file_id: str, content: str):
- self.file_id = file_id
- # 从file_id路径中获取文件类型
- self.file_type = file_id.split(".")[-1]
- self.content = content
- def to_dict(self):
- return {
- "file_id": base64.b64encode(self.file_id.encode()).decode(),
- "file_type": self.file_type,
- "content": self.content,
- }
- class ExcelParseDto:
- def __init__(
- self,
- task_id: int,
- version: str,
- project_id: str,
- project_name: str,
- project_stage: str,
- selected_zgs_id: int,
- zgs_list: list[ExcelParseZgsDto],
- selected_chapter: ExcelParseItemDto,
- # hierarchy: list[ExcelParseItemDto],
- # components: list[ExcelParseItemDto],
- files: list[ExcelParseFileDto],
- ):
- self.task_id = task_id
- self.version = version
- self.project_id = project_id
- self.project_name = project_name
- self.project_stage = project_stage
- self.selected_zgs_id = -1 if selected_zgs_id == 0 else selected_zgs_id
- self.zgs_list = zgs_list
- self.selected_chapter = selected_chapter
- # self.hierarchy = hierarchy
- # self.components = components
- self.files = files
- def to_dict(self):
- data = {
- "task_id": self.task_id,
- "version": self.version,
- "project_id": self.project_id,
- "project_name": self.project_name,
- "project_stage": self.project_stage,
- "selected_zgs_id": self.selected_zgs_id,
- "files": [file.to_dict() for file in self.files],
- "zgs_list": [zgs.to_dict() for zgs in self.zgs_list],
- "selected_chapter": self.selected_chapter.to_dict(),
- # "hierarchy": [item.to_dict() for item in self.hierarchy],
- # "components": [item.to_dict() for item in self.components],
- }
- return data
- class ExcelParseResultDataDto:
- # {
- # "zgs_id": 总概算id, // int
- # "zgs_code": 概算编号,
- # "item_id": 条⽬序号, // int
- # "item_code": 条⽬编码,
- # "dinge_code": 定额编号,
- # "entry_name": ⼯程或费⽤项⽬名称,来⾃于定额表,
- # "units": 单位,
- # "amount": 数量, // number
- # "target_id": ⽤户数据库中条⽬的id,-1表示没有, // int
- # "ex_file_id": excel⽂件id, // str
- # "ex_cell": 数量单元格位置,例如 "C17",
- # "ex_row": 该⾏内容,由逗号连接多个单元格得到,
- # "ex_unit": excel中给出的单位,
- # "ex_amount": excel中给出的数量, // number
- # }
- def __init__(
- self,
- zgs_id: int, # 总概算id
- zgs_code: str, # 总概算编号
- item_id: int, # 条⽬序号
- item_code: str, # 条⽬编码
- entry_name: str, # ⼯程或费⽤项⽬名称,来⾃于定额表,
- dinge_code: str, # 定额编号,
- dinge_adjust: str, # 定额调整,
- units: str, # 单位,
- amount: float, # 数量,
- target_id: int, # ⽤户数据库中条⽬的id,-1表示没有,
- ex_file_id: str, # excel⽂件id,
- ex_cell: str, # 数量单元格位置,例如 "C17",
- ex_row: str, # 该⾏内容,由逗号连接多个单元格得到,
- ex_unit: str, # excel中给出的单位,
- ex_amount: float, # excel中给出的数量,
- ):
- self.zgs_id = zgs_id
- self.zgs_code = zgs_code
- self.item_id = item_id
- self.item_code = item_code
- self.entry_name = entry_name
- self.dinge_code = dinge_code
- self.dinge_adjust = dinge_adjust
- self.units = units
- self.amount = amount
- self.target_id = target_id
- self.ex_file = ex_file_id
- self.ex_file_id = ex_file_id
- self.ex_cell = ex_cell
- self.ex_row = ex_row
- self.ex_unit = ex_unit
- self.ex_amount = ex_amount
- @classmethod
- def from_dict(cls, data: dict):
- return cls(
- zgs_id=data.get("zgs_id", 0),
- zgs_code=data.get("zgs_code", ""),
- item_id=data.get("item_id", 0),
- item_code=data.get("item_code", ""),
- entry_name=data.get("entry_name", ""),
- dinge_code=data.get("dinge_code", ""),
- dinge_adjust=data.get("dinge_adjust", ""),
- units=data.get("units", ""),
- amount=data.get("amount", 0.0),
- target_id=data.get("target_id", -1),
- ex_file_id=(
- base64.b64decode(data.get("ex_file_id", "").encode()).decode()
- if data.get("ex_file_id", "")
- else ""
- ),
- ex_cell=data.get("ex_cell", ""),
- ex_row=data.get("ex_row", ""),
- ex_unit=data.get("ex_unit", ""),
- ex_amount=data.get("ex_amount", 0.0),
- )
- def to_dict(self):
- return {
- "zgs_id": self.zgs_id,
- "zgs_code": self.zgs_code,
- "item_id": self.item_id,
- "item_code": self.item_code,
- "entry_name": self.entry_name,
- "dinge_code": self.dinge_code,
- "dinge_adjust": self.dinge_adjust,
- "target_id": self.target_id,
- "units": self.units,
- "amount": self.amount,
- "ex_file": self.ex_file_id,
- # "ex_file_id": base64.b64encode(self.ex_file_id.encode('utf-8')).decode('utf-8') if self.ex_file_id else self.ex_file_id,
- "ex_file_id": self.ex_file_id,
- "ex_cell": self.ex_cell,
- "ex_row": self.ex_row,
- "ex_unit": self.ex_unit,
- "ex_amount": self.ex_amount,
- }
- class ExcelParseResultDto:
- def __init__(
- self,
- task_id: int,
- result: int = -1,
- reason: str = "",
- data: list[ExcelParseResultDataDto] = None,
- ):
- self.task_id = task_id
- self.result = result # -1-失败;0-运行中;1-成功
- self.data = data
- self.reason = reason
- @classmethod
- def from_dict(cls, response: dict):
- data = [
- ExcelParseResultDataDto.from_dict(item) for item in response.get("data", [])
- ]
- return cls(
- task_id=response.get("task_id", 0),
- result=response.get("result", 0),
- reason=response.get("reason", ""),
- data=data,
- )
- def to_dict(self):
- return {
- "task_id": self.task_id,
- "result": self.result,
- "reason": self.reason,
- "data": [item.to_dict() for item in self.data] if self.data else [],
- }
|