Skip to content

asyncio

asyncio是 Python 官方提供的异步框架,他提供了特有的 async/await 语法来简化异步程序的编写。

运行器

运行器是构建事件循环之上的,他是 asyncio 提供的高层级代码,其目标是简化异步代码执行。

其中最为简单也是最推荐的做法就是 asyncio.run() 方法:

Python
asyncio.run(
  coro,             # 协程任务
  loop_factory=None # 事件循环,如果为 None 会创建事件循环
)

通常是创建一个 main() 方法来封装所有的协程任务来在 run 中运行:

Python
async def main():
    await asyncio.sleep(1)
    print('hello')

asyncio.run(main())

协程任务

通过 async/await 语法来编写协程任务,编写协程需要注意几个核心要点:

  • 协程是在单线程中运行的因此要想执行效率就必须并行
  • 只有涉及 I/O 操作才能够使用协程来真正实现并行,因此 CPU 密集型任务不适合以协程任务出现
  • 协程任务要想并行处理,就必须是独立任务,例如烧水做饭就没办法并行(假设只有一个炉灶),当时烧水切菜就是一个并行任务可以封装为独立的协程任务
  • 只有涉及 I/O 的并行操作才能使用协程来加快速度

重点: 不应该直接调用阻塞(CPU 绑定)代码。例如,如果一个函数执行 1 秒的 CPU 密集型计算,那么所有并发异步任务和 IO 操作都将延迟 1 秒。这类任务应当以避免阻塞事件循环线程。

可等待对象

如果一个对象可以在 await 语句中使用,他就是一个可等待对象,asyncio 中包含了三种可等待对象:

  • coroutine(异步操作的定义): 由 async def 定义的函数被称为协程函数,他是异步操作的代码块。调用协程函数返回的对象是协程对象协程对象能够通过 await 关键字来等待其他异步操作的完成
  • Task(异步操作的执行): 他将协程与事件循环关联起来,并提供了一些管理和跟踪协程执行的方法。他被设计出来用于并发调度协程对象的。通过 asyncio.create_task() 来将一个协程对象封装为一个任务并自动被事件循环调度执行
  • Future(异步操作的结果): 他表示一个异步操作的预期最终结果,他还表示异步任务的当前状态(未完成、已完成或失败)。通过 Future 对象能够使用同步方式来编写异步程序

这三个对象之间相辅相成,协程(coroutine)定义异步操作逻辑,最终包装为任务(Task)来自动交给事件循环来执行,而 Future 表示异步操作结果,他能够在协程定义中使用。

Python
import asyncio


async def counter_loop(x, n):
    for i in range(1, n + 1):
        print(f"Counter {x}: {i}")
        await asyncio.sleep(0.5)
    return f"Finished {x} in {n}"


async def main():
    # 直接被 create_task() 意味着他在这就直接运行了
    slow_task = asyncio.create_task(counter_loop("Slow", 4))
    # 返回协程对象,注意他不会运行
    fast_coro = counter_loop("Fast", 2)

    print("Awaiting Fast")
    # await 后面更协程对象,会直接使用 create_task() 包装
    # 这意味着此时 fast_coro 才会运行
    fast_val = await fast_coro
    print("Finished Fast")

    print("Awaiting Slow")
    slow_val = await slow_task
    print("Finished Slow")

    print(f"{fast_val}, {slow_val}")


asyncio.run(main())

"""
Awaiting Fast
Counter Fast: 1     # fast 运行了
Counter Slow: 1     # Slow 提前通过 create_task 包装也运行了
Counter Fast: 2
Counter Slow: 2
Finished Fast
Awaiting Slow       # 因此尽管 Awaiting Slow 但是实际上 slow_task 早已经运行
Counter Slow: 3
Counter Slow: 4
Finished Slow
Finished Fast in 2, Finished Slow in 4
"""

# 假设我们将
# slow_task = asyncio.create_task(counter_loop("Slow", 4))
slow_coro = counter_loop("Slow", 4)
# 此时的运行结果
Awaiting Fast
Counter Fast: 1
Counter Fast: 2
Finished Fast
Awaiting Slow
Counter Slow: 1
Counter Slow: 2
Counter Slow: 3
Counter Slow: 4
Finished Slow
Finished Fast in 2, Finished Slow in 4

异步任务执行

执行异步任务最简单的方式就是直接通过 asyncio.create_task(coro) 来包装协程对象,他会直接在当前上下文的事件循环中执行,该方法是非阻塞的,如果需要他的结果可以使用 await task 来获取结果:

Python
async def counter_loop(x, n):
    for i in range(1, n + 1):
        print(f"Counter {x}: {i}")
        await asyncio.sleep(0.5)
    return f"Finished {x} in {n}"

async def main():
  slow_task = asyncio.create_task(counter_loop("Slow", 4))
  fast_task = asyncio.create_task(counter_loop("Fast", 2))
  fast_val = await fast_task
  print(fast_val)
  slow_val = await slow_task
  print(slow_val)

"""
Counter Slow: 1
Counter Fast: 1
Counter Slow: 2
Counter Fast: 2
Counter Slow: 3
Finished Fast in 2
Counter Slow: 4
Finished Slow in 4
"""

并发执行

如果存在多个异步任务,每个任务都进行一次包装以及 await task 就比较麻烦,asyncio 库提供 asyncio.gather() 来实现并发执行:

Python
awaitable asyncio.gather(
  *aws, # 可等待对象,注意不是列表,如果是列表必须 *[aws]
  return_exceptions=False # 如果出现异常直接抛出,可以为 True 来将异常一结果形式返回
) -> list
# 结果会以 aws 的顺序返回

在线程池或进程池中执行异步任务

可以使用 loop.run_in_executor(executor,func, * args) -> asyncio.Future 配合实现在一个不同的线程/进程中执行阻塞代码,他避免阻塞运行事件循环的主线程:

Python
import asyncio
import concurrent.futures

def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    with open('/dev/urandom', 'rb') as f:
        return f.read(100)

def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return sum(i * i for i in range(10 ** 7))

async def main():
    # 获取主线程事件循环du
    loop = asyncio.get_running_loop()

    # 1. Run in a custom thread pool:
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, blocking_io)
        print('custom thread pool', result)

    # 2. Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, cpu_bound)
        print('custom process pool', result)

if __name__ == '__main__':
    asyncio.run(main())

超时

由于异步操作的特殊性,它超时需要通过对比前后的定时器来实现。最早引入的超时方法是 asyncio.wait_for(aw, timeout) 方法:

Python
# 等待 aw(可等待对象) 完成,timeout 秒后超时
def asyncio.wait_for(aw, timeout) -> coroutine
    pass

在 python3.11 中加入了两个新的方法,他们支持上下文管理:

  • asyncio.timeout(delay)
  • asyncio.time_at(when): when 定义停止等待的绝对时间
Python
async def main():
    async with asyncio.timeout(10):
        await long_running_task()

# loop.time() 获取事件循环的计时器
async def main():
    loop = get_running_loop()
    deadline = loop.time() + 20
    try:
        async with asyncio.timeout_at(deadline):
            await long_running_task()
    except TimeoutError:
        print("The long operation timed out, but we've handled it.")

    print("This statement will run regardless.")

等待

要想限制异步并发就需要等待的支持,可以使用 asyncio.wait(aws, timeout=None, return_when=asyncio.ALL_COMPLETED) -> (done[], pending[]) 来实现。他的含义是并发运行 aws 可迭代对象中的 Future 或 Task 并进入阻塞状态直到满足 return_when 所指定的条件后返回具有两个值元组,第一个值是完成的任务集合,第二个是未完成任务集合,可用的条件包括:

  • asyncio.FIRST_COMPLETED: 函数在任意可等待对象结束或取消时返回
  • asyncio.FIRST_EXCEPTION: 函数在任意可等待对象引发异常而结束时返回,如果没有任何任务引发异常等价于 ALL_COMPLETED
  • asyncio.ALL_COMPLETED: 将在所有可等待对象结束获取取消时返回

这是一个比 gather 更加常用的方法,它能够更精细的控制任务并发。

ALL_COMPLETED 和 gather

它们都能够并发运行序列中的异步任务直至完成,但是行为和使用场景略微有些区别:

区别 ALL_COMPLETED gather
返回值 返回(done, pending)尽管 pending 通常为空 返回所有任务的结果列表
返回值顺序 顺序不定 顺序于传入的任务顺序一致
任务异常 需要手动检查每个任务的 task.exception() 默认直接返回,如果return_exceptions=True时异常将作为结果返回
取消的任务 任务被取消会放到 done 集合中 相当于抛出异常

FIRST_COMPLETED

该模式才是 wait 的核心,它能够实现对任务的动态调整。他总是在一个任务完成后返回,此时注意返回的 pending 即未完成的任务并没有被取消,而是继续支持,因此通过这个特性我们可以实现对任务的并发控制:

Python
## 注意这是一个伪代码逻辑
async def scheduler(queue, task_limit):
    running_tasks = set()

    while True:
        # 如果队列为空且没有运行中的任务,退出
        if queue.empty() and not running_tasks:
            break

        # 如果运行中的任务数小于 task_limit,从队列中取出新任务,确保运行的任务始终是 task_limit 个
        while len(running_tasks) < task_limit and not queue.empty():
            item = await queue.get()
            # 注意 runing_tasks 是并发运行的,这个是并发的核心
            task = asyncio.create_task(process_data(item))
            running_tasks.add(task)

        # 核心在这里,他会一个个的提出完成的任务
        done, running_tasks = await asyncio.wait(running_tasks, return_when=asyncio.FIRST_COMPLETED)

其他