Skip to content

async operations with patched aiobotocore gives error "Already ended segment and subsegment cannot be modified." #164

Open
@IvDoorn

Description

@IvDoorn

I'm running into some problems with XRay in combination with async functions and aiobotocore.

Packages:

aioboto3==6.4.1
aiobotocore==0.10.2
aws-xray-sdk==2.4.2
boto3==1.9.176
botocore==1.12.176

I've created a very small sample script to demonstrate my issue:

import aioboto3
import asyncio
import logging
import sys

from datetime import datetime

from aws_xray_sdk.core import patch
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core.async_context import AsyncContext

logger = logging.getLogger()
logger.setLevel(logging.INFO)

xray_logger = logging.getLogger('aws_xray_sdk')
xray_logger.setLevel(logging.INFO)

handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
logging.getLogger().addHandler(handler)

xray_recorder.configure(service='Test', context=AsyncContext(), sampling=False)

patch([
    'aiobotocore',
], raise_errors=False)


async def perform_test(gather=False, loop_count=10):
    async_client = aioboto3.client('clouddirectory', region_name='eu-west-1')

    try:
        with xray_recorder.in_segment_async('ASYNC'):
            @xray_recorder.capture_async('execute')
            async def execute(client):
                retval = await client.list_directories(state='ENABLED')
                return retval['Directories']

            results = []
            if gather:
                with xray_recorder.capture_async('gather'):
                    for result in await asyncio.gather(*[execute(async_client) for i in range(loop_count)]):
                        results += result

            else:
                with xray_recorder.capture_async('loop'):
                    for i in range(loop_count):
                        results += await execute(async_client)

            return results
    finally:
        await async_client.close()


def test_gather(loop_count=10):
    try:
        now = datetime.utcnow()
        asyncio.get_event_loop().run_until_complete(perform_test(gather=True, loop_count=loop_count))
        logger.info('GATHER: PASS %s' % str(datetime.utcnow() - now))
    except:
        logger.exception('GATHER: FAIL')


def test_loop(loop_count=10):
    try:
        now = datetime.utcnow()
        asyncio.get_event_loop().run_until_complete(perform_test(gather=False, loop_count=loop_count))
        logger.info('LOOP: PASS %s' % str(datetime.utcnow() - now))
    except:
        logger.exception('LOOP: FAIL')


logger.info('==========================================')
test_gather()
logger.info('==========================================')
logger.info('==========================================')
test_loop()
logger.info('==========================================')

The expected outcome should be something like:

==========================================
Found credentials in shared credentials file: ~/.aws/credentials
GATHER: PASS 0:00:00.251635
==========================================
==========================================
LOOP: PASS 0:00:01.386801
==========================================

The timings are irrelevant, but it it means that both approaches of using aiobotocore and XRay work together. However, the actual outcome is:

successfully patched module aiobotocore
==========================================
Found credentials in shared credentials file: ~/.aws/credentials
GATHER: FAIL
Traceback (most recent call last):
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/async_recorder.py", line 82, in record_subsegment_async
    return_value = await wrapped(*args, **kwargs)
  File "/home/idoorn/xray_test.py", line 36, in execute
    retval = await client.list_directories(state='ENABLED')
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/ext/aiobotocore/patch.py", line 36, in _xray_traced_aiobotocore
    meta_processor=aws_meta_processor,
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/async_recorder.py", line 101, in record_subsegment_async
    stack=stack,
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/ext/boto_utils.py", line 57, in aws_meta_processor
    resp_meta.get('HTTPStatusCode'))
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/models/entity.py", line 102, in put_http_meta
    self._check_ended()
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/models/entity.py", line 283, in _check_ended
    raise AlreadyEndedException("Already ended segment and subsegment cannot be modified.")
aws_xray_sdk.core.exceptions.exceptions.AlreadyEndedException: Already ended segment and subsegment cannot be modified.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/idoorn/xray_test.py", line 58, in test_gather
    asyncio.get_event_loop().run_until_complete(perform_test(gather=True, loop_count=loop_count))
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
    return future.result()
  File "/home/idoorn/xray_test.py", line 42, in perform_test
    for result in await asyncio.gather(*[execute(async_client) for i in range(loop_count)]):
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/async_recorder.py", line 33, in __call__
    meta_processor=None,
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/async_recorder.py", line 105, in record_subsegment_async
    subsegment.add_exception(exception, stack)
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/models/entity.py", line 220, in add_exception
    self._check_ended()
  File "/home/idoorn/anaconda2/envs/bazelenv36/lib/python3.6/site-packages/aws_xray_sdk/core/models/entity.py", line 283, in _check_ended
    raise AlreadyEndedException("Already ended segment and subsegment cannot be modified.")
aws_xray_sdk.core.exceptions.exceptions.AlreadyEndedException: Already ended segment and subsegment cannot be modified.
==========================================
==========================================
LOOP: PASS 0:00:01.989784
==========================================

Using asyncio.gather() doesn't seem to play nice with xray, as when the call to clouddirectory comes, the subsegment which was used for tracing the call has already been closed. Only when calling the async calls sequentially will xray work nicely. But that obviously would kind of defeat the benefits of using async. :)

NOTE: if I remove the lines:

patch([
    'aiobotocore',
], raise_errors=False)

from the above test script, everything works well, but no tracing is occurring on the clouddirectory calls obviously.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions