sqlserver_helper.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. from typing import Dict, Optional, Any, List, Tuple
  2. from sqlalchemy import create_engine, text
  3. from sqlalchemy.engine import Engine
  4. from sqlalchemy.orm import sessionmaker
  5. from .base import DBHelper
  6. class SQLServerHelper(DBHelper):
  7. def __init__(self):
  8. super().__init__()
  9. self._engines: Dict[str, Engine] = {}
  10. self._session_makers: Dict[str, sessionmaker] = {}
  11. self._default_config = {
  12. 'driver': 'ODBC Driver 17 for SQL Server',
  13. 'server': 'localhost',
  14. 'username': '',
  15. 'password': '',
  16. 'trusted_connection': 'yes'
  17. }
  18. self._pool_config = {
  19. 'pool_size': 5, # 减少初始连接数以降低资源占用
  20. 'max_overflow': 10, # 适当减少最大溢出连接数
  21. 'pool_timeout': 60, # 增加池等待超时时间
  22. 'pool_recycle': 1800, # 每30分钟回收连接
  23. 'pool_pre_ping': True, # 启用连接健康检查
  24. 'connect_args': {
  25. 'timeout': 60, # 连接超时时间
  26. 'driver_connects_timeout': 60, # 驱动连接超时
  27. 'connect_timeout': 60, # ODBC连接超时
  28. 'connect_retries': 3, # 连接重试次数
  29. 'connect_retry_interval': 10, # 重试间隔增加到10秒
  30. 'connection_timeout': 60 # 额外的连接超时设置
  31. }
  32. }
  33. self._main_database_name = "sqlserver_mian_2024"
  34. def _build_connection_string(self, database: str, config: Optional[Dict[str, str]] = None) -> str:
  35. """构建连接字符串"""
  36. conn_config = self._default_config.copy()
  37. db_config = self.get_config_for_database(database)
  38. conn_config.update(db_config)
  39. if config:
  40. conn_config.update(config)
  41. # 构建认证字符串
  42. auth_params = []
  43. if conn_config.get('trusted_connection', True):
  44. auth_params.append("Trusted_Connection=yes")
  45. else:
  46. auth_params.extend([
  47. f"UID={conn_config['username']}",
  48. f"PWD={conn_config['password']}"
  49. ])
  50. # 构建ODBC连接字符串
  51. conn_parts = [
  52. f"DRIVER={conn_config['driver']}",
  53. f"SERVER={conn_config['server']}",
  54. f"DATABASE={conn_config['database'] if 'database' in conn_config else database}",
  55. "CHARSET=UTF-8"
  56. ]
  57. conn_parts.extend(auth_params)
  58. # 构建SQLAlchemy连接URL
  59. conn_str = ";".join(conn_parts)
  60. conn_url = f"mssql+pyodbc:///?odbc_connect={conn_str}"
  61. return conn_url
  62. def get_engine(self, database: str="", config: Optional[Dict[str, str]] = None) -> Engine:
  63. database = database or self._main_database_name
  64. """获取或创建数据库引擎"""
  65. if database not in self._engines:
  66. conn_str = self._build_connection_string(database, config)
  67. engine = create_engine(conn_str, **self._pool_config)
  68. # 预热连接池
  69. with engine.connect() as conn:
  70. conn.execute(text("SELECT 1"))
  71. self._engines[database] = engine
  72. return self._engines[database]
  73. def execute_query(self, database: str, query: str, params: Optional[Dict[str, Any]] = None) -> List[Tuple]:
  74. """执行查询并返回结果"""
  75. with self.session_scope(database) as session:
  76. result = session.execute(text(query), params or {})
  77. return [tuple(row) for row in result.fetchall()]
  78. def execute_non_query(self, database: str, query: str, params: Optional[Dict[str, Any]] = None) -> int:
  79. """执行非查询操作(如INSERT, UPDATE, DELETE)"""
  80. with self.session_scope(database) as session:
  81. result = session.execute(text(query), params or {})
  82. return result.rowcount
  83. def execute_scalar(self, database: str, query: str, params: Optional[Dict[str, Any]] = None) -> Any:
  84. """执行查询并返回第一行第一列的值"""
  85. with self.session_scope(database) as session:
  86. result = session.execute(text(query), params or {})
  87. row = result.fetchone()
  88. return row[0] if row else None
  89. def execute_procedure(self, database: str, procedure_name: str, params: Optional[Dict[str, Any]] = None) -> List[Tuple]:
  90. """执行存储过程"""
  91. params = params or {}
  92. param_str = ", ".join([f"@{key}=:{key}" for key in params.keys()])
  93. query = f"EXEC {procedure_name} {param_str}"
  94. with self.session_scope(database) as session:
  95. result = session.execute(text(query), params)
  96. return [tuple(row) for row in result.fetchall()]
  97. def dispose_all(self) -> None:
  98. """释放所有数据库引擎资源"""
  99. for engine in self._engines.values():
  100. engine.dispose()
  101. self._engines.clear()
  102. self._session_makers.clear()
  103. def __del__(self):
  104. """析构函数,确保所有引擎资源被释放"""
  105. self.dispose_all()