Description
I have a very simple app called "myapp". It uses the AsyncElasticsearch client:
from elasticsearch_async import AsyncElasticsearch
def create_app():
app = dict()
app['es_client'] = AsyncElasticsearch('http://index:9200/')
app['stuff'] = Stuff(app['es_client'])
return app
class Stuff:
def __init__(self, es_client):
self.es_client = es_client
def do_async_stuff(self):
return self.es_client.index(index='test',
doc_type='test',
body={'field': 'sample content'})
My question is not about AsyncElasticsearch, it just happens to be an async thing I want to work with, could be sth else like a Mongo driver or whatever.
I want to test do_async_stuff()
and wrote the following conftest.py
import pytest
from myapp import create_app
@pytest.fixture(scope='session')
def app():
return create_app()
... and test_stuff.py
import pytest
@pytest.mark.asyncio
async def test_stuff(app):
await app['stuff'].do_async_stuff()
assert True
When I execute the test I get an exception with the message "attached to a different loop". Digging into that matter I found that pytest-asyncio creates a new event_loop for each test case (right?). The Elasticsearch client however, takes the default loop on instantiation and sticks with it. So I tried to convince it to use the pytest-asyncio event_loop like so:
import pytest
@pytest.mark.asyncio
async def test_stuff(app, event_loop):
app['es_client'].transport.loop = event_loop
await app['stuff'].do_async_stuff()
assert True
This however gives me another exception:
__________________________________ test_stuff __________________________________
app = {'es_client': <Elasticsearch([{'host': 'index', 'port': 9200, 'scheme': 'http'}])>, 'stuff': <myapp.Stuff object at 0x7ffbbaff1860>}
@pytest.mark.asyncio
async def test_stuff(app):
> await app['stuff'].do_async_stuff()
test/test_stuff.py:6:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Task pending coro=<AsyncTransport.main_loop() running at /usr/local/lib/python3.5/dist-packages/elasticsearch_async/transport.py:133>>
def __iter__(self):
if not self.done():
self._blocking = True
> yield self # This tells Task to wait for completion.
E RuntimeError: Task <Task pending coro=<test_stuff() running at /srv/app/backend/test/test_stuff.py:6> cb=[_run_until_complete_cb() at /usr/lib/python3.5/asyncio/base_events.py:164]> got Future <Task pending coro=<AsyncTransport.main_loop() running at /usr/local/lib/python3.5/dist-packages/elasticsearch_async/transport.py:133>> attached to a different loop
How am I supposed to test this scenario?