project.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. from sqlalchemy import and_, or_
  2. from datetime import datetime
  3. from typing import Optional
  4. import tools.db_helper as db_helper
  5. from core.dtos import ProjectDto
  6. from core.models.project import ProjectModel
  7. from core.models.team import TeamModel
  8. from core.user_session import UserSession
  9. class ProjectStore:
  10. def __init__(self):
  11. self._database= None
  12. def get_user_projects_paginated(
  13. self,
  14. page: int = 1,
  15. page_size: int = 10,
  16. keyword: Optional[str] = None,
  17. start_time: Optional[datetime] = None,
  18. end_time: Optional[datetime] = None,
  19. can_edit:Optional[int]=0,
  20. ):
  21. """
  22. 分页查询用户有权限的项目列表
  23. Args:
  24. page: 页码,从1开始
  25. page_size: 每页数量
  26. keyword: 关键字(模糊查询)
  27. start_time: 开始时间
  28. end_time: 结束时间
  29. can_edit: 是否只显示可编辑的项目
  30. Returns:
  31. Tuple[total_count, projects]
  32. """
  33. # 构建基础查询
  34. with db_helper.sqlserver_query_session(self._database) as db_session:
  35. query = (db_session.query(
  36. ProjectModel.project_id,
  37. ProjectModel.project_name,
  38. ProjectModel.project_manager,
  39. ProjectModel.design_stage,
  40. ProjectModel.project_description,
  41. ProjectModel.short_name,
  42. ProjectModel.project_version,
  43. ProjectModel.project_type,
  44. ProjectModel.unit,
  45. ProjectModel.create_time,
  46. ) .distinct())
  47. user = UserSession.get_current_user()
  48. if not user.is_admin:
  49. query = query.outerjoin(TeamModel, ProjectModel.project_id == TeamModel.project_id)
  50. if can_edit:
  51. query = query.filter(
  52. or_(ProjectModel.project_manager == user.username,
  53. and_(TeamModel.name == user.username, TeamModel.compilation_status == can_edit)
  54. )
  55. )
  56. else:
  57. query = query.filter(or_(ProjectModel.project_manager == user.username,TeamModel.name == user.username))
  58. # 添加过滤条件
  59. if keyword:
  60. query = query.filter(or_(
  61. ProjectModel.project_id.like(f'%{keyword}%'),
  62. ProjectModel.project_name.like(f'%{keyword}%'),
  63. ProjectModel.project_manager.like(f'%{keyword}%'),
  64. ProjectModel.project_description.like(f'%{keyword}%'),
  65. ProjectModel.short_name.like(f'%{keyword}%')
  66. ))
  67. if start_time:
  68. query = query.filter(ProjectModel.create_time >= start_time)
  69. if end_time:
  70. query = query.filter(ProjectModel.create_time < end_time)
  71. # 获取总记录数和数据
  72. total_count = query.count()
  73. projects = query.order_by(ProjectModel.create_time.desc())\
  74. .offset((page - 1) * page_size)\
  75. .limit(page_size)\
  76. .all()
  77. return {
  78. 'total': total_count,
  79. 'data': projects
  80. }
  81. def get_team_project_item_code(self,project_id:str, user_name:str):
  82. with db_helper.sqlserver_query_session(self._database) as session:
  83. db_session = session
  84. data = db_session.query(TeamModel.item_code).filter(and_(TeamModel.project_id == project_id,TeamModel.name == user_name)).first()
  85. return data[0] if data else None
  86. def get(self,project_id:str):
  87. with db_helper.sqlserver_query_session(self._database) as session:
  88. db_session = session
  89. data = db_session.query(ProjectModel).filter(ProjectModel.project_id == project_id).first()
  90. return ProjectDto.from_model(data).to_dict()