Engine and Connection
任何 SQLAlchemy 应用程序的起点都是 Engine 对象,他还是与数据库进行链接通信的"大本营"。他本质上是一个连接池,其中包含了由 DBAPI 驱动提供的数据库连接。
create_engine(): 创建 Engine 对象
通过create_engine()工厂函数来构建 Engine 对象:
function sqlalchemy.create_engine(
url:str|URL, # 连接 URL
execution_options:dict=None, # 用于所有 Connection.execution_options()
pool: Pool=None, # Pool 即连接池,默认会自动创建
pool_size:int=5, # 连接池大小,0 表示无限制
pool_timeout:number=30, # 超时时间
max_overflow=10, # 连接池允许溢出的连接数
echo=False, # 如果为 True 向 stdout 输出日志内容,主要用于调试
) -> Engine:
pass
这其中的核心就是 url,他可以是字符串,URL 的典型形式如下:
dialect+driver://username:password@host:port/database
# dialect: SQLAlchemy dialect 的标识名称,例如 sqlite mysql postgresql oracle mssql
# driver: 对应 dialect 不同驱动的名称(全部使用小写字母),可以不指定此时将使用默认的驱动(同样需要安装)
他还可以是 sqlalchemy.URL
对象,通常用于 username/password 中存在特殊字符的情形,例如 @ /
这两个特殊字符需要将他转换为 %40 %2F
来传入,此时我们可以通过 URL 来构造:
from sqlalchemy import URL
url_object = URL.create(
drivername="postgresql+psycopg2",
username="dbuser",
password="kx@jj5/g", # plain (unescaped) text
host="pghost10",
database="appdb",
)
# 等价于
# "postgresql+pg8000://dbuser:kx%40jj5%2Fg@pghost10/appdb"
示例
以下是官方支持的五种 dialects 的连接方式:
# PostgreSQL
# default -> psycopg2
engine = create_engine("postgresql://scott:tiger@localhost/mydatabase")
# psycopg2
engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/mydatabase")
# pg8000
engine = create_engine("postgresql+pg8000://scott:tiger@localhost/mydatabase")
## MySQL
# default -> mysqlclient
engine = create_engine("mysql://scott:tiger@localhost/foo")
# mysqlclient (a maintained fork of MySQL-Python)
engine = create_engine("mysql+mysqldb://scott:tiger@localhost/foo")
# PyMySQL # 推荐使用这一个
engine = create_engine("mysql+pymysql://scott:tiger@localhost/foo")
## Oracle
# default -> cx_oracle
engine = create_engine("oracle://scott:tiger@127.0.0.1:1521/sidname")
engine = create_engine("oracle+cx_oracle://scott:tiger@tnsname")
## Microsoft SQL Server
# default -> pyodbc
engine = create_engine("mssql://scott:tiger@mydsn")
# pyodbc
engine = create_engine("mssql+pyodbc://scott:tiger@mydsn")
# pymssql
engine = create_engine("mssql+pymssql://scott:tiger@hostname:port/dbname")
## SQLite
# 使用 Python 内置模块
engine = create_engine("sqlite:///foo.db") # 相对路径
# Unix/Mac - 4 initial slashes in total
engine = create_engine("sqlite:////absolute/path/to/foo.db")
# Windows
engine = create_engine("sqlite:///C:\\path\\to\\foo.db")
# Windows alternative using raw string
engine = create_engine(r"sqlite:///C:\path\to\foo.db")
# :memory:
engine = create_engine("sqlite://")
Engine.Connection: 真正创建与数据库的连接
Engine 管理的是数据库的连接,他使用Connection对象表示,我们通过 Engine.connect()
方法来从 Engine 中获取连接,他支持 with 上下文管理:
from sqlalchemy import text
with engine.connect() as connection:
result = connection.execute(text("select username from users"))
for row in result:
print("username:", row.username)
engine.connect()
方法从 Engine 管理的连接池中获取一个表示与数据库连接的 Connection 对象,之后 connection.execute()
来执行 SQL 语句,当利用完 Connection 对象后需要将他们释放到连接池中。
Tips
with 结束后会调用 COnnection.close()
但是他并不会关闭连接而是释放到连接池。Connection 只有第一次执行 execute 方法时才会真正的与数据库创建连接。
使用原始 DBAPI Connection
如果我们想要使用 DBAPI 提供的 Connection,例如有些 SQLALchemy 并不支持的特性,或者仅仅就是不希望使用 text 来包裹 SQL 语句,可以使用:
with engine.connect() as conn:
# 直接使用驱动的 Connection 与数据库通信,此时不需要 text() 包裹 SQL 语句
conn.exec_driver_sql("SET param='bar'")
更甚至可以直接使用 Connection.connection
来直接返回 DBAPI 的 Connection 对象:
但是对他的操作就不再受到连接池的控制,此时可能会出现一些问题,因此 SQLALchemy 又引入了一个新的方式:
Tips
在 SQLALchemy 中使用 DBAPI 的 Connection 会引入更多的复杂性,通常不推荐使用。
Connection.execute: 执行语句以及事务管理
使用 Connection.execute()
来执行语句是最为典型的行为。他的核心要点就是对事务的管理。 SQLALchemy 中将他们分为了几种形式:
Commit As You Go
: 即随心所欲 Commit
with engine.connect() as connection:
connection.execute(text("<some statement>"))
connection.commit() # commits "some statement"
# new transaction starts
connection.execute(text("<some other statement>"))
connection.rollback() # rolls back "some other statement"
# new transaction starts
connection.execute(text("<a third statement>"))
connection.commit() # commits "a third statement"
即每一次执行 Connection.commit()
会结束一个事务块,之后的 Connection.execute()
会开启一个新的事务块。也就是事务的开启是自动的(autobegin),而结束需要我们手动 commit()
。
Tips
由于 DBAPI 并没有定义 begin 方法,因此 autobegin 和 Commit As You Go
是 DBAPI 事务管理的主要方式。
Tips
注意事务通常是针对于修改数据库的语句而言的,对于查询不需要什么事务,直接 execute 即可,不需要 commit
Begin Once
: 即使用 begin 显式开始一次事务
with engine.connect() as connection:
with connection.begin(): # 返回 Transaction 对象
connection.execute(some_table.insert(), {"x": 7, "y": "this is some data"})
connection.execute(
some_other_table.insert(), {"q": 8, "p": "this is some more data"}
)
# transaction is committed
同样支持 with 啥下文管理,而通过 Transaction 对象来显式定义事务块,他相对于 Commit As You Go
更加明确了事务的起点和结束点。他会在退出上下文管理后提交自身并且内部出现异常会自动回滚自身(Transaction.rollback()
)是我们开启事务的首选方式。
Connect Begin Once From the Engine
: 即直接从 Engine 中Begin Once
# 在 with 管理结束时会自动提交(commit),并释放 Connection 到连接池
with engine.begin() as connection:
connection.execute(some_table.insert(), {"x": 7, "y": "this is some data"})
connection.execute(
some_other_table.insert(), {"q": 8, "p": "this is some more data"}
)
这个本质上就是对 Begin Once
的封装,通常不推荐使用。
Mixing Styles
: 可以结合上面的三种形式来使用
with engine.connect() as connection:
with connection.begin():
# run statements in a "begin once" block
connection.execute(some_table.insert(), {"x": 7, "y": "this is some data"})
# transaction is committed
# run a new statement outside of a block. The connection
# autobegins
connection.execute(
some_other_table.insert(), {"q": 8, "p": "this is some more data"}
)
# commit explicitly
connection.commit()
# can use a "begin once" block here
with connection.begin():
# run more statements
connection.execute(...)
Tips
在使用混合模式时需要注意,Connection.begin()
前一定要 Connection.commit()
掉基于 antobegin 的语句否则会引发异常。
execution_options: 设置 execute 的行为
Connection.execute()
是 SQL 执行的核心方法,SQLALchemy 提供了几种方式来控制他的行为:
create_engine(execution_options)
: 全局设置,所有由Engine.connect()
创建的 Connection 都继承该选项Connection.execution_options() -> Connection
: 针对于当前 Connection 对象Connection.execute(execution_options)
: 仅仅针对于当前 execute 命令
Tips
一些很多精细的操作例如服务器游标、事务隔离等都是通过控制 execute 的行为实现的。因为 execute 是真正与数据库进行通信的地方。
isolation_level: 事务隔离级别
大多数 DBAPI 都支持配置事务类别,其中包括四个级别:
READ UNCOMMITTED
READ COMMITTED
REPEATABLE READ
SERIALIZABLE
要想设置事务类别最简单的就是通过 execution_options(isolation_level="READ COMMITTED")
来实现,也可以在引擎级别直接定义:
# 对于特定 Connection 来定义事务类别
with engine.connect().execution_options(
isolation_level="REPEATABLE READ"
) as connection:
with connection.begin():
connection.execute("<statement>")
# Engine 级别,他创建的 Connection 都默认继承
from sqlalchemy import create_engine
eng = create_engine(
"postgresql://scott:tiger@localhost/test", isolation_level="REPEATABLE READ"
)
stream_results: 服务器游标
设置 stream_result=True
来基于流传输,的而不是完全缓存到内存。通常这也意味着使用服务端游标。还需要配合 max_row_buffer
来设置缓冲区。
当直接迭代 Result 使用该选项交付的对象时(流式获取),默认会首先缓存很小几行然后每次提取时返回的行数不断变大直到达到 max_row_buffer
的设置值(默认1000):
with engine.connect() as conn:
conn = conn.execution_options(stream_results=True, max_row_buffer=100)
result = conn.execute(text("select * from table"))
for row in result:
print(f"{row}")
使用服务器游标时要避免全局启用,由于服务器游标的特殊性他会占用通信通道(因为需要保存状态),这就导致了其他操作无法进行:
with engine.connect() as conn:
conn = conn.execution_options(stream_results=True, max_row_buffer=100)
result = conn.execute(text("select * from table"))
for row in result:
print(f"{row}")
# 异常,因为 conn 使用服务端游标,不能执行其他语句
conn.execute('update 操作')
Tips
通常配合 `Result.partitions() 来实现快速迭代读取游标中的值
Pool
Engine 最核心的功能就是提供连接池Pool来管理连接。
当 Engine 调用 connect()
或直接调用 execute()
方法时将向连接池发起请求。连接池会根据需要创建与数据库的连接。默认大小(pool_size)为 5,并且默认允许溢出(max_overflow)到 10 个。连接池本质上就是保存所有连接的地方,因此每个应用程序中应当只为每个数据库保留一个连接池,而不是为每个连接都创建一个连接池。
Note
SQLite 的表现会不太一样,具体查看SQLite 连接池行为