123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433 |
- from sqlalchemy import and_, or_, func, Integer
- from datetime import datetime
- from typing import Optional
- import tools.db_helper as db_helper
- from core.dtos import ProjectQuotaDto
- from core.enum import SendStatusEnum
- from core.models import ProjectQuotaModel
- from core.user_session import UserSession
- class ProjectQuotaStore:
- def __init__(self):
- # self._database= None
- self._database = "iwb_railway_costing_v1"
- self._current_user = None
- @property
- def current_user(self):
- if self._current_user is None:
- self._current_user = UserSession.get_current_user()
- return self._current_user
- def get_quotas_paginated(
- self,
- budget_id: int,
- project_id: str,
- item_code: str,
- page: int = 1,
- page_size: int = 10,
- keyword: Optional[str] = None,
- send_status: Optional[int] = None,
- ):
- """分页查询定额列表
- Args:
- page: 页码,从1开始
- page_size: 每页数量
- project_id: 项目编号
- budget_id: 概算序号
- item_code: 条目序号
- keyword: 关键字
- send_status: 发送状态
- Returns:
- Tuple[total_count, quotas]
- """
- with db_helper.mysql_query_session(self._database) as db_session:
- query = db_session.query(ProjectQuotaModel)
- # 构建查询条件
- conditions = [
- ProjectQuotaModel.is_del == 0,
- ProjectQuotaModel.project_id == project_id,
- ProjectQuotaModel.budget_id == budget_id,
- ProjectQuotaModel.item_code.like(f"{item_code}%"),
- ]
- if send_status is not None:
- conditions.append(ProjectQuotaModel.send_status == send_status)
- if keyword:
- conditions.append(
- or_(
- ProjectQuotaModel.quota_code.like(f"%{keyword}%"),
- ProjectQuotaModel.entry_name.like(f"%{keyword}%"),
- )
- )
- query = query.filter(and_(*conditions))
- # 计算总数
- total_count = query.count()
- # 分页
- query = (
- query.order_by(ProjectQuotaModel.created_at.desc())
- .offset((page - 1) * page_size)
- .limit(page_size)
- )
- quotas = query.all()
- return {"total": total_count, "data": quotas}
- def get_quotas_by_task_paginated(
- self,
- task_id: int,
- budget_id,
- project_id,
- item_code,
- page: int = 1,
- page_size: int = 10,
- keyword: Optional[str] = None,
- send_status: Optional[int] = None,
- ):
- with db_helper.mysql_query_session(self._database) as db_session:
- query = db_session.query(ProjectQuotaModel).filter(
- and_(
- ProjectQuotaModel.task_id == task_id,
- ProjectQuotaModel.budget_id == budget_id,
- ProjectQuotaModel.project_id == project_id,
- ProjectQuotaModel.item_code.like(f"{item_code}%"),
- ProjectQuotaModel.is_del == 0,
- )
- )
- if keyword:
- query = query.filter(
- or_(
- ProjectQuotaModel.quota_code.like(f"%{keyword}%"),
- ProjectQuotaModel.entry_name.like(f"%{keyword}%"),
- ProjectQuotaModel.item_code.like(f"%{keyword}%"),
- )
- )
- if send_status is not None:
- query = query.filter(ProjectQuotaModel.send_status == send_status)
- # 计算总数
- total_count = query.count()
- # 分页
- query = (
- query.order_by(
- func.cast(func.substr(ProjectQuotaModel.ex_cell, 2), Integer).asc(),
- ProjectQuotaModel.sort.asc(),
- )
- .offset((page - 1) * page_size)
- .limit(page_size)
- )
- quotas = query.all()
- return {"total": total_count, "data": quotas}
- def get_quotas_by_task_id(self, task_id: int, with_quota_code: bool = False):
- with db_helper.mysql_query_session(self._database) as db_session:
- query = db_session.query(ProjectQuotaModel).filter(
- and_(
- ProjectQuotaModel.task_id == task_id, ProjectQuotaModel.is_del == 0
- )
- )
- if with_quota_code:
- query = query.filter(
- and_(
- ProjectQuotaModel.quota_code != None,
- ProjectQuotaModel.quota_code != "",
- )
- )
- quotas = query.all()
- return quotas
- def get_quota(self, quota_id: int) -> Optional[ProjectQuotaModel]:
- """根据ID查询定额
- Args:
- quota_id: 定额ID
- Returns:
- Optional[ProjectQuotaDto]
- """
- with db_helper.mysql_query_session(self._database) as db_session:
- quota = (
- db_session.query(ProjectQuotaModel)
- .filter(
- and_(
- ProjectQuotaModel.id == quota_id, ProjectQuotaModel.is_del == 0
- )
- )
- .first()
- )
- return quota
- def get_quota_dto(self, quota_id: int) -> Optional[ProjectQuotaDto]:
- """根据ID查询定额
- Args:
- quota_id: 定额ID
- Returns:
- Optional[ProjectQuotaDto]
- """
- quota = self.get_quota(quota_id)
- return ProjectQuotaDto.from_model(quota) if quota else None
- def get_quota_by_quota_input(
- self, project_id: str, budget_id: int, quota_input_id: int
- ):
- with db_helper.mysql_query_session(self._database) as db_session:
- quota = (
- db_session.query(ProjectQuotaModel)
- .filter(
- and_(
- ProjectQuotaModel.project_id == project_id,
- ProjectQuotaModel.budget_id == budget_id,
- ProjectQuotaModel.quota_id == quota_input_id,
- ProjectQuotaModel.is_del == 0,
- )
- )
- .first()
- )
- return ProjectQuotaDto.from_model(quota) if quota else None
- def create_quota(self, quota_dto: ProjectQuotaDto) -> ProjectQuotaDto:
- """创建定额
- Args:
- quota_dto: 定额DTO
- Returns:
- ProjectQuotaDto
- """
- with db_helper.mysql_session(self._database) as db_session:
- quota = ProjectQuotaModel(
- project_id=quota_dto.project_id,
- budget_id=quota_dto.budget_id,
- budget_code=quota_dto.budget_code,
- task_id=quota_dto.task_id,
- item_id=quota_dto.item_id,
- item_code=quota_dto.item_code,
- sort=quota_dto.sort,
- quota_id=quota_dto.quota_id,
- quota_code=quota_dto.quota_code,
- copy_quota_id=quota_dto.copy_quota_id,
- quota_adjustment=quota_dto.quota_adjustment,
- entry_name=quota_dto.entry_name,
- units=quota_dto.units,
- amount=quota_dto.amount,
- ex_file=quota_dto.ex_file,
- ex_cell=quota_dto.ex_cell,
- ex_row=quota_dto.ex_row,
- ex_unit=quota_dto.ex_unit,
- ex_amount=quota_dto.ex_amount,
- note=quota_dto.note,
- is_change=quota_dto.is_change,
- send_status=SendStatusEnum.NEW.value,
- send_time=quota_dto.send_time,
- send_error=quota_dto.send_error,
- created_by=quota_dto.created_by or self.current_user.username,
- created_at=datetime.now(),
- )
- db_session.add(quota)
- db_session.flush()
- return ProjectQuotaDto.from_model(quota)
- def update_quota(self, quota_dto: ProjectQuotaDto) -> Optional[ProjectQuotaDto]:
- """更新定额
- Args:
- quota_dto: 定额DTO
- Returns:
- Optional[ProjectQuotaDto]
- """
- quota = self.get_quota(quota_dto.id)
- if not quota:
- return None
- with db_helper.mysql_session(self._database) as db_session:
- quota = self._map_quota_dto(quota, quota_dto)
- quota = db_session.merge(quota)
- return ProjectQuotaDto.from_model(quota)
- def update_quota_all(self, quota_dto: ProjectQuotaDto) -> Optional[ProjectQuotaDto]:
- """更新定额
- Args:
- quota_dto: 定额DTO
- Returns:
- Optional[ProjectQuotaDto]
- """
- quota = self.get_quota(quota_dto.id)
- if not quota:
- return None
- with db_helper.mysql_session(self._database) as db_session:
- quota = self._map_quota_dto(quota, quota_dto)
- quota.item_code = quota_dto.item_code
- quota.item_id = quota_dto.item_id
- quota = db_session.merge(quota)
- return ProjectQuotaDto.from_model(quota)
- def _map_quota_dto(self, quota, quota_dto):
- quota.quota_id = quota_dto.quota_id
- quota.quota_code = quota_dto.quota_code
- quota.quota_adjustment = quota_dto.quota_adjustment
- quota.entry_name = quota_dto.entry_name
- quota.units = quota_dto.units
- quota.sort = quota_dto.sort
- quota.amount = quota_dto.amount
- quota.ex_file = quota_dto.ex_file
- quota.ex_cell = quota_dto.ex_cell
- quota.ex_row = quota_dto.ex_row
- quota.ex_unit = quota_dto.ex_unit
- quota.ex_amount = quota_dto.ex_amount
- quota.note = quota_dto.note
- quota.is_change = quota_dto.is_change
- quota.send_status = quota_dto.send_status
- quota.send_error = None
- quota.updated_by = self.current_user.username
- quota.updated_at = datetime.now()
- return quota
- def update_quota_id(self, id: int, quota_id: int):
- quota = self.get_quota(id)
- if not quota:
- raise Exception(f"定额条目[{id}]不存在")
- with db_helper.mysql_session(self._database) as db_session:
- quota.quota_id = quota_id
- quota.is_change = 0
- quota.copy_quota_id = -1 if quota.copy_quota_id > 0 else quota.copy_quota_id
- quota = db_session.merge(quota)
- return ProjectQuotaDto.from_model(quota)
- def update_quota_change(self, quota_id: int, is_change: bool):
- quota = self.get_quota(quota_id)
- if not quota:
- raise Exception(f"定额条目[{quota_id}]不存在")
- with db_helper.mysql_session(self._database) as db_session:
- if is_change:
- quota.is_change = 1
- quota.send_status = (
- SendStatusEnum.CHANGE.value
- if quota.send_status == SendStatusEnum.SUCCESS.value
- else quota.send_status
- )
- else:
- quota.is_change = 0
- quota = db_session.merge(quota)
- return ProjectQuotaDto.from_model(quota)
- def update_quota_chapter(
- self, quota_id: int, item_id: int, item_code: str
- ) -> Optional[ProjectQuotaDto]:
- quota = self.get_quota(quota_id)
- if not quota:
- raise Exception(f"定额条目[{quota_id}]不存在")
- with db_helper.mysql_session(self._database) as db_session:
- sort = self.get_next_sort(quota.project_id, quota.budget_id, item_id)
- quota.item_id = item_id
- quota.item_code = item_code
- quota.is_change = 2
- quota.sort = sort
- quota.send_status = (
- SendStatusEnum.CHANGE.value
- if quota.send_status == SendStatusEnum.SUCCESS.value
- else quota.send_status
- )
- quota.updated_by = self.current_user.username
- quota.updated_at = datetime.now()
- quota = db_session.merge(quota)
- return ProjectQuotaDto.from_model(quota)
- def delete_quota(self, quota_id: int) -> bool:
- """删除定额
- Args:
- quota_id: 定额ID
- Returns:
- bool
- """
- quota = self.get_quota(quota_id)
- if not quota:
- return False
- with db_helper.mysql_session(self._database) as db_session:
- quota.is_del = 1
- quota.deleted_by = self.current_user.username
- quota.deleted_at = datetime.now()
- db_session.merge(quota)
- return True
- def update_send_status(self, quota_id: int, status: int, err: str = None) -> bool:
- """
- 更新发送状态
- Args:
- quota_id: int
- status: int
- err: str
- Returns:
- bool
- """
- quota = self.get_quota(quota_id)
- if not quota:
- return False
- with db_helper.mysql_session(self._database) as db_session:
- quota.send_status = status
- quota.send_error = err
- quota.send_time = datetime.now()
- db_session.merge(quota)
- return True
- def update_quota_code(self, quota_dto: ProjectQuotaDto):
- """更新定额编号
- Args:
- quota_dto: 定额DTO
- Returns:
- bool
- """
- quota = self.get_quota(quota_dto.id)
- if not quota:
- return False
- with db_helper.mysql_session(self._database) as db_session:
- quota.quota_code = quota_dto.quota_code
- quota.unit = quota_dto.unit
- quota.unit_price = quota_dto.unit_price
- quota.total_price = quota_dto.total_price
- quota.updated_by = self.current_user.username
- quota.updated_at = datetime.now()
- db_session.merge(quota)
- return True
- def get_next_sort(self, project_id: str, budget_id: int, item_id: int):
- with db_helper.mysql_query_session(self._database) as db_session:
- num = (
- db_session.query(ProjectQuotaModel.sort)
- .filter(
- ProjectQuotaModel.project_id == project_id,
- ProjectQuotaModel.budget_id == budget_id,
- ProjectQuotaModel.item_id == item_id,
- )
- .order_by(ProjectQuotaModel.sort.desc())
- .first()
- )
- return num[0] + 1 if num and num[0] else 1
- def update_ge_sort(self, project_id: str, budget_id: int, item_id: int, sort: int):
- with db_helper.mysql_session(self._database) as db_session:
- db_session.query(ProjectQuotaModel).filter(
- ProjectQuotaModel.project_id == project_id,
- ProjectQuotaModel.budget_id == budget_id,
- ProjectQuotaModel.item_id == item_id,
- ProjectQuotaModel.sort >= sort,
- ).update({ProjectQuotaModel.sort: ProjectQuotaModel.sort + 1})
|