project.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. from sqlalchemy import and_, or_
  2. from datetime import datetime
  3. from typing import Optional
  4. import tools.db_helper as db_helper
  5. from core import configs
  6. from core.dtos import ProjectDto
  7. from core.models.project import ProjectModel
  8. from core.models.team import TeamModel
  9. from core.user_session import UserSession
  10. class ProjectStore:
  11. def __init__(self):
  12. self._database = (
  13. f"sqlserver_mian_{configs.app.version}"
  14. if configs.app.use_version
  15. else "sqlserver_mian"
  16. )
  17. def get_user_projects_paginated(
  18. self,
  19. page: int = 1,
  20. page_size: int = 10,
  21. keyword: Optional[str] = None,
  22. start_time: Optional[datetime] = None,
  23. end_time: Optional[datetime] = None,
  24. can_edit: Optional[int] = 0,
  25. ):
  26. """
  27. 分页查询用户有权限的项目列表
  28. Args:
  29. page: 页码,从1开始
  30. page_size: 每页数量
  31. keyword: 关键字(模糊查询)
  32. start_time: 开始时间
  33. end_time: 结束时间
  34. can_edit: 是否只显示可编辑的项目
  35. Returns:
  36. Tuple[total_count, projects]
  37. """
  38. # 构建基础查询
  39. with db_helper.sqlserver_query_session(self._database) as db_session:
  40. query = db_session.query(
  41. ProjectModel.project_id,
  42. ProjectModel.project_name,
  43. ProjectModel.project_manager,
  44. ProjectModel.design_stage,
  45. # ProjectModel.project_description,
  46. ProjectModel.short_name,
  47. ProjectModel.project_version,
  48. ProjectModel.project_type,
  49. ProjectModel.unit,
  50. ProjectModel.create_time,
  51. ).distinct()
  52. user = UserSession.get_current_user()
  53. if not user.is_admin:
  54. query = query.outerjoin(
  55. TeamModel, ProjectModel.project_id == TeamModel.project_id
  56. )
  57. if can_edit:
  58. query = query.filter(
  59. or_(
  60. ProjectModel.project_manager == user.username,
  61. and_(
  62. TeamModel.name == user.username,
  63. TeamModel.compilation_status == can_edit,
  64. ),
  65. )
  66. )
  67. else:
  68. query = query.filter(
  69. or_(
  70. ProjectModel.project_manager == user.username,
  71. TeamModel.name == user.username,
  72. )
  73. )
  74. # 添加过滤条件
  75. if keyword:
  76. query = query.filter(
  77. or_(
  78. ProjectModel.project_id.like(f"%{keyword}%"),
  79. ProjectModel.project_name.like(f"%{keyword}%"),
  80. ProjectModel.project_manager.like(f"%{keyword}%"),
  81. # ProjectModel.project_description.like(f'%{keyword}%'),
  82. ProjectModel.short_name.like(f"%{keyword}%"),
  83. )
  84. )
  85. if start_time:
  86. query = query.filter(ProjectModel.create_time >= start_time)
  87. if end_time:
  88. query = query.filter(ProjectModel.create_time < end_time)
  89. # 获取总记录数和数据
  90. total_count = query.count()
  91. projects = (
  92. query.order_by(ProjectModel.create_time.desc())
  93. .offset((page - 1) * page_size)
  94. .limit(page_size)
  95. .all()
  96. )
  97. return {"total": total_count, "data": projects}
  98. def get_team_project_item_code(self, project_id: str, user_name: str):
  99. with db_helper.sqlserver_query_session(self._database) as session:
  100. db_session = session
  101. data = (
  102. db_session.query(TeamModel.item_code)
  103. .filter(
  104. and_(
  105. TeamModel.project_id == project_id, TeamModel.name == user_name
  106. )
  107. )
  108. .first()
  109. )
  110. return data[0] if data else None
  111. def get(self, project_id: str):
  112. with db_helper.sqlserver_query_session(self._database) as session:
  113. db_session = session
  114. data = (
  115. db_session.query(ProjectModel)
  116. .filter(ProjectModel.project_id == project_id)
  117. .first()
  118. )
  119. return ProjectDto.from_model(data) if data else None