Skip to content

aiostream

aiostream 提供了一组流运算符,能够使用管道来组合各种功能。它可以看作是异步版本的itertools。所有 aiostream 提供的运算符都返回一个 Stream 对象,该对象是一个增强的异步迭代器,该迭代器提供了以下功能:

  • 可以使用 | 来拼接不同的运算符
  • 每次迭代都会创建不同的迭代器
  • 能够使用 async with 来上下文管理
  • 直接使用 await 就能够运行
  • 支持 [] 切片
  • 可以使用 + 来串联操作

流操作符

aiostream 提供了七种类型的流操作符:

类型 类型说明 操作符
creation 创建,流起点,为整个管道提供源参数 iterate、preserve、just、call、empty、throw、never、repeat、count、range
selection 选择,对输入进行处理被选择的结果会被输出 take、takelast、skip、skiplast、getitem、filter、until、takewhile、dropwhile
transformation 转换,都输入进行转换,这个通常是整个业务的核心 map、enumerate、starmap、cycle、chunks
combination 组合,即将多个源组合成一个元组 map、zip、merge、chain、ziplatest
aggregation 聚合,归一即将源序列合并为单值输出 accumulate、reduce、list
advanced 高阶操作 concat、flatten、switch,concatmap、flatmap、switchmap
timing 定时 spaceout、timeout、delay
miscellaneous 其他,主要是最终输出,它通常作为流的终点 action、print

stream、pipe 和 |

除了 creation 操作符外,其他操作符都提供了 pipe.[oper] 形式的管道操作符,并且还允许使用 | 来连接他们,具体:

Python
# xs 可以还是异步迭代器,或者 creation 操作符
# 下面三者严格等价
ys = stream.map(xs, lambda x: x**2)
ys = pipe.map(lambda x: x**2)(xs)
ys = xs | pipe.map(lambda x: x**2)

其中使用最多也最灵活的就是 pipe 的情形,当连接多个运算符时:

Python
ys = (xs
    | pipe.operator1(*args1)
    | pipe.operator2(*args2)
    | pipe.operator3(*args3))

Create: 生成值

用于生成数据,他通常是流的源头。他们都没有等效的管道操作符因此通常也只能作为源头。

Tips

当然在 aiostream 中几乎所有源都是可读写的,都能够作为源,不过 create 相当于钦定的源。

他们之间也能够使用+操作符连接:

Python
stream.range(10) + stream.range(10, 20)

# 等价于
stream.range(20)

iterate/preserve: 迭代

他们都接受迭代器来作为输入,之后不断从迭代器中获取值来输出,是典型的可读源,他们之间的区别在于 preserve 不会隐式关闭迭代器:

Python
async def main() -> None:
    async def agen() -> AsyncIterator[int]:
        yield 1
        yield 2
        yield 3

    # The xs stream does not preserve the generator
    xs = stream.iterate(agen())
    print(await xs[0])  # Print 1
    print(await stream.list(xs))  # Print [] (2 and 3 have never yielded)

    # The xs stream does preserve the generator
    xs = stream.preserve(agen())
    print(await xs[0])  # Print 1
    print(await stream.list(xs))  # Print [2, 3]

    # Transform agen into a stream operator
    agen_stream = operator(agen)
    xs = agen_stream()  # agen is now reusable
    print(await stream.list(xs))  # Print [1, 2, 3]
    print(await stream.list(xs))  # Print [1, 2, 3]

需要注意只有异步迭代器才会被隐式关闭迭代器,因此 preserve 只能接受异步迭代器,而对于 iterate 就没有这个限制能够接受 AsyncIterable[T] | Iterable[T]

还有我们是可以在运行过程中不断向序列中添加任务的,例如:

Python
a = [1]
await (
    stream.iterate(a)
    | pipe.action(lambda x: a.append(x + 1))
    | pipe.print("{}")
)

repeat: 重复值

用于生成给定次数的相同值:

Python
stream.repeat(
  value: T, # 生成值
  times: int|None = None, # 次数,默认 None 即无限次
  interval: float=0.0,    # 时间间隔,默认不间隔
)

range/count: 范围数字

他们都能够生成给定范围的数字,不同在于 count 能够无限生成:

Python
stream.range(
  *args, # 等价于内置的 range 方法,
  interval: float=0.0, # 时间间隔
)
# 他大致等价于
(stream.iterate(range(*args)) | pipe.spaceout(interval))

stream.count(
  start: int=0,  # 起始值
  step: int=1,   # 步进
  interval: float=0.0, # 时间间隔
)

just/call: 单值

他们都是生成单个值,因为有些地方必须提供异步迭代器,单值就没办法,因此就需要将她们包装为异步迭代器:

Python
# 给定值返回值
stream.just(value: T)

# 调用给定函数返回值
stream.call(
  func: SyncCallable[P, T] | AsyncCallable[P, T],
  *args,
  **kwargs
)

# 他们本质上等价于
async def just(n):
  yield n

例如如果想要传入固定的参数就可以通过他来实现:

Python
# url 是需要迭代的, 而 session 只需要一个
async def crawler(url, session):
  pass

await (stream.zip(stream.iterate(urls), stream.cycle(stream.just(session)))
      | pip.starmap(crawler)
      | ....
      )

empty/throw/never: 特殊值

还有三个非常特殊的值,不知道干啥用的,应该是内部使用的:

  • stream.empty: 终止而不产生任何值
  • stream.throw(Exception): 抛出异常而不生成任何值
  • stream.never: 永远阻塞,不产生任何值

Transformation: 转换值

转换操作,类似于 Nodejs 中的转换流(Transform),他获取输入和转换方法来生成输出。是最主要的业务逻辑定义处。通常位于管道的中间。

map/starmap: 批量处理

将给定函数应用于异步序列的元素:

Python
stream.map(
  source: AsyncIterable[T],   # 异步迭代器,可由管道获取
  func: MapCallable[T, U],    # 函数,可以是异步的也可以是同步的
  *more_sources,              # 其他异步迭代器,他们会与 source 共同作为位置参数传递给 func
  ordered: bool=True,         # 排序,如果 func 是同步的一定是排序的该参数被省略
  task_limit: int|None=None   # 协程同时运行数量,同样对于同步函数来说被省略,或一个个执行
)

他和 pyhton 内置的 map 行为几乎相同。比较不好理解的就是 *more_sources 参数,他在源码中的表现为:

Python
# 使用 stream.zip 包装以后
# 结果 *item 解包作为 func 的位置参数使用
stream = stream.zip(source, *more_sources)
async with streamcontext(stream) as streamer:
        async for item in streamer:
            yield func(*item)

Tips

具有 *more_sources 的 map 也被认为是 combination 操作

他是一个非常重要的方法,实际上任何接受函数的方法都很重要,因为他们能够承接业务逻辑,例如如果想要抓取数据并保存到数据库中可以:

Python
await (stream.iterate(urls)
      | pipe.map(aiohttp_get_url, task_limit = 10)
      | pipe.map(to_mongodb)
      | pipe.print("成功写入{}!")
      )

starmap 比较特殊,他没有 *more_sources 参数,他要求 source 中的值必须是可迭代对象,之后他会解包这个对象来作为位置参数来执行函数:

Python
await (
    # combination 的 zip 拼接多个异步迭代器产生 (x, y)
    # 他们被传入 starmap 参数并且会 *(x, y)
    stream.zip(stream.range(10), stream.range(10))
    | pipe.starmap(lambda x, y: x * y)
    | pipe.print("{}")
)

Tips

map 需要返回一个值(即 return 语句),如果是 yield value 即迭代器方法,它返回的是 iterator ,此时后续的操作获取的是这个迭代器,而不是 yield 的值。这一点尤其需要注意。因此迭代器方法应当使用 stream.iterate 来实现 yield 值。

enumerate: 枚举值

从异步序列中生成 (index, value) 元组:

Python
stream.enumerate(
  source: AsyncIterable[T],
  start: int=0,   # 起始索引
  step: int=1     # 步进
)

chunks: 收集

他将异步迭代整理成指定大小的列表输出:

Python
stream.chunks(
  source: AsyncIterable[T],
  n: int # 指定大小
)

他也是一个非常有用的方法,例如爬取数据时插入数据库,如果一条条插入性能会比较差,此时可以通过 chunks 来收集足够多在批量插入:

Python
await (stream.iterate(urls)
      | pipe.map(aiohttp_get_url, task_limit = 10)
      | pipe.chunks(1000)
      | pipe.map(to_mongodb)   # 这里要用 insert_many 了
      | pipe.print("成功写入{}!")
      )

cycle: 循环

循环迭代异步序列,即到终点后重新迭代。注意该方法不提供任何缓冲和时间间隔,如果不想过快输出需要添加其他操作符:

Python
await (stream.cycle(stream.range(10))
       | pipe.spaceout(1)
       | pipe.print("{}")
      )

Selection: 选择值

顾名思义用于选择特定的元素而抛弃其他的。

take: 前 n 个

选择前 n 个值,之后的抛弃,如果为负数直接终止:

Python
stream.take(
  source: AsyncIterable[T],
  n: int  # 前 n 个值
)

tasklast: 后 n 个

选择后 n 个值,注意他会将迭代器执行到末尾才会返回第一个值,因此性能并不好:

Python
stream.takelast(
  source: AsyncIterable[T],
  n: int  # 后 n 个值
)

skip: 跳过前 n 个

跳过前 n 个值,如果为负数不跳过任何值:

Python
stream.skip(
  source: AsyncIterable[T],
  n: int  # 跳过前 n 个值
)

skiplast: 跳过后 n 个

跳过后 n 个值,同样他会将迭代器执行到末尾才会返回第一个值,因此性能并不好:

Python
stream.skiplast(
  source: AsyncIterable[T],
  n: int  # 后 n 个值
)

getitem: 索引、切片

可以指定整数值来作为索引选择指定值,也可以使用切片语法来获取一个区间:

Python
stream.getitem(
  source: AsyncIterable[T],
  index: int | builtins.slice  # 切片或索引
)

filter: 过滤

他应该是最为有用的,运行指定一个 func 来过滤需要的值:

Python
# 函数必须返回布尔值
stream.filter(
  source: AsyncIterable[T],
  func: Callable[[T], bool | Awaitable[bool]]
)

until: 条件满足终止

同样接受函数,如果条件满足时结束异步迭代器:

Python
stream.until(
  source: AsyncIterable[T],
  func: Callable[[T], bool | Awaitable[bool]]
)

takewhile: 条件满足开始

until 的反操作,只有条件满足时才开始获取 item:

Python
stream.takewhile(
  source: AsyncIterable[T],
  func: Callable[[T], bool | Awaitable[bool]]
)

dropwhile: 条件满足丢弃

算是 filter 的反向操作,任何满足条件的值会被抛弃

Python
stream.dropwhile(
  source: AsyncIterable[T],
  func: Callable[[T], bool | Awaitable[bool]]
)

Combination: 组合值

他接受多个序列,然后生成一个合并后的值。通常也是作为数据源使用的。

zip: 合并

类似 Pyhton 中的 zip,他会合并多个异步序列的元素为一个元组:

Python
stream.zip(
  source: AsyncIterable[T],
  *more_sources
)

唯一需要注意的是会在最短序列用尽之后停止,也就是说最终序列的长度由最短的一个序列决定。他可以配合多个 Create 来返回序列,例如配合 repeat 来添加一个额外参数供 strmap 使用:

Python
await (
  stream.zip(
    stream.iterator(iter_file()),
    stream.repeat(save_dir)
  )
  | pipe.strmap(save_file)
)

merge: 合并

将多个异步序列合并在一起,实际上也可以使用 + 来实现。merge 的所有序列是同步迭代的,其中元素在可用时立即转发,直到所有异步序列执行完毕。这也意味着他是无序的:

Python
stream.merge(
  source: AsyncIterable[T],
  *more_sources
)

chain: 串联

按照给定顺序串联异步序列,他与 merge 不同之处在于他是有顺序的,并且在上一个序列迭代未完成时后一个序列是等待的:

Python
stream.chain(
  source: AsyncIterable[T],
  *more_sources
)

Aggregatation: 聚合值

所谓的聚合就是将序列归一,他通常也位于整个流的最后来统计一些数据。

reduce: 累加

将第一个方法的结果作为下一个方法的第一个参数来会实现聚合值,其中可以指定一个 func,该函数接受两个位置参数:

  1. 上一个 ccumulate 的结果
  2. 新的 item
Python
stream.reduce(
  source: AsyncIterable[T],
  func: Callable[[T, T], Awaitable[T] | T],
  initializer: T | None = None
)

Tips

可以通过 initializer 指定初始值。

accumulate: 累加

reduce 的特例,使用内置的加法作为默认的 func。这有一个小坑就是 initializer 是 None,因此整个结果都是 None,因此一定要指定初始值:

Python
stream.accumulate(
  source: AsyncIterable[T],
  func: Callable[[T, T], Awaitable[T] | T] = <built-in function add>,
  initializer: T | None = None
)

list: 列表

就是简单的将异步序列组装成列表:

Python
stream.list(
  source: AsyncIterable[T]
)

Timing: 定时

大多数操作符都是立即生成的,定时提供了一些限制的方法。

spaceout: 时间间隔

确保异步序列的元素在时间上按照给定的间隔生成:

Python
stream.spaceout(
  source: AsyncIterable[T],
  interval: float  # 时间间隔
)

如果我们想要实现每秒一个请求,就可以通过这来实现:

Python
await (stream.iterate(urls)
      | pipe.spaceout(1)
      | pipe.map(aiohttp_get_url)
      | pipe.map(to_mongodb)
      | pipe.print("成功写入{}!")
      )

timeout: 超时

如果异步序列中的元素超过指定时间,这引发超时。注意他并不是全局的,而是只针对于传递到 timeout 管道的前后 item 的时间超过指定时间会引发:

Python
stream.timeout(
  source: AsyncIterable[T],
  timeout: float
)

delay: 延迟

aiostream 版本的 sleep,在遇到该操作符时会休眠指定时间:

Python
stream.delay(
  source: AsyncIterable[T],
  delay: float
)

Miscellaneous: 其他

这里的方法通常作为整个流的终点。他们不对数据进行任何处理而直接返回,其中可以包裹其他业务逻辑。

action: 触发特殊操作

他接受一个函数来对异步序列中的每个元素执行特定操作,但是无论函数返回什么值他依然返回原始值,通常用于打印输出、像 log 中添加内容等:

Python
stream.action(
  source: AsyncIterable[T],
  func: Callable[[T], Awaitable[Any] | Any],
  ordered: bool = True,
  task_limit: int | None = None
)

一个简单的方法就是打印输出:

Python
await (stream.iterate(urls)
      | pipe.spaceout(1)
      | pipe.map(aiohttp_get_url)
      | pipe.map(to_mongodb)
      | pipe.action(lambda x: f"成功写入{x}!")
      )

有时候我们为了添加多个动作也可以使用 action,例如我们先要将结果写入不同的数据库,就可以使用 action 来链接多个写入动作,因为他输入输出是强一致的,不需要像 map 那样将结果重新抛出。

map 强调的是转换,即输入输出结果不一致。action 强调的是动作,即做了什么事情,而输入输出是强一致的。

print: 打印

打印异步序列中的值,不会对值进行修改,他可以看作是 action 的一个特例。注意他同样会原封不懂的输出值:

Python
aiostream.stream.print(
  source: AsyncIterable[T],
  template: str = '{}',
  sep: str = ' ',
  end: str = 'n',
  file: Any | None = None,
  flush: bool = False
)

Advanced: 高阶操作

所谓的高阶操作在于他们的参数都是值为异步序列的异步序列。他们最终会执行对应的扁平化操作来获取元素:

Python
concat|flatten|switch(
  source: AsyncIterable[AsyncIterable[T]],
  task_limit: int | None = None
)

这三个方法执行扁平化的方式:

  • concat: 根据顺序扁平化所有序列的序列来生成元素
  • flatten: 不按照顺序谁先发生就生成该元素,最终依然会迭代序列中的序列的所有元素
  • switch: 这个比较特殊,首先谁先他会生成谁的元素,但是一旦有其他序列生成元素那么就直接中止当前序列(之后的不在生成),也就是序列之间有个开关,谁争取到该开关就是开始迭代,并且只有一次获取开关的机会

这些高阶方法也是的一种方式,官方也推荐使用这种方式甚至还提供了一个简单的变体:

Python
concatmap|flattenmap|switchmap(
  source: AsyncIterable[T],
  func: combine.SmapCallable[T, AsyncIterable[U]],
  *more_sources,
  task_limit: int | None = None
)

即在展平后直接应用 map。

Tips

注意他们与有一个很大的不同在于他们只接受异步迭代器。