diff --git a/docs/utilities/idempotency.md b/docs/utilities/idempotency.md index 65852c06ca..d45607db69 100644 --- a/docs/utilities/idempotency.md +++ b/docs/utilities/idempotency.md @@ -109,10 +109,11 @@ If you're not [changing the default configuration for the DynamoDB persistence l Larger items cannot be written to DynamoDB and will cause exceptions. ???+ info "Info: DynamoDB" - Each function invocation will generally make 2 requests to DynamoDB. If the - result returned by your Lambda is less than 1kb, you can expect 2 WCUs per invocation. For retried invocations, you will - see 1WCU and 1RCU. Review the [DynamoDB pricing documentation](https://aws.amazon.com/dynamodb/pricing/){target="_blank"} to - estimate the cost. + Each function invocation will make only 1 request to DynamoDB by using DynamoDB's [conditional expressions](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Expressions.ConditionExpressions.html){target="_blank"} to ensure that we don't overwrite existing records, + and [ReturnValuesOnConditionCheckFailure](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_PutItem.html#DDB-PutItem-request-ReturnValuesOnConditionCheckFailure){target="_blank"} to return the record if it exists. + See [AWS Blog post on handling conditional write errors](https://aws.amazon.com/blogs/database/handle-conditional-write-errors-in-high-concurrency-scenarios-with-amazon-dynamodb/) for more details. + For retried invocations, you will see 1WCU and 1RCU. + Review the [DynamoDB pricing documentation](https://aws.amazon.com/dynamodb/pricing/){target="_blank"} to estimate the cost. ### MakeIdempotent function wrapper diff --git a/packages/idempotency/src/IdempotencyHandler.ts b/packages/idempotency/src/IdempotencyHandler.ts index 7deb2c53f7..0808edf999 100644 --- a/packages/idempotency/src/IdempotencyHandler.ts +++ b/packages/idempotency/src/IdempotencyHandler.ts @@ -314,9 +314,10 @@ export class IdempotencyHandler { } catch (e) { if (e instanceof IdempotencyItemAlreadyExistsError) { const idempotencyRecord: IdempotencyRecord = - await this.#persistenceStore.getRecord( + e.existingRecord || + (await this.#persistenceStore.getRecord( this.#functionPayloadToBeHashed - ); + )); return IdempotencyHandler.determineResultFromIdempotencyRecord( idempotencyRecord diff --git a/packages/idempotency/src/errors.ts b/packages/idempotency/src/errors.ts index da96692bc8..674e05d6a6 100644 --- a/packages/idempotency/src/errors.ts +++ b/packages/idempotency/src/errors.ts @@ -1,7 +1,16 @@ +import type { IdempotencyRecord } from './persistence'; + /** * Item attempting to be inserted into persistence store already exists and is not expired */ -class IdempotencyItemAlreadyExistsError extends Error {} +class IdempotencyItemAlreadyExistsError extends Error { + public existingRecord?: IdempotencyRecord; + + public constructor(message?: string, existingRecord?: IdempotencyRecord) { + super(message); + this.existingRecord = existingRecord; + } +} /** * Item does not exist in persistence store diff --git a/packages/idempotency/src/persistence/BasePersistenceLayer.ts b/packages/idempotency/src/persistence/BasePersistenceLayer.ts index e90118d042..108a7c0adf 100644 --- a/packages/idempotency/src/persistence/BasePersistenceLayer.ts +++ b/packages/idempotency/src/persistence/BasePersistenceLayer.ts @@ -154,7 +154,10 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface { } if (this.getFromCache(idempotencyRecord.idempotencyKey)) { - throw new IdempotencyItemAlreadyExistsError(); + throw new IdempotencyItemAlreadyExistsError( + `Failed to put record for already existing idempotency key: ${idempotencyRecord.idempotencyKey}`, + idempotencyRecord + ); } await this._putRecord(idempotencyRecord); diff --git a/packages/idempotency/src/persistence/DynamoDBPersistenceLayer.ts b/packages/idempotency/src/persistence/DynamoDBPersistenceLayer.ts index e13bdebdfd..f30fa83b57 100644 --- a/packages/idempotency/src/persistence/DynamoDBPersistenceLayer.ts +++ b/packages/idempotency/src/persistence/DynamoDBPersistenceLayer.ts @@ -6,10 +6,10 @@ import { IdempotencyRecordStatus } from '../constants'; import type { DynamoDBPersistenceOptions } from '../types'; import { AttributeValue, + ConditionalCheckFailedException, DeleteItemCommand, DynamoDBClient, DynamoDBClientConfig, - DynamoDBServiceException, GetItemCommand, PutItemCommand, UpdateItemCommand, @@ -198,15 +198,26 @@ class DynamoDBPersistenceLayer extends BasePersistenceLayer { ':inprogress': IdempotencyRecordStatus.INPROGRESS, }), ConditionExpression: conditionExpression, + ReturnValuesOnConditionCheckFailure: 'ALL_OLD', }) ); } catch (error) { - if (error instanceof DynamoDBServiceException) { - if (error.name === 'ConditionalCheckFailedException') { - throw new IdempotencyItemAlreadyExistsError( - `Failed to put record for already existing idempotency key: ${record.idempotencyKey}` - ); - } + if (error instanceof ConditionalCheckFailedException) { + const item = error.Item && unmarshall(error.Item); + const idempotencyRecord = + item && + new IdempotencyRecord({ + idempotencyKey: item[this.keyAttr], + status: item[this.statusAttr], + expiryTimestamp: item[this.expiryAttr], + inProgressExpiryTimestamp: item[this.inProgressExpiryAttr], + responseData: item[this.dataAttr], + payloadHash: item[this.validationKeyAttr], + }); + throw new IdempotencyItemAlreadyExistsError( + `Failed to put record for already existing idempotency key: ${record.idempotencyKey}`, + idempotencyRecord + ); } throw error; diff --git a/packages/idempotency/tests/unit/IdempotencyHandler.test.ts b/packages/idempotency/tests/unit/IdempotencyHandler.test.ts index deeccee875..1ebb14e0be 100644 --- a/packages/idempotency/tests/unit/IdempotencyHandler.test.ts +++ b/packages/idempotency/tests/unit/IdempotencyHandler.test.ts @@ -119,6 +119,28 @@ describe('Class IdempotencyHandler', () => { expect(saveInProgressSpy).toHaveBeenCalledTimes(1); }); + test('when IdempotencyAlreadyInProgressError is thrown and it contains the existing item, it returns it directly', async () => { + // Prepare + const saveInProgressSpy = jest + .spyOn(persistenceStore, 'saveInProgress') + .mockRejectedValueOnce( + new IdempotencyItemAlreadyExistsError( + 'Failed to put record for already existing idempotency key: idempotence-key', + new IdempotencyRecord({ + idempotencyKey: 'key', + status: IdempotencyRecordStatus.COMPLETED, + responseData: 'Hi', + }) + ) + ); + const getRecordSpy = jest.spyOn(persistenceStore, 'getRecord'); + + // Act & Assess + await expect(idempotentHandler.handle()).resolves.toEqual('Hi'); + expect(saveInProgressSpy).toHaveBeenCalledTimes(1); + expect(getRecordSpy).toHaveBeenCalledTimes(0); + }); + test('when IdempotencyInconsistentStateError is thrown, it retries until max retries are exhausted', async () => { // Prepare const mockProcessIdempotency = jest diff --git a/packages/idempotency/tests/unit/persistence/BasePersistenceLayer.test.ts b/packages/idempotency/tests/unit/persistence/BasePersistenceLayer.test.ts index 3224c0661b..b615434c9f 100644 --- a/packages/idempotency/tests/unit/persistence/BasePersistenceLayer.test.ts +++ b/packages/idempotency/tests/unit/persistence/BasePersistenceLayer.test.ts @@ -407,7 +407,7 @@ describe('Class: BasePersistenceLayer', () => { // Act & Assess await expect( persistenceLayer.saveInProgress({ foo: 'bar' }) - ).rejects.toThrow(new IdempotencyItemAlreadyExistsError()); + ).rejects.toThrow(IdempotencyItemAlreadyExistsError); expect(putRecordSpy).toHaveBeenCalledTimes(0); }); diff --git a/packages/idempotency/tests/unit/persistence/DynamoDbPersistenceLayer.test.ts b/packages/idempotency/tests/unit/persistence/DynamoDbPersistenceLayer.test.ts index aa817e7b77..4a0195c19e 100644 --- a/packages/idempotency/tests/unit/persistence/DynamoDbPersistenceLayer.test.ts +++ b/packages/idempotency/tests/unit/persistence/DynamoDbPersistenceLayer.test.ts @@ -12,8 +12,8 @@ import { IdempotencyRecord } from '../../../src/persistence'; import type { DynamoDBPersistenceOptions } from '../../../src/types'; import { IdempotencyRecordStatus } from '../../../src'; import { + ConditionalCheckFailedException, DynamoDBClient, - DynamoDBServiceException, PutItemCommand, GetItemCommand, UpdateItemCommand, @@ -395,19 +395,30 @@ describe('Class: DynamoDBPersistenceLayer', () => { expiryTimestamp: 0, }); client.on(PutItemCommand).rejects( - new DynamoDBServiceException({ - $fault: 'client', + new ConditionalCheckFailedException({ $metadata: { httpStatusCode: 400, requestId: 'someRequestId', }, - name: 'ConditionalCheckFailedException', + message: 'Conditional check failed', + Item: { + id: { S: 'test-key' }, + status: { S: 'INPROGRESS' }, + expiration: { N: Date.now().toString() }, + }, }) ); // Act & Assess await expect(persistenceLayer._putRecord(record)).rejects.toThrowError( - IdempotencyItemAlreadyExistsError + new IdempotencyItemAlreadyExistsError( + `Failed to put record for already existing idempotency key: ${record.idempotencyKey}`, + new IdempotencyRecord({ + idempotencyKey: record.idempotencyKey, + status: IdempotencyRecordStatus.EXPIRED, + expiryTimestamp: Date.now() / 1000 - 1, + }) + ) ); }); @@ -676,4 +687,26 @@ describe('Class: DynamoDBPersistenceLayer', () => { }); }); }); + + test('_putRecord throws Error when Item is undefined', async () => { + // Prepare + const persistenceLayer = new TestDynamoDBPersistenceLayer({ + tableName: dummyTableName, + }); + const mockRecord = new IdempotencyRecord({ + idempotencyKey: 'test-key', + status: 'INPROGRESS', + expiryTimestamp: Date.now(), + }); + + DynamoDBClient.prototype.send = jest.fn().mockRejectedValueOnce( + new ConditionalCheckFailedException({ + message: 'Conditional check failed', + $metadata: {}, + }) + ); + await expect( + persistenceLayer._putRecord(mockRecord) + ).rejects.toThrowError(); + }); });