123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- from typing import Optional
- from datetime import datetime, timedelta
- from core.dtos import TotalBudgetInfoDto, TotalBudgetItemDto
- from core.dtos.project import ProjectDto
- from core.dtos.tree import TreeDto
- from core.user_session import UserSession
- from stores import ProjectStore,BudgetStore
- class ProjectService:
- def __init__(self):
- self._project_store = ProjectStore()
- self._budget_store = BudgetStore()
- def get_projects_paginated(self, page: int, page_size: int, keyword: Optional[str] = None,
- start_time: Optional[str] = None,
- end_time: Optional[str] = None, can_edit:Optional[int]=0,):
-
- # 处理开始时间和结束时间
- start_datetime = None
- end_datetime = None
- try:
- if start_time:
- start_datetime = datetime.strptime(start_time, '%Y-%m-%d').replace(hour=0, minute=0, second=0)
- if end_time:
- end_datetime = datetime.strptime(end_time, '%Y-%m-%d').replace(hour=0, minute=0, second=0)
- # 使用timedelta来处理结束时间加一天
- end_datetime = end_datetime + timedelta(days=1)
- except ValueError as e:
- raise ValueError(f"日期格式错误,请使用YYYY-MM-DD格式: {str(e)}")
- data = self._project_store.get_user_projects_paginated(page, page_size, keyword, start_datetime, end_datetime, can_edit)
- return [ProjectDto.from_model(item).to_dict() for item in data.get('data',[])],data.get('total',0)
- def get_budget_info(self, project_id: str):
- msg = self._check_project_db_exit(project_id)
- if msg:
- return None, msg
- data = self._budget_store.get_budget_info(project_id)
- return [TotalBudgetInfoDto.from_model(item).to_dict() for item in data],""
- def get_top_budget_items(self, budget_id: int, project_id: str):
- msg = self._check_project_db_exit(project_id)
- if msg:
- return None, msg
- items = self._budget_store.get_top_budget_items(project_id,budget_id)
- return [TotalBudgetItemDto.from_model(item).to_dict() for item in items],""
- def get_budget_items(self, budget_id: str, project_id: str, item_code:str):
- if not budget_id:
- return None,'budget_id不能为空'
- msg = self._check_project_db_exit(project_id)
- if msg:
- return None, msg
- data_list = []
- if not item_code:
- team_item_code = None
- current_user = UserSession.get_current_user()
- if not current_user.is_admin:
- team_item_code_str = self._project_store.get_team_project_item_code(project_id,current_user.username)
- if team_item_code_str:
- team_item_code = None if team_item_code_str == 'None' or team_item_code_str == '0' or team_item_code_str == '' else team_item_code_str.split(',')
- items = self._budget_store.get_top_budget_items(project_id,budget_id,team_item_code)
- else:
- items = self._budget_store.get_child_budget_items(project_id,budget_id,item_code)
- parent = "#"
- if item_code:
- item = self._budget_store.get_budget_item_by_item_code(project_id,budget_id,item_code)
- parent = item.item_id
- for item in items:
- text = f"第{item.chapter}章、{item.project_name}" if item.chapter else ( f"{item.section} {item.project_name}" if item.section else item.project_name)
- data_list.append(TreeDto(item.item_id,parent,text,item.children_count>0,item).to_dict())
- return data_list,""
- def _check_project_db_exit(self, project_id:str):
- if not project_id:
- return 'project_id不能为空'
- if not self._project_store.get(project_id):
- return '项目不存在'
- if not project_id.startswith('Reco'):
- return '项目id格式错误'
- return None
|