project.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. from typing import Optional
  2. from datetime import datetime, timedelta
  3. from core.dtos import TotalBudgetInfoDto, TotalBudgetItemDto
  4. from core.dtos.project import ProjectDto
  5. from core.dtos.tree import TreeDto
  6. from core.user_session import UserSession
  7. from stores import ProjectStore,BudgetStore
  8. class ProjectService:
  9. def __init__(self):
  10. self._project_store = ProjectStore()
  11. self._budget_store = BudgetStore()
  12. def get_projects_paginated(self, page: int, page_size: int, keyword: Optional[str] = None,
  13. start_time: Optional[str] = None,
  14. end_time: Optional[str] = None, can_edit:Optional[int]=0,):
  15. # 处理开始时间和结束时间
  16. start_datetime = None
  17. end_datetime = None
  18. try:
  19. if start_time:
  20. start_datetime = datetime.strptime(start_time, '%Y-%m-%d').replace(hour=0, minute=0, second=0)
  21. if end_time:
  22. end_datetime = datetime.strptime(end_time, '%Y-%m-%d').replace(hour=0, minute=0, second=0)
  23. # 使用timedelta来处理结束时间加一天
  24. end_datetime = end_datetime + timedelta(days=1)
  25. except ValueError as e:
  26. raise ValueError(f"日期格式错误,请使用YYYY-MM-DD格式: {str(e)}")
  27. data = self._project_store.get_user_projects_paginated(page, page_size, keyword, start_datetime, end_datetime, can_edit)
  28. return [ProjectDto.from_model(item).to_dict() for item in data.get('data',[])],data.get('total',0)
  29. def get_budget_info(self, project_id: str):
  30. msg = self._check_project_db_exit(project_id)
  31. if msg:
  32. return None, msg
  33. data = self._budget_store.get_budget_info(project_id)
  34. return [TotalBudgetInfoDto.from_model(item).to_dict() for item in data],""
  35. def get_top_budget_items(self, budget_id: int, project_id: str):
  36. msg = self._check_project_db_exit(project_id)
  37. if msg:
  38. return None, msg
  39. items = self._budget_store.get_top_budget_items(project_id,budget_id)
  40. return [TotalBudgetItemDto.from_model(item).to_dict() for item in items],""
  41. def get_budget_items(self, budget_id: str, project_id: str, item_code:str):
  42. if not budget_id:
  43. return None,'budget_id不能为空'
  44. msg = self._check_project_db_exit(project_id)
  45. if msg:
  46. return None, msg
  47. data_list = []
  48. if not item_code:
  49. team_item_code = None
  50. current_user = UserSession.get_current_user()
  51. if not current_user.is_admin:
  52. team_item_code_str = self._project_store.get_team_project_item_code(project_id,current_user.username)
  53. if team_item_code_str:
  54. team_item_code = None if team_item_code_str == 'None' or team_item_code_str == '0' or team_item_code_str == '' else team_item_code_str.split(',')
  55. items = self._budget_store.get_top_budget_items(project_id,budget_id,team_item_code)
  56. else:
  57. items = self._budget_store.get_child_budget_items(project_id,budget_id,item_code)
  58. parent = "#"
  59. if item_code:
  60. item = self._budget_store.get_budget_item_by_item_code(project_id,budget_id,item_code)
  61. parent = item.item_id
  62. for item in items:
  63. text = f"第{item.chapter}章、{item.project_name}" if item.chapter else ( f"{item.section} {item.project_name}" if item.section else item.project_name)
  64. data_list.append(TreeDto(item.item_id,parent,text,item.children_count>0,item).to_dict())
  65. return data_list,""
  66. def _check_project_db_exit(self, project_id:str):
  67. if not project_id:
  68. return 'project_id不能为空'
  69. if not self._project_store.get(project_id):
  70. return '项目不存在'
  71. if not project_id.startswith('Reco'):
  72. return '项目id格式错误'
  73. return None