Skip to content

Add support for Async I/O via asyncio #1230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ pytest
pytest-cov
coverage
mock
nosexcover
sphinx<1.7
sphinx_rtd_theme
jinja2
Expand All @@ -15,3 +14,8 @@ pandas
pyyaml<5.3

black; python_version>="3.6"

# Async dependencies
unasync; python_version>="3.6"
aiohttp; python_version>="3.6"
pytest-asyncio; python_version>="3.6"
180 changes: 180 additions & 0 deletions docs/async.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
Using Asyncio with Elasticsearch
================================

.. py:module:: elasticsearch

Starting in ``elasticsearch-py`` v7.8.0 for Python 3.6+ the ``elasticsearch`` package supports async/await with
`Asyncio <https://docs.python.org/3/library/asyncio.html>`_. Install the package with the ``async``
extra to install the ``aiohttp`` HTTP client and other dependencies required for async support:

.. code-block:: bash

$ python -m pip install elasticsearch[async]>=7.8.0

The same version specifiers for following the Elastic Stack apply to
the ``async`` extra::

# Elasticsearch 7.x
$ python -m pip install elasticsearch[async]>=7,<8

After installation all async API endpoints are available via :class:`~elasticsearch.AsyncElasticsearch`
and are used in the same way as other APIs, just with an extra ``await``:

.. code-block:: python

import asyncio
from elasticsearch import AsyncElasticsearch

es = AsyncElasticsearch()

async def main():
resp = await es.search(
index="documents",
body={"query": {"match_all": {}}}
size=20,
)
print(resp)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

.. note::

Previously asyncio was supported via the `elasticsearch-async <https://github.com/elastic/elasticsearch-py-async>`_ package.
elasticsearch-async has been deprecated in favor of ``elasticsearch`` async support.
For Elasticsearch 7.x and later you must install
``elasticsearch[async]`` and use ``elasticsearch.AsyncElasticsearch()``.

.. note::

Async support is not supported in Python 3.5 or earlier. Upgrade to Python 3.6
or later for async support.

Async Helpers
-------------

Async variants of all helpers are available in ``elasticsearch.helpers``
and are all prefixed with ``async_*``. You'll notice that these APIs
are identical to the ones in the sync :ref:`helpers` documentation.

All async helpers that accept an iterator or generator also accept async iterators
and async generators.

.. py:module:: elasticsearch.helpers

Bulk and Streaming Bulk
~~~~~~~~~~~~~~~~~~~~~~~

.. autofunction:: async_bulk

.. code-block:: python

import asyncio
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_bulk

es = AsyncElasticsearch()

async def gendata():
mywords = ['foo', 'bar', 'baz']
for word in mywords:
yield {
"_index": "mywords",
"doc": {"word": word},
}

async def main():
await async_bulk(es, gendata())

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

.. autofunction:: async_streaming_bulk

.. code-block:: python

import asyncio
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_bulk

es = AsyncElasticsearch()

async def gendata():
mywords = ['foo', 'bar', 'baz']
for word in mywords:
yield {
"_index": "mywords",
"doc": {"word": word},
}

async def main():
async for ok, result in async_streaming_bulk(es, gendata()):
action, result = result.popitem()
if not ok:
print("failed to %s document %s" % ())

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Scan
~~~~

.. autofunction:: async_scan

.. code-block:: python

import asyncio
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_scan

es = AsyncElasticsearch()

async def main():
async for doc in async_scan(
client=es,
query={"query": {"match": {"title": "python"}}},
index="orders-*"
):
print(doc)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Reindex
~~~~~~~

.. autofunction:: async_reindex

API Reference
-------------

.. py:module:: elasticsearch

The API of :class:`~elasticsearch.AsyncElasticsearch` is nearly identical
to the API of :class:`~elasticsearch.Elasticsearch` with the exception that
every API call like :py:func:`~elasticsearch.AsyncElasticsearch.search` is
an ``async`` function and requires an ``await`` to properly return the response
body.

AsyncTransport
~~~~~~~~~~~~~~

.. autoclass:: AsyncTransport
:members:

AIOHttpConnection
~~~~~~~~~~~~~~~~~

.. autoclass:: AIOHttpConnection
:members:

AsyncElasticsearch
~~~~~~~~~~~~~~~~~~

.. note::

To reference Elasticsearch APIs that are namespaced like ``.indices.create()``
refer to the sync API reference. These APIs are identical between sync and async.

.. autoclass:: AsyncElasticsearch
:members:
12 changes: 11 additions & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,14 @@ Installation
Install the ``elasticsearch`` package with `pip
<https://pypi.python.org/pypi/elasticsearch>`_::

pip install elasticsearch
$ python -m pip install elasticsearch

If your application uses async/await in Python you can install with
the ``async`` extra::

$ python -m pip install elasticsearch[async]

Read more about `how to use asyncio with this project <async>`_.

Example Usage
-------------
Expand Down Expand Up @@ -257,6 +264,8 @@ APIKey Authentication
You can configure the client to use Elasticsearch's `API Key`_ for connecting to your cluster.
Please note this authentication method has been introduced with release of Elasticsearch ``6.7.0``.

.. code-block:: python

from elasticsearch import Elasticsearch

# you can use the api key tuple
Expand Down Expand Up @@ -374,6 +383,7 @@ Contents
api
xpack
exceptions
async
connection
transports
helpers
Expand Down
24 changes: 24 additions & 0 deletions elasticsearch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
__version__ = VERSION
__versionstr__ = ".".join(map(str, VERSION))

import sys
import logging
import warnings

Expand Down Expand Up @@ -64,3 +65,26 @@
"AuthorizationException",
"ElasticsearchDeprecationWarning",
]

try:
# Async is only supported on Python 3.6+
if sys.version_info < (3, 6):
raise ImportError()

from ._async import (
AsyncElasticsearch,
AsyncTransport,
AIOHttpConnection,
AsyncConnectionPool,
AsyncDummyConnectionPool,
)

__all__ += [
"AsyncElasticsearch",
"AsyncTransport",
"AIOHttpConnection",
"AsyncConnectionPool",
"AsyncDummyConnectionPool",
]
except (ImportError, SyntaxError):
pass
28 changes: 28 additions & 0 deletions elasticsearch/_async/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Licensed to Elasticsearch B.V under one or more agreements.
# Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
# See the LICENSE file in the project root for more information

from .client import Elasticsearch
from .connection_pool import AsyncConnectionPool, AsyncDummyConnectionPool
from .transport import AsyncTransport
from .http_aiohttp import AIOHttpConnection


class AsyncElasticsearch(Elasticsearch):
# This class def is for both the name 'AsyncElasticsearch'
# and all async-only additions to the class.
async def __aenter__(self):
await self.transport._async_call()
return self


AsyncElasticsearch.__doc__ = Elasticsearch.__doc__


__all__ = [
"AsyncElasticsearch",
"AsyncConnectionPool",
"AsyncDummyConnectionPool",
"AsyncTransport",
"AIOHttpConnection",
]
Loading