123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- from sqlalchemy import and_, or_
- from datetime import datetime
- from typing import Optional
- import tools.db_helper as db_helper
- from core import configs
- from core.dtos import ProjectDto
- from core.models.project import ProjectModel
- from core.models.team import TeamModel
- from core.user_session import UserSession
- class ProjectStore:
- def __init__(self):
- self._database = (
- f"sqlserver_mian_{configs.app.version}"
- if configs.app.use_version
- else "sqlserver_mian"
- )
- def get_user_projects_paginated(
- self,
- page: int = 1,
- page_size: int = 10,
- keyword: Optional[str] = None,
- start_time: Optional[datetime] = None,
- end_time: Optional[datetime] = None,
- can_edit: Optional[int] = 0,
- ):
- """
- 分页查询用户有权限的项目列表
- Args:
- page: 页码,从1开始
- page_size: 每页数量
- keyword: 关键字(模糊查询)
- start_time: 开始时间
- end_time: 结束时间
- can_edit: 是否只显示可编辑的项目
- Returns:
- Tuple[total_count, projects]
- """
- # 构建基础查询
- with db_helper.sqlserver_query_session(self._database) as db_session:
- query = db_session.query(
- ProjectModel.project_id,
- ProjectModel.project_name,
- ProjectModel.project_manager,
- ProjectModel.design_stage,
- # ProjectModel.project_description,
- ProjectModel.short_name,
- ProjectModel.project_version,
- ProjectModel.project_type,
- ProjectModel.unit,
- ProjectModel.create_time,
- ).distinct()
- user = UserSession.get_current_user()
- if not user.is_admin:
- query = query.outerjoin(
- TeamModel, ProjectModel.project_id == TeamModel.project_id
- )
- if can_edit:
- query = query.filter(
- or_(
- ProjectModel.project_manager == user.username,
- and_(
- TeamModel.name == user.username,
- TeamModel.compilation_status == can_edit,
- ),
- )
- )
- else:
- query = query.filter(
- or_(
- ProjectModel.project_manager == user.username,
- TeamModel.name == user.username,
- )
- )
- # 添加过滤条件
- if keyword:
- query = query.filter(
- or_(
- ProjectModel.project_id.like(f"%{keyword}%"),
- ProjectModel.project_name.like(f"%{keyword}%"),
- ProjectModel.project_manager.like(f"%{keyword}%"),
- # ProjectModel.project_description.like(f'%{keyword}%'),
- ProjectModel.short_name.like(f"%{keyword}%"),
- )
- )
- if start_time:
- query = query.filter(ProjectModel.create_time >= start_time)
- if end_time:
- query = query.filter(ProjectModel.create_time < end_time)
- # 获取总记录数和数据
- total_count = query.count()
- projects = (
- query.order_by(ProjectModel.create_time.desc())
- .offset((page - 1) * page_size)
- .limit(page_size)
- .all()
- )
- return {"total": total_count, "data": projects}
- def get_team_project_item_code(self, project_id: str, user_name: str):
- with db_helper.sqlserver_query_session(self._database) as session:
- db_session = session
- data = (
- db_session.query(TeamModel.item_code)
- .filter(
- and_(
- TeamModel.project_id == project_id, TeamModel.name == user_name
- )
- )
- .first()
- )
- return data[0] if data else None
- def get(self, project_id: str):
- with db_helper.sqlserver_query_session(self._database) as session:
- db_session = session
- data = (
- db_session.query(ProjectModel)
- .filter(ProjectModel.project_id == project_id)
- .first()
- )
- return ProjectDto.from_model(data) if data else None
|