Skip to content

Engine and Connection

任何 SQLAlchemy 应用程序的起点都是 Engine 对象,他还是与数据库进行链接通信的"大本营"。他本质上是一个连接池,其中包含了由 DBAPI 驱动提供的数据库连接。

engine arch

create_engine(): 创建 Engine 对象

通过create_engine()工厂函数来构建 Engine 对象:

Python
function sqlalchemy.create_engine(
    url:str|URL,                # 连接 URL
    execution_options:dict=None,    # 用于所有 Connection.execution_options()
    pool: Pool=None,                # Pool 即连接池,默认会自动创建
    pool_size:int=5,                # 连接池大小,0 表示无限制
    pool_timeout:number=30,         # 超时时间
    max_overflow=10,                # 连接池允许溢出的连接数
    echo=False,                     # 如果为 True 向 stdout 输出日志内容,主要用于调试
    ) -> Engine:
        pass

这其中的核心就是 url,他可以是字符串,URL 的典型形式如下:

Bash
dialect+driver://username:password@host:port/database

# dialect: SQLAlchemy dialect 的标识名称,例如 sqlite mysql postgresql oracle mssql
# driver: 对应 dialect 不同驱动的名称(全部使用小写字母),可以不指定此时将使用默认的驱动(同样需要安装)

他还可以是 sqlalchemy.URL 对象,通常用于 username/password 中存在特殊字符的情形,例如 @ / 这两个特殊字符需要将他转换为 %40 %2F 来传入,此时我们可以通过 URL 来构造:

Python
from sqlalchemy import URL

url_object = URL.create(
    drivername="postgresql+psycopg2",
    username="dbuser",
    password="kx@jj5/g",  # plain (unescaped) text
    host="pghost10",
    database="appdb",
)
# 等价于
# "postgresql+pg8000://dbuser:kx%40jj5%2Fg@pghost10/appdb"

示例

以下是官方支持的五种 dialects 的连接方式:

Python
# PostgreSQL
# default -> psycopg2
engine = create_engine("postgresql://scott:tiger@localhost/mydatabase")
# psycopg2
engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/mydatabase")
# pg8000
engine = create_engine("postgresql+pg8000://scott:tiger@localhost/mydatabase")

## MySQL
# default -> mysqlclient
engine = create_engine("mysql://scott:tiger@localhost/foo")
# mysqlclient (a maintained fork of MySQL-Python)
engine = create_engine("mysql+mysqldb://scott:tiger@localhost/foo")
# PyMySQL # 推荐使用这一个
engine = create_engine("mysql+pymysql://scott:tiger@localhost/foo")

## Oracle
# default -> cx_oracle
engine = create_engine("oracle://scott:tiger@127.0.0.1:1521/sidname")
engine = create_engine("oracle+cx_oracle://scott:tiger@tnsname")

## Microsoft SQL Server
# default -> pyodbc
engine = create_engine("mssql://scott:tiger@mydsn")
# pyodbc
engine = create_engine("mssql+pyodbc://scott:tiger@mydsn")
# pymssql
engine = create_engine("mssql+pymssql://scott:tiger@hostname:port/dbname")

## SQLite
# 使用 Python 内置模块
engine = create_engine("sqlite:///foo.db") # 相对路径
# Unix/Mac - 4 initial slashes in total
engine = create_engine("sqlite:////absolute/path/to/foo.db")
# Windows
engine = create_engine("sqlite:///C:\\path\\to\\foo.db")
# Windows alternative using raw string
engine = create_engine(r"sqlite:///C:\path\to\foo.db")
# :memory:
engine = create_engine("sqlite://")

Engine.Connection: 真正创建与数据库的连接

Engine 管理的是数据库的连接,他使用Connection对象表示,我们通过 Engine.connect() 方法来从 Engine 中获取连接,他支持 with 上下文管理:

Python
from sqlalchemy import text

with engine.connect() as connection:
    result = connection.execute(text("select username from users"))
    for row in result:
        print("username:", row.username)

engine.connect() 方法从 Engine 管理的连接池中获取一个表示与数据库连接的 Connection 对象,之后 connection.execute() 来执行 SQL 语句,当利用完 Connection 对象后需要将他们释放到连接池中。

Tips

with 结束后会调用 COnnection.close() 但是他并不会关闭连接而是释放到连接池。Connection 只有第一次执行 execute 方法时才会真正的与数据库创建连接。

使用原始 DBAPI Connection

如果我们想要使用 DBAPI 提供的 Connection,例如有些 SQLALchemy 并不支持的特性,或者仅仅就是不希望使用 text 来包裹 SQL 语句,可以使用:

Python
with engine.connect() as conn:
    # 直接使用驱动的 Connection 与数据库通信,此时不需要 text() 包裹 SQL 语句
    conn.exec_driver_sql("SET param='bar'")

更甚至可以直接使用 Connection.connection 来直接返回 DBAPI 的 Connection 对象:

Python
connection = engine.connect()
dbapi_conn = connection.connection

但是对他的操作就不再受到连接池的控制,此时可能会出现一些问题,因此 SQLALchemy 又引入了一个新的方式:

Python
dbapi_conn = engine.raw_connection()

# 他会直接释放到连接池而不是关闭连接
dbapi_conn.close()

Tips

在 SQLALchemy 中使用 DBAPI 的 Connection 会引入更多的复杂性,通常不推荐使用。

Connection.execute: 执行语句以及事务管理

使用 Connection.execute() 来执行语句是最为典型的行为。他的核心要点就是对事务的管理。 SQLALchemy 中将他们分为了几种形式:

  1. Commit As You Go: 即随心所欲 Commit
Python
with engine.connect() as connection:
    connection.execute(text("<some statement>"))
    connection.commit()  # commits "some statement"

    # new transaction starts
    connection.execute(text("<some other statement>"))
    connection.rollback()  # rolls back "some other statement"

    # new transaction starts
    connection.execute(text("<a third statement>"))
    connection.commit()  # commits "a third statement"

即每一次执行 Connection.commit() 会结束一个事务块,之后的 Connection.execute() 会开启一个新的事务块。也就是事务的开启是自动的(autobegin),而结束需要我们手动 commit()

Tips

由于 DBAPI 并没有定义 begin 方法,因此 autobegin 和 Commit As You Go 是 DBAPI 事务管理的主要方式。

Tips

注意事务通常是针对于修改数据库的语句而言的,对于查询不需要什么事务,直接 execute 即可,不需要 commit

  1. Begin Once: 即使用 begin 显式开始一次事务
Python
with engine.connect() as connection:
    with connection.begin(): # 返回 Transaction 对象
        connection.execute(some_table.insert(), {"x": 7, "y": "this is some data"})
        connection.execute(
            some_other_table.insert(), {"q": 8, "p": "this is some more data"}
        )

    # transaction is committed

同样支持 with 啥下文管理,而通过 Transaction 对象来显式定义事务块,他相对于 Commit As You Go 更加明确了事务的起点和结束点。他会在退出上下文管理后提交自身并且内部出现异常会自动回滚自身(Transaction.rollback())是我们开启事务的首选方式。

  1. Connect Begin Once From the Engine: 即直接从 Engine 中 Begin Once
Python
# 在 with 管理结束时会自动提交(commit),并释放 Connection 到连接池
with engine.begin() as connection:
    connection.execute(some_table.insert(), {"x": 7, "y": "this is some data"})
    connection.execute(
        some_other_table.insert(), {"q": 8, "p": "this is some more data"}
    )

这个本质上就是对 Begin Once 的封装,通常不推荐使用。

  1. Mixing Styles: 可以结合上面的三种形式来使用
Python
with engine.connect() as connection:
    with connection.begin():
        # run statements in a "begin once" block
        connection.execute(some_table.insert(), {"x": 7, "y": "this is some data"})

    # transaction is committed

    # run a new statement outside of a block. The connection
    # autobegins
    connection.execute(
        some_other_table.insert(), {"q": 8, "p": "this is some more data"}
    )

    # commit explicitly
    connection.commit()

    # can use a "begin once" block here
    with connection.begin():
        # run more statements
        connection.execute(...)

Tips

在使用混合模式时需要注意,Connection.begin() 前一定要 Connection.commit() 掉基于 antobegin 的语句否则会引发异常。

execution_options: 设置 execute 的行为

Connection.execute() 是 SQL 执行的核心方法,SQLALchemy 提供了几种方式来控制他的行为:

  • create_engine(execution_options): 全局设置,所有由 Engine.connect() 创建的 Connection 都继承该选项
  • Connection.execution_options() -> Connection: 针对于当前 Connection 对象
  • Connection.execute(execution_options): 仅仅针对于当前 execute 命令

Tips

一些很多精细的操作例如服务器游标、事务隔离等都是通过控制 execute 的行为实现的。因为 execute 是真正与数据库进行通信的地方。

isolation_level: 事务隔离级别

大多数 DBAPI 都支持配置事务类别,其中包括四个级别:

  • READ UNCOMMITTED
  • READ COMMITTED
  • REPEATABLE READ
  • SERIALIZABLE

要想设置事务类别最简单的就是通过 execution_options(isolation_level="READ COMMITTED") 来实现,也可以在引擎级别直接定义:

Python
# 对于特定 Connection 来定义事务类别
with engine.connect().execution_options(
    isolation_level="REPEATABLE READ"
) as connection:
    with connection.begin():
        connection.execute("<statement>")

# Engine 级别,他创建的 Connection 都默认继承
from sqlalchemy import create_engine

eng = create_engine(
    "postgresql://scott:tiger@localhost/test", isolation_level="REPEATABLE READ"
)

stream_results: 服务器游标

设置 stream_result=True 来基于流传输,的而不是完全缓存到内存。通常这也意味着使用服务端游标。还需要配合 max_row_buffer 来设置缓冲区。

当直接迭代 Result 使用该选项交付的对象时(流式获取),默认会首先缓存很小几行然后每次提取时返回的行数不断变大直到达到 max_row_buffer 的设置值(默认1000):

Python
with engine.connect() as conn:
    conn = conn.execution_options(stream_results=True, max_row_buffer=100)
    result = conn.execute(text("select * from table"))
    for row in result:
        print(f"{row}")

使用服务器游标时要避免全局启用,由于服务器游标的特殊性他会占用通信通道(因为需要保存状态),这就导致了其他操作无法进行:

Python
with engine.connect() as conn:
    conn = conn.execution_options(stream_results=True, max_row_buffer=100)
    result = conn.execute(text("select * from table"))
    for row in result:
        print(f"{row}")
        # 异常,因为 conn 使用服务端游标,不能执行其他语句
        conn.execute('update 操作')

Tips

通常配合 `Result.partitions() 来实现快速迭代读取游标中的值

Pool

Engine 最核心的功能就是提供连接池Pool来管理连接。

当 Engine 调用 connect() 或直接调用 execute() 方法时将向连接池发起请求。连接池会根据需要创建与数据库的连接。默认大小(pool_size)为 5,并且默认允许溢出(max_overflow)到 10 个。连接池本质上就是保存所有连接的地方,因此每个应用程序中应当只为每个数据库保留一个连接池,而不是为每个连接都创建一个连接池

Note

SQLite 的表现会不太一样,具体查看SQLite 连接池行为