123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- from sqlalchemy.orm import aliased
- from sqlalchemy import and_, or_, func
- from core.models import TotalBudgetInfoModel, TotalBudgetItemModel, ChapterModel
- from tools import db_helper
- class BudgetStore:
- def __init__(self):
- self._database = None
- self._db_session = None
- pass
- def get_budget_info(self, project_id: str):
- self._database = project_id
- with db_helper.sqlserver_query_session(self._database) as db_session:
- budgets = db_session.query(TotalBudgetInfoModel).all()
- if budgets is None:
- return None
- return budgets
- def get_budget_items(self, project_id: str, budget_id: int):
- self._database = project_id
- with db_helper.sqlserver_query_session(self._database) as db_session:
- if budget_id != 0:
- budget_items = db_session.query(TotalBudgetItemModel).all()
- else:
- budget_items = (
- db_session.query(TotalBudgetItemModel)
- .filter(TotalBudgetItemModel.budget_id == budget_id)
- .all()
- )
- if budget_items is None:
- return None
- return budget_items
- def get_budget_item_by_item_code(
- self, project_id: str, budget_id: int, item_code: str
- ):
- self._database = project_id
- with db_helper.sqlserver_query_session(self._database) as db_session:
- budget = (
- db_session.query(
- TotalBudgetItemModel.budget_id,
- TotalBudgetItemModel.item_id,
- ChapterModel.item_code,
- ChapterModel.chapter,
- ChapterModel.section,
- ChapterModel.project_name,
- ChapterModel.item_type,
- ChapterModel.unit,
- )
- .join(
- ChapterModel, ChapterModel.item_id == TotalBudgetItemModel.item_id
- )
- .filter(
- and_(
- TotalBudgetItemModel.budget_id == budget_id,
- ChapterModel.item_code == item_code,
- )
- )
- .first()
- )
- if budget is None:
- return None
- return budget
- def _build_children_count_subquery(self, model_class):
- # 创建父节点和子节点的别名
- parent = aliased(model_class, name="parent")
- child = aliased(model_class, name="child")
- # 子查询:计算每个节点的直接子节点数量
- return (
- self.db_session.query(
- parent.item_code.label("parent_code"),
- func.count(child.item_code).label("child_count"),
- )
- .outerjoin(
- child,
- or_(
- # 匹配形如01-01的格式
- child.item_code.like(parent.item_code + "-__"),
- # 匹配形如0101的格式
- child.item_code.like(parent.item_code + "__"),
- ),
- )
- .group_by(parent.item_code)
- .subquery()
- )
- def _build_budget_items_query(self, budget_id: int):
- # 子查询:计算每个节点的直接子节点数量
- children_count = self._build_children_count_subquery(ChapterModel)
- return (
- self.db_session.query(
- TotalBudgetItemModel.budget_id,
- TotalBudgetItemModel.item_id,
- ChapterModel.item_code,
- ChapterModel.chapter,
- ChapterModel.section,
- ChapterModel.project_name,
- ChapterModel.item_type,
- ChapterModel.unit,
- func.coalesce(children_count.c.child_count, 0).label("children_count"),
- )
- .join(ChapterModel, ChapterModel.item_id == TotalBudgetItemModel.item_id)
- .outerjoin(
- children_count, children_count.c.parent_code == ChapterModel.item_code
- )
- .filter(TotalBudgetItemModel.budget_id == budget_id)
- .distinct()
- )
- def get_top_budget_items(
- self, project_id: str, budget_id: int, item_code: list[str] = None
- ):
- self._database = project_id
- with db_helper.sqlserver_query_session(self._database) as self.db_session:
- query = self._build_budget_items_query(budget_id)
- if item_code:
- query = query.filter(ChapterModel.item_code.in_(item_code))
- else:
- query = query.filter(ChapterModel.item_code.like("__")).filter(
- ChapterModel.chapter.is_not(None)
- )
- query = query.order_by(ChapterModel.item_code)
- items = query.all()
- return items
- def get_child_budget_items(
- self, project_id: str, budget_id: int, parent_item_code: str
- ):
- # 构建子节点的模式:支持两种格式
- # 1. 父级编号后跟-和两位数字(如:01-01)
- # 2. 父级编号直接跟两位数字(如:0101)
- pattern_with_dash = f"{parent_item_code}-__"
- pattern_without_dash = f"{parent_item_code}__"
- self._database = project_id
- with db_helper.sqlserver_query_session(self._database) as self.db_session:
- query = (
- self._build_budget_items_query(budget_id)
- .filter(
- or_(
- ChapterModel.item_code.like(pattern_with_dash),
- ChapterModel.item_code.like(pattern_without_dash),
- )
- )
- .order_by(ChapterModel.item_code)
- )
- items = query.all()
- return items
- def get_all_budget_items_not_children(
- self, project_id: str, budget_id: int, item_code: str
- ):
- self._database = project_id
- with db_helper.sqlserver_query_session(self._database) as self.db_session:
- # 添加叶子节点过滤条件,使用复用的子查询方法
- children_count = self._build_children_count_subquery(ChapterModel)
- query = (
- self.db_session.query(
- TotalBudgetItemModel.budget_id,
- TotalBudgetItemModel.item_id,
- ChapterModel.item_code,
- ChapterModel.chapter,
- ChapterModel.section,
- ChapterModel.project_name,
- ChapterModel.item_type,
- ChapterModel.unit,
- func.coalesce(children_count.c.child_count, 0).label(
- "children_count"
- ),
- )
- .join(
- ChapterModel, ChapterModel.item_id == TotalBudgetItemModel.item_id
- )
- .outerjoin(
- children_count,
- children_count.c.parent_code == ChapterModel.item_code,
- )
- .filter(TotalBudgetItemModel.budget_id == budget_id)
- .distinct()
- )
- query = query.filter(func.coalesce(children_count.c.child_count, 0) == 0)
- # 如果指定了item_code,添加前缀匹配条件
- if item_code:
- pattern_with_dash = f"{item_code}-%"
- query = query.filter(ChapterModel.item_code.like(pattern_with_dash))
- query = query.order_by(ChapterModel.item_code)
- items = query.all()
- return items
|