| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441 |
- import sqlite3
- import json
- from pathlib import Path
- from datetime import datetime
- from typing import List, Dict, Any, Optional, Tuple
- class Database:
- """数据库管理类,负责SQLite数据库的连接和操作"""
-
- def __init__(self):
- """初始化数据库连接"""
- self.db_path = Path(__file__).parent.parent.parent / 'db' / 'game_data.db'
- self.db_path.parent.mkdir(exist_ok=True)
- self.conn = None
- self.initialize_db()
-
- def get_connection(self):
- """获取数据库连接"""
- if self.conn is None:
- self.conn = sqlite3.connect(str(self.db_path))
- self.conn.row_factory = sqlite3.Row
- return self.conn
-
- def close_connection(self):
- """关闭数据库连接"""
- if self.conn:
- self.conn.close()
- self.conn = None
-
- def initialize_db(self):
- """初始化数据库表结构"""
- conn = self.get_connection()
- cursor = conn.cursor()
-
- # 创建游戏数据表
- cursor.execute("""
- CREATE TABLE IF NOT EXISTS game_data (
- id INTEGER PRIMARY KEY,
- num1 INTEGER NOT NULL,
- num2 INTEGER NOT NULL,
- num3 INTEGER NOT NULL,
- num4 INTEGER NOT NULL
- )
- """)
-
- # 创建解法表
- cursor.execute("""
- CREATE TABLE IF NOT EXISTS solutions (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- game_id INTEGER NOT NULL,
- game_num TEXT NOT NULL,
- expression TEXT NOT NULL,
- flag INTEGER NOT NULL,
- FOREIGN KEY (game_id) REFERENCES game_data (id)
- )
- """)
-
- # 创建历史记录表
- cursor.execute("""
- CREATE TABLE IF NOT EXISTS history (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- game_type TEXT NOT NULL,
- question TEXT NOT NULL,
- numbers TEXT NOT NULL,
- answers TEXT NOT NULL,
- is_favorite BOOLEAN DEFAULT 0,
- is_deleted BOOLEAN DEFAULT 0,
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- )
- """)
-
- conn.commit()
-
- def import_json_data(self, json_path: Path):
- """从JSON文件导入数据到SQLite数据库"""
- if not json_path.exists():
- raise FileNotFoundError(f"数据文件未找到: {json_path}")
-
- conn = self.get_connection()
- cursor = conn.cursor()
-
- # 检查数据库是否已有数据
- cursor.execute("SELECT COUNT(*) FROM game_data")
- if cursor.fetchone()[0] > 0:
- print("数据库已有数据,跳过导入")
- return
-
- # 开始导入数据
- try:
- with open(json_path, encoding='utf-8') as f:
- # 开始事务
- conn.execute("BEGIN TRANSACTION")
-
- for line in f:
- data = json.loads(line)
-
- # 插入游戏数据
- cursor.execute(
- "INSERT INTO game_data (id, num1, num2, num3, num4) VALUES (?, ?, ?, ?, ?)",
- (data['id'], data['n1'], data['n2'], data['n3'], data['n4'])
- )
- # 插入解法数据
- for solution in data['s']:
- cursor.execute(
- "INSERT INTO solutions (game_id, game_num,expression, flag) VALUES (?, ?,?, ?)",
- (data['id'], f"{data['n1']},{data['n2']},{data['n3']},{data['n4']}",solution['c'], solution['f'])
- )
-
- # 提交事务
- conn.commit()
- print("数据导入成功")
-
- except Exception as e:
- conn.rollback()
- raise RuntimeError(f"数据导入失败: {e}") from e
-
- def add_history(self, history_dto):
- """添加历史记录
-
- Args:
- history_dto: HistoryDTO对象,包含历史记录的所有信息
-
- Returns:
- 新添加记录的ID
- """
- conn = self.get_connection()
- cursor = conn.cursor()
-
- # 转换DTO为数据库记录格式
- record = history_dto.to_db_record()
- current_time = datetime.now()
- cursor.execute(
- "INSERT INTO history (game_type, question, numbers, answers, created_at, is_favorite, is_deleted) VALUES (?, ?, ?, ?, ?, ?, ?)",
- (record['game_type'], record['question'], record['numbers'], record['answers'], current_time,
- record.get('is_favorite', False), record.get('is_deleted', False))
- )
-
- conn.commit()
- return cursor.lastrowid
-
- def get_history(self, game_type: Optional[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, page: int = 1, page_size: int = 10, only_favorite: bool = False, include_deleted: bool = False) -> Tuple[List[Dict[str, Any]], int]:
- """获取历史记录,支持分页查询
-
- Args:
- game_type: 类型
- start_date: 开始日期,格式为 YYYY-MM-DD
- end_date: 结束日期,格式为 YYYY-MM-DD
- page: 页码,从1开始
- page_size: 每页记录数
- only_favorite: 是否只查询收藏的记录
- include_deleted: 是否包含已删除的记录
-
- Returns:
- Tuple[List[Dict[str, Any]], int]: 历史记录列表和总记录数
- """
- conn = self.get_connection()
- cursor = conn.cursor()
-
- # 构建基础查询和条件
- base_query = "FROM history WHERE 1=1 "
- params = []
- if game_type:
- if game_type == '0':
- base_query += " AND is_deleted = 1"
- include_deleted=True
- else:
- base_query += " AND game_type = ?"
- params.append(game_type)
-
- # 添加日期过滤条件
- if start_date or end_date:
- if start_date:
- base_query += " AND date(created_at) >= ?"
- params.append(start_date)
- if end_date:
- base_query += " AND date(created_at) <= ?"
- params.append(end_date)
-
- # 添加收藏和删除过滤条件
- if only_favorite:
- base_query += " AND is_favorite = 1"
-
- if not include_deleted:
- base_query += " AND is_deleted = 0"
-
- # 先查询总记录数
- count_query = f"SELECT COUNT(*) {base_query}"
- cursor.execute(count_query, params)
- total_count = cursor.fetchone()[0]
-
- # 计算分页偏移量
- offset = (page - 1) * page_size
-
- # 查询当前页数据
- data_query = f"SELECT * {base_query} ORDER BY created_at DESC LIMIT ? OFFSET ?"
- cursor.execute(data_query, params + [page_size, offset])
-
- # 转换结果为字典列表
- results = []
- for row in cursor.fetchall():
- history_item = dict(row)
- # 确保字段名称与HistoryDTO兼容
- if 'game_type' in history_item and history_item['game_type'] not in ['A', 'B', 'C', 'D', 'E']:
- # 对于旧数据,将非标准类型转换为默认类型
- history_item['game_type'] = 'A'
- results.append(history_item)
-
- return results, total_count
-
- def get_game_data(self, nums: List[int]) -> List[Dict[str, Any]]:
- """根据数字组合获取游戏数据
-
- Args:
- nums: 4个数字的列表
-
- Returns:
- 匹配的游戏数据列表
- """
- if len(nums) != 4:
- raise ValueError("必须提供4个数字")
-
- # 对数字排序以匹配不同顺序的相同组合
- sorted_nums = sorted(nums)
-
- conn = self.get_connection()
- cursor = conn.cursor()
-
- # 查询匹配的游戏数据
- cursor.execute("""
- SELECT g.id, g.num1, g.num2, g.num3, g.num4, g.no
- FROM game_data g
- WHERE (g.num1, g.num2, g.num3, g.num4) IN (
- SELECT num1, num2, num3, num4 FROM game_data
- WHERE (num1 + num2 + num3 + num4) = ?
- )
- """, (sum(nums),))
-
- games = []
- for row in cursor.fetchall():
- game = dict(row)
- game_nums = [game['num1'], game['num2'], game['num3'], game['num4']]
-
- # 检查是否为相同的数字组合(不考虑顺序)
- if sorted(game_nums) == sorted_nums:
- # 获取该游戏的所有解法
- cursor.execute("""
- SELECT id, expression, flag
- FROM solutions
- WHERE game_id = ?
- """, (game['id'],))
-
- solutions = [dict(sol) for sol in cursor.fetchall()]
- game['solutions'] = solutions
- games.append(game)
-
- return games
-
- def get_flag_data(self, flag: int, min_num: Optional[str] = None, max_num: Optional[str] = None) -> List[Dict[str, Any]]:
- """获取指定flag的游戏数据
-
- Args:
- flag: 解法标志,1或2
- min_num: 最小数字过滤
- max_num: 最大数字过滤
-
- Returns:
- 匹配的游戏数据列表
- """
- conn = self.get_connection()
- cursor = conn.cursor()
-
- query = """
- SELECT DISTINCT g.id, g.num1, g.num2, g.num3, g.num4
- FROM game_data g
- JOIN solutions s ON g.id = s.game_id
- WHERE s.flag = ?
- """
-
- params = [flag]
-
- # 添加数字过滤条件(基于num1和num4)
- if min_num and min_num != "0":
- query += " AND g.num1 = ?"
- params.append(int(min_num))
-
- if max_num and max_num != "0":
- query += " AND g.num4 = ?"
- params.append(int(max_num))
-
- cursor.execute(query, params)
-
- results = []
- for row in cursor.fetchall():
- game = dict(row)
-
- # 获取该游戏的所有解法
- cursor.execute("""
- SELECT id, expression, flag
- FROM solutions
- WHERE game_id = ? AND flag = ?
- """, (game['id'], flag))
-
- solutions = [dict(sol) for sol in cursor.fetchall()]
- game['solutions'] = solutions
- results.append(game)
-
- return results
-
- def get_all_game_data(self) -> List[Dict[str, Any]]:
- """获取所有游戏数据
-
- Returns:
- 所有游戏数据列表
- """
- conn = self.get_connection()
- cursor = conn.cursor()
-
- # 查询所有游戏数据
- cursor.execute("SELECT id, num1, num2, num3, num4 FROM game_data")
-
- results = []
- for row in cursor.fetchall():
- game = dict(row)
-
- # 获取该游戏的所有解法
- cursor.execute("""
- SELECT id, expression, flag
- FROM solutions
- WHERE game_id = ?
- """, (game['id'],))
-
- solutions = [dict(sol) for sol in cursor.fetchall()]
- game['solutions'] = solutions
- results.append(game)
-
- return results
-
- def has_game_data(self) -> bool:
- """检查数据库是否已有游戏数据
-
- Returns:
- 如果数据库中已有游戏数据则返回True,否则返回False
- """
- conn = self.get_connection()
- cursor = conn.cursor()
-
- cursor.execute("SELECT COUNT(*) FROM game_data")
- count = cursor.fetchone()[0]
-
- return count > 0
- def toggle_favorite(self, history_id: int) -> bool:
- """切换历史记录的收藏状态
-
- Args:
- history_id: 历史记录ID
-
- Returns:
- bool: 操作是否成功
- """
- conn = self.get_connection()
- cursor = conn.cursor()
-
- try:
- # 先查询当前收藏状态
- cursor.execute("SELECT is_favorite FROM history WHERE id = ?", (history_id,))
- result = cursor.fetchone()
-
- if not result:
- return False # 记录不存在
-
- current_state = bool(result[0])
- new_state = not current_state
-
- # 更新收藏状态
- cursor.execute("UPDATE history SET is_favorite = ? WHERE id = ?", (int(new_state), history_id))
- conn.commit()
-
- return True
- except Exception as e:
- print(f"切换收藏状态失败: {e}")
- conn.rollback()
- return False
- def toggle_delete(self, history_id: int) -> bool:
- """切换历史记录的删除状态
- Args:
- history_id: 历史记录ID
- Returns:
- bool: 操作是否成功
- """
- conn = self.get_connection()
- cursor = conn.cursor()
- try:
- # 先查询当前收藏状态
- cursor.execute("SELECT is_deleted FROM history WHERE id = ?", (history_id,))
- result = cursor.fetchone()
- if not result:
- return False # 记录不存在
- current_state = bool(result[0])
- new_state = not current_state
- # 更新收藏状态
- cursor.execute("UPDATE history SET is_deleted = ? WHERE id = ?", (int(new_state), history_id))
- conn.commit()
- return True
- except Exception as e:
- print(f"切换删除状态失败: {e}")
- conn.rollback()
- return False
-
- def soft_delete(self, history_id: int) -> bool:
- """软删除历史记录
-
- Args:
- history_id: 历史记录ID
-
- Returns:
- bool: 操作是否成功
- """
- conn = self.get_connection()
- cursor = conn.cursor()
-
- try:
- # 更新删除状态
- cursor.execute("UPDATE history SET is_deleted = 1 WHERE id = ?", (history_id,))
- conn.commit()
-
- return cursor.rowcount > 0
- except Exception as e:
- print(f"软删除记录失败: {e}")
- conn.rollback()
- return False
|