123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315 |
- from sqlalchemy import and_
- from datetime import datetime
- from typing import Optional
- import tools.db_helper as db_helper
- from core.dtos import ProjectTaskDto
- from core.enum import TaskStatusEnum
- from core.models import ProjectTaskModel
- from core.user_session import UserSession
- class ProjectTaskStore:
- def __init__(self):
- # self._database= None
- self._database = "Iwb_RailwayCosting"
- self._current_user = None
- @property
- def current_user(self):
- if self._current_user is None:
- self._current_user = UserSession.get_current_user()
- return self._current_user
- def get_tasks_paginated(
- self,
- project_id: str,
- item_code: str,
- page: int = 1,
- page_size: int = 10,
- keyword: Optional[str] = None,
- process_status: Optional[int] = None,
- send_status: Optional[int] = None,
- ):
- """分页查询任务列表
- Args:
- page: 页码,从1开始
- page_size: 每页数量
- project_id: 项目编号
- item_code: 条目编号
- keyword: 关键字
- process_status: 处理状态
- send_status: 发送状态
- Returns:
- """
- with db_helper.sqlserver_query_session(self._database) as db_session:
- query = db_session.query(ProjectTaskModel)
- # 构建查询条件
- conditions = [
- ProjectTaskModel.is_del == 0,
- ProjectTaskModel.project_id == project_id,
- # ProjectTaskModel.budget_id == budget_id,
- ProjectTaskModel.item_code.like(f"{item_code}%"),
- ]
- if keyword:
- conditions.append(ProjectTaskModel.task_name.like(f"%{keyword}%"))
- if process_status is not None:
- conditions.append(ProjectTaskModel.process_status == process_status)
- if send_status is not None:
- conditions.append(ProjectTaskModel.send_status == send_status)
- query = query.filter(and_(*conditions))
- # 计算总数
- total_count = query.count()
- # 分页
- query = (
- query.order_by(ProjectTaskModel.task_sort.desc())
- .order_by(ProjectTaskModel.created_at.desc())
- .offset((page - 1) * page_size)
- .limit(page_size)
- )
- tasks = query.all()
- return {"total": total_count, "data": tasks}
- def get_task_template_paginated(
- self,
- project_name: str,
- page: int = 1,
- page_size: int = 10,
- keyword: Optional[str] = None,
- ):
- """分页查询任务列表
- Args:
- page: 页码,从1开始
- page_size: 每页数量
- project_name: 项目编号
- keyword: 关键字
- Returns:
- """
- with db_helper.sqlserver_query_session(self._database) as db_session:
- query = db_session.query(ProjectTaskModel)
- # 构建查询条件
- conditions = [
- ProjectTaskModel.is_del == 0,
- ProjectTaskModel.is_template == 1,
- ProjectTaskModel.project_name.like(f"%{project_name}%"),
- ]
- if keyword:
- conditions.append(ProjectTaskModel.task_name.like(f"%{keyword}%"))
- conditions.append(ProjectTaskModel.created_by.like(f"%{keyword}%"))
- query = query.filter(and_(*conditions))
- # 计算总数
- total_count = query.count()
- # 分页
- query = (
- query.order_by(ProjectTaskModel.task_sort.desc())
- .order_by(ProjectTaskModel.created_at.desc())
- .offset((page - 1) * page_size)
- .limit(page_size)
- )
- tasks = query.all()
- return {"total": total_count, "data": tasks}
- def get_task(self, task_id: int) -> Optional[ProjectTaskModel]:
- with db_helper.sqlserver_query_session(self._database) as db_session:
- task = (
- db_session.query(ProjectTaskModel)
- .filter(
- and_(ProjectTaskModel.id == task_id, ProjectTaskModel.is_del == 0)
- )
- .first()
- )
- return task
- def get_task_dto(self, task_id: int) -> Optional[ProjectTaskDto]:
- """根据ID查询任务
- Args:
- task_id: 任务ID
- Returns:
- Optional[ProjectTaskDto]
- """
- task_dto = self.get_task(task_id)
- return ProjectTaskDto.from_model(task_dto) if task_dto else None
- def get_wait_tasks(self, project_id: str = None):
- """查询待处理的任务
- Args:
- project_id: 项目编号
- Returns:
- """
- with db_helper.sqlserver_query_session(self._database) as db_session:
- query = db_session.query(ProjectTaskModel)
- query = query.filter(
- and_(
- ProjectTaskModel.is_del == 0,
- ProjectTaskModel.process_status == TaskStatusEnum.WAIT.value,
- )
- )
- if project_id:
- query = query.filter(ProjectTaskModel.project_id == project_id)
- query.order_by(ProjectTaskModel.task_sort.desc())
- tasks = query.all()
- return [ProjectTaskDto.from_model(task) for task in tasks]
- def get_tasks_by_status(self, status: int):
- """查询指定状态的任务"""
- with db_helper.sqlserver_query_session(self._database) as db_session:
- query = db_session.query(ProjectTaskModel)
- query = query.filter(
- and_(
- ProjectTaskModel.is_del == 0,
- ProjectTaskModel.process_status == status,
- )
- )
- query.order_by(ProjectTaskModel.task_sort.desc())
- tasks = query.all()
- return [ProjectTaskDto.from_model(task) for task in tasks]
- def create_task(self, task_dto: ProjectTaskDto) -> ProjectTaskDto:
- """创建任务
- Args:
- task_dto: 任务DTO
- Returns:
- ProjectTaskDto
- """
- task = ProjectTaskModel(
- task_name=task_dto.task_name,
- task_desc=task_dto.task_desc,
- task_type=task_dto.task_type,
- is_template=task_dto.is_template,
- task_sort=task_dto.task_sort,
- project_id=task_dto.project_id,
- project_name=task_dto.project_name,
- budget_id=task_dto.budget_id,
- item_id=task_dto.item_id,
- item_code=task_dto.item_code,
- file_path=task_dto.file_path,
- created_by=self.current_user.username,
- created_at=datetime.now(),
- )
- with db_helper.sqlserver_session(self._database) as db_session:
- db_session.add(task)
- db_session.flush()
- return ProjectTaskDto.from_model(task)
- def update_task(self, task_dto: ProjectTaskDto) -> Optional[ProjectTaskDto]:
- """更新任务
- Args:
- task_dto: 任务DTO
- Returns:
- Optional[ProjectTaskDto]
- """
- task = self.get_task(task_dto.id)
- if not task:
- return None
- with db_helper.sqlserver_session(self._database) as db_session:
- task.task_name = task_dto.task_name
- task.task_desc = task_dto.task_desc
- task.task_sort = task_dto.task_sort
- task.budget_id = task_dto.budget_id
- # task.project_id = task_dto.project_id
- # task.item_id = task_dto.item_id
- # task.item_code = task_dto.item_code
- # task.file_path = task_dto.file_path
- task.updated_by = self.current_user.username
- task.updated_at = datetime.now()
- task = db_session.merge(task)
- return ProjectTaskDto.from_model(task)
- def update_task_files(self, task_id: int, files: str):
- task = self.get_task(task_id)
- if not task:
- return None
- with db_helper.sqlserver_session(self._database) as db_session:
- task.file_path = files
- if task.process_status != 0:
- task.process_status = 4
- if task.send_status != 0:
- task.send_status = 4
- task.updated_by = self.current_user.username
- task.updated_at = datetime.now()
- task = db_session.merge(task)
- return ProjectTaskDto.from_model(task)
- def delete_task(self, task_id: int) -> bool:
- """删除任务
- Args:
- task_id: 任务ID
- Returns:
- bool
- """
- task = self.get_task(task_id)
- if not task:
- return False
- with db_helper.sqlserver_session(self._database) as db_session:
- task.is_del = 1
- task.deleted_by = self.current_user.username
- task.deleted_at = datetime.now()
- db_session.merge(task)
- return True
- def update_task_status(self, task_id: int, status: int, err: str = None):
- task = self.get_task(task_id)
- if not task:
- return False
- with db_helper.sqlserver_session(self._database) as db_session:
- task.process_status = status
- if err:
- task.process_error = err
- task.process_time = datetime.now()
- db_session.merge(task)
- return True
- def update_process_status(self, task_id: int, status: int, err: str = None):
- task = self.get_task(task_id)
- if not task:
- return False
- with db_helper.sqlserver_session(self._database) as db_session:
- task.process_status = status
- if err:
- task.process_error = err
- task.process_time = datetime.now()
- db_session.merge(task)
- return True
- def update_send_status(self, task_id: int, status: int, err: str = None):
- task = self.get_task(task_id)
- if not task:
- return False
- with db_helper.sqlserver_session(self._database) as db_session:
- task.send_status = status
- if err:
- task.send_error = err
- task.send_time = datetime.now()
- db_session.merge(task)
- return True
|