import sqlite3 import json from pathlib import Path from datetime import datetime from typing import List, Dict, Any, Optional, Tuple class Database: """数据库管理类,负责SQLite数据库的连接和操作""" def __init__(self): """初始化数据库连接""" self.db_path = Path(__file__).parent.parent.parent / 'db' / 'game_data.db' self.db_path.parent.mkdir(exist_ok=True) self.conn = None self.initialize_db() def get_connection(self): """获取数据库连接""" if self.conn is None: self.conn = sqlite3.connect(str(self.db_path)) self.conn.row_factory = sqlite3.Row return self.conn def close_connection(self): """关闭数据库连接""" if self.conn: self.conn.close() self.conn = None def initialize_db(self): """初始化数据库表结构""" conn = self.get_connection() cursor = conn.cursor() # 创建游戏数据表 cursor.execute(""" CREATE TABLE IF NOT EXISTS game_data ( id INTEGER PRIMARY KEY, num1 INTEGER NOT NULL, num2 INTEGER NOT NULL, num3 INTEGER NOT NULL, num4 INTEGER NOT NULL ) """) # 创建解法表 cursor.execute(""" CREATE TABLE IF NOT EXISTS solutions ( id INTEGER PRIMARY KEY AUTOINCREMENT, game_id INTEGER NOT NULL, game_num TEXT NOT NULL, expression TEXT NOT NULL, flag INTEGER NOT NULL, FOREIGN KEY (game_id) REFERENCES game_data (id) ) """) # 创建历史记录表 cursor.execute(""" CREATE TABLE IF NOT EXISTS history ( id INTEGER PRIMARY KEY AUTOINCREMENT, game_type TEXT NOT NULL, question TEXT NOT NULL, numbers TEXT NOT NULL, answers TEXT NOT NULL, is_favorite BOOLEAN DEFAULT 0, is_deleted BOOLEAN DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) conn.commit() def import_json_data(self, json_path: Path): """从JSON文件导入数据到SQLite数据库""" if not json_path.exists(): raise FileNotFoundError(f"数据文件未找到: {json_path}") conn = self.get_connection() cursor = conn.cursor() # 检查数据库是否已有数据 cursor.execute("SELECT COUNT(*) FROM game_data") if cursor.fetchone()[0] > 0: print("数据库已有数据,跳过导入") return # 开始导入数据 try: with open(json_path, encoding='utf-8') as f: # 开始事务 conn.execute("BEGIN TRANSACTION") for line in f: data = json.loads(line) # 插入游戏数据 cursor.execute( "INSERT INTO game_data (id, num1, num2, num3, num4) VALUES (?, ?, ?, ?, ?)", (data['id'], data['n1'], data['n2'], data['n3'], data['n4']) ) # 插入解法数据 for solution in data['s']: cursor.execute( "INSERT INTO solutions (game_id, game_num,expression, flag) VALUES (?, ?,?, ?)", (data['id'], f"{data['n1']},{data['n2']},{data['n3']},{data['n4']}",solution['c'], solution['f']) ) # 提交事务 conn.commit() print("数据导入成功") except Exception as e: conn.rollback() raise RuntimeError(f"数据导入失败: {e}") from e def add_history(self, history_dto): """添加历史记录 Args: history_dto: HistoryDTO对象,包含历史记录的所有信息 Returns: 新添加记录的ID """ conn = self.get_connection() cursor = conn.cursor() # 转换DTO为数据库记录格式 record = history_dto.to_db_record() current_time = datetime.now() cursor.execute( "INSERT INTO history (game_type, question, numbers, answers, created_at, is_favorite, is_deleted) VALUES (?, ?, ?, ?, ?, ?, ?)", (record['game_type'], record['question'], record['numbers'], record['answers'], current_time, record.get('is_favorite', False), record.get('is_deleted', False)) ) conn.commit() return cursor.lastrowid def get_history(self, game_type: Optional[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, page: int = 1, page_size: int = 10, only_favorite: bool = False, include_deleted: bool = False) -> Tuple[List[Dict[str, Any]], int]: """获取历史记录,支持分页查询 Args: game_type: 类型 start_date: 开始日期,格式为 YYYY-MM-DD end_date: 结束日期,格式为 YYYY-MM-DD page: 页码,从1开始 page_size: 每页记录数 only_favorite: 是否只查询收藏的记录 include_deleted: 是否包含已删除的记录 Returns: Tuple[List[Dict[str, Any]], int]: 历史记录列表和总记录数 """ conn = self.get_connection() cursor = conn.cursor() # 构建基础查询和条件 base_query = "FROM history WHERE 1=1 " params = [] if game_type: if game_type == '0': base_query += " AND is_deleted = 1" include_deleted=True else: base_query += " AND game_type = ?" params.append(game_type) # 添加日期过滤条件 if start_date or end_date: if start_date: base_query += " AND date(created_at) >= ?" params.append(start_date) if end_date: base_query += " AND date(created_at) <= ?" params.append(end_date) # 添加收藏和删除过滤条件 if only_favorite: base_query += " AND is_favorite = 1" if not include_deleted: base_query += " AND is_deleted = 0" # 先查询总记录数 count_query = f"SELECT COUNT(*) {base_query}" cursor.execute(count_query, params) total_count = cursor.fetchone()[0] # 计算分页偏移量 offset = (page - 1) * page_size # 查询当前页数据 data_query = f"SELECT * {base_query} ORDER BY created_at DESC LIMIT ? OFFSET ?" cursor.execute(data_query, params + [page_size, offset]) # 转换结果为字典列表 results = [] for row in cursor.fetchall(): history_item = dict(row) # 确保字段名称与HistoryDTO兼容 if 'game_type' in history_item and history_item['game_type'] not in ['A', 'B', 'C', 'D', 'E']: # 对于旧数据,将非标准类型转换为默认类型 history_item['game_type'] = 'A' results.append(history_item) return results, total_count def get_game_data(self, nums: List[int]) -> List[Dict[str, Any]]: """根据数字组合获取游戏数据 Args: nums: 4个数字的列表 Returns: 匹配的游戏数据列表 """ if len(nums) != 4: raise ValueError("必须提供4个数字") # 对数字排序以匹配不同顺序的相同组合 sorted_nums = sorted(nums) conn = self.get_connection() cursor = conn.cursor() # 查询匹配的游戏数据 cursor.execute(""" SELECT g.id, g.num1, g.num2, g.num3, g.num4, g.no FROM game_data g WHERE (g.num1, g.num2, g.num3, g.num4) IN ( SELECT num1, num2, num3, num4 FROM game_data WHERE (num1 + num2 + num3 + num4) = ? ) """, (sum(nums),)) games = [] for row in cursor.fetchall(): game = dict(row) game_nums = [game['num1'], game['num2'], game['num3'], game['num4']] # 检查是否为相同的数字组合(不考虑顺序) if sorted(game_nums) == sorted_nums: # 获取该游戏的所有解法 cursor.execute(""" SELECT id, expression, flag FROM solutions WHERE game_id = ? """, (game['id'],)) solutions = [dict(sol) for sol in cursor.fetchall()] game['solutions'] = solutions games.append(game) return games def get_flag_data(self, flag: int, min_num: Optional[str] = None, max_num: Optional[str] = None) -> List[Dict[str, Any]]: """获取指定flag的游戏数据 Args: flag: 解法标志,1或2 min_num: 最小数字过滤 max_num: 最大数字过滤 Returns: 匹配的游戏数据列表 """ conn = self.get_connection() cursor = conn.cursor() query = """ SELECT DISTINCT g.id, g.num1, g.num2, g.num3, g.num4 FROM game_data g JOIN solutions s ON g.id = s.game_id WHERE s.flag = ? """ params = [flag] # 添加数字过滤条件(基于num1和num4) if min_num and min_num != "0": query += " AND g.num1 = ?" params.append(int(min_num)) if max_num and max_num != "0": query += " AND g.num4 = ?" params.append(int(max_num)) cursor.execute(query, params) results = [] for row in cursor.fetchall(): game = dict(row) # 获取该游戏的所有解法 cursor.execute(""" SELECT id, expression, flag FROM solutions WHERE game_id = ? AND flag = ? """, (game['id'], flag)) solutions = [dict(sol) for sol in cursor.fetchall()] game['solutions'] = solutions results.append(game) return results def get_all_game_data(self) -> List[Dict[str, Any]]: """获取所有游戏数据 Returns: 所有游戏数据列表 """ conn = self.get_connection() cursor = conn.cursor() # 查询所有游戏数据 cursor.execute("SELECT id, num1, num2, num3, num4 FROM game_data") results = [] for row in cursor.fetchall(): game = dict(row) # 获取该游戏的所有解法 cursor.execute(""" SELECT id, expression, flag FROM solutions WHERE game_id = ? """, (game['id'],)) solutions = [dict(sol) for sol in cursor.fetchall()] game['solutions'] = solutions results.append(game) return results def has_game_data(self) -> bool: """检查数据库是否已有游戏数据 Returns: 如果数据库中已有游戏数据则返回True,否则返回False """ conn = self.get_connection() cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM game_data") count = cursor.fetchone()[0] return count > 0 def toggle_favorite(self, history_id: int) -> bool: """切换历史记录的收藏状态 Args: history_id: 历史记录ID Returns: bool: 操作是否成功 """ conn = self.get_connection() cursor = conn.cursor() try: # 先查询当前收藏状态 cursor.execute("SELECT is_favorite FROM history WHERE id = ?", (history_id,)) result = cursor.fetchone() if not result: return False # 记录不存在 current_state = bool(result[0]) new_state = not current_state # 更新收藏状态 cursor.execute("UPDATE history SET is_favorite = ? WHERE id = ?", (int(new_state), history_id)) conn.commit() return True except Exception as e: print(f"切换收藏状态失败: {e}") conn.rollback() return False def toggle_delete(self, history_id: int) -> bool: """切换历史记录的删除状态 Args: history_id: 历史记录ID Returns: bool: 操作是否成功 """ conn = self.get_connection() cursor = conn.cursor() try: # 先查询当前收藏状态 cursor.execute("SELECT is_deleted FROM history WHERE id = ?", (history_id,)) result = cursor.fetchone() if not result: return False # 记录不存在 current_state = bool(result[0]) new_state = not current_state # 更新收藏状态 cursor.execute("UPDATE history SET is_deleted = ? WHERE id = ?", (int(new_state), history_id)) conn.commit() return True except Exception as e: print(f"切换删除状态失败: {e}") conn.rollback() return False def soft_delete(self, history_id: int) -> bool: """软删除历史记录 Args: history_id: 历史记录ID Returns: bool: 操作是否成功 """ conn = self.get_connection() cursor = conn.cursor() try: # 更新删除状态 cursor.execute("UPDATE history SET is_deleted = 1 WHERE id = ?", (history_id,)) conn.commit() return cursor.rowcount > 0 except Exception as e: print(f"软删除记录失败: {e}") conn.rollback() return False