Description
I'm running into some problems with XRay in combination with async functions and aiobotocore.
Packages:
aioboto3==6.4.1
aiobotocore==0.10.2
aws-xray-sdk==2.4.2
boto3==1.9.176
botocore==1.12.176
I've created a very small sample script to demonstrate my issue:
import aioboto3
import asyncio
import logging
import sys
from datetime import datetime
from aws_xray_sdk.core import patch
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core.async_context import AsyncContext
logger = logging.getLogger()
logger.setLevel(logging.INFO)
xray_logger = logging.getLogger('aws_xray_sdk')
xray_logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
logging.getLogger().addHandler(handler)
xray_recorder.configure(service='Test', context=AsyncContext(), sampling=False)
patch([
'aiobotocore',
], raise_errors=False)
async def perform_test(gather=False, loop_count=10):
async_client = aioboto3.client('clouddirectory', region_name='eu-west-1')
try:
with xray_recorder.in_segment_async('ASYNC'):
@xray_recorder.capture_async('execute')
async def execute(client):
retval = await client.list_directories(state='ENABLED')
return retval['Directories']
results = []
if gather:
with xray_recorder.capture_async('gather'):
for result in await asyncio.gather(*[execute(async_client) for i in range(loop_count)]):
results += result
else:
with xray_recorder.capture_async('loop'):
for i in range(loop_count):
results += await execute(async_client)
return results
finally:
await async_client.close()
def test_gather(loop_count=10):
try:
now = datetime.utcnow()
asyncio.get_event_loop().run_until_complete(perform_test(gather=True, loop_count=loop_count))
logger.info('GATHER: PASS %s' % str(datetime.utcnow() - now))
except:
logger.exception('GATHER: FAIL')
def test_loop(loop_count=10):
try:
now = datetime.utcnow()
asyncio.get_event_loop().run_until_complete(perform_test(gather=False, loop_count=loop_count))
logger.info('LOOP: PASS %s' % str(datetime.utcnow() - now))
except:
logger.exception('LOOP: FAIL')
logger.info('==========================================')
test_gather()
logger.info('==========================================')
logger.info('==========================================')
test_loop()
logger.info('==========================================')
The expected outcome should be something like:
==========================================
Found credentials in shared credentials file: ~/.aws/credentials
GATHER: PASS 0:00:00.251635
==========================================
==========================================
LOOP: PASS 0:00:01.386801
==========================================
The timings are irrelevant, but it it means that both approaches of using aiobotocore and XRay work together. However, the actual outcome is:
successfully patched module aiobotocore
==========================================
Found credentials in shared credentials file: ~/.aws/credentials
GATHER: FAIL
Traceback (most recent call last):
File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/async_recorder.py", line 82, in record_subsegment_async
return_value = await wrapped(*args, **kwargs)
File "/home/idoorn/xray_test.py", line 36, in execute
retval = await client.list_directories(state='ENABLED')
File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/ext/aiobotocore/patch.py", line 36, in _xray_traced_aiobotocore
meta_processor=aws_meta_processor,
File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/async_recorder.py", line 101, in record_subsegment_async
stack=stack,
File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/ext/boto_utils.py", line 57, in aws_meta_processor
resp_meta.get('HTTPStatusCode'))
File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/models/entity.py", line 102, in put_http_meta
self._check_ended()
File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/models/entity.py", line 283, in _check_ended
raise AlreadyEndedException("Already ended segment and subsegment cannot be modified.")
aws_xray_sdk.core.exceptions.exceptions.AlreadyEndedException: Already ended segment and subsegment cannot be modified.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/idoorn/xray_test.py", line 58, in test_gather
asyncio.get_event_loop().run_until_complete(perform_test(gather=True, loop_count=loop_count))
File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
return future.result()
File "/home/idoorn/xray_test.py", line 42, in perform_test
for result in await asyncio.gather(*[execute(async_client) for i in range(loop_count)]):
File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/async_recorder.py", line 33, in __call__
meta_processor=None,
File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/async_recorder.py", line 105, in record_subsegment_async
subsegment.add_exception(exception, stack)
File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/models/entity.py", line 220, in add_exception
self._check_ended()
File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/models/entity.py", line 283, in _check_ended
raise AlreadyEndedException("Already ended segment and subsegment cannot be modified.")
aws_xray_sdk.core.exceptions.exceptions.AlreadyEndedException: Already ended segment and subsegment cannot be modified.
==========================================
==========================================
LOOP: PASS 0:00:01.989784
==========================================
Using asyncio.gather() doesn't seem to play nice with xray, as when the call to clouddirectory comes, the subsegment which was used for tracing the call has already been closed. Only when calling the async calls sequentially will xray work nicely. But that obviously would kind of defeat the benefits of using async. :)
NOTE: if I remove the lines:
patch([
'aiobotocore',
], raise_errors=False)
from the above test script, everything works well, but no tracing is occurring on the clouddirectory calls obviously.