Skip to content

wallneradam/esorm

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

35 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Logo

ESORM - Python ElasticSearch ORM based on Pydantic

ESORM is an ElasticSearch Object Relational Mapper or Object Document Mapper (ODM) if you like, for Python based on Pydantic. It is a high-level library for managing ElasticSearch documents in Python. It is fully async and uses annotations and type hints for type checking and IDE autocompletion.

☰ Table of Contents

💾 Installation

pip install pyesorm

🚀 Features

  • Pydantic model representation of ElasticSearch documents
  • Automatic mapping and index creation
  • CRUD operations
  • Full async support (no sync version at all)
  • Mapping to and from ElasticSearch types
  • Support for nested documents
  • Automatic optimistic concurrency control
  • Custom id field
  • Context for bulk operations
  • Supported IDE autocompletion and type checking (PyCharm tested)
  • Everything in the source code is documented and annotated
  • TypedDicts for ElasticSearch queries and aggregations
  • Docstring support for fields
  • Shard routing support
  • Lazy properties
  • Support >= Python 3.8 (tested with 3.8 through 3.12)
  • Support for ElasticSearch 8.x and 7.x
  • Watcher support (You may need ElasticSearch subscription license for this)
  • Pagination and sorting
  • FastAPI integration

Not all ElasticSearch features are supported yet, pull requests are welcome.

Supported ElasticSearch versions

It is tested with ElasticSearch 7.x and 8.x.

Supported Python versions

Tested with Python 3.8 through 3.12.

📖 Usage

Define a model

You can use all Pydantic model features, because ESModel is a subclass of pydantic.BaseModel. (Actually it is a subclass of ESBaseModel, see more below...)

ESModel extends pydantic BaseModel with ElasticSearch specific features. It serializes and deserializes documents to and from ElasticSearch types and handle ElasticSearch operations in the background.

Python basic types

from esorm import ESModel


class User(ESModel):
    name: str
    age: int

This is how the python types are converted to ES types:

Python type ES type Comment
str text
int long
float double
bool boolean
datetime.datetime date
datetime.date date
datetime.time date Stored as 1970-01-01 + time
typing.Literal keyword
UUID keyword
Path keyword
IntEnum integer
Enum keyword also StrEnum

Some special pydanctic types are also supported:

Pydantic type ES type Comment
URL keyword
IPvAddressAny ip

ESORM field types

You can specify ElasticSearch special fields using esorm.fields module.

from esorm import ESModel
from esorm.fields import keyword, text, byte, geo_point


class User(ESModel):
    name: text
    email: keyword
    age: byte
    location: geo_point
    ...

The supported fields are:

Field name ES type
keyword keyword
text text
binary binary
byte byte
short short
integer or int32 integer
long or int64 long
unsigned_long or uint64 unsigned_long
float16 or half_float half_float
float32 float
double double
boolean boolean
geo_point geo_point

The binary field accepts base64 encoded strings. However, if you provide bytes to it, they will be automatically converted to a base64 string during serialization. When you retrieve the field, it will always be a base64 encoded string. You can easily convert it back to bytes using the bytes() method: binary_field.bytes().

You can also use Annotated types to specify the ES type, like Pydantic PositiveInt and NegativeInt and similar.

Nested documents

from esorm import ESModel
from esorm.fields import keyword, text, byte


class User(ESModel):
    name: text
    email: keyword
    age: byte = 18


class Post(ESModel):
    title: text
    content: text
    writer: User  # User is a nested document

List primitive fields

You can use list of primitive fields:

from typing import List
from esorm import ESModel


class User(ESModel):
    emails: List[str]
    favorite_ids: List[int] 
    ...   

ESBaseModel

ESBaseModel is the base of ESModel.

Use it for abstract models
from esorm import ESModel, ESBaseModel
from esorm.fields import keyword, text, byte


# This way `User` model won't be in the index
class BaseUser(ESBaseModel):  # <---------------
    # This config will be inherited by User
    class ESConfig:
        id_field = 'email'    
    
    name: text
    email: keyword
    

# This will be in the index because it is a subclass of ESModel
class UserExtended(BaseUser, ESModel):
    age: byte = 18


async def create_user():
    user = UserExtended(
        name='John Doe',
        email="john@example.com",
        age=25
    )
    await user.save()
Use it for nested documents

It is useful to use it for nested documents, because by using it will not be included in the ElasticSearch index.

from esorm import ESModel, ESBaseModel
from esorm.fields import keyword, text, byte


# This way `User` model won't be in the index
class User(ESBaseModel):  # <---------------
    name: text
    email: keyword
    age: byte = 18


class Post(ESModel):
    title: text
    content: text
    writer: User  # User is a nested document

Id field

You can specify id field in model settings:

from esorm import ESModel
from esorm.fields import keyword, text, byte


class User(ESModel):
    class ESConfig:
        id_field = 'email'

    name: text
    email: keyword
    age: byte = 18

This way the field specified in id_field will be removed from the document and used as the document _id in the index.

If you specify a field named id in your model, it will be used as the document _id in the index (it will automatically override the id_field setting):

from esorm import ESModel


class User(ESModel):
    id: int  # This will be used as the document _id in the index
    name: str

You can also create an __id__ property in your model to return a custom id:

from esorm import ESModel
from esorm.fields import keyword, text, byte


class User(ESModel):
    name: text
    email: keyword
    age: byte = 18

    @property
    def __id__(self) -> str:
        return self.email

NOTE: annotation of __id__ method is important, and it must be declared as a property.

Model Settings

You can specify model settings using ESConfig child class.

from typing import Optional, List, Dict, Any
from esorm import ESModel


class User(ESModel):
    class ESConfig:
        """ ESModel Config """
        # The index name
        index: Optional[str] = None
        # The name of the 'id' field
        id_field: Optional[str] = None
        # Default sort
        default_sort: Optional[List[Dict[str, Dict[str, str]]]] = None
        # ElasticSearch index settings (https://www.elastic.co/guide/en/elasticsearch/reference/current/index-modules.html)
        settings: Optional[Dict[str, Any]] = None
        # Maximum recursion depth of lazy properties
        lazy_property_max_recursion_depth: int = 1

ESModelTimestamp

You can use ESModelTimestamp class to add created_at and updated_at fields to your model:

from esorm import ESModelTimestamp


class User(ESModelTimestamp):
    name: str
    age: int

These fields will be automatically updated to the actual datetime when you create or update a document. The created_at field will be set only when you create a document. The updated_at field will be set when you create or update a document.

Describe fields

You can use the usual Pydantic field description, but you can also use docstrings like this:

from esorm import ESModel
from esorm.fields import TextField


class User(ESModel):
    name: str = 'John Doe'
    """ The name of the user """
    age: int = 18
    """ The age of the user """

    # This is the usual Pydantic way, but I think docstrings are more intuitive and readable
    address: str = TextField(description="The address of the user")

The documentation is usseful if you create an API and you want to generate documentation from the model. It can be used in FastAPI for example.

Aliases

You can specify aliases for fields:

from esorm import ESModel
from esorm.fields import keyword, Field


class User(ESModel):
    full_name: keyword = Field(alias='fullName')  # In ES `fullName` will be the field name

This is good for renaming fields in the model without changing the ElasticSearch field name.

Connecting to ElasticSearch

You can connect with a simple connection string:

from esorm import connect


async def es_init():
    await connect('localhost:9200')

Also you can connect to multiple hosts if you have a cluster:

from esorm import connect


async def es_init():
    await connect(['localhost:9200', 'localhost:9201'])

You can wait for node or cluster to be ready (recommended):

from esorm import connect


async def es_init():
    await connect('localhost:9200', wait=True)

This will ping the node in 2 seconds intervals until it is ready. It can be a long time.

You can pass any arguments that AsyncElasticsearch supports:

from esorm import connect


async def es_init():
    await connect('localhost:9200', wait=True, sniff_on_start=True, sniff_on_connection_fail=True)

Client

The connect function is a wrapper for the AsyncElasticsearch constructor. It creates and stores a global instance of a proxy to an AsyncElasticsearch instance. The model operations will use this instance to communicate with ElasticSearch. You can retrieve the proxy client instance and you can use the same way as AsyncElasticsearch instance:

from esorm import es


async def es_init():
    await es.ping()

Create index templates

You can create index templates easily:

from esorm import model as esorm_model


# Create index template
async def prepare_es():
    await esorm_model.create_index_template('default_template',
                                            prefix_name='esorm_',
                                            shards=3,
                                            auto_expand_replicas='1-5')

Here this will be applied all esorm_ prefixed (default) indices.

All indices created by ESORM have a prefix, which you can modify globally if you want:

from esorm.model import set_default_index_prefix

set_default_index_prefix('custom_prefix_')

The default prefix is esorm_.

Create indices and mappings

You can create indices and mappings automatically from your models:

from esorm import setup_mappings


# Create indices and mappings
async def prepare_es():
    import models  # Import your models
    # Here models argument is not needed, but you can pass it to prevent unused import warning
    await setup_mappings(models)  

First you must create (import) all model classes. Model classes will be registered into a global registry. Then you can call setup_mappings function to create indices and mappings for all registered models.

IMPORTANT: This method will ignore mapping errors if you already have an index with the same name. It can update the indices by new fields, but cannot modify or delete fields! For that you need to reindex your ES database. It is an ElasticSearch limitation.

Model instances

When you get a model instance from elasticsearch by search or get methods, you will get the following private attributes filled automatically:

Attribute Description
_id The ES id of the document
_routing The routing value of the document
_version Version of the document
_primary_term The primary term of the document
_seq_no The sequence number of the document

CRUD: Create

from esorm import ESModel


# Here the model have automatically generated id
class User(ESModel):
    name: str
    age: int


async def create_user():
    # Create a new user 
    user = User(name='John Doe', age=25)
    # Save the user to ElasticSearch
    new_user_id = await user.save()
    print(new_user_id)

CRUD: Read

from esorm import ESModel


# Here the model have automatically generated id
class User(ESModel):
    name: str
    age: int


async def get_user(user_id: str):
    user = await User.get(user_id)
    print(user.name)

CRUD: Update

On update race conditions are checked automatically (with the help of _primary_term and _seq_no fields). This way an optimistic locking mechanism is implemented.

from esorm import ESModel


# Here the model have automatically generated id
class User(ESModel):
    name: str
    age: int


async def update_user(user_id: str):
    user = await User.get(user_id)
    user.name = 'Jane Doe'
    await user.save()

CRUD: Delete

from esorm import ESModel


# Here the model have automatically generated id
class User(ESModel):
    name: str
    age: int


async def delete_user(user_id: str):
    user = await User.get(user_id)
    await user.delete()

Bulk operations

Bulk operations could be much faster than single operations, if you have lot of documents to create, update or delete.

You can use context for bulk operations:

from typing import List
from esorm import ESModel, ESBulk


# Here the model have automatically generated id
class User(ESModel):
    name: str
    age: int


async def bulk_create_users():
    async with ESBulk() as bulk:
        # Creating or modifiying models
        for i in range(10):
            user = User(name=f'User {i}', age=i)
            await bulk.save(user)


async def bulk_delete_users(users: List[User]):
    async with ESBulk(wait_for=True) as bulk:  # Here we wait for the bulk operation to finish
        # Deleting models
        for user in users:
            await bulk.delete(user)

The wait_for argument is optional. If it is True, the context will wait for the bulk operation to finish.

Search

General search

You can search for documents using search method, where an ES query can be specified as a dictionary. You can use res_dict=True argument to get the result as a dictionary instead of a list. The key will be the id of the document: await User.search(query, res_dict=True).

If you only need one result, you can use search_one method.

from esorm import ESModel


# Here the model have automatically generated id
class User(ESModel):
    name: str
    age: int


async def search_users():
    # Search for users at least 18 years old
    users = await User.search(
        query={
            'bool': {
                'must': [{
                    'range': {
                        'age': {
                            'gte': 18
                        }
                    }
                }]
            }
        }
    )
    for user in users:
        print(user.name)


async def search_one_user():
    # Search a user named John Doe
    user = await User.search_one(
        query={
            'bool': {
                'must': [{
                    'match': {
                        'name': {
                            'query': 'John Doe'
                        }
                    }
                }]
            }
        }
    )
    print(user.name)

Queries are type checked, because they are annotated as TypedDicts. You can use IDE autocompletion and type checking.

Search with field value terms (dictionary search)

You can search for documents using search_by_fields method, where you can specify a field and a value. It also has a res_dict argument and search_one_by_fields variant.

from esorm import ESModel


# Here the model have automatically generated id
class User(ESModel):
    name: str
    age: int


async def search_users():
    # Search users age is 18
    users = await User.search_by_fields({'age': 18})
    for user in users:
        print(user.name)

Aggregations

You can use aggregate method to get aggregations. You can specify an ES aggregation query as a dictionary. It also accepts normal ES queries, to be able to fiter which documents you want to aggregate. Both the aggs parameter and the query parameter are type checked, because they are annotated as TypedDicts. You can use IDE autocompletion and type checking.

from esorm import ESModel

# Here the model have automatically generated id
class User(ESModel):
    name: str
    age: int
    country: str
    
async def aggregate_avg():
    # Get average age of users
    aggs_def = {
        'avg_age': {
            'avg': {
                'field': 'age'
            }
        }
    }
    aggs = await User.aggregate(aggs_def)
    print(aggs['avg_age']['value'])
    
async def aggregate_avg_by_country(country = 'Hungary'):
    # Get average age of users by country
    aggs_def = {
        'avg_age': {
            'avg': {
                'field': 'age'
            }
        }
    }
    query = {
        'bool': {
            'must': [{
                'match': {
                    'country': {
                        'query': country
                    }
                }
            }]
        }
    }
    aggs = await User.aggregate(aggs_def, query)
    print(aggs['avg_age']['value'])
    
    
async def aggregate_terms():
    # Get number of users by country
    aggs_def = {
        'countries': {
            'terms': {
                'field': 'country'
            }
        }
    }
    aggs = await User.aggregate(aggs_def)
    for bucket in aggs['countries']['buckets']:
        print(bucket['key'], bucket['doc_count'])

Pagination and sorting

You can use Pagination and Sort classes to decorate your models. They simply wrap your models and add pagination and sorting functionality to them.

Pagination

You can add a callback parameter to the Pagination class which will be invoked after the search with the total number of documents found.

from esorm.model import ESModel, Pagination


class User(ESModel):
    id: int  # This will be used as the document _id in the index
    name: str
    age: int


def get_users(page = 1, page_size = 10):

    def pagination_callback(total: int):
        # You may set a header value or something else here
        print(f'Total users: {total}')

    # 1st create the decorator itself
    pagination = Pagination(page=page, page_size=page_size)
    
    # Then decorate your model
    res = pagination(User).search_by_fields(age=18)
    
    # Here the result has maximum 10 items
    return res

Sorting

It is similar to pagination:

from esorm.model import ESModel, Sort


class User(ESModel):
    id: int  # This will be used as the document _id in the index
    name: str
    age: int
    
    
def get_users():
    # 1st create the decorator itself
    sort = Sort(sort=[
        {'age': {'order': 'desc'}},
        {'name': {'order': 'asc'}}
    ])
    
    # Then decorate your model
    res = sort(User).search_by_fields(age=18)
    
    # Here the result is sorted by age ascending
    return res
    
def get_user_sorted_by_name():
    # You can also use this simplified syntax 
    sort = Sort(sort='name')
    
    # Then decorate your model
    res = sort(User).all()
    
    # Here the result is sorted by age descending
    return res

🧪 Testing

For testing you can use the test.sh in the root directory. It is a script to running tests on multiple python interpreters in virtual environments. At the top of the file you can specify which python interpreters you want to test. The ES versions are specified in tests/docker-compose.yml file.

If you already have a virtual environment, simply use pytest to run the tests.

🛡 License

This project is licensed under the terms of the Mozilla Public License 2.0 ( MPL 2.0) license.

📃 Citation

If you use this project in your research, please cite it using the following BibTeX entry:

@misc{esorm,
  author = {Adam Wallner},
  title = {ESORM: ElasticSearch Object Relational Mapper},
  year = {2023},
  publisher = {GitHub},
  journal = {GitHub repository},
  howpublished = {\url{https://github.com/wallneradam/esorm}},
}