budget.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. from sqlalchemy.orm import aliased
  2. from sqlalchemy import and_, or_, func
  3. from core.models import TotalBudgetInfoModel, TotalBudgetItemModel, ChapterModel
  4. from tools import db_helper
  5. class BudgetStore:
  6. def __init__(self):
  7. self._database = None
  8. self._db_session = None
  9. pass
  10. def get_budget_info(self, project_id: str):
  11. self._database = project_id
  12. with db_helper.sqlserver_query_session(self._database) as db_session:
  13. budgets = db_session.query(TotalBudgetInfoModel).all()
  14. if budgets is None:
  15. return None
  16. return budgets
  17. def get_budget_items(self, project_id: str, budget_id: int):
  18. self._database = project_id
  19. with db_helper.sqlserver_query_session(self._database) as db_session:
  20. if budget_id != 0:
  21. budget_items = db_session.query(TotalBudgetItemModel).all()
  22. else:
  23. budget_items = (
  24. db_session.query(TotalBudgetItemModel)
  25. .filter(TotalBudgetItemModel.budget_id == budget_id)
  26. .all()
  27. )
  28. if budget_items is None:
  29. return None
  30. return budget_items
  31. def get_budget_item_by_item_code(
  32. self, project_id: str, budget_id: int, item_code: str
  33. ):
  34. self._database = project_id
  35. with db_helper.sqlserver_query_session(self._database) as db_session:
  36. budget = (
  37. db_session.query(
  38. TotalBudgetItemModel.budget_id,
  39. TotalBudgetItemModel.item_id,
  40. ChapterModel.item_code,
  41. ChapterModel.chapter,
  42. ChapterModel.section,
  43. ChapterModel.project_name,
  44. ChapterModel.item_type,
  45. ChapterModel.unit,
  46. )
  47. .join(
  48. ChapterModel, ChapterModel.item_id == TotalBudgetItemModel.item_id
  49. )
  50. .filter(
  51. and_(
  52. TotalBudgetItemModel.budget_id == budget_id,
  53. ChapterModel.item_code == item_code,
  54. )
  55. )
  56. .first()
  57. )
  58. if budget is None:
  59. return None
  60. return budget
  61. def _build_children_count_subquery(self, model_class):
  62. # 创建父节点和子节点的别名
  63. parent = aliased(model_class, name="parent")
  64. child = aliased(model_class, name="child")
  65. # 子查询:计算每个节点的直接子节点数量
  66. return (
  67. self.db_session.query(
  68. parent.item_code.label("parent_code"),
  69. func.count(child.item_code).label("child_count"),
  70. )
  71. .outerjoin(
  72. child,
  73. or_(
  74. # 匹配形如01-01的格式
  75. child.item_code.like(parent.item_code + "-__"),
  76. # 匹配形如0101的格式
  77. child.item_code.like(parent.item_code + "__"),
  78. ),
  79. )
  80. .group_by(parent.item_code)
  81. .subquery()
  82. )
  83. def _build_budget_items_query(self, budget_id: int):
  84. # 子查询:计算每个节点的直接子节点数量
  85. children_count = self._build_children_count_subquery(ChapterModel)
  86. return (
  87. self.db_session.query(
  88. TotalBudgetItemModel.budget_id,
  89. TotalBudgetItemModel.item_id,
  90. ChapterModel.item_code,
  91. ChapterModel.chapter,
  92. ChapterModel.section,
  93. ChapterModel.project_name,
  94. ChapterModel.item_type,
  95. ChapterModel.unit,
  96. func.coalesce(children_count.c.child_count, 0).label("children_count"),
  97. )
  98. .join(ChapterModel, ChapterModel.item_id == TotalBudgetItemModel.item_id)
  99. .outerjoin(
  100. children_count, children_count.c.parent_code == ChapterModel.item_code
  101. )
  102. .filter(TotalBudgetItemModel.budget_id == budget_id)
  103. .distinct()
  104. )
  105. def get_top_budget_items(
  106. self, project_id: str, budget_id: int, item_code: list[str] = None
  107. ):
  108. self._database = project_id
  109. with db_helper.sqlserver_query_session(self._database) as self.db_session:
  110. query = self._build_budget_items_query(budget_id)
  111. if item_code:
  112. query = query.filter(ChapterModel.item_code.in_(item_code))
  113. else:
  114. query = query.filter(ChapterModel.item_code.like("__")).filter(
  115. ChapterModel.chapter.is_not(None)
  116. )
  117. query = query.order_by(ChapterModel.item_code)
  118. items = query.all()
  119. return items
  120. def get_child_budget_items(
  121. self, project_id: str, budget_id: int, parent_item_code: str
  122. ):
  123. # 构建子节点的模式:支持两种格式
  124. # 1. 父级编号后跟-和两位数字(如:01-01)
  125. # 2. 父级编号直接跟两位数字(如:0101)
  126. pattern_with_dash = f"{parent_item_code}-__"
  127. pattern_without_dash = f"{parent_item_code}__"
  128. self._database = project_id
  129. with db_helper.sqlserver_query_session(self._database) as self.db_session:
  130. query = (
  131. self._build_budget_items_query(budget_id)
  132. .filter(
  133. or_(
  134. ChapterModel.item_code.like(pattern_with_dash),
  135. ChapterModel.item_code.like(pattern_without_dash),
  136. )
  137. )
  138. .order_by(ChapterModel.item_code)
  139. )
  140. items = query.all()
  141. return items
  142. def get_all_budget_items_not_children(
  143. self, project_id: str, budget_id: int, item_code: str
  144. ):
  145. self._database = project_id
  146. with db_helper.sqlserver_query_session(self._database) as self.db_session:
  147. # 添加叶子节点过滤条件,使用复用的子查询方法
  148. children_count = self._build_children_count_subquery(ChapterModel)
  149. query = (
  150. self.db_session.query(
  151. TotalBudgetItemModel.budget_id,
  152. TotalBudgetItemModel.item_id,
  153. ChapterModel.item_code,
  154. ChapterModel.chapter,
  155. ChapterModel.section,
  156. ChapterModel.project_name,
  157. ChapterModel.item_type,
  158. ChapterModel.unit,
  159. func.coalesce(children_count.c.child_count, 0).label(
  160. "children_count"
  161. ),
  162. )
  163. .join(
  164. ChapterModel, ChapterModel.item_id == TotalBudgetItemModel.item_id
  165. )
  166. .outerjoin(
  167. children_count,
  168. children_count.c.parent_code == ChapterModel.item_code,
  169. )
  170. .filter(TotalBudgetItemModel.budget_id == budget_id)
  171. .distinct()
  172. )
  173. query = query.filter(func.coalesce(children_count.c.child_count, 0) == 0)
  174. # 如果指定了item_code,添加前缀匹配条件
  175. if item_code:
  176. pattern_with_dash = f"{item_code}-%"
  177. query = query.filter(ChapterModel.item_code.like(pattern_with_dash))
  178. query = query.order_by(ChapterModel.item_code)
  179. items = query.all()
  180. return items