asyncio 同步原语
asyncio 同步原语被设计为与threading 模块类似,但有两个关键的注意事项:
- asyncio 同步原语并不是线程安全的,因此他们不能用于 OS 线程同步(此时应该使用 threading)
- 这些同步原语的方法都不接受 timeout 参数,需要使用
asyncio.wait_for()
函数来执行带有超时的操作
Lock
Event
Condition
Semaphore
信号量会管理一个计数器,该计数器会在每次调用 acquire()
时递减并随着每次调用 release()
时递增,计数器的值永远不会降到零以下,当 acquire()
为零时,他会保持阻塞直到某个任务调用了 release()
方法。
Semaphore 支持async with
语句:
# 支持传入 value 来作为内部计数器的初始值
sem = asyncio.Semaphore(10)
# ... later
async with sem:
# work with shared resource
# 他等价于
sem = asyncio.Semaphore(10)
# ... later
await sem.acquire()
try:
# work with shared resource
finally:
sem.release()
Semaphore
对象包含以下几个成员:
成员 | 说明 |
---|---|
coroutine acquire() |
如果内部计数器值大于 0 就减一并立即返回 True |
locked() |
如果信号量对象无法被立即释放则返回 True |
release() |
释放一个信号量对象,将内部计数器的值加 1,可以唤醒 acquire() |
示例
信号量对象主要是用于控制并发量,在使用的时候要注意,一定要在阻塞方法上使用信号量,并且他应当阻塞整个主线程才有意义,下面是一个错误的示范:
async def to_mongodb(data, executor, loop):
# 在这里限制了并发量
async with sem:
try:
result = await loop.run_in_executor(executor, parse_xml, data)
await col_result.insert_one(result)
print(f'成功写入{result["_id"]}')
except Exception as e:
print(e)
async def main():
with ProcessPoolExecutor(18) as executor:
loop = asyncio.get_event_loop()
async for doc in col_source.find({}, {"_id": 0}):
# 这里是非阻塞方法,他会一直执行下去
asyncio.create_task(to_mongodb(doc["data"], executor, loop))
asyncio.run(main())
上面的问题就是尽管在执行 to_mongodb 时阻塞了,但是他并没有阻塞主线程,会继续排产 to_mongodb
并发,尽管该方法不会被真正执行但是依然会占用大量的内存。所以如果使用信号量来限制并发最重要的是阻塞 async for doc in col_source.find({}, {"_id": 0}):
的继续执行,此时我们在看修改后的错误示范:
async def main():
with ProcessPoolExecutor(18) as executor:
loop = asyncio.get_event_loop()
async for doc in col_source.find({}, {"_id": 0}):
async with sem:
asyncio.create_task(to_mongodb(doc["data"], executor, loop))
这又是一个典型的错误,asyncio.create_task(to_mongodb(doc["data"], executor, loop))
是一个非阻塞方法,他完成不耗时间,这样信号量就没意义了。因此信号量一定要控制阻塞的方法(也就是里面有 await 语句)。因此真正限制并发的方法是:
async def main():
with ProcessPoolExecutor(18) as executor:
loop = asyncio.get_event_loop()
async for doc in col_source.find_raw_batch({}, {"_id": 0}):
docs = bson.decode_all(raw_docs)
# 这个是阻塞的,只有他运行完成才会继续
# 在这个阻塞方法中并发了一大堆的异步任务,而这其中的异步任务就可以通过信号量来限制并发
results = await asyncio.gather(
*[to_mongodb(doc["data"], executor, loop) for doc in docs]
)
这个例子尽管解释了整个信号量的使用,但是并不是很好,因为 find_raw_batch
返回的量天生就有限制,量不会太大,而且 loop.run_in_executor
本身就被 worker 控制也不需要信号量实现并发控制,并发通常用在全程 IO 操作的情形:
async def gather_task(arg):
async with sem:
asyncio.sleep(1)
return arg
async def main():
args = range(100000000)
results = asyncio.gather(*[gather_task(arg) for arg in args])
Tips
这也不是一个好的行为,gather、wait、as_commpled 都需要任务序列全部求解出才能够并发执行,这就导致如果 tasks 任务非常大的时候有问题。这个可以使用aiostream改写