サムネがコーヒーの記事は書きかけです。

BeanieによるMongoDBの非同期CRUD操作【Python】

PythonからNoSQLデータベースであるMongoDBに接続できるドライバーBeanieを使用して、CRUD操作を行います。

MongoDBに直接アクセスする場合は、以下のサイトからMongoDB Compassをダウンロードしておきます。

https://www.mongodb.com/products/compass

必要モジュール

MondoDBドライバー、非同期処理ライブラリをインポートします。

import motor.motor_asyncio
from beanie import Document
from pydantic import Field
import beanie
import asyncio

Documentの定義

今回はUserクラスを使用してDocumentを定義します。

この時、Fieldを使用してデータ型を指定します。

class User(Document):
    user_id: int = Field()
    description: str = Field(max_length = 200,default = None)
    def to_JSON(self):
        return {
            "id" : str(self.id),
            "revision_id": str(self.revision_id),
            "user_id": self.user_id,
            "description": self.description
        }

データベースへの接続

非同期処理のため、以下に非同期関数の内容を書いていきます。

async def main():
    #Connect to the database
    client = motor.motor_asyncio.AsyncIOMotorClient("mongodb://localhost:27017")
    #Create Collection
    await beanie.init_beanie(database = client.db_name, document_models=[User])

Create 処理

Userクラスのインスタンスを作成して、それぞれのインスタンスをデータベースに登録します。

users = [User(user_id=i,description=f"user{i}_description") for i in range(1,5)]
for i in users:
        await i.insert()
>>>
{'id': '641f637d844dee5b03e1b82e', 'revision_id': 'None', 'user_id': 1, 'description': 'user1_description'}
{'id': '641f637e844dee5b03e1b82f', 'revision_id': 'None', 'user_id': 2, 'description': 'user2_description'}
{'id': '641f637e844dee5b03e1b830', 'revision_id': 'None', 'user_id': 3, 'description': 'user3_description'}
{'id': '641f637e844dee5b03e1b831', 'revision_id': 'None', 'user_id': 4, 'description': 'user4_description'}

Read 処理

findメソッドを使用して、Documentを一括取得します。

users = await User.find().to_list()
for i in users:
    print(i.to_JSON())
>>>
{'id': '641f637d844dee5b03e1b82e', 'revision_id': 'None', 'user_id': 1, 'description': 'user1_description'}
{'id': '641f637e844dee5b03e1b82f', 'revision_id': 'None', 'user_id': 2, 'description': 'user2_description'}
{'id': '641f637e844dee5b03e1b830', 'revision_id': 'None', 'user_id': 3, 'description': 'user3_description'}
{'id': '641f637e844dee5b03e1b831', 'revision_id': 'None', 'user_id': 4, 'description': 'user4_description'}

findメソッドに以下のようなパラメータを渡すことで、検索条件を指定できます。

users = await User.find(User.user_id == 2).to_list()
for i in users:
    print(i.to_JSON())
>>>
{'id': '641f637e844dee5b03e1b82f', 'revision_id': 'None', 'user_id': 2, 'description': 'user2_description'}

以下のように、直接指定することもできます。

users = await User.find({"user_id":2}).to_list()
for i in users:
    print(i.to_JSON())
>>>
{'id': '641f637e844dee5b03e1b82f', 'revision_id': 'None', 'user_id': 2, 'description': 'user2_description'}

Update 処理

以下のように、一旦条件に合うオブジェクトを取得してから、そのプロパティを書き換えます。

この時、検索条件に合うオブジェクトが存在しない場合のエラーハンドリングをしておきます。

user1 = await User.find_one(User.user_id == 1)
    try:
        user1.user_id = 9
        await user1.save()
    except AttributeError as e:
        print("No match found.")
    print(user1)

Delete 処理

Updateと似ていますが、検索条件に一致したオブジェクトに対して削除を行います。

user1 = await User.find_one(User.user_id == 3)
    try:
        await user1.delete()
    except AttributeError:
        print("File not found.")

まとめ

以下にCRUD操作のまとめを置いておきます。

import motor.motor_asyncio
from beanie import Document
from pydantic import Field
import beanie
import asyncio

class User(Document):
    user_id: int = Field()
    description: str = Field(max_length = 200,default = None)
    def to_JSON(self):
        return {
            "id" : str(self.id),
            "revision_id": str(self.revision_id),
            "user_id": self.user_id,
            "description": self.description
        }

async def main():
    client = motor.motor_asyncio.AsyncIOMotorClient("mongodb://localhost:27017")
    await beanie.init_beanie(database = client.db_name, document_models=[User])

    #users = [User(user_id=i,description=f"user{i}_description") for i in range(1,5)]
    #for i in users:
        #await i.insert()
    users = await User.find(User.user_id == 1).to_list()
    for i in users:
        print(i.to_JSON())

    users = await User.find({"user_id":1}).to_list()
    for i in users:
        print(i.to_JSON())
asyncio.run(main())

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です