Relational API
Relational API使用增量构造查询,他以 DuckDBPyRelation 作为核心对象。该对象可以看作 SQL 查询的符号表示形式,他不保存任何数据,也不会进行任何通信。直到显式的调用触发执行的方法。
他非常类似于数据库中的虚拟视图,我们在一个虚拟视图上构造下一个虚拟视图来获取最终的结果,他通常经历几个步骤:
- 通过
DuckDBPyConnection.sql(query)
来获取首个视图,也可以通过DuckDBPyConnection.read(from)_*()
来从各种源获取首个视图,首个虚拟视图的方法都是绑定在DB API的 DuckDBPyConnection 对象上的 - 通过各种类似 ORM 的方式来对首个视图进行操作,这些操作都是绑定在 DuckDBPyRelation 对象上的
- 执行 DuckDBPyRelation 对象上的输出方法来真正执行查询
Relational API 只是针对于 SELECT 语句
再次强调DuckDBPyRelation 的表现就是一个只读视图,能够用于 SQL 语句中
from duckdb import DuckDBPyRelation, DuckDBPyConnection
import pandas as pd
import pyarrow as pa
class DuckDBPyRelation:
"""他的表现就是一个只读视图,能够用于 SQL 语句中"""
# ================ 操作类方法 ====================
def aggregate(self, aggr_expr: str, group_expr: str = "") -> DuckDBPyRelation:
pass
def apply(
self,
function_name: str,
function_aggr: str,
group_expr: str = "",
function_parameter: str = "",
projected_columns: str = "",
) -> DuckDBPyRelation:
"""对关系上的单列和列列表执行函数"""
pass
def any_value(self, column: str) -> DuckDBPyRelation:
"""返回给定列(column)第一个非 null 值的行"""
pass
def count(self, columns: str) -> DuckDBPyRelation:
"""计算给定列中存在的元素数量"""
pass
def describe(self) -> DuckDBPyRelation:
"""给出多维度的基本统计信息,例如 min max 是否有 null 等"""
pass
def distinct(self) -> DuckDBPyRelation:
"""从关系对象中检索完全不同的行(去重)"""
pass
def filter(self, filter_expr) -> DuckDBPyRelation:
"""过滤关系对象"""
pass
def except_(self, other_rel: DuckDBPyRelation) -> DuckDBPyRelation:
"""选择未出现在 other_rel 中的所行(差集),关系必须具有相同数量的列"""
pass
def intersect(self, other_rel: DuckDBPyRelation) -> DuckDBPyRelation:
"""选择两个关系的交集,返回两个关系中的共有的行(交集),关系必须具有相同数量的列"""
pass
def union(self, other_rel: DuckDBPyRelation) -> DuckDBPyRelation:
"""合并连个关系(并集),关系必须具有相同数量的列"""
pass
def join(
self, other_rel: DuckDBPyRelation, condition, type="inner"
) -> DuckDBPyRelation:
"""根据提供的条件合并两个关系"""
pass
def limit(self, size: int, offset: int = 0) -> DuckDBPyRelation:
"""限制返回的数量,可以通过 offset 来指定偏移"""
pass
def order(self, expr) -> DuckDBPyRelation:
"""对关系进行排序"""
pass
def project(self, expr) -> DuckDBPyRelation:
"""将给定的表达式应用于关系中的每一行"""
pass
def select(self, expression) -> DuckDBPyRelation:
"""通过 expression 来筛选行"""
pass
# ================ 执行类方法 ====================
def show(self):
"""预览"""
pass
def fetchone(self) -> tuple | None:
pass
def fetchmany(self, size: int = 1) -> list:
pass
def fetchall(self) -> list:
pass
def fetch_arrow_table(self, rows_per_batch: int = 1000000) -> pa.lib.Table:
"""获取 Arrow Table数据,和 pyarrow 交互"""
pass
def fetch_record_batch(
self, row_per_batch: int = 1000000
) -> pa.lib.RecordBatchReader:
"""获取 Arrow RecordBatchReader 数据,和 pyarrow 交互"""
pass
def fetch_df(self, date_as_object: bool = False) -> pd.DataFrame:
"""获取 pandas DataFrame 数据,和 pandas 交互"""
pass
def fetch_df_chunk(
self, vectors_per_chunk: int = 1, date_as_object: bool = False
) -> pd.DataFrame:
pass
# ================ 持久化数据 =====================
def to_table(self, table_name: str) -> None:
"""使用关系对象中的视图创建一个 table_name 新表"""
pass
def insert_into(self, table_name: str) -> None:
"""将关系对象插入到名为 table_name 的现有表中"""
pass
def to_parquet(self, file_name: str) -> None:
"""持久化文件到 parquet 中"""
pass
def to_csv(self, file_name: str) -> None:
"""持久化文件到 csv 中"""
pass
# ================ 其他 ====================
def sql_query(self) -> str:
"""获取与关系对象等效 SQL 查询语句
说白了就是构造这个只读惰性视图的 SQL 语句
"""
pass
Relational API 核心在于构造惰性的只读的视图,因此他只能用于构造 SELECT 表达式,并且提供了类似 ORM 的编程方式,甚至 DuckDB 还提供了Expression API来配合 Relational API 来动态构造表达式。
SELECT 语句
SELECT 是 SQL 的最重要也是最复杂的语句,它包含 8 个子句:
子句 | 说明 | 执行顺序 |
---|---|---|
SELECT columns | 选择最终结果的列 | 5 |
FROM table | 要检索的表可以包含JOIN连接表 | 1 |
WHERE predicate on rows | 筛选 from 的结果 | 2 |
GROUP BY columns | 分组 | 3 |
HAVING predicate on groups | 筛选 group by 的结果 | 4 |
ORDER BY columns | 排序 | 6 |
OFFSET num | 指定跳过的行数 | 7 |
LIMIT num | 指定要返回的行数 | 8 |
多数据库关联
由于 Relational API 的所有操作都是惰性的,只有最后才会真正执行。因此他的所有表在最终执行的时候都必须是可以被访问的。而由于 duckdb 整个操作的灵活性会导致一些问题:
import duckdb
# 两个独立的数据库, 可以是内存也可以是实在的
conn1 = duckdb.connect()
conn2 = duckdb.connect()
# 插入两个表
conn1.execute("CREATE TABLE temp1 AS SELECT * FROM (VALUES (1, 'text2'), (2, 'text2'))")
conn2.execute("CREATE TABLE temp2 AS SELECT * FROM (VALUES (1, 'text2'), (2, 'text2'))")
temp1_table = conn1.sql("select * from temp1")
temp2_table = conn2.sql("select * from temp2")
# Invalid Input Error: Cannot combine LEFT and RIGHT relations of different connections!
# 报错,两个表不再同一个连接中
temp1_table.intersect(temp2_table).show()
---------------------------------------------------------------------------
InvalidInputException Traceback (most recent call last)
Cell In[4], line 4
1 temp1_table = conn1.sql("select * from temp1")
2 temp2_table = conn2.sql("select * from temp2")
----> 4 temp1_table.intersect(temp2_table).show()
InvalidInputException: Invalid Input Error: Cannot combine LEFT and RIGHT relations of different connections!
上面会报错,提示两个表不再同一个 Connection 对象中,但是 temp1_table
和 temp2_table
都是 DuckDBPyRelation 对象但是他们由于不在一个连接中所以无法操作。因此要解决这个问题只需要将需求的表弄到一个连接中即可,而为了实现这一点:
- 对于内存数据库,不要有多个内存数据库。将内存数据库看作是全局的表
- 对于内存数据库和文件数据库来说,内存数据库就没啥必要,直接在全局使用 DataFrame 等支持的格式即可,甚至直接读取 csv 等也可以
- 对于多个文件数据库,这也是最常见的情况就是直接通过
conn.exceute("ATTACH xxxx")
来在一个 Connection 中接入其他 Connection
Operation
DuckDBPyRelation 对象提供了一大堆方法来模拟 SELECT 子句。当然涉及到表达式可以直接传入字符串,也可以通过Expression API来构建。
select(Expression|str)
这个主要用于构造标准的 SELECT 子句:
import duckdb
import pandas as pd
df = pd.DataFrame(
{"a": [1, 2, 3, 4], "b": [True, None, False, True], "c": [42, 21, 13, 14]}
)
hello = duckdb.ConstantExpression("hello")
world = duckdb.ConstantExpression("world")
case = duckdb.CaseExpression(
condition=duckdb.ColumnExpression("b") == False, value=world
).otherwise(hello)
duckdb.from_df(df).select(case).sql_query()
"SELECT CASE WHEN ((b = false)) THEN ('world') ELSE 'hello' END FROM (SELECT * FROM pandas_scan(0x792190d30440)) AS df_48420d7303da29f9"
它还接受任何合法的用于 SELECT 子句的字符串:
duckdb.from_df(df).select("a").show()
┌───────┐
│ a │
│ int64 │
├───────┤
│ 1 │
│ 2 │
│ 3 │
│ 4 │
└───────┘
filter(Exception|str)
通过给定条件来过滤任何不满足该条件的行,实际上就是用于构造 WHERE 子句:
import duckdb
rel = duckdb.sql("SELECT * FROM range(1000000) AS tbl(id)")
rel.filter("id > 5").limit(3).show()
┌───────┐
│ id │
│ int64 │
├───────┤
│ 6 │
│ 7 │
│ 8 │
└───────┘
同样他也支持 Expression API 来构造:
┌───────┐
│ id │
│ int64 │
├───────┤
│ 6 │
│ 7 │
│ 8 │
└───────┘
join(rel:DuckDBPyRelation, condition: Expression|str, type ='inner')
用于构造 FROM 子句中的 JOIN 子句,注意 FROM 的第一个 table 是调用该方法的 DuckDBPyRelation,其中 condition 指定连接条件,type 指定连接类型:
import duckdb
r1 = duckdb.sql("SELECT * FROM range(5) AS tbl(id)").set_alias("r1")
r2 = duckdb.sql("SELECT * FROM range(10, 15) AS tbl(id)").set_alias("r2")
r1.join(r2, "r1.id + 10 = r2.id").show()
┌───────┬───────┐
│ id │ id │
│ int64 │ int64 │
├───────┼───────┤
│ 0 │ 10 │
│ 1 │ 11 │
│ 2 │ 12 │
│ 3 │ 13 │
│ 4 │ 14 │
└───────┴───────┘
set_alias(alias: str)
为关系设置别名,也就是AS alias
语句,最主要的用处是为子查询定义别名。这在有些时候是必须的,例如上面的需要在 join 中指定条件想等。
aggregate(aggr_expr:str, group_expr:str)
在关系上执行聚合,也就是模拟了 GROUP BY 子句,可扩展为SELECT aggr_expr FROM table GROUP BY group_expr
:
import duckdb
rel = duckdb.sql("SELECT * FROM range(1000000) AS tbl(id)")
rel.aggregate("id % 2 AS g, sum(id), min(id), max(id)")
┌───────┬──────────────┬─────────┬─────────┐
│ g │ sum(id) │ min(id) │ max(id) │
│ int64 │ int128 │ int64 │ int64 │
├───────┼──────────────┼─────────┼─────────┤
│ 0 │ 249999500000 │ 0 │ 999998 │
│ 1 │ 250000000000 │ 1 │ 999999 │
└───────┴──────────────┴─────────┴─────────┘
默认是GROUP BY ALL
,也可以指定其他 GROUP BY 字段:
rel.aggregate("id % 2 as g, id % 5 as g1, sum(id)", "g, g1")
┌───────┬───────┬─────────────┐
│ g │ g1 │ sum(id) │
│ int64 │ int64 │ int128 │
├───────┼───────┼─────────────┤
│ 0 │ 0 │ 49999500000 │
│ 1 │ 1 │ 49999600000 │
│ 0 │ 2 │ 49999700000 │
│ 1 │ 4 │ 50000400000 │
│ 1 │ 0 │ 50000000000 │
│ 0 │ 1 │ 50000100000 │
│ 0 │ 4 │ 49999900000 │
│ 1 │ 2 │ 50000200000 │
│ 0 │ 3 │ 50000300000 │
│ 1 │ 3 │ 49999800000 │
├───────┴───────┴─────────────┤
│ 10 rows 3 columns │
└─────────────────────────────┘
limit(n, offset=0)
选择返回 n 行,可选偏移一定的数量。
order(expr)
根据给定表达式对关系进行排序:
import duckdb
rel = duckdb.sql("SELECT * FROM range(1000000) AS tbl(id)")
rel.order("id DESC").limit(3).show()
┌────────┐
│ id │
│ int64 │
├────────┤
│ 999999 │
│ 999998 │
│ 999997 │
└────────┘
相同模式关系的集合运算
如果存在多个具有相同模式的视图,他们之间能够执行集合运算:
union(rel)
: 并集intersect(rel)
: 交集except_(rel)
: 差集
import duckdb
r1 = duckdb.sql("SELECT * FROM range(10) AS tbl(id)")
r2 = duckdb.sql("SELECT * FROM range(5) AS tbl(id)")
# 差集,即 r2 - r1
r1.except_(r2).show()
┌───────┐
│ id │
│ int64 │
├───────┤
│ 5 │
│ 9 │
│ 6 │
│ 7 │
│ 8 │
└───────┘
# 交集,即 r1 和 r2 中共有的
r1.intersect(r2).show()
┌───────┐
│ id │
│ int64 │
├───────┤
│ 4 │
│ 2 │
│ 1 │
│ 0 │
│ 3 │
└───────┘
# 合集,即 r1 和 r2 合并
r1.union(r2).show()
┌───────┐
│ id │
│ int64 │
├───────┤
│ 0 │
│ 1 │
│ 2 │
│ 3 │
│ 4 │
│ 5 │
│ 6 │
│ 7 │
│ 8 │
│ 9 │
│ 0 │
│ 1 │
│ 2 │
│ 3 │
│ 4 │
└───────┘