database.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. import sqlite3
  2. import json
  3. from pathlib import Path
  4. from datetime import datetime
  5. from typing import List, Dict, Any, Optional, Tuple
  6. class Database:
  7. """数据库管理类,负责SQLite数据库的连接和操作"""
  8. def __init__(self):
  9. """初始化数据库连接"""
  10. self.db_path = Path(__file__).parent.parent.parent / 'db' / 'game_data.db'
  11. self.db_path.parent.mkdir(exist_ok=True)
  12. self.conn = None
  13. self.initialize_db()
  14. def get_connection(self):
  15. """获取数据库连接"""
  16. if self.conn is None:
  17. self.conn = sqlite3.connect(str(self.db_path))
  18. self.conn.row_factory = sqlite3.Row
  19. return self.conn
  20. def close_connection(self):
  21. """关闭数据库连接"""
  22. if self.conn:
  23. self.conn.close()
  24. self.conn = None
  25. def initialize_db(self):
  26. """初始化数据库表结构"""
  27. conn = self.get_connection()
  28. cursor = conn.cursor()
  29. # 创建游戏数据表
  30. cursor.execute("""
  31. CREATE TABLE IF NOT EXISTS game_data (
  32. id INTEGER PRIMARY KEY,
  33. num1 INTEGER NOT NULL,
  34. num2 INTEGER NOT NULL,
  35. num3 INTEGER NOT NULL,
  36. num4 INTEGER NOT NULL
  37. )
  38. """)
  39. # 创建解法表
  40. cursor.execute("""
  41. CREATE TABLE IF NOT EXISTS solutions (
  42. id INTEGER PRIMARY KEY AUTOINCREMENT,
  43. game_id INTEGER NOT NULL,
  44. game_num TEXT NOT NULL,
  45. expression TEXT NOT NULL,
  46. flag INTEGER NOT NULL,
  47. FOREIGN KEY (game_id) REFERENCES game_data (id)
  48. )
  49. """)
  50. # 创建历史记录表
  51. cursor.execute("""
  52. CREATE TABLE IF NOT EXISTS history (
  53. id INTEGER PRIMARY KEY AUTOINCREMENT,
  54. game_type TEXT NOT NULL,
  55. question TEXT NOT NULL,
  56. numbers TEXT NOT NULL,
  57. answers TEXT NOT NULL,
  58. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  59. )
  60. """)
  61. conn.commit()
  62. def import_json_data(self, json_path: Path):
  63. """从JSON文件导入数据到SQLite数据库"""
  64. if not json_path.exists():
  65. raise FileNotFoundError(f"数据文件未找到: {json_path}")
  66. conn = self.get_connection()
  67. cursor = conn.cursor()
  68. # 检查数据库是否已有数据
  69. cursor.execute("SELECT COUNT(*) FROM game_data")
  70. if cursor.fetchone()[0] > 0:
  71. print("数据库已有数据,跳过导入")
  72. return
  73. # 开始导入数据
  74. try:
  75. with open(json_path, encoding='utf-8') as f:
  76. # 开始事务
  77. conn.execute("BEGIN TRANSACTION")
  78. for line in f:
  79. data = json.loads(line)
  80. # 插入游戏数据
  81. cursor.execute(
  82. "INSERT INTO game_data (id, num1, num2, num3, num4) VALUES (?, ?, ?, ?, ?)",
  83. (data['id'], data['n1'], data['n2'], data['n3'], data['n4'])
  84. )
  85. # 插入解法数据
  86. for solution in data['s']:
  87. cursor.execute(
  88. "INSERT INTO solutions (game_id, game_num,expression, flag) VALUES (?, ?,?, ?)",
  89. (data['id'], f"{data['n1']},{data['n2']},{data['n3']},{data['n4']}",solution['c'], solution['f'])
  90. )
  91. # 提交事务
  92. conn.commit()
  93. print("数据导入成功")
  94. except Exception as e:
  95. conn.rollback()
  96. raise RuntimeError(f"数据导入失败: {e}") from e
  97. def add_history(self, history_dto):
  98. """添加历史记录
  99. Args:
  100. history_dto: HistoryDTO对象,包含历史记录的所有信息
  101. Returns:
  102. 新添加记录的ID
  103. """
  104. conn = self.get_connection()
  105. cursor = conn.cursor()
  106. # 转换DTO为数据库记录格式
  107. record = history_dto.to_db_record()
  108. current_time = datetime.now()
  109. cursor.execute(
  110. "INSERT INTO history (game_type, question, numbers, answers, created_at) VALUES (?, ?, ?, ?, ?)",
  111. (record['game_type'], record['question'], record['numbers'], record['answers'],current_time)
  112. )
  113. conn.commit()
  114. return cursor.lastrowid
  115. def get_history(self, game_type: Optional[str] = None,start_date: Optional[str] = None, end_date: Optional[str] = None, page: int = 1, page_size: int = 10) -> Tuple[List[Dict[str, Any]], int]:
  116. """获取历史记录,支持分页查询
  117. Args:
  118. game_type: 类型
  119. start_date: 开始日期,格式为 YYYY-MM-DD
  120. end_date: 结束日期,格式为 YYYY-MM-DD
  121. page: 页码,从1开始
  122. page_size: 每页记录数
  123. Returns:
  124. Tuple[List[Dict[str, Any]], int]: 历史记录列表和总记录数
  125. """
  126. conn = self.get_connection()
  127. cursor = conn.cursor()
  128. # 构建基础查询和条件
  129. base_query = "FROM history WHERE 1=1 "
  130. params = []
  131. if game_type:
  132. base_query += " AND game_type = ?"
  133. params.append(game_type)
  134. # 添加日期过滤条件
  135. if start_date or end_date:
  136. if start_date:
  137. base_query += " AND date(created_at) >= ?"
  138. params.append(start_date)
  139. if end_date:
  140. base_query += " AND date(created_at) <= ?"
  141. params.append(end_date)
  142. # 先查询总记录数
  143. count_query = f"SELECT COUNT(*) {base_query}"
  144. cursor.execute(count_query, params)
  145. total_count = cursor.fetchone()[0]
  146. # 计算分页偏移量
  147. offset = (page - 1) * page_size
  148. # 查询当前页数据
  149. data_query = f"SELECT * {base_query} ORDER BY created_at DESC LIMIT ? OFFSET ?"
  150. cursor.execute(data_query, params + [page_size, offset])
  151. # 转换结果为字典列表
  152. results = []
  153. for row in cursor.fetchall():
  154. history_item = dict(row)
  155. # 确保字段名称与HistoryDTO兼容
  156. if 'game_type' in history_item and history_item['game_type'] not in ['A', 'B', 'C', 'D', 'E']:
  157. # 对于旧数据,将非标准类型转换为默认类型
  158. history_item['game_type'] = 'A'
  159. results.append(history_item)
  160. return results, total_count
  161. def get_game_data(self, nums: List[int]) -> List[Dict[str, Any]]:
  162. """根据数字组合获取游戏数据
  163. Args:
  164. nums: 4个数字的列表
  165. Returns:
  166. 匹配的游戏数据列表
  167. """
  168. if len(nums) != 4:
  169. raise ValueError("必须提供4个数字")
  170. # 对数字排序以匹配不同顺序的相同组合
  171. sorted_nums = sorted(nums)
  172. conn = self.get_connection()
  173. cursor = conn.cursor()
  174. # 查询匹配的游戏数据
  175. cursor.execute("""
  176. SELECT g.id, g.num1, g.num2, g.num3, g.num4, g.no
  177. FROM game_data g
  178. WHERE (g.num1, g.num2, g.num3, g.num4) IN (
  179. SELECT num1, num2, num3, num4 FROM game_data
  180. WHERE (num1 + num2 + num3 + num4) = ?
  181. )
  182. """, (sum(nums),))
  183. games = []
  184. for row in cursor.fetchall():
  185. game = dict(row)
  186. game_nums = [game['num1'], game['num2'], game['num3'], game['num4']]
  187. # 检查是否为相同的数字组合(不考虑顺序)
  188. if sorted(game_nums) == sorted_nums:
  189. # 获取该游戏的所有解法
  190. cursor.execute("""
  191. SELECT id, expression, flag
  192. FROM solutions
  193. WHERE game_id = ?
  194. """, (game['id'],))
  195. solutions = [dict(sol) for sol in cursor.fetchall()]
  196. game['solutions'] = solutions
  197. games.append(game)
  198. return games
  199. def get_flag_data(self, flag: int, min_num: Optional[str] = None, max_num: Optional[str] = None) -> List[Dict[str, Any]]:
  200. """获取指定flag的游戏数据
  201. Args:
  202. flag: 解法标志,1或2
  203. min_num: 最小数字过滤
  204. max_num: 最大数字过滤
  205. Returns:
  206. 匹配的游戏数据列表
  207. """
  208. conn = self.get_connection()
  209. cursor = conn.cursor()
  210. query = """
  211. SELECT DISTINCT g.id, g.num1, g.num2, g.num3, g.num4
  212. FROM game_data g
  213. JOIN solutions s ON g.id = s.game_id
  214. WHERE s.flag = ?
  215. """
  216. params = [flag]
  217. # 添加数字过滤条件(基于num1和num4)
  218. if min_num and min_num != "0":
  219. query += " AND g.num1 = ?"
  220. params.append(int(min_num))
  221. if max_num and max_num != "0":
  222. query += " AND g.num4 = ?"
  223. params.append(int(max_num))
  224. cursor.execute(query, params)
  225. results = []
  226. for row in cursor.fetchall():
  227. game = dict(row)
  228. # 获取该游戏的所有解法
  229. cursor.execute("""
  230. SELECT id, expression, flag
  231. FROM solutions
  232. WHERE game_id = ? AND flag = ?
  233. """, (game['id'], flag))
  234. solutions = [dict(sol) for sol in cursor.fetchall()]
  235. game['solutions'] = solutions
  236. results.append(game)
  237. return results
  238. def get_all_game_data(self) -> List[Dict[str, Any]]:
  239. """获取所有游戏数据
  240. Returns:
  241. 所有游戏数据列表
  242. """
  243. conn = self.get_connection()
  244. cursor = conn.cursor()
  245. # 查询所有游戏数据
  246. cursor.execute("SELECT id, num1, num2, num3, num4 FROM game_data")
  247. results = []
  248. for row in cursor.fetchall():
  249. game = dict(row)
  250. # 获取该游戏的所有解法
  251. cursor.execute("""
  252. SELECT id, expression, flag
  253. FROM solutions
  254. WHERE game_id = ?
  255. """, (game['id'],))
  256. solutions = [dict(sol) for sol in cursor.fetchall()]
  257. game['solutions'] = solutions
  258. results.append(game)
  259. return results
  260. def has_game_data(self) -> bool:
  261. """检查数据库是否已有游戏数据
  262. Returns:
  263. 如果数据库中已有游戏数据则返回True,否则返回False
  264. """
  265. conn = self.get_connection()
  266. cursor = conn.cursor()
  267. cursor.execute("SELECT COUNT(*) FROM game_data")
  268. count = cursor.fetchone()[0]
  269. return count > 0