asyncio
asyncio是 Python 官方提供的异步框架,他提供了特有的 async/await 语法来简化异步程序的编写。
运行器
运行器是构建事件循环之上的,他是 asyncio 提供的高层级代码,其目标是简化异步代码执行。
其中最为简单也是最推荐的做法就是 asyncio.run()
方法:
通常是创建一个 main()
方法来封装所有的协程任务来在 run 中运行:
协程任务
通过 async/await 语法来编写协程任务,编写协程需要注意几个核心要点:
- 协程是在单线程中运行的因此要想执行效率就必须并行
- 只有涉及 I/O 操作才能够使用协程来真正实现并行,因此 CPU 密集型任务不适合以协程任务出现
- 协程任务要想并行处理,就必须是独立任务,例如烧水做饭就没办法并行(假设只有一个炉灶),当时烧水切菜就是一个并行任务可以封装为独立的协程任务
- 只有涉及 I/O 的并行操作才能使用协程来加快速度
重点: 不应该直接调用阻塞(CPU 绑定)代码。例如,如果一个函数执行 1 秒的 CPU 密集型计算,那么所有并发异步任务和 IO 操作都将延迟 1 秒。这类任务应当以避免阻塞事件循环线程。
可等待对象
如果一个对象可以在 await 语句中使用,他就是一个可等待对象,asyncio 中包含了三种可等待对象:
- coroutine(异步操作的定义): 由
async def
定义的函数被称为协程函数,他是异步操作的代码块。调用协程函数返回的对象是协程对象。协程对象能够通过 await 关键字来等待其他异步操作的完成 - Task(异步操作的执行): 他将协程与事件循环关联起来,并提供了一些管理和跟踪协程执行的方法。他被设计出来用于并发调度协程对象的。通过
asyncio.create_task()
来将一个协程对象封装为一个任务并自动被事件循环调度执行 - Future(异步操作的结果): 他表示一个异步操作的预期最终结果,他还表示异步任务的当前状态(未完成、已完成或失败)。通过 Future 对象能够使用同步方式来编写异步程序
这三个对象之间相辅相成,协程(coroutine)定义异步操作逻辑,最终包装为任务(Task)来自动交给事件循环来执行,而 Future 表示异步操作结果,他能够在协程定义中使用。
import asyncio
async def counter_loop(x, n):
for i in range(1, n + 1):
print(f"Counter {x}: {i}")
await asyncio.sleep(0.5)
return f"Finished {x} in {n}"
async def main():
# 直接被 create_task() 意味着他在这就直接运行了
slow_task = asyncio.create_task(counter_loop("Slow", 4))
# 返回协程对象,注意他不会运行
fast_coro = counter_loop("Fast", 2)
print("Awaiting Fast")
# await 后面更协程对象,会直接使用 create_task() 包装
# 这意味着此时 fast_coro 才会运行
fast_val = await fast_coro
print("Finished Fast")
print("Awaiting Slow")
slow_val = await slow_task
print("Finished Slow")
print(f"{fast_val}, {slow_val}")
asyncio.run(main())
"""
Awaiting Fast
Counter Fast: 1 # fast 运行了
Counter Slow: 1 # Slow 提前通过 create_task 包装也运行了
Counter Fast: 2
Counter Slow: 2
Finished Fast
Awaiting Slow # 因此尽管 Awaiting Slow 但是实际上 slow_task 早已经运行
Counter Slow: 3
Counter Slow: 4
Finished Slow
Finished Fast in 2, Finished Slow in 4
"""
# 假设我们将
# slow_task = asyncio.create_task(counter_loop("Slow", 4))
slow_coro = counter_loop("Slow", 4)
# 此时的运行结果
Awaiting Fast
Counter Fast: 1
Counter Fast: 2
Finished Fast
Awaiting Slow
Counter Slow: 1
Counter Slow: 2
Counter Slow: 3
Counter Slow: 4
Finished Slow
Finished Fast in 2, Finished Slow in 4
异步任务执行
执行异步任务最简单的方式就是直接通过 asyncio.create_task(coro)
来包装协程对象,他会直接在当前上下文的事件循环中执行,该方法是非阻塞的,如果需要他的结果可以使用 await task
来获取结果:
async def counter_loop(x, n):
for i in range(1, n + 1):
print(f"Counter {x}: {i}")
await asyncio.sleep(0.5)
return f"Finished {x} in {n}"
async def main():
slow_task = asyncio.create_task(counter_loop("Slow", 4))
fast_task = asyncio.create_task(counter_loop("Fast", 2))
fast_val = await fast_task
print(fast_val)
slow_val = await slow_task
print(slow_val)
"""
Counter Slow: 1
Counter Fast: 1
Counter Slow: 2
Counter Fast: 2
Counter Slow: 3
Finished Fast in 2
Counter Slow: 4
Finished Slow in 4
"""
并发执行
如果存在多个异步任务,每个任务都进行一次包装以及 await task
就比较麻烦,asyncio 库提供 asyncio.gather()
来实现并发执行:
awaitable asyncio.gather(
*aws, # 可等待对象,注意不是列表,如果是列表必须 *[aws]
return_exceptions=False # 如果出现异常直接抛出,可以为 True 来将异常一结果形式返回
) -> list
# 结果会以 aws 的顺序返回
在线程池或进程池中执行异步任务
可以使用 loop.run_in_executor(executor,func, * args) -> asyncio.Future
配合实现在一个不同的线程/进程中执行阻塞代码,他避免阻塞运行事件循环的主线程:
import asyncio
import concurrent.futures
def blocking_io():
# File operations (such as logging) can block the
# event loop: run them in a thread pool.
with open('/dev/urandom', 'rb') as f:
return f.read(100)
def cpu_bound():
# CPU-bound operations will block the event loop:
# in general it is preferable to run them in a
# process pool.
return sum(i * i for i in range(10 ** 7))
async def main():
# 获取主线程事件循环du
loop = asyncio.get_running_loop()
# 1. Run in a custom thread pool:
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, blocking_io)
print('custom thread pool', result)
# 2. Run in a custom process pool:
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, cpu_bound)
print('custom process pool', result)
if __name__ == '__main__':
asyncio.run(main())
超时
由于异步操作的特殊性,它超时需要通过对比前后的定时器来实现。最早引入的超时方法是 asyncio.wait_for(aw, timeout)
方法:
在 python3.11 中加入了两个新的方法,他们支持上下文管理:
asyncio.timeout(delay)
asyncio.time_at(when)
: when 定义停止等待的绝对时间
async def main():
async with asyncio.timeout(10):
await long_running_task()
# loop.time() 获取事件循环的计时器
async def main():
loop = get_running_loop()
deadline = loop.time() + 20
try:
async with asyncio.timeout_at(deadline):
await long_running_task()
except TimeoutError:
print("The long operation timed out, but we've handled it.")
print("This statement will run regardless.")
等待
要想限制异步并发就需要等待的支持,可以使用 asyncio.wait(aws, timeout=None, return_when=asyncio.ALL_COMPLETED) -> (done[], pending[])
来实现。他的含义是并发运行 aws 可迭代对象中的 Future 或 Task 并进入阻塞状态直到满足 return_when 所指定的条件后返回具有两个值元组,第一个值是完成的任务集合,第二个是未完成任务集合,可用的条件包括:
asyncio.FIRST_COMPLETED
: 函数在任意可等待对象结束或取消时返回asyncio.FIRST_EXCEPTION
: 函数在任意可等待对象引发异常而结束时返回,如果没有任何任务引发异常等价于ALL_COMPLETED
asyncio.ALL_COMPLETED
: 将在所有可等待对象结束获取取消时返回
这是一个比 gather 更加常用的方法,它能够更精细的控制任务并发。
ALL_COMPLETED 和 gather
它们都能够并发运行序列中的异步任务直至完成,但是行为和使用场景略微有些区别:
区别 | ALL_COMPLETED | gather |
---|---|---|
返回值 | 返回(done, pending) 尽管 pending 通常为空 |
返回所有任务的结果列表 |
返回值顺序 | 顺序不定 | 顺序于传入的任务顺序一致 |
任务异常 | 需要手动检查每个任务的 task.exception() |
默认直接返回,如果return_exceptions=True 时异常将作为结果返回 |
取消的任务 | 任务被取消会放到 done 集合中 | 相当于抛出异常 |
FIRST_COMPLETED
该模式才是 wait 的核心,它能够实现对任务的动态调整。他总是在一个任务完成后返回,此时注意返回的 pending 即未完成的任务并没有被取消,而是继续支持,因此通过这个特性我们可以实现对任务的并发控制:
## 注意这是一个伪代码逻辑
async def scheduler(queue, task_limit):
running_tasks = set()
while True:
# 如果队列为空且没有运行中的任务,退出
if queue.empty() and not running_tasks:
break
# 如果运行中的任务数小于 task_limit,从队列中取出新任务,确保运行的任务始终是 task_limit 个
while len(running_tasks) < task_limit and not queue.empty():
item = await queue.get()
# 注意 runing_tasks 是并发运行的,这个是并发的核心
task = asyncio.create_task(process_data(item))
running_tasks.add(task)
# 核心在这里,他会一个个的提出完成的任务
done, running_tasks = await asyncio.wait(running_tasks, return_when=asyncio.FIRST_COMPLETED)