Skip to content

motor

motor 是 Python 下的 MongoDB 数据的异步访问库。他提供了 Tornado 和 asyncio 两种异步框架的支持。

安装

Bash
pip install motor

Tips

他依赖于 pymongo 实际上他是通过 loop.run_in_executor 配合线程池(pymongo)来实现的异步。

对象层次结构(asyncio版本)

  • AsyncIOMotorClient: 客户端对象
  • AsyncIOMotorDatabase: 数据库对象
  • AsyncIOMotorCollection: 数据库中的集合
  • AsyncIOMotorCursor: 游标,AsyncIOMotorCollection.find() 会返回该对象,他表示查询的结果数据集

Tips

motor 提供了对 asyncio 和 tornado 两种协程框架的支持,由于 asyncio 已经内置因此推荐使用他。他们分别绑定在 motor.motor_asyncio 和 motor.motor_tornado 命名空间中

基本使用

对于层次结构中的说明,首先我们需要构建前三个对象, 而面向用户的命令通常都是绑定到 AsyncIOMotorCollection 对象上,而整个 Motor 的基本使用流程就是更具步骤构造上面四个对象:

Python
from motor.motor_asyncio import AsyncIOMotorClient

#1. 创建客户端连接
client = AsyncIOMotorClient("mongodb://localhost:27017")
#2. 获取数据库
db = client['test_database']
#3. 获取集合
collection = db['test_collection']

前两个步骤都是同步操作(即不需要与数据库通信),只有开始真正的 CRUD 操作才会与数据库通信,此时就需要 await 来获取结果了:

Python
# CRUD
# create
async def do_insert_one():
    document = {"key": "value"}
    result = await collection.insert_one(document)
    print("result %s" % repr(result.inserted_id))
async def do_insert_many():
    result = await collection.insert_many([{"i": i} for i in range(2000)])
    print("inserted %d docs" % (len(result.inserted_ids),))

# Read
async def do_find_one():
    document = await db.test_collection.find_one({"i": {"$lt": 1}})
    pprint.pprint(document)

# find 有两种形式
# 他实际上就是将所有数据返回,注意如果量大会导致内存溢出
async def do_find():
    cursor = db.test_collection.find({"i": {"$lt": 5}}).sort("i")
    for document in await cursor.to_list(length=100):
        pprint.pprint(document)
# 真正的基于服务器游标的流式传输
async def do_find():
    c = db.test_collection
    async for document in c.find({"i": {"$lt": 2}}):
        pprint.pprint(document)
# 构建 Cursor 本身并不需要异步只有真正的迭代或者 to_list() 等方法调用才会真正执行
async def do_find():
    # curosr 构造以及更改都不需要 await
    cursor = db.test_collection.find({"i": {"$lt": 4}})
    cursor.sort("i", -1).skip(1).limit(2)
    async for document in cursor:
        print(document)

# update 是更新即在旧的基础上操作
async def do_update():
    # await coll.update_many({'i': {'$gt': 100}}, {'$set': {'key': 'value'}})
    result = await collection.update_one({"i": 51}, {"$set": {"key": "value"}})
    print("updated %s document" % result.modified_count)
    new_document = await collection.find_one({"i": 51})
    print("document is now %s" % pprint.pformat(new_document))

# delete 删除
async def do_delete_one():
    n = await collection.count_documents({})
    print("%s documents before calling delete_one()" % n)
    result = await collection.delete_one({"i": {"$gte": 1000}})
    print("%s documents after" % (await collection.count_documents({})))

async def do_delete_many():
    n = await collection.count_documents({})
    print("%s documents before calling delete_many()" % n)
    result = await collection.delete_many({"i": {"$gte": 1000}})
    print("%s documents after" % (await collection.count_documents({})))

Tips

上面的操作基本上就能够满足日常使用了,更多的参数可以参考 pymongo

AsyncIOMotorCursor

游标,AsyncIOMotorCollection.find() 会返回该对象,他表示查询的结果数据集。要注意游标本身并不是可等待对象:

Python
# 会报错
for doc in await col.find():
    pass

# 必须调用 col.find().to_list() 这样的可等待语句
cursor = col.find()
for doc in await cursor.to_list(length=100):
    pass

# 要么就老老实实异步迭代
async for doc in col.find():
    pass

参考