Skip to content

asyncio 同步原语

asyncio 同步原语被设计为与threading 模块类似,但有两个关键的注意事项:

  • asyncio 同步原语并不是线程安全的,因此他们不能用于 OS 线程同步(此时应该使用 threading)
  • 这些同步原语的方法都不接受 timeout 参数,需要使用 asyncio.wait_for() 函数来执行带有超时的操作

Lock

Event

Condition

Semaphore

信号量会管理一个计数器,该计数器会在每次调用 acquire() 时递减并随着每次调用 release() 时递增,计数器的值永远不会降到零以下,当 acquire() 为零时,他会保持阻塞直到某个任务调用了 release() 方法。

Semaphore 支持async with语句:

Python
# 支持传入 value 来作为内部计数器的初始值
sem = asyncio.Semaphore(10)

# ... later
async with sem:
    # work with shared resource

# 他等价于
sem = asyncio.Semaphore(10)

# ... later
await sem.acquire()
try:
    # work with shared resource
finally:
    sem.release()

Semaphore对象包含以下几个成员:

成员 说明
coroutine acquire() 如果内部计数器值大于 0 就减一并立即返回 True
locked() 如果信号量对象无法被立即释放则返回 True
release() 释放一个信号量对象,将内部计数器的值加 1,可以唤醒 acquire()

示例

信号量对象主要是用于控制并发量,在使用的时候要注意,一定要在阻塞方法上使用信号量,并且他应当阻塞整个主线程才有意义,下面是一个错误的示范:

Python
async def to_mongodb(data, executor, loop):
  # 在这里限制了并发量
  async with sem:
    try:
      result = await loop.run_in_executor(executor, parse_xml, data)
      await col_result.insert_one(result)
      print(f'成功写入{result["_id"]}')
    except Exception as e:
      print(e)

async def main():
  with ProcessPoolExecutor(18) as executor:
    loop = asyncio.get_event_loop()
    async for doc in col_source.find({}, {"_id": 0}):
      # 这里是非阻塞方法,他会一直执行下去
      asyncio.create_task(to_mongodb(doc["data"], executor, loop))

asyncio.run(main())

上面的问题就是尽管在执行 to_mongodb 时阻塞了,但是他并没有阻塞主线程,会继续排产 to_mongodb 并发,尽管该方法不会被真正执行但是依然会占用大量的内存。所以如果使用信号量来限制并发最重要的是阻塞 async for doc in col_source.find({}, {"_id": 0}): 的继续执行,此时我们在看修改后的错误示范:

Python
async def main():
  with ProcessPoolExecutor(18) as executor:
    loop = asyncio.get_event_loop()
    async for doc in col_source.find({}, {"_id": 0}):
      async with sem:
        asyncio.create_task(to_mongodb(doc["data"], executor, loop))

这又是一个典型的错误,asyncio.create_task(to_mongodb(doc["data"], executor, loop)) 是一个非阻塞方法,他完成不耗时间,这样信号量就没意义了。因此信号量一定要控制阻塞的方法(也就是里面有 await 语句)。因此真正限制并发的方法是:

Python
async def main():
    with ProcessPoolExecutor(18) as executor:
    loop = asyncio.get_event_loop()
    async for doc in col_source.find_raw_batch({}, {"_id": 0}):
      docs = bson.decode_all(raw_docs)
      # 这个是阻塞的,只有他运行完成才会继续
      # 在这个阻塞方法中并发了一大堆的异步任务,而这其中的异步任务就可以通过信号量来限制并发
      results = await asyncio.gather(
        *[to_mongodb(doc["data"], executor, loop) for doc in docs]
      )

这个例子尽管解释了整个信号量的使用,但是并不是很好,因为 find_raw_batch 返回的量天生就有限制,量不会太大,而且 loop.run_in_executor 本身就被 worker 控制也不需要信号量实现并发控制,并发通常用在全程 IO 操作的情形:

Python
async def gather_task(arg):
  async with sem:
    asyncio.sleep(1)
    return arg

async def main():
  args = range(100000000)
  results = asyncio.gather(*[gather_task(arg) for arg in args])

Tips

这也不是一个好的行为,gather、wait、as_commpled 都需要任务序列全部求解出才能够并发执行,这就导致如果 tasks 任务非常大的时候有问题。这个可以使用aiostream改写