123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- from sqlalchemy import and_, or_
- from datetime import datetime
- from typing import Optional
- import tools.db_helper as db_helper
- from core.dtos import ProjectDto
- from core.models.project import ProjectModel
- from core.models.team import TeamModel
- from core.user_session import UserSession
- class ProjectStore:
- def __init__(self):
- self._database= None
- def get_user_projects_paginated(
- self,
- page: int = 1,
- page_size: int = 10,
- keyword: Optional[str] = None,
- start_time: Optional[datetime] = None,
- end_time: Optional[datetime] = None,
- can_edit:Optional[int]=0,
- ):
- """
- 分页查询用户有权限的项目列表
-
- Args:
- page: 页码,从1开始
- page_size: 每页数量
- keyword: 关键字(模糊查询)
- start_time: 开始时间
- end_time: 结束时间
- can_edit: 是否只显示可编辑的项目
-
- Returns:
- Tuple[total_count, projects]
- """
- # 构建基础查询
- with db_helper.sqlserver_query_session(self._database) as db_session:
- query = (db_session.query(
- ProjectModel.project_id,
- ProjectModel.project_name,
- ProjectModel.project_manager,
- ProjectModel.design_stage,
- ProjectModel.project_description,
- ProjectModel.short_name,
- ProjectModel.project_version,
- ProjectModel.project_type,
- ProjectModel.unit,
- ProjectModel.create_time,
- ) .distinct())
- user = UserSession.get_current_user()
- if not user.is_admin:
- query = query.outerjoin(TeamModel, ProjectModel.project_id == TeamModel.project_id)
- if can_edit:
- query = query.filter(
- or_(ProjectModel.project_manager == user.username,
- and_(TeamModel.name == user.username, TeamModel.compilation_status == can_edit)
- )
- )
- else:
- query = query.filter(or_(ProjectModel.project_manager == user.username,TeamModel.name == user.username))
- # 添加过滤条件
- if keyword:
- query = query.filter(or_(
- ProjectModel.project_id.like(f'%{keyword}%'),
- ProjectModel.project_name.like(f'%{keyword}%'),
- ProjectModel.project_manager.like(f'%{keyword}%'),
- ProjectModel.project_description.like(f'%{keyword}%'),
- ProjectModel.short_name.like(f'%{keyword}%')
- ))
- if start_time:
- query = query.filter(ProjectModel.create_time >= start_time)
- if end_time:
- query = query.filter(ProjectModel.create_time < end_time)
- # 获取总记录数和数据
- total_count = query.count()
- projects = query.order_by(ProjectModel.create_time.desc())\
- .offset((page - 1) * page_size)\
- .limit(page_size)\
- .all()
- return {
- 'total': total_count,
- 'data': projects
- }
- def get_team_project_item_code(self,project_id:str, user_name:str):
- with db_helper.sqlserver_query_session(self._database) as session:
- db_session = session
- data = db_session.query(TeamModel.item_code).filter(and_(TeamModel.project_id == project_id,TeamModel.name == user_name)).first()
- return data[0] if data else None
- def get(self,project_id:str):
- with db_helper.sqlserver_query_session(self._database) as session:
- db_session = session
- data = db_session.query(ProjectModel).filter(ProjectModel.project_id == project_id).first()
- return ProjectDto.from_model(data).to_dict()
|