소스 검색

init 数据库连接

YueYunyun 3 달 전
부모
커밋
322d461a3f
8개의 변경된 파일254개의 추가작업 그리고 26개의 파일을 삭제
  1. 82 7
      README.md
  2. 2 3
      README_DEV.md
  3. 6 6
      SERVER/app/application-dev.yml
  4. 3 3
      SERVER/app/application.yml
  5. 6 0
      SERVER/app/database/__init__.py
  6. 147 0
      SERVER/app/database/database.py
  7. 5 7
      SERVER/app/main.py
  8. 3 0
      requirements.txt

+ 82 - 7
README.md

@@ -2,6 +2,81 @@
 
 配置模块提供了统一的配置管理接口,主要功能包括:
 
+### 数据库配置
+
+项目使用 SQLAlchemy ORM 框架连接 MySQL 数据库,主要配置项如下:
+
+```python
+class DatabaseConfig:
+    host: str = "localhost"  # 数据库主机
+    port: int = 3306         # 数据库端口
+    user: str = "root"       # 数据库用户名
+    password: str = ""       # 数据库密码
+    name: str = "question_bank"  # 数据库名称
+```
+
+使用示例:
+
+```python
+from app.database.database import Database
+
+# 初始化数据库连接
+Database.initialize()
+
+# 使用上下文管理器管理会话
+with db.session() as session:
+    # 执行查询
+    users = db.execute_query("SELECT * FROM users WHERE age > :age", {"age": 18})
+
+    # 执行非查询
+    affected_rows = db.execute_non_query(
+        "UPDATE users SET status = :status WHERE id = :id",
+        {"status": "active", "id": 1}
+    )
+
+    # 批量操作
+    user_data = [
+        {"name": "Alice", "age": 25},
+        {"name": "Bob", "age": 30}
+    ]
+    inserted_rows = db.batch_execute(
+        "INSERT INTO users (name, age) VALUES (:name, :age)",
+        user_data
+    )
+
+# 兼容旧版接口
+from app.database.db_mysql import get_db
+db = next(get_db())
+result = db.execute("SELECT * FROM users")
+db.commit()
+```
+
+新特性说明:
+
+1. 单例模式:确保全局只有一个数据库连接实例
+2. 上下文管理器:自动管理会话生命周期,异常时自动回滚
+3. 常用操作封装:
+   - execute_query:执行查询,返回字典列表
+   - execute_non_query:执行非查询,返回影响行数
+   - batch_execute:批量执行,返回总影响行数
+4. 新增 initialize()方法:显式初始化数据库连接
+5. 保持对旧版接口的兼容
+
+注意事项:
+
+1. 数据库连接字符串格式:mysql+pymysql://{user}:{password}@{host}:{port}/{name}?charset=utf8mb4
+2. 数据库连接池配置:
+   - pool_pre_ping: True # 每次使用连接前检查连接是否有效
+   - pool_recycle: 3600 # 连接池回收时间(秒)
+3. MySQL 驱动说明:
+   - PyMySQL:纯 Python 实现的 MySQL 驱动,兼容性好
+   - mysqlclient:C 扩展实现的 MySQL 驱动,性能更好
+   - 默认使用 PyMySQL,如需使用 mysqlclient:
+     - 安装 mysqlclient:pip install mysqlclient
+     - 修改连接字符串前缀为 mysql://
+
+配置模块提供了统一的配置管理接口,主要功能包括:
+
 - 获取配置实例
 - 获取配置项值
 - 注册配置变更回调
@@ -68,10 +143,10 @@ except RuntimeError as e:
 
 ```env
 # 开发环境
-APP_ENV=dev
+ENV=dev
 
 # 生产环境
-# APP_ENV=prod
+# ENV=prod
 ```
 
 注意:
@@ -86,10 +161,10 @@ APP_ENV=dev
 
 ```cmd
 :: 开发环境
-set APP_ENV=dev
+set ENV=dev
 
 :: 生产环境
-set APP_ENV=prod
+set ENV=prod
 ```
 
 2. 永久设置:
@@ -97,17 +172,17 @@ set APP_ENV=prod
 - 右键点击"此电脑" -> "属性" -> "高级系统设置"
 - 点击"环境变量"按钮
 - 在"系统变量"或"用户变量"中新建变量:
-  - 变量名:APP_ENV
+  - 变量名:ENV
   - 变量值:dev(开发环境)或 prod(生产环境)
 
 #### macOS/Linux 系统
 
 ```bash
 # 开发环境
-export APP_ENV=dev
+export ENV=dev
 
 # 生产环境
-export APP_ENV=prod
+export ENV=prod
 ```
 
 ### 各环境主要差异

+ 2 - 3
README_DEV.md

@@ -284,19 +284,18 @@ pip install -r requirements.txt
 2. 配置数据库
 
 - 创建 MySQL 数据库
-- 修改 src/common/config.py 中的数据库连接配置
 
 3. 运行系统
 
 ```bash
-python src/main.py
+python main.py
 ```
 
 ## 已实现功能
 
 ### 1. 图形用户界面
 
-- 使用 PyQt5 开发桌面应用界面
+- 使用 vue3 开发桌面应用界面
 - 提供友好的用户交互体验
 - 支持题目展示、答题、结果查看等功能
 

+ 6 - 6
SERVER/app/application-dev.yml

@@ -3,12 +3,12 @@ app:
   version: 1.0.1
   debug: true
 
-database:
-  host: 192.168.0.82
-  port: 3306
-  user: root
-  password: ''
-  name: question_bank_dev
+# database:
+#   host: 192.168.0.82
+#   port: 3306
+#   user: root
+#   password: ''
+#   name: question_bank_dev
 
 logging:
   path: logs

+ 3 - 3
SERVER/app/application.yml

@@ -4,11 +4,11 @@ app:
   debug: false
 
 database:
-  host: 192.168.0.81
+  host: 192.168.0.82
   port: 3306
   user: root
-  password: ''
-  name: question_bank
+  password: y123456
+  name: smart_question_bank_v1.0
 
 logging:
   level: INFO

+ 6 - 0
SERVER/app/database/__init__.py

@@ -0,0 +1,6 @@
+from .database import Database
+
+# 数据库初始化单例实例
+db = Database.initialize()
+
+__all__ = ['Database', 'db']

+ 147 - 0
SERVER/app/database/database.py

@@ -0,0 +1,147 @@
+from typing import Iterator, List, Optional, Any
+from contextlib import contextmanager
+from sqlalchemy import create_engine, text
+from sqlalchemy.orm import Session, sessionmaker
+from app.config import config
+from app.logger import logger
+
+
+class Database:
+    """数据库单例类"""
+    _instance = None
+
+    @staticmethod
+    def initialize():
+        """初始化数据库连接
+        """
+        # 调用单例实例会触发_init_db
+        return Database()
+
+    def __new__(cls):
+        if cls._instance is None:
+            cls._instance = super().__new__(cls)
+            cls._instance._init_db()
+        return cls._instance
+
+    def _init_db(self):
+        """初始化数据库连接"""
+        db_config = config.database
+        try:
+            self.engine = create_engine(self._get_sqlalchemy_uri(
+                host=db_config.host,
+                port=db_config.port,
+                user=db_config.user,
+                password=db_config.password,
+                name=db_config.name),
+                                        pool_pre_ping=True,
+                                        pool_recycle=3600)
+
+            # 测试连接
+            with self.engine.connect() as conn:
+                conn.execute(text("SELECT 1"))
+
+            logger.debug("数据库连接成功")
+        except Exception as e:
+            logger.warning(f"数据库连接失败: {e}")
+            raise
+
+        self.SessionLocal = sessionmaker(autocommit=False,
+                                         autoflush=False,
+                                         bind=self.engine)
+
+    @staticmethod
+    def _get_sqlalchemy_uri(host: str, port: int, user: str, password: str,
+                            name: str) -> str:
+        """生成SQLAlchemy连接字符串"""
+        sql = f"mysql+pymysql://{user}:{password}@{host}:{port}/{name}?charset=utf8mb4"
+        logger.debug(f"数据库连接字符串: {sql}")
+        return sql
+
+    @contextmanager
+    def session(self) -> Iterator[Session]:
+        """获取数据库会话"""
+        db = self.SessionLocal()
+        try:
+            yield db
+        except Exception:
+            db.rollback()
+            raise
+        finally:
+            db.close()
+
+    def query(self, sql: str, params: Optional[dict] = None) -> List[dict]:
+        """执行查询SQL"""
+        with self.session() as db:
+            try:
+                result = db.execute(text(sql), params or {})
+                return [dict(row) for row in result.mappings()]
+            except Exception as e:
+                logger.error(f"查询SQL出错: [{sql}] {e}")
+                raise
+
+    def execute(self, sql: str, params: Optional[dict] = None) -> int:
+        """执行非查询SQL"""
+        with self.session() as db:
+            try:
+                result = db.execute(text(sql), params or {})
+                db.commit()
+                return result.rowcount
+            except Exception as e:
+                logger.error(f"执行SQL出错: [{sql}] {e}")
+                raise
+
+    def batch_execute(self, sql: str, params_list: List[dict]) -> int:
+        """批量执行SQL
+        注意:所有操作在单个事务中执行,要么全部成功,要么全部回滚
+        """
+        if not params_list:
+            return 0
+
+        with self.session() as db:
+            try:
+                result = db.execute(text(sql), params_list)
+                db.commit()
+                return result.rowcount
+            except Exception as e:
+                logger.error(f"批量执行SQL出错: [{sql}] {e}")
+                raise
+
+    def query_one(self,
+                  sql: str,
+                  params: Optional[dict] = None) -> Optional[dict]:
+        """执行查询SQL并返回单条记录"""
+        with self.session() as db:
+            try:
+                result = db.execute(text(sql), params or {})
+                row = result.mappings().first()
+                return dict(row) if row else None
+            except Exception as e:
+                logger.error(f"查询SQL出错: [{sql}] {e}")
+                raise
+
+    def execute_procedure(self,
+                          procedure_name: str,
+                          params: Optional[dict] = None) -> Any:
+        """执行存储过程"""
+        with self.session() as db:
+            try:
+                result = db.execute(text(f"CALL {procedure_name}"), params
+                                    or {})
+                db.commit()
+                return result.fetchall()
+            except Exception as e:
+                logger.error(f"执行存储过程出错: [{procedure_name}] {e}")
+                raise
+
+    def execute_function(self,
+                         function_name: str,
+                         params: Optional[dict] = None) -> Any:
+        """执行数据库函数"""
+        with self.session() as db:
+            try:
+                result = db.execute(text(f"SELECT {function_name}"), params
+                                    or {})
+                return result.scalar()
+            except Exception as e:
+                logger.error(f"执行数据库函数出错: [{function_name}] {e}")
+                raise

+ 5 - 7
SERVER/app/main.py

@@ -1,13 +1,11 @@
-import sys
-import os
+import os, sys
 
 # 添加项目根目录到Python路径
-sys.path.append(
-    os.path.dirname(os.path.dirname(os.path.dirname(
-        os.path.abspath(__file__)))))
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 
-from SERVER.app.config import config
-from SERVER.app.logger import logger
+from app.database import db
+from app.config import config
+from app.logger import logger
 
 
 def main():

+ 3 - 0
requirements.txt

@@ -4,3 +4,6 @@ pydantic-settings>=2.0.0
 watchdog>=3.0.0
 loguru>=0.7.0
 python-dotenv>=1.0.0
+SQLAlchemy>=2.0.0
+PyMySQL>=1.1.0
+mysqlclient>=2.2.0