[파이썬] mongoengine 실시간 데이터 처리와 `mongoengine`

Installing mongoengine

To get started with mongoengine, you need to install it using pip:

pip install mongoengine

Connecting to MongoDB

The first step is to establish a connection to your MongoDB server. mongoengine provides a connect method for this:

from mongoengine import connect

connect('mydatabase')

Here, 'mydatabase' is the name of your MongoDB database. If the database doesn’t exist, mongoengine will create it automatically.

Defining Document Schema

In mongoengine, data is represented as documents. Each document corresponds to a collection in MongoDB. You can define a document schema using Python classes:

from mongoengine import Document, StringField, DateTimeField

class User(Document):
    name = StringField(required=True)
    email = StringField(required=True)
    created_at = DateTimeField(default=datetime.datetime.now)

    def __str__(self):
        return self.name

In the example above, we defined a User document with three fields: name, email, and created_at. The required=True attribute specifies that these fields are mandatory. The default attribute defines the default value for created_at field.

Querying and Filtering Data

Once you have defined your document schema, you can perform CRUD operations on your MongoDB data. For example, to query all users from the User collection, you can use the objects attribute:

users = User.objects().all()
for user in users:
    print(user)

objects() returns a QuerySet, which allows you to filter and manipulate the query results easily. For instance, to find users created after a specific date, you can use the gt (greater than) operator:

from datetime import datetime

recent_users = User.objects(created_at__gt=datetime(2022, 1, 1)).all()
for user in recent_users:
    print(user)

Real-time Data Processing

Real-time data processing involves handling streams of data with low latency. With mongoengine, you can achieve this by leveraging features like change streams and triggers offered by MongoDB.

Change streams allow you to watch a collection and receive real-time notifications whenever any changes occur. You can define a change stream using mongoengine as follows:

from mongoengine import collection

@collection(User._get_collection_name())
def on_user_changed(change):
    print("User changed:", change)

The on_user_changed function is the callback that will be triggered whenever a change occurs in the User collection. Here, you can perform any real-time processing you require.

To start listening for changes, you can use the watch method:

User.watch(callback=on_user_changed)

This sets up the change stream and starts listening for real-time changes. Now, whenever a document in the User collection is inserted, updated, or deleted, the on_user_changed function will be executed.

Conclusion

In this blog post, we explored how mongoengine can be used for real-time data processing with MongoDB in Python. We discussed connecting to MongoDB, defining document schema, querying data, and implementing real-time processing using change streams. mongoengine provides a straightforward and convenient way to interact with MongoDB and handle real-time data efficiently.