|
@@ -0,0 +1,450 @@
|
|
|
+from typing import List, Optional, Dict, Any
|
|
|
+from datetime import datetime
|
|
|
+from sqlalchemy.orm import Session, joinedload
|
|
|
+from sqlalchemy import or_
|
|
|
+from ..models.question import Question
|
|
|
+from ..models.question_type import QuestionType
|
|
|
+from ..models.question_difficulty import QuestionDifficulty
|
|
|
+from ..models.question_category import QuestionCategory
|
|
|
+from ..models.question_option import QuestionOption
|
|
|
+from ..models.question_analysis import QuestionAnalysis
|
|
|
+from ..models.question_feedback import QuestionFeedback
|
|
|
+from ..models.question_comment import QuestionComment
|
|
|
+from ..models.question_attachment import QuestionAttachment
|
|
|
+from ..models.question_version import QuestionVersion
|
|
|
+from ..models.question_relation import QuestionRelation
|
|
|
+from ..models.question_statistics import QuestionStatistics
|
|
|
+from ..models.question_bank import QuestionBank
|
|
|
+from ..models.user import User
|
|
|
+from ..models.user_statistics import UserStatistics
|
|
|
+from ..models.learning_progress import LearningProgress
|
|
|
+from ..models.practice_record import PracticeRecord
|
|
|
+from ..models.wrong_question import WrongQuestion
|
|
|
+from ..models.knowledge_point import KnowledgePoint
|
|
|
+from ..models.learning_path import LearningPath
|
|
|
+from ..models.question_collection import QuestionCollection
|
|
|
+from ..models.question_template import QuestionTemplate
|
|
|
+from ..models.question_tag import QuestionTag
|
|
|
+from ..models.question_history import QuestionHistory
|
|
|
+from ..models.question_knowledge_point import QuestionKnowledgePoint
|
|
|
+from ..common.exceptions import NotFoundException, PermissionDeniedException
|
|
|
+from ..common.constants import QuestionStatus, ReviewStatus
|
|
|
+
|
|
|
+
|
|
|
+class QuestionStore:
|
|
|
+
|
|
|
+ def __init__(self, db: Session):
|
|
|
+ self.db = db
|
|
|
+
|
|
|
+ def create_question(self, creator_id: int,
|
|
|
+ question_data: Dict[str, Any]) -> Question:
|
|
|
+ """创建题目"""
|
|
|
+ question = Question.create_with_options(
|
|
|
+ db=self.db,
|
|
|
+ title=question_data["title"],
|
|
|
+ content=question_data["content"],
|
|
|
+ type_id=question_data["type_id"],
|
|
|
+ difficulty_id=question_data["difficulty_id"],
|
|
|
+ options=question_data.get("options", []),
|
|
|
+ creator_id=creator_id,
|
|
|
+ category_id=question_data.get("category_id"),
|
|
|
+ tags=question_data.get("tags"),
|
|
|
+ answer=question_data.get("answer"),
|
|
|
+ explanation=question_data.get("explanation"),
|
|
|
+ source=question_data.get("source"),
|
|
|
+ extra_data=question_data.get("extra_data"))
|
|
|
+ return question
|
|
|
+
|
|
|
+ def get_question_by_id(self, question_id: int) -> Optional[Question]:
|
|
|
+ """根据ID获取题目"""
|
|
|
+ return self.db.query(Question).get(question_id)
|
|
|
+
|
|
|
+ def update_question(self, question_id: int,
|
|
|
+ update_data: Dict[str, Any]) -> Question:
|
|
|
+ """更新题目"""
|
|
|
+ question = self.get_question_by_id(question_id)
|
|
|
+ if not question:
|
|
|
+ raise NotFoundException("题目不存在")
|
|
|
+
|
|
|
+ # 更新基础信息
|
|
|
+ if "title" in update_data:
|
|
|
+ question.title = update_data["title"]
|
|
|
+ if "content" in update_data:
|
|
|
+ question.content = update_data["content"]
|
|
|
+ if "type_id" in update_data:
|
|
|
+ question.type_id = update_data["type_id"]
|
|
|
+ if "difficulty_id" in update_data:
|
|
|
+ question.difficulty_id = update_data["difficulty_id"]
|
|
|
+ if "category_id" in update_data:
|
|
|
+ question.category_id = update_data["category_id"]
|
|
|
+ if "answer" in update_data:
|
|
|
+ question.answer = update_data["answer"]
|
|
|
+ if "explanation" in update_data:
|
|
|
+ question.explanation = update_data["explanation"]
|
|
|
+ if "source" in update_data:
|
|
|
+ question.source = update_data["source"]
|
|
|
+ if "extra_data" in update_data:
|
|
|
+ question.extra_data = update_data["extra_data"]
|
|
|
+
|
|
|
+ # 更新选项
|
|
|
+ if "options" in update_data:
|
|
|
+ # 删除旧选项
|
|
|
+ self.db.query(QuestionOption).filter(
|
|
|
+ QuestionOption.question_id == question_id).delete()
|
|
|
+
|
|
|
+ # 添加新选项
|
|
|
+ for option_data in update_data["options"]:
|
|
|
+ QuestionOption.create(db=self.db,
|
|
|
+ question_id=question_id,
|
|
|
+ content=option_data["content"],
|
|
|
+ is_correct=option_data.get(
|
|
|
+ "is_correct", False),
|
|
|
+ order=option_data.get("order", 0),
|
|
|
+ extra_data=option_data.get("extra_data"))
|
|
|
+
|
|
|
+ # 更新标签
|
|
|
+ if "tags" in update_data:
|
|
|
+ # 删除旧标签
|
|
|
+ self.db.query(QuestionTag).filter(
|
|
|
+ QuestionTag.question_id == question_id).delete()
|
|
|
+
|
|
|
+ # 添加新标签
|
|
|
+ for tag_data in update_data["tags"]:
|
|
|
+ QuestionTag.create(db=self.db,
|
|
|
+ question_id=question_id,
|
|
|
+ name=tag_data["name"],
|
|
|
+ description=tag_data.get("description"),
|
|
|
+ extra_data=tag_data.get("extra_data"))
|
|
|
+
|
|
|
+ # 创建新版本
|
|
|
+ QuestionVersion.create(db=self.db,
|
|
|
+ question_id=question_id,
|
|
|
+ version_data={
|
|
|
+ "title":
|
|
|
+ question.title,
|
|
|
+ "content":
|
|
|
+ question.content,
|
|
|
+ "type_id":
|
|
|
+ question.type_id,
|
|
|
+ "difficulty_id":
|
|
|
+ question.difficulty_id,
|
|
|
+ "category_id":
|
|
|
+ question.category_id,
|
|
|
+ "answer":
|
|
|
+ question.answer,
|
|
|
+ "explanation":
|
|
|
+ question.explanation,
|
|
|
+ "source":
|
|
|
+ question.source,
|
|
|
+ "extra_data":
|
|
|
+ question.extra_data,
|
|
|
+ "options": [{
|
|
|
+ "content": o.content,
|
|
|
+ "is_correct": o.is_correct,
|
|
|
+ "order": o.order,
|
|
|
+ "extra_data": o.extra_data
|
|
|
+ } for o in question.options],
|
|
|
+ "tags": [{
|
|
|
+ "name": t.name,
|
|
|
+ "description": t.description,
|
|
|
+ "extra_data": t.extra_data
|
|
|
+ } for t in question.tags]
|
|
|
+ })
|
|
|
+
|
|
|
+ # 更新统计信息
|
|
|
+ stats = self.db.query(QuestionStatistics).filter(
|
|
|
+ QuestionStatistics.question_id == question_id).first()
|
|
|
+ if not stats:
|
|
|
+ stats = QuestionStatistics(question_id=question_id)
|
|
|
+ self.db.add(stats)
|
|
|
+
|
|
|
+ stats.update_count += 1
|
|
|
+ stats.last_update_time = datetime.now()
|
|
|
+
|
|
|
+ # 记录历史
|
|
|
+ QuestionHistory.create(db=self.db,
|
|
|
+ question_id=question_id,
|
|
|
+ action="update",
|
|
|
+ change_data={
|
|
|
+ "title": question.title,
|
|
|
+ "content": question.content,
|
|
|
+ "type_id": question.type_id,
|
|
|
+ "difficulty_id": question.difficulty_id,
|
|
|
+ "category_id": question.category_id,
|
|
|
+ "answer": question.answer,
|
|
|
+ "explanation": question.explanation,
|
|
|
+ "source": question.source,
|
|
|
+ "extra_data": question.extra_data
|
|
|
+ })
|
|
|
+
|
|
|
+ self.db.commit()
|
|
|
+ self.db.refresh(question)
|
|
|
+ return question
|
|
|
+
|
|
|
+ def delete_question(self, question_id: int) -> None:
|
|
|
+ """删除题目"""
|
|
|
+ question = self.get_question_by_id(question_id)
|
|
|
+ if not question:
|
|
|
+ raise NotFoundException("题目不存在")
|
|
|
+
|
|
|
+ # 记录删除历史
|
|
|
+ QuestionHistory.create(db=self.db,
|
|
|
+ question_id=question_id,
|
|
|
+ action="delete",
|
|
|
+ change_data={
|
|
|
+ "title": question.title,
|
|
|
+ "content": question.content,
|
|
|
+ "type_id": question.type_id,
|
|
|
+ "difficulty_id": question.difficulty_id,
|
|
|
+ "category_id": question.category_id,
|
|
|
+ "answer": question.answer,
|
|
|
+ "explanation": question.explanation,
|
|
|
+ "source": question.source,
|
|
|
+ "extra_data": question.extra_data
|
|
|
+ })
|
|
|
+
|
|
|
+ # 更新统计信息
|
|
|
+ stats = self.db.query(QuestionStatistics).filter(
|
|
|
+ QuestionStatistics.question_id == question_id).first()
|
|
|
+ if stats:
|
|
|
+ stats.delete_count += 1
|
|
|
+ stats.last_update_time = datetime.now()
|
|
|
+
|
|
|
+ # 删除题目
|
|
|
+ self.db.delete(question)
|
|
|
+ self.db.commit()
|
|
|
+
|
|
|
+ def get_question_detail(self, question_id: int) -> Question:
|
|
|
+ """获取题目详情"""
|
|
|
+ question = self.db.query(Question) \
|
|
|
+ .options(
|
|
|
+ joinedload(Question.options),
|
|
|
+ joinedload(Question.tags),
|
|
|
+ joinedload(Question.versions),
|
|
|
+ joinedload(Question.comments),
|
|
|
+ joinedload(Question.attachments),
|
|
|
+ joinedload(Question.knowledge_points)
|
|
|
+ ) \
|
|
|
+ .get(question_id)
|
|
|
+ if not question:
|
|
|
+ raise NotFoundException("题目不存在")
|
|
|
+ return question
|
|
|
+
|
|
|
+ def search_questions(self,
|
|
|
+ search_params: Dict[str, Any],
|
|
|
+ page: int = 1,
|
|
|
+ page_size: int = 20) -> Dict[str, Any]:
|
|
|
+ """搜索题目"""
|
|
|
+ query = self.db.query(Question)
|
|
|
+
|
|
|
+ # 应用过滤条件
|
|
|
+ if search_params.get("type_id"):
|
|
|
+ query = query.filter(Question.type_id == search_params["type_id"])
|
|
|
+ if search_params.get("difficulty_id"):
|
|
|
+ query = query.filter(
|
|
|
+ Question.difficulty_id == search_params["difficulty_id"])
|
|
|
+ if search_params.get("category_id"):
|
|
|
+ query = query.filter(
|
|
|
+ Question.category_id == search_params["category_id"])
|
|
|
+ if search_params.get("creator_id"):
|
|
|
+ query = query.filter(
|
|
|
+ Question.creator_id == search_params["creator_id"])
|
|
|
+ if search_params.get("bank_id"):
|
|
|
+ query = query.filter(Question.bank_id == search_params["bank_id"])
|
|
|
+ if search_params.get("status"):
|
|
|
+ query = query.filter(Question.status == search_params["status"])
|
|
|
+ if search_params.get("review_status"):
|
|
|
+ query = query.filter(
|
|
|
+ Question.review_status == search_params["review_status"])
|
|
|
+ if search_params.get("keyword"):
|
|
|
+ keyword = search_params["keyword"]
|
|
|
+ query = query.filter((Question.title.contains(keyword))
|
|
|
+ | (Question.content.contains(keyword))
|
|
|
+ | (Question.answer.contains(keyword))
|
|
|
+ | (Question.explanation.contains(keyword)))
|
|
|
+
|
|
|
+ # 时间范围过滤
|
|
|
+ if search_params.get("create_time_start"):
|
|
|
+ query = query.filter(
|
|
|
+ Question.create_time >= search_params["create_time_start"])
|
|
|
+ if search_params.get("create_time_end"):
|
|
|
+ query = query.filter(
|
|
|
+ Question.create_time <= search_params["create_time_end"])
|
|
|
+ if search_params.get("update_time_start"):
|
|
|
+ query = query.filter(
|
|
|
+ Question.update_time >= search_params["update_time_start"])
|
|
|
+ if search_params.get("update_time_end"):
|
|
|
+ query = query.filter(
|
|
|
+ Question.update_time <= search_params["update_time_end"])
|
|
|
+
|
|
|
+ # 排序
|
|
|
+ sort_field = search_params.get("sort_field", "create_time")
|
|
|
+ sort_order = search_params.get("sort_order", "desc")
|
|
|
+ if sort_order == "asc":
|
|
|
+ query = query.order_by(getattr(Question, sort_field).asc())
|
|
|
+ else:
|
|
|
+ query = query.order_by(getattr(Question, sort_field).desc())
|
|
|
+
|
|
|
+ # 计算总数
|
|
|
+ total = query.count()
|
|
|
+
|
|
|
+ # 分页查询
|
|
|
+ questions = query.offset((page - 1) * page_size) \
|
|
|
+ .limit(page_size) \
|
|
|
+ .all()
|
|
|
+
|
|
|
+ return {
|
|
|
+ "total": total,
|
|
|
+ "page": page,
|
|
|
+ "page_size": page_size,
|
|
|
+ "questions": questions
|
|
|
+ }
|
|
|
+
|
|
|
+ def delete_attachment(self, attachment_id: int, deleter_id: int) -> None:
|
|
|
+ """删除题目附件"""
|
|
|
+ attachment = self.db.query(QuestionAttachment).get(attachment_id)
|
|
|
+ if not attachment:
|
|
|
+ raise NotFoundException("附件不存在")
|
|
|
+
|
|
|
+ # 检查删除权限
|
|
|
+ if attachment.user_id != deleter_id:
|
|
|
+ raise PermissionDeniedException("无权限删除该附件")
|
|
|
+
|
|
|
+ # 更新统计信息
|
|
|
+ stats = self.db.query(QuestionStatistics).filter(
|
|
|
+ QuestionStatistics.question_id == attachment.question_id).first()
|
|
|
+ if stats:
|
|
|
+ stats.attachment_count -= 1
|
|
|
+
|
|
|
+ user_stats = self.db.query(UserStatistics).filter(
|
|
|
+ UserStatistics.user_id == attachment.user_id).first()
|
|
|
+ if user_stats:
|
|
|
+ user_stats.attachment_count -= 1
|
|
|
+
|
|
|
+ # 删除附件记录
|
|
|
+ self.db.delete(attachment)
|
|
|
+ self.db.commit()
|
|
|
+
|
|
|
+ def delete_comment(self, comment_id: int, deleter_id: int) -> None:
|
|
|
+ """删除题目评论"""
|
|
|
+ comment = self.db.query(QuestionComment).get(comment_id)
|
|
|
+ if not comment:
|
|
|
+ raise NotFoundException("评论不存在")
|
|
|
+
|
|
|
+ # 检查删除权限
|
|
|
+ if comment.user_id != deleter_id:
|
|
|
+ raise PermissionDeniedException("无权限删除该评论")
|
|
|
+
|
|
|
+ # 更新统计信息
|
|
|
+ stats = self.db.query(QuestionStatistics).filter(
|
|
|
+ QuestionStatistics.question_id == comment.question_id).first()
|
|
|
+ if stats:
|
|
|
+ stats.comment_count -= 1
|
|
|
+
|
|
|
+ user_stats = self.db.query(UserStatistics).filter(
|
|
|
+ UserStatistics.user_id == comment.user_id).first()
|
|
|
+ if user_stats:
|
|
|
+ user_stats.comment_count -= 1
|
|
|
+
|
|
|
+ # 删除评论
|
|
|
+ self.db.delete(comment)
|
|
|
+ self.db.commit()
|
|
|
+
|
|
|
+ def collect_question(self, question_id: int,
|
|
|
+ user_id: int) -> Dict[str, Any]:
|
|
|
+ """收藏题目"""
|
|
|
+ # 检查题目是否存在
|
|
|
+ question = self.db.query(Question).get(question_id)
|
|
|
+ if not question:
|
|
|
+ raise NotFoundException("题目不存在")
|
|
|
+
|
|
|
+ # 检查是否已经收藏
|
|
|
+ existing = self.db.query(QuestionCollection).filter(
|
|
|
+ QuestionCollection.question_id == question_id,
|
|
|
+ QuestionCollection.user_id == user_id).first()
|
|
|
+ if existing:
|
|
|
+ return {
|
|
|
+ "status": "already_collected",
|
|
|
+ "message": "题目已收藏",
|
|
|
+ "collect_count": existing.question.statistics.collect_count,
|
|
|
+ "hot_score": question.hot_score
|
|
|
+ }
|
|
|
+
|
|
|
+ # 创建收藏记录
|
|
|
+ QuestionCollection.create(db=self.db,
|
|
|
+ question_id=question_id,
|
|
|
+ user_id=user_id,
|
|
|
+ collect_time=datetime.now())
|
|
|
+
|
|
|
+ # 更新题目统计信息
|
|
|
+ stats = self.db.query(QuestionStatistics).filter(
|
|
|
+ QuestionStatistics.question_id == question_id).first()
|
|
|
+ if not stats:
|
|
|
+ stats = QuestionStatistics(question_id=question_id)
|
|
|
+ self.db.add(stats)
|
|
|
+
|
|
|
+ stats.collect_count += 1
|
|
|
+ stats.last_collect_time = datetime.now()
|
|
|
+ stats.last_collector_id = user_id
|
|
|
+
|
|
|
+ # 更新用户统计信息
|
|
|
+ user_stats = self.db.query(UserStatistics).filter(
|
|
|
+ UserStatistics.user_id == user_id).first()
|
|
|
+ if not user_stats:
|
|
|
+ user_stats = UserStatistics(user_id=user_id)
|
|
|
+ self.db.add(user_stats)
|
|
|
+
|
|
|
+ user_stats.collect_count += 1
|
|
|
+ user_stats.last_collect_time = datetime.now()
|
|
|
+
|
|
|
+ # 更新题目热度
|
|
|
+ question.hot_score = (question.hot_score or 0) + 1
|
|
|
+
|
|
|
+ self.db.commit()
|
|
|
+
|
|
|
+ return {
|
|
|
+ "status": "success",
|
|
|
+ "message": "收藏成功",
|
|
|
+ "collect_count": stats.collect_count,
|
|
|
+ "last_collect_time": stats.last_collect_time,
|
|
|
+ "hot_score": question.hot_score
|
|
|
+ }
|
|
|
+
|
|
|
+ def uncollect_question(self, question_id: int,
|
|
|
+ user_id: int) -> Dict[str, Any]:
|
|
|
+ """取消收藏题目"""
|
|
|
+ # 检查题目是否存在
|
|
|
+ question = self.db.query(Question).get(question_id)
|
|
|
+ if not question:
|
|
|
+ raise NotFoundException("题目不存在")
|
|
|
+
|
|
|
+ # 删除收藏记录
|
|
|
+ self.db.query(QuestionCollection).filter(
|
|
|
+ QuestionCollection.question_id == question_id,
|
|
|
+ QuestionCollection.user_id == user_id).delete()
|
|
|
+
|
|
|
+ # 更新题目统计信息
|
|
|
+ stats = self.db.query(QuestionStatistics).filter(
|
|
|
+ QuestionStatistics.question_id == question_id).first()
|
|
|
+ if stats:
|
|
|
+ stats.collect_count -= 1
|
|
|
+ stats.collect_count = max(stats.collect_count, 0)
|
|
|
+
|
|
|
+ # 更新用户统计信息
|
|
|
+ user_stats = self.db.query(UserStatistics).filter(
|
|
|
+ UserStatistics.user_id == user_id).first()
|
|
|
+ if user_stats:
|
|
|
+ user_stats.collect_count -= 1
|
|
|
+ user_stats.collect_count = max(user_stats.collect_count, 0)
|
|
|
+
|
|
|
+ # 更新题目热度
|
|
|
+ question.hot_score = max((question.hot_score or 0) - 1, 0)
|
|
|
+
|
|
|
+ self.db.commit()
|
|
|
+
|
|
|
+ return {
|
|
|
+ "status": "success",
|
|
|
+ "message": "取消收藏成功",
|
|
|
+ "collect_count": stats.collect_count if stats else 0,
|
|
|
+ "last_collect_time": stats.last_collect_time if stats else None
|
|
|
+ }
|