project_quota.py 11 KB


  1. from sqlalchemy import and_, or_
  2. from datetime import datetime
  3. from typing import Optional
  4. import tools.db_helper as db_helper
  5. from core.dtos import ProjectQuotaDto
  6. from core.enum import SendStatusEnum
  7. from core.models import ProjectQuotaModel
  8. from core.user_session import UserSession
  9. class ProjectQuotaStore:
  10. def __init__(self):
  11. # self._database= None
  12. self._database = "iwb_railway_costing_v1"
  13. self._current_user = None
  14. @property
  15. def current_user(self):
  16. if self._current_user is None:
  17. self._current_user = UserSession.get_current_user()
  18. return self._current_user
  19. def get_quotas_paginated(
  20. self,
  21. budget_id: int,
  22. project_id: str,
  23. item_code: str,
  24. page: int = 1,
  25. page_size: int = 10,
  26. keyword: Optional[str] = None,
  27. send_status: Optional[int] = None,
  28. ):
  29. """分页查询定额列表
  30. Args:
  31. page: 页码,从1开始
  32. page_size: 每页数量
  33. project_id: 项目编号
  34. budget_id: 概算序号
  35. item_code: 条目序号
  36. keyword: 关键字
  37. send_status: 发送状态
  38. Returns:
  39. Tuple[total_count, quotas]
  40. """
  41. with db_helper.mysql_query_session(self._database) as db_session:
  42. query = db_session.query(ProjectQuotaModel)
  43. # 构建查询条件
  44. conditions = [
  45. ProjectQuotaModel.is_del == 0,
  46. ProjectQuotaModel.project_id == project_id,
  47. ProjectQuotaModel.budget_id == budget_id,
  48. ProjectQuotaModel.item_code.like(f"{item_code}%"),
  49. ]
  50. if send_status is not None:
  51. conditions.append(ProjectQuotaModel.send_status == send_status)
  52. if keyword:
  53. conditions.append(
  54. or_(
  55. ProjectQuotaModel.quota_code.like(f"%{keyword}%"),
  56. ProjectQuotaModel.entry_name.like(f"%{keyword}%"),
  57. )
  58. )
  59. query = query.filter(and_(*conditions))
  60. # 计算总数
  61. total_count = query.count()
  62. # 分页
  63. query = (
  64. query.order_by(ProjectQuotaModel.created_at.desc())
  65. .offset((page - 1) * page_size)
  66. .limit(page_size)
  67. )
  68. quotas = query.all()
  69. return {"total": total_count, "data": quotas}
  70. def get_quotas_by_task_paginated(
  71. self,
  72. task_id: int,
  73. budget_id,
  74. project_id,
  75. item_code,
  76. page: int = 1,
  77. page_size: int = 10,
  78. keyword: Optional[str] = None,
  79. send_status: Optional[int] = None,
  80. ):
  81. with db_helper.mysql_query_session(self._database) as db_session:
  82. query = db_session.query(ProjectQuotaModel).filter(
  83. and_(
  84. ProjectQuotaModel.task_id == task_id,
  85. ProjectQuotaModel.budget_id == budget_id,
  86. ProjectQuotaModel.project_id == project_id,
  87. ProjectQuotaModel.item_code.like(f"{item_code}%"),
  88. ProjectQuotaModel.is_del == 0,
  89. )
  90. )
  91. if keyword:
  92. query = query.filter(
  93. or_(
  94. ProjectQuotaModel.quota_code.like(f"%{keyword}%"),
  95. ProjectQuotaModel.entry_name.like(f"%{keyword}%"),
  96. )
  97. )
  98. if send_status is not None:
  99. query = query.filter(ProjectQuotaModel.send_status == send_status)
  100. # 计算总数
  101. total_count = query.count()
  102. # 分页
  103. query = (
  104. query.order_by(ProjectQuotaModel.id.desc())
  105. .offset((page - 1) * page_size)
  106. .limit(page_size)
  107. )
  108. quotas = query.all()
  109. return {"total": total_count, "data": quotas}
  110. def get_quotas_by_task_id(self, task_id: int, with_quota_code: bool = False):
  111. with db_helper.mysql_query_session(self._database) as db_session:
  112. query = db_session.query(ProjectQuotaModel).filter(
  113. and_(
  114. ProjectQuotaModel.task_id == task_id, ProjectQuotaModel.is_del == 0
  115. )
  116. )
  117. if with_quota_code:
  118. query = query.filter(
  119. and_(
  120. ProjectQuotaModel.quota_code != None,
  121. ProjectQuotaModel.quota_code != "",
  122. )
  123. )
  124. quotas = query.all()
  125. return quotas
  126. def get_quota(self, quota_id: int) -> Optional[ProjectQuotaModel]:
  127. """根据ID查询定额
  128. Args:
  129. quota_id: 定额ID
  130. Returns:
  131. Optional[ProjectQuotaDto]
  132. """
  133. with db_helper.mysql_query_session(self._database) as db_session:
  134. quota = (
  135. db_session.query(ProjectQuotaModel)
  136. .filter(
  137. and_(
  138. ProjectQuotaModel.id == quota_id, ProjectQuotaModel.is_del == 0
  139. )
  140. )
  141. .first()
  142. )
  143. return quota
  144. def get_quota_dto(self, quota_id: int) -> Optional[ProjectQuotaDto]:
  145. """根据ID查询定额
  146. Args:
  147. quota_id: 定额ID
  148. Returns:
  149. Optional[ProjectQuotaDto]
  150. """
  151. quota = self.get_quota(quota_id)
  152. return ProjectQuotaDto.from_model(quota) if quota else None
  153. def get_quota_by_quota_input(
  154. self, project_id: str, budget_id: int, quota_input_id: int
  155. ):
  156. with db_helper.mysql_query_session(self._database) as db_session:
  157. quota = (
  158. db_session.query(ProjectQuotaModel)
  159. .filter(
  160. and_(
  161. ProjectQuotaModel.project_id == project_id,
  162. ProjectQuotaModel.budget_id == budget_id,
  163. ProjectQuotaModel.quota_id == quota_input_id,
  164. ProjectQuotaModel.is_del == 0,
  165. )
  166. )
  167. .first()
  168. )
  169. return ProjectQuotaDto.from_model(quota) if quota else None
  170. def create_quota(self, quota_dto: ProjectQuotaDto) -> ProjectQuotaDto:
  171. """创建定额
  172. Args:
  173. quota_dto: 定额DTO
  174. Returns:
  175. ProjectQuotaDto
  176. """
  177. with db_helper.mysql_session(self._database) as db_session:
  178. quota = ProjectQuotaModel(
  179. project_id=quota_dto.project_id,
  180. budget_id=quota_dto.budget_id,
  181. task_id=quota_dto.task_id,
  182. item_id=quota_dto.item_id,
  183. item_code=quota_dto.item_code,
  184. quota_id=quota_dto.quota_id,
  185. quota_code=quota_dto.quota_code,
  186. entry_name=quota_dto.entry_name,
  187. units=quota_dto.units,
  188. amount=quota_dto.amount,
  189. ex_file=quota_dto.ex_file,
  190. ex_cell=quota_dto.ex_cell,
  191. ex_row=quota_dto.ex_row,
  192. ex_unit=quota_dto.ex_unit,
  193. ex_amount=quota_dto.ex_amount,
  194. send_status=SendStatusEnum.NEW.value,
  195. send_time=quota_dto.send_time,
  196. send_error=quota_dto.send_error,
  197. created_by=quota_dto.created_by or self.current_user.username,
  198. created_at=datetime.now(),
  199. )
  200. db_session.add(quota)
  201. db_session.flush()
  202. return ProjectQuotaDto.from_model(quota)
  203. def update_quota(self, quota_dto: ProjectQuotaDto) -> Optional[ProjectQuotaDto]:
  204. """更新定额
  205. Args:
  206. quota_dto: 定额DTO
  207. Returns:
  208. Optional[ProjectQuotaDto]
  209. """
  210. quota = self.get_quota(quota_dto.id)
  211. if not quota:
  212. return None
  213. with db_helper.mysql_session(self._database) as db_session:
  214. quota.quota_id = quota_dto.quota_id
  215. quota.quota_code = quota_dto.quota_code
  216. quota.entry_name = quota_dto.entry_name
  217. quota.units = quota_dto.units
  218. quota.amount = quota_dto.amount
  219. quota.ex_file = quota_dto.ex_file
  220. quota.ex_cell = quota_dto.ex_cell
  221. quota.ex_row = quota_dto.ex_row
  222. quota.ex_unit = quota_dto.ex_unit
  223. quota.ex_amount = quota_dto.ex_amount
  224. quota.send_status = quota_dto.send_status
  225. quota.send_error = None
  226. quota.updated_by = self.current_user.username
  227. quota.updated_at = datetime.now()
  228. quota = db_session.merge(quota)
  229. return ProjectQuotaDto.from_model(quota)
  230. def update_quota_chapter(
  231. self, quota_id: int, item_id: int, item_code: str
  232. ) -> Optional[ProjectQuotaDto]:
  233. quota = self.get_quota(quota_id)
  234. if not quota:
  235. raise Exception(f"定额条目[{quota_id}]不存在")
  236. with db_helper.mysql_session(self._database) as db_session:
  237. quota.item_id = item_id
  238. quota.item_code = item_code
  239. quota.updated_by = self.current_user.username
  240. quota.updated_at = datetime.now()
  241. quota = db_session.merge(quota)
  242. return ProjectQuotaDto.from_model(quota)
  243. def delete_quota(self, quota_id: int) -> bool:
  244. """删除定额
  245. Args:
  246. quota_id: 定额ID
  247. Returns:
  248. bool
  249. """
  250. quota = self.get_quota(quota_id)
  251. if not quota:
  252. return False
  253. with db_helper.mysql_session(self._database) as db_session:
  254. quota.is_del = 1
  255. quota.deleted_by = self.current_user.username
  256. quota.deleted_at = datetime.now()
  257. db_session.merge(quota)
  258. return True
  259. def update_send_status(self, quota_id: int, status: int, err: str = None) -> bool:
  260. """
  261. 更新发送状态
  262. Args:
  263. quota_id: int
  264. status: int
  265. err: str
  266. Returns:
  267. bool
  268. """
  269. quota = self.get_quota(quota_id)
  270. if not quota:
  271. return False
  272. with db_helper.mysql_session(self._database) as db_session:
  273. quota.send_status = status
  274. quota.send_error = err
  275. quota.send_time = datetime.now()
  276. db_session.merge(quota)
  277. return True
  278. def update_quota_code(self, quota_dto: ProjectQuotaDto):
  279. """更新定额编号
  280. Args:
  281. quota_dto: 定额DTO
  282. Returns:
  283. bool
  284. """
  285. quota = self.get_quota(quota_dto.id)
  286. if not quota:
  287. return False
  288. with db_helper.mysql_session(self._database) as db_session:
  289. quota.quota_code = quota_dto.quota_code
  290. quota.unit = quota_dto.unit
  291. quota.unit_price = quota_dto.unit_price
  292. quota.total_price = quota_dto.total_price
  293. quota.updated_by = self.current_user.username
  294. quota.updated_at = datetime.now()
  295. db_session.merge(quota)
  296. return True