project_task.py 7.3 KB


  1. from sqlalchemy import and_
  2. from datetime import datetime
  3. from typing import Optional
  4. import tools.db_helper as db_helper
  5. from core.dtos import ProjectTaskDto
  6. from core.models import ProjectTaskModel
  7. from core.user_session import UserSession
  8. class ProjectTaskStore:
  9. def __init__(self):
  10. self._current_user = None
  11. self._database=None
  12. @property
  13. def current_user(self):
  14. if self._current_user is None:
  15. self._current_user = UserSession.get_current_user()
  16. return self._current_user
  17. def get_tasks_paginated(
  18. self,
  19. budget_id: int,
  20. project_id: str,
  21. item_code: str,
  22. page: int = 1,
  23. page_size: int = 10,
  24. keyword: Optional[str] = None,
  25. collect_status: Optional[int] = None,
  26. process_status: Optional[int] = None,
  27. send_status: Optional[int] = None,
  28. ) :
  29. """分页查询任务列表
  30. Args:
  31. page: 页码,从1开始
  32. page_size: 每页数量
  33. project_id: 项目编号
  34. budget_id: 概算序号
  35. item_code: 条目编号
  36. keyword: 关键字
  37. collect_status: 采集状态
  38. process_status: 处理状态
  39. send_status: 发送状态
  40. Returns:
  41. """
  42. with db_helper.mysql_query_session(self._database) as db_session:
  43. query = db_session.query(ProjectTaskModel)
  44. # 构建查询条件
  45. conditions = [
  46. ProjectTaskModel.is_del == 0,
  47. ProjectTaskModel.project_id == project_id,
  48. ProjectTaskModel.budget_id == budget_id,
  49. ProjectTaskModel.item_code.like(f"{item_code}%")
  50. ]
  51. if keyword:
  52. conditions.append(ProjectTaskModel.task_name.like(f'%{keyword}%'))
  53. if collect_status is not None:
  54. conditions.append(ProjectTaskModel.collect_status == collect_status)
  55. if process_status is not None:
  56. conditions.append(ProjectTaskModel.process_status == process_status)
  57. if send_status is not None:
  58. conditions.append(ProjectTaskModel.send_status == send_status)
  59. query = query.filter(and_(*conditions))
  60. # 计算总数
  61. total_count = query.count()
  62. # 分页
  63. query = query.offset((page - 1) * page_size).limit(page_size)
  64. tasks = query.all()
  65. return {
  66. 'total': total_count,
  67. 'data': tasks
  68. }
  69. def get_task(self, task_id: int) -> Optional[ProjectTaskModel]:
  70. with db_helper.mysql_query_session(self._database) as db_session:
  71. task = db_session.query(ProjectTaskModel).filter(
  72. and_(
  73. ProjectTaskModel.id == task_id,
  74. ProjectTaskModel.is_del == 0
  75. )).first()
  76. return task
  77. def get_task_dto(self, task_id: int) -> Optional[ProjectTaskDto]:
  78. """根据ID查询任务
  79. Args:
  80. task_id: 任务ID
  81. Returns:
  82. Optional[ProjectTaskDto]
  83. """
  84. task_dto = self.get_task(task_id)
  85. return ProjectTaskDto.from_model(task_dto) if task_dto else None
  86. def create_task(self, task_dto: ProjectTaskDto) -> ProjectTaskDto:
  87. """创建任务
  88. Args:
  89. task_dto: 任务DTO
  90. Returns:
  91. ProjectTaskDto
  92. """
  93. task = ProjectTaskModel(
  94. task_name=task_dto.task_name,
  95. task_desc=task_dto.task_desc,
  96. project_id=task_dto.project_id,
  97. budget_id=task_dto.budget_id,
  98. item_id=task_dto.item_id,
  99. item_code=task_dto.item_code,
  100. file_path=task_dto.file_path,
  101. created_by=self.current_user.username,
  102. created_at=datetime.now(),
  103. )
  104. with db_helper.mysql_session(self._database) as db_session:
  105. db_session.add(task)
  106. db_session.flush()
  107. return ProjectTaskDto.from_model(task)
  108. def update_task(self, task_dto: ProjectTaskDto) -> Optional[ProjectTaskDto]:
  109. """更新任务
  110. Args:
  111. task_dto: 任务DTO
  112. Returns:
  113. Optional[ProjectTaskDto]
  114. """
  115. task = self.get_task(task_dto.id)
  116. if not task:
  117. return None
  118. with db_helper.mysql_session(self._database) as db_session:
  119. task.task_name = task_dto.task_name
  120. task.task_desc = task_dto.task_desc
  121. # task.project_id = task_dto.project_id
  122. # task.budget_id = task_dto.budget_id
  123. # task.item_id = task_dto.item_id
  124. # task.item_code = task_dto.item_code
  125. # task.file_path = task_dto.file_path
  126. task.updated_by=self.current_user.username
  127. task.updated_at=datetime.now()
  128. task = db_session.merge(task)
  129. return ProjectTaskDto.from_model(task)
  130. def update_task_files(self, task_id: int,files: str):
  131. task = self.get_task(task_id)
  132. if not task:
  133. return None
  134. with db_helper.mysql_session(self._database) as db_session:
  135. task.file_path = files
  136. if task.collect_status != 0:
  137. task.collect_status = 4
  138. if task.process_status != 0:
  139. task.process_status = 4
  140. if task.send_status != 0:
  141. task.send_status = 4
  142. task.updated_by=self.current_user.username
  143. task.updated_at=datetime.now()
  144. task = db_session.merge(task)
  145. return ProjectTaskDto.from_model(task)
  146. def delete_task(self, task_id: int) -> bool:
  147. """删除任务
  148. Args:
  149. task_id: 任务ID
  150. Returns:
  151. bool
  152. """
  153. task = self.get_task(task_id)
  154. if not task:
  155. return False
  156. with db_helper.mysql_session(self._database) as db_session:
  157. task.is_del = 1
  158. task.deleted_by = self.current_user.username
  159. task.deleted_at = datetime.now()
  160. task = db_session.merge(task)
  161. return True
  162. def update_collect_status(self,task_id:int, status:int, err:str = None):
  163. task = self.get_task(task_id)
  164. if not task:
  165. return False
  166. with db_helper.mysql_session(self._database) as db_session:
  167. task.collect_status = status
  168. if err:
  169. task.collect_error = err
  170. task.collect_time = datetime.now()
  171. task = db_session.merge(task)
  172. return True
  173. def update_process_status(self,task_id:int, status:int, err:str = None):
  174. task = self.get_task(task_id)
  175. if not task:
  176. return False
  177. with db_helper.mysql_session(self._database) as db_session:
  178. task.process_status = status
  179. if err:
  180. task.process_error = err
  181. task.process_time = datetime.now()
  182. task = db_session.merge(task)
  183. return True
  184. def update_send_status(self,task_id:int, status:int, err:str = None):
  185. task = self.get_task(task_id)
  186. if not task:
  187. return False
  188. with db_helper.mysql_session(self._database) as db_session:
  189. task.send_status = status
  190. if err:
  191. task.send_error = err
  192. task.send_time = datetime.now()
  193. task = db_session.merge(task)
  194. return True