aiostream
aiostream 提供了一组流运算符,能够使用管道来组合各种功能。它可以看作是异步版本的itertools。所有 aiostream 提供的运算符都返回一个 Stream 对象,该对象是一个增强的异步迭代器,该迭代器提供了以下功能:
- 可以使用
|
来拼接不同的运算符 - 每次迭代都会创建不同的迭代器
- 能够使用
async with
来上下文管理 - 直接使用
await
就能够运行 - 支持
[]
切片 - 可以使用
+
来串联操作
流操作符
aiostream 提供了七种类型的流操作符:
类型 | 类型说明 | 操作符 |
---|---|---|
creation | 创建,流起点,为整个管道提供源参数 | iterate、preserve、just、call、empty、throw、never、repeat、count、range |
selection | 选择,对输入进行处理被选择的结果会被输出 | take、takelast、skip、skiplast、getitem、filter、until、takewhile、dropwhile |
transformation | 转换,都输入进行转换,这个通常是整个业务的核心 | map、enumerate、starmap、cycle、chunks |
combination | 组合,即将多个源组合成一个元组 | map、zip、merge、chain、ziplatest |
aggregation | 聚合,归一即将源序列合并为单值输出 | accumulate、reduce、list |
advanced | 高阶操作 | concat、flatten、switch,concatmap、flatmap、switchmap |
timing | 定时 | spaceout、timeout、delay |
miscellaneous | 其他,主要是最终输出,它通常作为流的终点 | action、print |
stream、pipe 和 |
除了 creation 操作符外,其他操作符都提供了 pipe.[oper]
形式的管道操作符,并且还允许使用 |
来连接他们,具体:
# xs 可以还是异步迭代器,或者 creation 操作符
# 下面三者严格等价
ys = stream.map(xs, lambda x: x**2)
ys = pipe.map(lambda x: x**2)(xs)
ys = xs | pipe.map(lambda x: x**2)
其中使用最多也最灵活的就是 pipe 的情形,当连接多个运算符时:
Create: 生成值
用于生成数据,他通常是流的源头。他们都没有等效的管道操作符因此通常也只能作为源头。
Tips
当然在 aiostream 中几乎所有源都是可读写的,都能够作为源,不过 create 相当于钦定的源。
他们之间也能够使用+
操作符连接:
iterate/preserve: 迭代
他们都接受迭代器来作为输入,之后不断从迭代器中获取值来输出,是典型的可读源,他们之间的区别在于 preserve 不会隐式关闭迭代器:
async def main() -> None:
async def agen() -> AsyncIterator[int]:
yield 1
yield 2
yield 3
# The xs stream does not preserve the generator
xs = stream.iterate(agen())
print(await xs[0]) # Print 1
print(await stream.list(xs)) # Print [] (2 and 3 have never yielded)
# The xs stream does preserve the generator
xs = stream.preserve(agen())
print(await xs[0]) # Print 1
print(await stream.list(xs)) # Print [2, 3]
# Transform agen into a stream operator
agen_stream = operator(agen)
xs = agen_stream() # agen is now reusable
print(await stream.list(xs)) # Print [1, 2, 3]
print(await stream.list(xs)) # Print [1, 2, 3]
需要注意只有异步迭代器才会被隐式关闭迭代器,因此 preserve 只能接受异步迭代器,而对于 iterate 就没有这个限制能够接受 AsyncIterable[T] | Iterable[T]
。
还有我们是可以在运行过程中不断向序列中添加任务的,例如:
a = [1]
await (
stream.iterate(a)
| pipe.action(lambda x: a.append(x + 1))
| pipe.print("{}")
)
repeat: 重复值
用于生成给定次数的相同值:
stream.repeat(
value: T, # 生成值
times: int|None = None, # 次数,默认 None 即无限次
interval: float=0.0, # 时间间隔,默认不间隔
)
range/count: 范围数字
他们都能够生成给定范围的数字,不同在于 count 能够无限生成:
stream.range(
*args, # 等价于内置的 range 方法,
interval: float=0.0, # 时间间隔
)
# 他大致等价于
(stream.iterate(range(*args)) | pipe.spaceout(interval))
stream.count(
start: int=0, # 起始值
step: int=1, # 步进
interval: float=0.0, # 时间间隔
)
just/call: 单值
他们都是生成单个值,因为有些地方必须提供异步迭代器,单值就没办法,因此就需要将她们包装为异步迭代器:
# 给定值返回值
stream.just(value: T)
# 调用给定函数返回值
stream.call(
func: SyncCallable[P, T] | AsyncCallable[P, T],
*args,
**kwargs
)
# 他们本质上等价于
async def just(n):
yield n
例如如果想要传入固定的参数就可以通过他来实现:
# url 是需要迭代的, 而 session 只需要一个
async def crawler(url, session):
pass
await (stream.zip(stream.iterate(urls), stream.cycle(stream.just(session)))
| pip.starmap(crawler)
| ....
)
empty/throw/never: 特殊值
还有三个非常特殊的值,不知道干啥用的,应该是内部使用的:
stream.empty
: 终止而不产生任何值stream.throw(Exception)
: 抛出异常而不生成任何值stream.never
: 永远阻塞,不产生任何值
Transformation: 转换值
转换操作,类似于 Nodejs 中的转换流(Transform),他获取输入和转换方法来生成输出。是最主要的业务逻辑定义处。通常位于管道的中间。
map/starmap: 批量处理
将给定函数应用于异步序列的元素:
stream.map(
source: AsyncIterable[T], # 异步迭代器,可由管道获取
func: MapCallable[T, U], # 函数,可以是异步的也可以是同步的
*more_sources, # 其他异步迭代器,他们会与 source 共同作为位置参数传递给 func
ordered: bool=True, # 排序,如果 func 是同步的一定是排序的该参数被省略
task_limit: int|None=None # 协程同时运行数量,同样对于同步函数来说被省略,或一个个执行
)
他和 pyhton 内置的 map 行为几乎相同。比较不好理解的就是 *more_sources
参数,他在源码中的表现为:
# 使用 stream.zip 包装以后
# 结果 *item 解包作为 func 的位置参数使用
stream = stream.zip(source, *more_sources)
async with streamcontext(stream) as streamer:
async for item in streamer:
yield func(*item)
Tips
具有 *more_sources
的 map 也被认为是 combination 操作
他是一个非常重要的方法,实际上任何接受函数的方法都很重要,因为他们能够承接业务逻辑,例如如果想要抓取数据并保存到数据库中可以:
await (stream.iterate(urls)
| pipe.map(aiohttp_get_url, task_limit = 10)
| pipe.map(to_mongodb)
| pipe.print("成功写入{}!")
)
而 starmap
比较特殊,他没有 *more_sources
参数,他要求 source 中的值必须是可迭代对象,之后他会解包这个对象来作为位置参数来执行函数:
await (
# combination 的 zip 拼接多个异步迭代器产生 (x, y)
# 他们被传入 starmap 参数并且会 *(x, y)
stream.zip(stream.range(10), stream.range(10))
| pipe.starmap(lambda x, y: x * y)
| pipe.print("{}")
)
Tips
map 需要返回一个值(即 return 语句),如果是 yield value
即迭代器方法,它返回的是 iterator ,此时后续的操作获取的是这个迭代器,而不是 yield 的值。这一点尤其需要注意。因此迭代器方法应当使用 stream.iterate
来实现 yield 值。
enumerate: 枚举值
从异步序列中生成 (index, value)
元组:
chunks: 收集
他将异步迭代整理成指定大小的列表输出:
他也是一个非常有用的方法,例如爬取数据时插入数据库,如果一条条插入性能会比较差,此时可以通过 chunks 来收集足够多在批量插入:
await (stream.iterate(urls)
| pipe.map(aiohttp_get_url, task_limit = 10)
| pipe.chunks(1000)
| pipe.map(to_mongodb) # 这里要用 insert_many 了
| pipe.print("成功写入{}!")
)
cycle: 循环
循环迭代异步序列,即到终点后重新迭代。注意该方法不提供任何缓冲和时间间隔,如果不想过快输出需要添加其他操作符:
Selection: 选择值
顾名思义用于选择特定的元素而抛弃其他的。
take: 前 n 个
选择前 n 个值,之后的抛弃,如果为负数直接终止:
tasklast: 后 n 个
选择后 n 个值,注意他会将迭代器执行到末尾才会返回第一个值,因此性能并不好:
skip: 跳过前 n 个
跳过前 n 个值,如果为负数不跳过任何值:
skiplast: 跳过后 n 个
跳过后 n 个值,同样他会将迭代器执行到末尾才会返回第一个值,因此性能并不好:
getitem: 索引、切片
可以指定整数值来作为索引选择指定值,也可以使用切片语法来获取一个区间:
filter: 过滤
他应该是最为有用的,运行指定一个 func 来过滤需要的值:
# 函数必须返回布尔值
stream.filter(
source: AsyncIterable[T],
func: Callable[[T], bool | Awaitable[bool]]
)
until: 条件满足终止
同样接受函数,如果条件满足时结束异步迭代器:
takewhile: 条件满足开始
until 的反操作,只有条件满足时才开始获取 item:
dropwhile: 条件满足丢弃
算是 filter 的反向操作,任何满足条件的值会被抛弃
Combination: 组合值
他接受多个序列,然后生成一个合并后的值。通常也是作为数据源使用的。
zip: 合并
类似 Pyhton 中的 zip,他会合并多个异步序列的元素为一个元组:
唯一需要注意的是会在最短序列用尽之后停止,也就是说最终序列的长度由最短的一个序列决定。他可以配合多个 Create 来返回序列,例如配合 repeat 来添加一个额外参数供 strmap 使用:
await (
stream.zip(
stream.iterator(iter_file()),
stream.repeat(save_dir)
)
| pipe.strmap(save_file)
)
merge: 合并
将多个异步序列合并在一起,实际上也可以使用 +
来实现。merge 的所有序列是同步迭代的,其中元素在可用时立即转发,直到所有异步序列执行完毕。这也意味着他是无序的:
chain: 串联
按照给定顺序串联异步序列,他与 merge 不同之处在于他是有顺序的,并且在上一个序列迭代未完成时后一个序列是等待的:
Aggregatation: 聚合值
所谓的聚合就是将序列归一,他通常也位于整个流的最后来统计一些数据。
reduce: 累加
将第一个方法的结果作为下一个方法的第一个参数来会实现聚合值,其中可以指定一个 func,该函数接受两个位置参数:
- 上一个 ccumulate 的结果
- 新的 item
stream.reduce(
source: AsyncIterable[T],
func: Callable[[T, T], Awaitable[T] | T],
initializer: T | None = None
)
Tips
可以通过 initializer 指定初始值。
accumulate: 累加
reduce 的特例,使用内置的加法作为默认的 func。这有一个小坑就是 initializer 是 None,因此整个结果都是 None,因此一定要指定初始值:
stream.accumulate(
source: AsyncIterable[T],
func: Callable[[T, T], Awaitable[T] | T] = <built-in function add>,
initializer: T | None = None
)
list: 列表
就是简单的将异步序列组装成列表:
Timing: 定时
大多数操作符都是立即生成的,定时提供了一些限制的方法。
spaceout: 时间间隔
确保异步序列的元素在时间上按照给定的间隔生成:
如果我们想要实现每秒一个请求,就可以通过这来实现:
await (stream.iterate(urls)
| pipe.spaceout(1)
| pipe.map(aiohttp_get_url)
| pipe.map(to_mongodb)
| pipe.print("成功写入{}!")
)
timeout: 超时
如果异步序列中的元素超过指定时间,这引发超时。注意他并不是全局的,而是只针对于传递到 timeout 管道的前后 item 的时间超过指定时间会引发:
delay: 延迟
aiostream 版本的 sleep,在遇到该操作符时会休眠指定时间:
Miscellaneous: 其他
这里的方法通常作为整个流的终点。他们不对数据进行任何处理而直接返回,其中可以包裹其他业务逻辑。
action: 触发特殊操作
他接受一个函数来对异步序列中的每个元素执行特定操作,但是无论函数返回什么值他依然返回原始值,通常用于打印输出、像 log 中添加内容等:
stream.action(
source: AsyncIterable[T],
func: Callable[[T], Awaitable[Any] | Any],
ordered: bool = True,
task_limit: int | None = None
)
一个简单的方法就是打印输出:
await (stream.iterate(urls)
| pipe.spaceout(1)
| pipe.map(aiohttp_get_url)
| pipe.map(to_mongodb)
| pipe.action(lambda x: f"成功写入{x}!")
)
有时候我们为了添加多个动作也可以使用 action,例如我们先要将结果写入不同的数据库,就可以使用 action 来链接多个写入动作,因为他输入输出是强一致的,不需要像 map 那样将结果重新抛出。
map 强调的是转换,即输入输出结果不一致。action 强调的是动作,即做了什么事情,而输入输出是强一致的。
print: 打印
打印异步序列中的值,不会对值进行修改,他可以看作是 action 的一个特例。注意他同样会原封不懂的输出值:
aiostream.stream.print(
source: AsyncIterable[T],
template: str = '{}',
sep: str = ' ',
end: str = 'n',
file: Any | None = None,
flush: bool = False
)
Advanced: 高阶操作
所谓的高阶操作在于他们的参数都是值为异步序列的异步序列。他们最终会执行对应的扁平化操作来获取元素:
concat|flatten|switch(
source: AsyncIterable[AsyncIterable[T]],
task_limit: int | None = None
)
这三个方法执行扁平化的方式:
concat
: 根据顺序扁平化所有序列的序列来生成元素flatten
: 不按照顺序谁先发生就生成该元素,最终依然会迭代序列中的序列的所有元素switch
: 这个比较特殊,首先谁先他会生成谁的元素,但是一旦有其他序列生成元素那么就直接中止当前序列(之后的不在生成),也就是序列之间有个开关,谁争取到该开关就是开始迭代,并且只有一次获取开关的机会
这些高阶方法也是的一种方式,官方也推荐使用这种方式甚至还提供了一个简单的变体:
concatmap|flattenmap|switchmap(
source: AsyncIterable[T],
func: combine.SmapCallable[T, AsyncIterable[U]],
*more_sources,
task_limit: int | None = None
)
即在展平后直接应用 map。
Tips
注意他们与有一个很大的不同在于他们只接受异步迭代器。