Ver Fonte

从sqlserver数据库中生成定额库csv文件

klzhangweiya há 1 mês atrás
pai
commit
3e0f0a4fda

+ 3 - 1
SourceCode/DataMiddleware/requirements.txt

@@ -16,4 +16,6 @@ tabulate==0.9.0
 pyxnat~=1.6.3
 httplib2~=0.22.0
 bcrypt~=4.3.0
-Flask-Login~=0.6.3
+Flask-Login~=0.6.3
+SQLAlchemy~=2.0.23
+pyodbc==5.2.0

+ 23 - 0
SourceCode/DataMiddleware/tools/config.yml

@@ -22,3 +22,26 @@ ai:
 fastgpt:
   api_url: http://192.168.0.104:8020/api
   api_key: fastgpt-rSTjfs9BPv6KHUnqtz9vWHNSiPqJneOeBYDtMxqgsu6JgW2trJC7
+db:
+  # SQL Server 配置
+  # SQL Server 2008:'{SQL Server}' 或 '{SQL Server Native Client 10.0}'
+  # SQL Server 2016:'{ODBC Driver 13 for SQL Server}'
+  # SQL Server 2020:'{ODBC Driver 17 for SQL Server}'
+  # SQL Server 2022:'{ODBC Driver 18 for SQL Server}'
+  # 在Windows系统的ODBC数据源管理器中查看已安装的驱动程序,选择相应的驱动名称
+  # 在开始菜单的列表里面找到"Windows管理工具"打开, 然后点开里面的"ODBC数据源"。
+  # 打开以后,点开上方"驱动程序"。 就可以看到系统所安装的ODBC驱动程序
+  sqlserver_mian_2024:
+    driver: '{ODBC Driver 17 for SQL Server}'
+    server: shvber.com,50535
+    username: sa
+    password: Iwb2017
+    database: Iwb_RecoData2024
+    trusted_connection: false
+  sqlserver_mian_2020:
+    driver: '{ODBC Driver 17 for SQL Server}'
+    server: shvber.com,50535
+    username: sa
+    password: Iwb2017
+    database: Iwb_RecoData2020
+    trusted_connection: false

+ 48 - 1
SourceCode/DataMiddleware/tools/image_extract/extract.py

@@ -3,13 +3,15 @@ from time import sleep
 
 from tools.stores.mysql_store import MysqlStore
 from tools.models.standard_model import StandardModel
+from tools.stores.sqlserver_store import SqlServerStore
 
 
 class ImageExtractor:
     def __init__(self):
         self._logger = utils.get_logger()
         self._db_store = MysqlStore()
-        self._base_path = "./temp_files/images/output"
+        self._ms_db_store = SqlServerStore()
+        self._base_path = "./temp_files/csv/output"
         self._complete_path=""
         self._ai = utils.AiHelper()
         self._err_files=[]
@@ -139,6 +141,51 @@ class ImageExtractor:
         except Exception as e:
             self._logger.error(f"导出数据库数据失败: {e}")
 
+    def export_csv_from_sqldb(self):
+        try:
+            self._logger.info(f"开始导出数据库数据")
+            data = self._ms_db_store.group_by_book_no()
+            # 确保目录存在
+            dir_name = f"{self._base_path}"
+            os.makedirs(os.path.dirname(dir_name), exist_ok=True)
+            for k in data:
+                # 数据保存为 csv
+                csv_file = f"{dir_name}/{k}.csv"
+                v = self._ms_db_store.query_ding_e_ku(k)
+                print(v[0])
+                # 新增 CSV 写入逻辑
+                with open(csv_file, 'w', newline='', encoding='utf-8-sig') as f:
+                    writer = csv.writer(f)
+                    # 写入表头
+                    writer.writerow(['书号', '定额编号', '定额名称', '单位',
+                                     '工作内容', '基本定额', '基价', '工费', '料费',
+                                     '机费', '单重', '节号', '流水号', 'LOCK', '使用',
+                                     '条目序号', '导入时间'])
+                    # 写入数据行
+                    for record in v:
+                        writer.writerow([
+                            record["shuhao"],
+                            record["dingebianhao"],
+                            record["dingemingcheng"],
+                            record["danwei"],
+                            record["gongzuoneirong"],
+                            record["jibendinge"],
+                            record["jijia"],
+                            record["gongfei"],
+                            record["liaofei"],
+                            record["jifei"],
+                            record["danzhong"],
+                            record["jiehao"],
+                            record["liushuihao"],
+                            record["lock"],
+                            record["shiyong"],
+                            record["tiaoxuhao"],
+                            record["daorushijian"].strftime('%Y-%m-%d %H:%M:%S') if record["daorushijian"] else ''
+                        ])
+            self._logger.info(f"成功导出数据库数据")
+        except Exception as e:
+            self._logger.error(f"导出数据库数据失败: {e}")
+
     def import_form_json(self,path:str):
         try:
             if not os.path.exists(path):

+ 1 - 1
SourceCode/DataMiddleware/tools/img_spilt/run.py

@@ -13,4 +13,4 @@ if __name__ == '__main__':
     # ImageSpilt(21).split("06")
     # ImageExtractor().extract("06")
     # ImageExtractor().import_form_json("./temp_files/QY_2024.json")
-    ImageExtractor().export()
+    ImageExtractor().export_csv_from_sqldb()

+ 6 - 6
SourceCode/DataMiddleware/tools/main.py

@@ -2,12 +2,12 @@ from pdf_split.processor import PDFProcessor
 import tools.utils as utils
 from tools.pdf_split.mysql_store import MysqlStore
 
-@app.route('/standards')
-def show_standards():
-    store = MysqlStore()
-    standards = store.get_all_standards()
-    records = store.get_all_pdf_records()
-    return render_template('standards.html', standards=standards, records=records)
+# @app.route('/standards')
+# def show_standards():
+#     store = MysqlStore()
+#     standards = store.get_all_standards()
+#     records = store.get_all_pdf_records()
+#     return render_template('standards.html', standards=standards, records=records)
 def main():
     log = utils.get_logger()
     #"""PDF拆分工具的主入口函数"""

+ 27 - 0
SourceCode/DataMiddleware/tools/models/dinge_model.py

@@ -0,0 +1,27 @@
+from sqlalchemy import Column, String, Float, Integer, Text, Boolean, DateTime
+from sqlalchemy.ext.declarative import declarative_base
+
+Base = declarative_base()
+
+
+class DingEKu(Base):
+    __tablename__ = '定额库'
+
+    shuhao = Column('书号', String(50), primary_key=True)
+    dingebianhao = Column('定额编号', String(50), primary_key=True)
+    dingemingcheng = Column('定额名称', String(255))
+    danwei = Column('单位', String(20))
+    xiaohao = Column('消耗', Text)
+    gongzuoneirong = Column('工作内容', Text)
+    jibendinge = Column('基本定额', String(255))
+    jijia = Column('基价', Float)
+    gongfei = Column('工费', Float)
+    liaofei = Column('料费', Float)
+    jifei = Column('机费', Float)
+    danzhong = Column('单重', Float)
+    jiehao = Column('节号', String(50))
+    liushuihao = Column('流水号', Integer)
+    lock = Column('LOCK', Boolean)
+    shiyong = Column('使用', Boolean)
+    tiaoxuhao = Column('条目序号', Integer)
+    daorushijian = Column('导入时间', DateTime)

+ 83 - 0
SourceCode/DataMiddleware/tools/stores/sqlserver_store.py

@@ -0,0 +1,83 @@
+from typing import List, Any, Sequence
+
+from tools.utils.db_helper import sqlserver_helper
+import tools.utils as utils
+from sqlalchemy.orm import Session
+from tools.models.dinge_model import DingEKu
+from sqlalchemy import select, Row
+from collections import defaultdict
+class SqlServerStore:
+    def __init__(self):
+        self._db_helper = sqlserver_helper.SQLServerHelper()
+        self._logger = utils.get_logger()
+
+    def group_by_book_no(self) -> List[str]:
+        try:
+            with Session(self._db_helper.get_engine()) as session:
+                stmt = select(DingEKu.shuhao).group_by(DingEKu.shuhao).order_by(DingEKu.shuhao.asc())
+                result = session.execute(stmt)
+                return [row[0] for row in result]
+        except Exception as e:
+            self._logger.error(f"查询定额库失败: {str(e)}")
+            raise
+
+
+
+
+    def query_ding_e_ku(self, shuhao: str = None) -> List[dict]:
+        """
+        查询定额库表
+        参数:
+        - shuhao: 书号
+        """
+        try:
+            with Session(self._db_helper.get_engine()) as session:
+                # 构建查询条件
+                conditions = []
+                if shuhao:
+                   conditions.append(DingEKu.shuhao == shuhao)
+
+                # 分页查询
+                stmt = select(DingEKu.shuhao,
+                    DingEKu.dingebianhao,
+                    DingEKu.dingemingcheng,
+                    DingEKu.danwei,
+                    DingEKu.gongzuoneirong,
+                    DingEKu.jibendinge,
+                    DingEKu.jijia,
+                    DingEKu.gongfei,
+                    DingEKu.liaofei,
+                    DingEKu.jifei,
+                    DingEKu.danzhong,
+                    DingEKu.jiehao,
+                    DingEKu.liushuihao,
+                    DingEKu.lock,
+                    DingEKu.shiyong,
+                    DingEKu.tiaoxuhao,
+                    DingEKu.daorushijian).where(*conditions).order_by(DingEKu.shuhao.asc())
+
+                result = session.execute(stmt)
+                # 动态生成字段映射(排除消耗字段)
+                return [{
+                    'shuhao': row.shuhao,
+                    'dingebianhao': row.dingebianhao,
+                    'dingemingcheng': row.dingemingcheng,
+                    'danwei': row.danwei,
+                    'gongzuoneirong': row.gongzuoneirong,
+                    'jibendinge': row.jibendinge,
+                    'jijia': row.jijia,
+                    'gongfei': row.gongfei,
+                    'liaofei': row.liaofei,
+                    'jifei': row.jifei,
+                    'danzhong': row.danzhong,
+                    'jiehao': row.jiehao,
+                    'liushuihao': row.liushuihao,
+                    'lock': row.lock,
+                    'shiyong': row.shiyong,
+                    'tiaoxuhao': row.tiaoxuhao,
+                    'daorushijian': row.daorushijian
+                } for row in result]
+
+        except Exception as e:
+            self._logger.error(f"查询定额库失败: {str(e)}")
+            raise

+ 12 - 0
SourceCode/DataMiddleware/tools/utils/config_helper.py

@@ -73,6 +73,18 @@ class ConfigHelper:
         except ValueError:
             return default
 
+    def get_object(self, key: str, default: dict = None):
+        val = self.get(key)
+        if not val:
+            return default
+        if isinstance(val, dict):
+            return val
+        try:
+            return yaml.safe_load(val)
+        except yaml.YAMLError as e:
+            print(f"Error loading YAML object: {e}")
+            return default
+
     def get_all(self):
         if self._config is None:
             self.load_config(self._path)

+ 18 - 0
SourceCode/DataMiddleware/tools/utils/db_helper/__init__.py

@@ -0,0 +1,18 @@
+from sqlalchemy.orm import Session
+
+from .mysql_helper import MySQLHelper
+from .sqlserver_helper import SQLServerHelper
+
+
+def create_sqlServer_session(database:str=None)->Session:
+    return SQLServerHelper().get_session_maker(database)()
+
+def create_mysql_session(database:str=None)->Session:
+    return MySQLHelper().get_session_maker(database)()
+
+__all__ = [
+    'MySQLHelper',
+    'SQLServerHelper',
+    'create_sqlServer_session',
+    'create_mysql_session'
+]

+ 199 - 0
SourceCode/DataMiddleware/tools/utils/db_helper/base.py

@@ -0,0 +1,199 @@
+from typing import Dict, Optional, Any, List, Tuple, Generator
+from contextlib import contextmanager
+import threading
+from tools.utils.config_helper import ConfigHelper
+from sqlalchemy.orm import sessionmaker, declarative_base
+
+# 创建基础模型类
+Base = declarative_base()
+config = ConfigHelper()
+
+class DBHelper:
+    _instance = None
+    _lock = threading.Lock()
+    _main_database_name = ""
+    def __new__(cls, *args, **kwargs):
+        with cls._lock:
+            if cls._instance is None:
+                cls._instance = super(DBHelper, cls).__new__(cls)
+                cls._instance._initialized = False
+            return cls._instance
+    
+    def __init__(self):
+        if self._initialized:
+            return
+            
+        self._config_cache: Dict[str, Dict[str, str]] = {}
+        self._engines: Dict[str, Any] = {}
+        self._sessions: Dict[str, sessionmaker] = {}
+        self._default_config = {
+            'pool_size': 5,
+            'max_overflow': 10,
+            'pool_timeout': 30,
+            'pool_recycle': 3600
+        }
+        self._initialized = True
+        
+    def set_default_config(self, config: Dict[str, str]) -> None:
+        """设置默认连接配置"""
+        self._default_config.update(config)
+    
+    def get_config_for_database(self, database: str=_main_database_name) -> Dict[str, str]:
+        """获取数据库的连接配置
+        
+        按照以下顺序查找配置:
+        1. 配置缓存
+        2. 配置文件中特定数据库的配置
+        3. 配置文件中的main_config配置
+        4. 默认配置
+        
+        Args:
+            database: 数据库名称
+            
+        Returns:
+            数据库连接配置
+            
+        Raises:
+            Exception: 当找不到特定数据库配置且main_config配置也不存在时
+        """
+        if database in self._config_cache:
+            return self._config_cache[database]
+
+        db_config = config.get_object("db")
+        if database in db_config:
+            self._config_cache[database] = db_config[database]
+            return db_config[database]
+        main_config = db_config[self._main_database_name]
+        if not main_config:
+            raise Exception(f"未找到数据库 {database} 的配置,且main_config配置不存在")
+        # main_config['database'] = database
+        # main_config['db'] = database
+        self._config_cache[database] = main_config
+        return main_config
+
+    
+    def execute_query(self, database: str, query: str, params: Optional[Any] = None) -> List[Tuple]:
+        """执行查询并返回结果
+        
+        Args:
+            database: 数据库名称
+            query: SQL查询语句
+            params: 查询参数
+            
+        Returns:
+            查询结果列表
+        """
+        raise NotImplementedError("子类必须实现execute_query方法")
+    
+    def execute_non_query(self, database: str, query: str, params: Optional[Any] = None) -> int:
+        """执行非查询操作(如INSERT, UPDATE, DELETE)
+        
+        Args:
+            database: 数据库名称
+            query: SQL语句
+            params: 查询参数
+            
+        Returns:
+            受影响的行数
+        """
+        raise NotImplementedError("子类必须实现execute_non_query方法")
+    
+    def execute_scalar(self, database: str, query: str, params: Optional[Any] = None) -> Any:
+        """执行查询并返回第一行第一列的值
+        
+        Args:
+            database: 数据库名称
+            query: SQL查询语句
+            params: 查询参数
+            
+        Returns:
+            查询结果的第一行第一列的值
+        """
+        raise NotImplementedError("子类必须实现execute_scalar方法")
+    
+    def execute_procedure(self, database: str, procedure_name: str, params: Optional[Dict[str, Any]] = None) -> List[Tuple]:
+        """执行存储过程
+        
+        Args:
+            database: 数据库名称
+            procedure_name: 存储过程名称
+            params: 存储过程参数
+            
+        Returns:
+            存储过程执行结果
+        """
+        raise NotImplementedError("子类必须实现execute_procedure方法")
+    
+    def get_engine(self, database: str, config: Optional[Dict[str, Any]] = None) -> Any:
+        """获取或创建数据库引擎
+        
+        Args:
+            database: 数据库名称
+            config: 数据库配置信息
+            
+        Returns:
+            SQLAlchemy引擎实例
+        """
+        raise NotImplementedError("子类必须实现get_engine方法")
+    
+    def get_session_maker(self, database: str=None, config: Optional[Dict[str, Any]] = None) -> sessionmaker:
+        """获取或创建会话工厂
+        
+        Args:
+            database: 数据库名称
+            config: 数据库配置信息
+            
+        Returns:
+            会话工厂实例
+        """
+        database = database or self._main_database_name
+        if database in self._sessions:
+            return self._sessions[database]
+        
+        engine = self.get_engine(database, config)
+        session = sessionmaker(bind=engine)
+        self._sessions[database] = session
+        return session
+    
+    @contextmanager
+    def session_scope(self, database: str, config: Optional[Dict[str, Any]] = None) -> Generator:
+        """创建会话的上下文管理器
+        
+        Args:
+            database: 数据库名称
+            config: 数据库配置信息
+            
+        Yields:
+            数据库会话
+        """
+        session = self.get_session_maker(database, config)
+        session = session()
+        try:
+            yield session
+            session.commit()
+        except:
+            session.rollback()
+            raise
+        finally:
+            session.close()
+    
+    def dispose_engine(self, database: str) -> None:
+        """释放指定数据库的引擎资源
+        
+        Args:
+            database: 数据库名称
+        """
+        if database in self._engines:
+            self._engines[database].dispose()
+            del self._engines[database]
+        if database in self._sessions:
+            del self._sessions[database]
+    
+    def dispose_all(self) -> None:
+        """释放所有数据库资源"""
+        for database in list(self._engines.keys()):
+            self.dispose_engine(database)
+    
+    def __del__(self):
+        """析构函数,确保所有资源被释放"""
+        self.dispose_all()

+ 182 - 0
SourceCode/DataMiddleware/tools/utils/db_helper/mysql_helper.py

@@ -0,0 +1,182 @@
+import pymysql
+from typing import Dict, Optional, Any, List, Tuple
+from contextlib import contextmanager
+from sqlalchemy import create_engine
+from sqlalchemy.engine import Engine
+from .base import DBHelper
+from pymysql import Error
+
+class MySQLHelper(DBHelper):
+    def __init__(self):
+        super().__init__()
+        self._connections: Dict[str, pymysql.Connection] = {}
+        self._default_config = {
+            'db': '',
+            'host': 'localhost',
+            'port': 3306,
+            'user': '',
+            'password': '',
+            'charset': 'utf8mb4',
+            'pool_size': 5,
+            'max_overflow': 10,
+            'pool_timeout': 30,
+            'pool_recycle': 3600
+        }
+        self._main_database_name = "mysql_main"
+
+    def get_engine(self, database: str, config: Optional[Dict[str, Any]] = None) -> Engine:
+        """获取或创建数据库引擎
+        
+        Args:
+            database: 数据库名称
+            config: 可选的连接配置
+            
+        Returns:
+            SQLAlchemy引擎实例
+        """
+        if database in self._engines:
+            return self._engines[database]
+        
+        conn_config = self._default_config.copy()
+        db_config = self.get_config_for_database(database)
+        conn_config.update(db_config)
+        if config:
+            conn_config.update(config)
+        
+        if 'db' not in conn_config or not conn_config['db']:
+            conn_config['db'] = database
+        
+        url = f"mysql+pymysql://{conn_config['user']}:{conn_config['password']}@{conn_config['host']}:{conn_config['port']}/{conn_config['db']}"
+        self._engines[database] = create_engine(
+            url,
+            pool_size=self._default_config['pool_size'],
+            max_overflow=self._default_config['max_overflow'],
+            pool_timeout=self._default_config['pool_timeout'],
+            pool_recycle=self._default_config['pool_recycle']
+        )
+        return self._engines[database]
+    
+    def connect(self, database: str, config: Optional[Dict[str, str]] = None) -> pymysql.Connection:
+        """连接到指定数据库
+        
+        Args:
+            database: 数据库名称
+            config: 可选的连接配置,如果不提供则使用默认配置
+            
+        Returns:
+            数据库连接对象
+        """
+        if database in self._connections:
+            try:
+                self._connections[database].ping(reconnect=True)
+                return self._connections[database]
+            except Error:
+                try:
+                    self._connections[database].close()
+                except Error:
+                    pass
+                del self._connections[database]
+        
+        conn_config = self._default_config.copy()
+        db_config = self.get_config_for_database(database)
+        conn_config.update(db_config)
+        if config:
+            conn_config.update(config)
+        
+        if 'db' not in conn_config or not conn_config['db']:
+            conn_config['db'] = database
+        
+        connection = pymysql.connect(**conn_config)
+        self._connections[database] = connection
+        return connection
+    
+    @contextmanager
+    def connection(self, database: str):
+        """获取数据库连接的上下文管理器
+        
+        Args:
+            database: 数据库名称
+            
+        Yields:
+            数据库连接对象
+        """
+        connection = self.connect(database)
+        try:
+            yield connection
+        finally:
+            pass
+    
+    def execute_query(self, database: str, query: str, params: Optional[Tuple] = None) -> List[Tuple]:
+        connection = self.connect(database)
+        cursor = connection.cursor()
+        
+        try:
+            if params:
+                cursor.execute(query, params)
+            else:
+                cursor.execute(query)
+            
+            results = cursor.fetchall()
+            return [tuple(row) for row in results]
+        finally:
+            cursor.close()
+    
+    def execute_non_query(self, database: str, query: str, params: Optional[Tuple] = None) -> int:
+        connection = self.connect(database)
+        cursor = connection.cursor()
+        
+        try:
+            if params:
+                cursor.execute(query, params)
+            else:
+                cursor.execute(query)
+            
+            connection.commit()
+            return cursor.rowcount
+        finally:
+            cursor.close()
+    
+    def execute_scalar(self, database: str, query: str, params: Optional[Tuple] = None) -> Any:
+        connection = self.connect(database)
+        cursor = connection.cursor()
+        
+        try:
+            if params:
+                cursor.execute(query, params)
+            else:
+                cursor.execute(query)
+            
+            row = cursor.fetchone()
+            return row[0] if row else None
+        finally:
+            cursor.close()
+    
+    def execute_procedure(self, database: str, procedure_name: str, params: Optional[Dict[str, Any]] = None) -> List[Tuple]:
+        connection = self.connect(database)
+        cursor = connection.cursor()
+        
+        try:
+            if params:
+                param_placeholders = ", ".join([f"%s" for _ in params.keys()])
+                query = f"CALL {procedure_name}({param_placeholders})"
+                cursor.execute(query, list(params.values()))
+            else:
+                cursor.execute(f"CALL {procedure_name}()")
+            
+            results = cursor.fetchall()
+            return [tuple(row) for row in results]
+        finally:
+            cursor.close()
+    
+    def dispose_all(self) -> None:
+        """释放所有数据库连接"""
+        for conn in self._connections.values():
+            try:
+                conn.close()
+            except Error:
+                pass
+        self._connections.clear()
+    
+    def __del__(self):
+        """析构函数,确保所有连接被关闭"""
+        self.dispose_all()

+ 122 - 0
SourceCode/DataMiddleware/tools/utils/db_helper/sqlserver_helper.py

@@ -0,0 +1,122 @@
+from typing import Dict, Optional, Any, List, Tuple
+
+from sqlalchemy import create_engine, text
+from sqlalchemy.engine import Engine
+from sqlalchemy.orm import sessionmaker
+
+from .base import DBHelper
+
+
+class SQLServerHelper(DBHelper):
+    def __init__(self):
+        super().__init__()
+        self._engines: Dict[str, Engine] = {}
+        self._session_makers: Dict[str, sessionmaker] = {}
+        self._default_config = {
+            'driver': 'ODBC Driver 17 for SQL Server',
+            'server': 'localhost',
+            'username': '',
+            'password': '',
+            'trusted_connection': 'yes'
+        }
+        self._pool_config = {
+            'pool_size': 5,  # 减少初始连接数以降低资源占用
+            'max_overflow': 10,  # 适当减少最大溢出连接数
+            'pool_timeout': 60,  # 增加池等待超时时间
+            'pool_recycle': 1800,  # 每30分钟回收连接
+            'pool_pre_ping': True,  # 启用连接健康检查
+            'connect_args': {
+                'timeout': 60,  # 连接超时时间
+                'driver_connects_timeout': 60,  # 驱动连接超时
+                'connect_timeout': 60,  # ODBC连接超时
+                'connect_retries': 3,  # 连接重试次数
+                'connect_retry_interval': 10,  # 重试间隔增加到10秒
+                'connection_timeout': 60  # 额外的连接超时设置
+            }
+        }
+
+        self._main_database_name = "sqlserver_mian_2024"
+    def _build_connection_string(self, database: str, config: Optional[Dict[str, str]] = None) -> str:
+        """构建连接字符串"""
+        conn_config = self._default_config.copy()
+        db_config = self.get_config_for_database(database)
+        conn_config.update(db_config)
+        if config:
+            conn_config.update(config)
+
+        # 构建认证字符串
+        auth_params = []
+        if conn_config.get('trusted_connection', True):
+            auth_params.append("Trusted_Connection=yes")
+        else:
+            auth_params.extend([
+                f"UID={conn_config['username']}",
+                f"PWD={conn_config['password']}"
+            ])
+
+        # 构建ODBC连接字符串
+        conn_parts = [
+            f"DRIVER={conn_config['driver']}",
+            f"SERVER={conn_config['server']}",
+            f"DATABASE={conn_config['database'] if 'database' in conn_config else database}",
+            "CHARSET=UTF-8"
+        ]
+        conn_parts.extend(auth_params)
+        
+        # 构建SQLAlchemy连接URL
+        conn_str = ";".join(conn_parts)
+        conn_url = f"mssql+pyodbc:///?odbc_connect={conn_str}"
+
+        return conn_url
+
+    def get_engine(self, database: str="", config: Optional[Dict[str, str]] = None) -> Engine:
+        database = database or self._main_database_name
+        """获取或创建数据库引擎"""
+        if database not in self._engines:
+            conn_str = self._build_connection_string(database, config)
+            engine = create_engine(conn_str, **self._pool_config)
+            # 预热连接池
+            with engine.connect() as conn:
+                conn.execute(text("SELECT 1"))
+            self._engines[database] = engine
+        return self._engines[database]
+
+    def execute_query(self, database: str, query: str, params: Optional[Dict[str, Any]] = None) -> List[Tuple]:
+        """执行查询并返回结果"""
+        with self.session_scope(database) as session:
+            result = session.execute(text(query), params or {})
+            return [tuple(row) for row in result.fetchall()]
+
+    def execute_non_query(self, database: str, query: str, params: Optional[Dict[str, Any]] = None) -> int:
+        """执行非查询操作(如INSERT, UPDATE, DELETE)"""
+        with self.session_scope(database) as session:
+            result = session.execute(text(query), params or {})
+            return result.rowcount
+
+    def execute_scalar(self, database: str, query: str, params: Optional[Dict[str, Any]] = None) -> Any:
+        """执行查询并返回第一行第一列的值"""
+        with self.session_scope(database) as session:
+            result = session.execute(text(query), params or {})
+            row = result.fetchone()
+            return row[0] if row else None
+
+    def execute_procedure(self, database: str, procedure_name: str, params: Optional[Dict[str, Any]] = None) -> List[Tuple]:
+        """执行存储过程"""
+        params = params or {}
+        param_str = ", ".join([f"@{key}=:{key}" for key in params.keys()])
+        query = f"EXEC {procedure_name} {param_str}"
+        
+        with self.session_scope(database) as session:
+            result = session.execute(text(query), params)
+            return [tuple(row) for row in result.fetchall()]
+
+    def dispose_all(self) -> None:
+        """释放所有数据库引擎资源"""
+        for engine in self._engines.values():
+            engine.dispose()
+        self._engines.clear()
+        self._session_makers.clear()
+
+    def __del__(self):
+        """析构函数,确保所有引擎资源被释放"""
+        self.dispose_all()