database.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441
  1. import sqlite3
  2. import json
  3. from pathlib import Path
  4. from datetime import datetime
  5. from typing import List, Dict, Any, Optional, Tuple
  6. class Database:
  7. """数据库管理类,负责SQLite数据库的连接和操作"""
  8. def __init__(self):
  9. """初始化数据库连接"""
  10. self.db_path = Path(__file__).parent.parent.parent / 'db' / 'game_data.db'
  11. self.db_path.parent.mkdir(exist_ok=True)
  12. self.conn = None
  13. self.initialize_db()
  14. def get_connection(self):
  15. """获取数据库连接"""
  16. if self.conn is None:
  17. self.conn = sqlite3.connect(str(self.db_path))
  18. self.conn.row_factory = sqlite3.Row
  19. return self.conn
  20. def close_connection(self):
  21. """关闭数据库连接"""
  22. if self.conn:
  23. self.conn.close()
  24. self.conn = None
  25. def initialize_db(self):
  26. """初始化数据库表结构"""
  27. conn = self.get_connection()
  28. cursor = conn.cursor()
  29. # 创建游戏数据表
  30. cursor.execute("""
  31. CREATE TABLE IF NOT EXISTS game_data (
  32. id INTEGER PRIMARY KEY,
  33. num1 INTEGER NOT NULL,
  34. num2 INTEGER NOT NULL,
  35. num3 INTEGER NOT NULL,
  36. num4 INTEGER NOT NULL
  37. )
  38. """)
  39. # 创建解法表
  40. cursor.execute("""
  41. CREATE TABLE IF NOT EXISTS solutions (
  42. id INTEGER PRIMARY KEY AUTOINCREMENT,
  43. game_id INTEGER NOT NULL,
  44. game_num TEXT NOT NULL,
  45. expression TEXT NOT NULL,
  46. flag INTEGER NOT NULL,
  47. FOREIGN KEY (game_id) REFERENCES game_data (id)
  48. )
  49. """)
  50. # 创建历史记录表
  51. cursor.execute("""
  52. CREATE TABLE IF NOT EXISTS history (
  53. id INTEGER PRIMARY KEY AUTOINCREMENT,
  54. game_type TEXT NOT NULL,
  55. question TEXT NOT NULL,
  56. numbers TEXT NOT NULL,
  57. answers TEXT NOT NULL,
  58. is_favorite BOOLEAN DEFAULT 0,
  59. is_deleted BOOLEAN DEFAULT 0,
  60. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  61. )
  62. """)
  63. conn.commit()
  64. def import_json_data(self, json_path: Path):
  65. """从JSON文件导入数据到SQLite数据库"""
  66. if not json_path.exists():
  67. raise FileNotFoundError(f"数据文件未找到: {json_path}")
  68. conn = self.get_connection()
  69. cursor = conn.cursor()
  70. # 检查数据库是否已有数据
  71. cursor.execute("SELECT COUNT(*) FROM game_data")
  72. if cursor.fetchone()[0] > 0:
  73. print("数据库已有数据,跳过导入")
  74. return
  75. # 开始导入数据
  76. try:
  77. with open(json_path, encoding='utf-8') as f:
  78. # 开始事务
  79. conn.execute("BEGIN TRANSACTION")
  80. for line in f:
  81. data = json.loads(line)
  82. # 插入游戏数据
  83. cursor.execute(
  84. "INSERT INTO game_data (id, num1, num2, num3, num4) VALUES (?, ?, ?, ?, ?)",
  85. (data['id'], data['n1'], data['n2'], data['n3'], data['n4'])
  86. )
  87. # 插入解法数据
  88. for solution in data['s']:
  89. cursor.execute(
  90. "INSERT INTO solutions (game_id, game_num,expression, flag) VALUES (?, ?,?, ?)",
  91. (data['id'], f"{data['n1']},{data['n2']},{data['n3']},{data['n4']}",solution['c'], solution['f'])
  92. )
  93. # 提交事务
  94. conn.commit()
  95. print("数据导入成功")
  96. except Exception as e:
  97. conn.rollback()
  98. raise RuntimeError(f"数据导入失败: {e}") from e
  99. def add_history(self, history_dto):
  100. """添加历史记录
  101. Args:
  102. history_dto: HistoryDTO对象,包含历史记录的所有信息
  103. Returns:
  104. 新添加记录的ID
  105. """
  106. conn = self.get_connection()
  107. cursor = conn.cursor()
  108. # 转换DTO为数据库记录格式
  109. record = history_dto.to_db_record()
  110. current_time = datetime.now()
  111. cursor.execute(
  112. "INSERT INTO history (game_type, question, numbers, answers, created_at, is_favorite, is_deleted) VALUES (?, ?, ?, ?, ?, ?, ?)",
  113. (record['game_type'], record['question'], record['numbers'], record['answers'], current_time,
  114. record.get('is_favorite', False), record.get('is_deleted', False))
  115. )
  116. conn.commit()
  117. return cursor.lastrowid
  118. def get_history(self, game_type: Optional[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, page: int = 1, page_size: int = 10, only_favorite: bool = False, include_deleted: bool = False) -> Tuple[List[Dict[str, Any]], int]:
  119. """获取历史记录,支持分页查询
  120. Args:
  121. game_type: 类型
  122. start_date: 开始日期,格式为 YYYY-MM-DD
  123. end_date: 结束日期,格式为 YYYY-MM-DD
  124. page: 页码,从1开始
  125. page_size: 每页记录数
  126. only_favorite: 是否只查询收藏的记录
  127. include_deleted: 是否包含已删除的记录
  128. Returns:
  129. Tuple[List[Dict[str, Any]], int]: 历史记录列表和总记录数
  130. """
  131. conn = self.get_connection()
  132. cursor = conn.cursor()
  133. # 构建基础查询和条件
  134. base_query = "FROM history WHERE 1=1 "
  135. params = []
  136. if game_type:
  137. if game_type == '0':
  138. base_query += " AND is_deleted = 1"
  139. include_deleted=True
  140. else:
  141. base_query += " AND game_type = ?"
  142. params.append(game_type)
  143. # 添加日期过滤条件
  144. if start_date or end_date:
  145. if start_date:
  146. base_query += " AND date(created_at) >= ?"
  147. params.append(start_date)
  148. if end_date:
  149. base_query += " AND date(created_at) <= ?"
  150. params.append(end_date)
  151. # 添加收藏和删除过滤条件
  152. if only_favorite:
  153. base_query += " AND is_favorite = 1"
  154. if not include_deleted:
  155. base_query += " AND is_deleted = 0"
  156. # 先查询总记录数
  157. count_query = f"SELECT COUNT(*) {base_query}"
  158. cursor.execute(count_query, params)
  159. total_count = cursor.fetchone()[0]
  160. # 计算分页偏移量
  161. offset = (page - 1) * page_size
  162. # 查询当前页数据
  163. data_query = f"SELECT * {base_query} ORDER BY created_at DESC LIMIT ? OFFSET ?"
  164. cursor.execute(data_query, params + [page_size, offset])
  165. # 转换结果为字典列表
  166. results = []
  167. for row in cursor.fetchall():
  168. history_item = dict(row)
  169. # 确保字段名称与HistoryDTO兼容
  170. if 'game_type' in history_item and history_item['game_type'] not in ['A', 'B', 'C', 'D', 'E']:
  171. # 对于旧数据,将非标准类型转换为默认类型
  172. history_item['game_type'] = 'A'
  173. results.append(history_item)
  174. return results, total_count
  175. def get_game_data(self, nums: List[int]) -> List[Dict[str, Any]]:
  176. """根据数字组合获取游戏数据
  177. Args:
  178. nums: 4个数字的列表
  179. Returns:
  180. 匹配的游戏数据列表
  181. """
  182. if len(nums) != 4:
  183. raise ValueError("必须提供4个数字")
  184. # 对数字排序以匹配不同顺序的相同组合
  185. sorted_nums = sorted(nums)
  186. conn = self.get_connection()
  187. cursor = conn.cursor()
  188. # 查询匹配的游戏数据
  189. cursor.execute("""
  190. SELECT g.id, g.num1, g.num2, g.num3, g.num4, g.no
  191. FROM game_data g
  192. WHERE (g.num1, g.num2, g.num3, g.num4) IN (
  193. SELECT num1, num2, num3, num4 FROM game_data
  194. WHERE (num1 + num2 + num3 + num4) = ?
  195. )
  196. """, (sum(nums),))
  197. games = []
  198. for row in cursor.fetchall():
  199. game = dict(row)
  200. game_nums = [game['num1'], game['num2'], game['num3'], game['num4']]
  201. # 检查是否为相同的数字组合(不考虑顺序)
  202. if sorted(game_nums) == sorted_nums:
  203. # 获取该游戏的所有解法
  204. cursor.execute("""
  205. SELECT id, expression, flag
  206. FROM solutions
  207. WHERE game_id = ?
  208. """, (game['id'],))
  209. solutions = [dict(sol) for sol in cursor.fetchall()]
  210. game['solutions'] = solutions
  211. games.append(game)
  212. return games
  213. def get_flag_data(self, flag: int, min_num: Optional[str] = None, max_num: Optional[str] = None) -> List[Dict[str, Any]]:
  214. """获取指定flag的游戏数据
  215. Args:
  216. flag: 解法标志,1或2
  217. min_num: 最小数字过滤
  218. max_num: 最大数字过滤
  219. Returns:
  220. 匹配的游戏数据列表
  221. """
  222. conn = self.get_connection()
  223. cursor = conn.cursor()
  224. query = """
  225. SELECT DISTINCT g.id, g.num1, g.num2, g.num3, g.num4
  226. FROM game_data g
  227. JOIN solutions s ON g.id = s.game_id
  228. WHERE s.flag = ?
  229. """
  230. params = [flag]
  231. # 添加数字过滤条件(基于num1和num4)
  232. if min_num and min_num != "0":
  233. query += " AND g.num1 = ?"
  234. params.append(int(min_num))
  235. if max_num and max_num != "0":
  236. query += " AND g.num4 = ?"
  237. params.append(int(max_num))
  238. cursor.execute(query, params)
  239. results = []
  240. for row in cursor.fetchall():
  241. game = dict(row)
  242. # 获取该游戏的所有解法
  243. cursor.execute("""
  244. SELECT id, expression, flag
  245. FROM solutions
  246. WHERE game_id = ? AND flag = ?
  247. """, (game['id'], flag))
  248. solutions = [dict(sol) for sol in cursor.fetchall()]
  249. game['solutions'] = solutions
  250. results.append(game)
  251. return results
  252. def get_all_game_data(self) -> List[Dict[str, Any]]:
  253. """获取所有游戏数据
  254. Returns:
  255. 所有游戏数据列表
  256. """
  257. conn = self.get_connection()
  258. cursor = conn.cursor()
  259. # 查询所有游戏数据
  260. cursor.execute("SELECT id, num1, num2, num3, num4 FROM game_data")
  261. results = []
  262. for row in cursor.fetchall():
  263. game = dict(row)
  264. # 获取该游戏的所有解法
  265. cursor.execute("""
  266. SELECT id, expression, flag
  267. FROM solutions
  268. WHERE game_id = ?
  269. """, (game['id'],))
  270. solutions = [dict(sol) for sol in cursor.fetchall()]
  271. game['solutions'] = solutions
  272. results.append(game)
  273. return results
  274. def has_game_data(self) -> bool:
  275. """检查数据库是否已有游戏数据
  276. Returns:
  277. 如果数据库中已有游戏数据则返回True,否则返回False
  278. """
  279. conn = self.get_connection()
  280. cursor = conn.cursor()
  281. cursor.execute("SELECT COUNT(*) FROM game_data")
  282. count = cursor.fetchone()[0]
  283. return count > 0
  284. def toggle_favorite(self, history_id: int) -> bool:
  285. """切换历史记录的收藏状态
  286. Args:
  287. history_id: 历史记录ID
  288. Returns:
  289. bool: 操作是否成功
  290. """
  291. conn = self.get_connection()
  292. cursor = conn.cursor()
  293. try:
  294. # 先查询当前收藏状态
  295. cursor.execute("SELECT is_favorite FROM history WHERE id = ?", (history_id,))
  296. result = cursor.fetchone()
  297. if not result:
  298. return False # 记录不存在
  299. current_state = bool(result[0])
  300. new_state = not current_state
  301. # 更新收藏状态
  302. cursor.execute("UPDATE history SET is_favorite = ? WHERE id = ?", (int(new_state), history_id))
  303. conn.commit()
  304. return True
  305. except Exception as e:
  306. print(f"切换收藏状态失败: {e}")
  307. conn.rollback()
  308. return False
  309. def toggle_delete(self, history_id: int) -> bool:
  310. """切换历史记录的删除状态
  311. Args:
  312. history_id: 历史记录ID
  313. Returns:
  314. bool: 操作是否成功
  315. """
  316. conn = self.get_connection()
  317. cursor = conn.cursor()
  318. try:
  319. # 先查询当前收藏状态
  320. cursor.execute("SELECT is_deleted FROM history WHERE id = ?", (history_id,))
  321. result = cursor.fetchone()
  322. if not result:
  323. return False # 记录不存在
  324. current_state = bool(result[0])
  325. new_state = not current_state
  326. # 更新收藏状态
  327. cursor.execute("UPDATE history SET is_deleted = ? WHERE id = ?", (int(new_state), history_id))
  328. conn.commit()
  329. return True
  330. except Exception as e:
  331. print(f"切换删除状态失败: {e}")
  332. conn.rollback()
  333. return False
  334. def soft_delete(self, history_id: int) -> bool:
  335. """软删除历史记录
  336. Args:
  337. history_id: 历史记录ID
  338. Returns:
  339. bool: 操作是否成功
  340. """
  341. conn = self.get_connection()
  342. cursor = conn.cursor()
  343. try:
  344. # 更新删除状态
  345. cursor.execute("UPDATE history SET is_deleted = 1 WHERE id = ?", (history_id,))
  346. conn.commit()
  347. return cursor.rowcount > 0
  348. except Exception as e:
  349. print(f"软删除记录失败: {e}")
  350. conn.rollback()
  351. return False