diff --git a/packages/idempotency/src/IdempotencyHandler.ts b/packages/idempotency/src/IdempotencyHandler.ts index 0808edf999..a1ef92053a 100644 --- a/packages/idempotency/src/IdempotencyHandler.ts +++ b/packages/idempotency/src/IdempotencyHandler.ts @@ -300,9 +300,10 @@ export class IdempotencyHandler { }; /** - * Save an in progress record to the idempotency store or return an existing result. + * Save an in progress record to the idempotency store or return an stored result. * - * If the record already exists, return the result from the record. + * Before returning a result, we might neede to look up the idempotency record + * and validate it to ensure that it is consistent with the payload to be hashed. */ #saveInProgressOrReturnExistingResult = async (): Promise => { @@ -313,11 +314,22 @@ export class IdempotencyHandler { ); } catch (e) { if (e instanceof IdempotencyItemAlreadyExistsError) { - const idempotencyRecord: IdempotencyRecord = - e.existingRecord || - (await this.#persistenceStore.getRecord( + let idempotencyRecord = e.existingRecord; + if (idempotencyRecord !== undefined) { + // If the error includes the existing record, we can use it to validate + // the record being processed and cache it in memory. + idempotencyRecord = this.#persistenceStore.processExistingRecord( + idempotencyRecord, this.#functionPayloadToBeHashed - )); + ); + // If the error doesn't include the existing record, we need to fetch + // it from the persistence layer. In doing so, we also call the processExistingRecord + // method to validate the record and cache it in memory. + } else { + idempotencyRecord = await this.#persistenceStore.getRecord( + this.#functionPayloadToBeHashed + ); + } return IdempotencyHandler.determineResultFromIdempotencyRecord( idempotencyRecord diff --git a/packages/idempotency/src/persistence/BasePersistenceLayer.ts b/packages/idempotency/src/persistence/BasePersistenceLayer.ts index f6eb24e2c4..7f28a45d6a 100644 --- a/packages/idempotency/src/persistence/BasePersistenceLayer.ts +++ b/packages/idempotency/src/persistence/BasePersistenceLayer.ts @@ -114,8 +114,7 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface { } const record = await this._getRecord(idempotencyKey); - this.saveToCache(record); - this.validatePayload(data, record); + this.processExistingRecord(record, data); return record; } @@ -127,6 +126,29 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface { return this.payloadValidationEnabled; } + /** + * Validates an existing record against the data payload being processed. + * If the payload does not match the stored record, an `IdempotencyValidationError` error is thrown. + * + * Whenever a record is retrieved from the persistence layer, it should be validated against the data payload + * being processed. This is to ensure that the data payload being processed is the same as the one that was + * used to create the record in the first place. + * + * The record is also saved to the local cache if local caching is enabled. + * + * @param record - the stored record to validate against + * @param data - the data payload being processed and to be validated against the stored record + */ + public processExistingRecord( + storedDataRecord: IdempotencyRecord, + processedData: JSONValue | IdempotencyRecord + ): IdempotencyRecord { + this.validatePayload(processedData, storedDataRecord); + this.saveToCache(storedDataRecord); + + return storedDataRecord; + } + /** * Saves a record indicating that the function's execution is currently in progress * @@ -303,8 +325,9 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface { /** * Save record to local cache except for when status is `INPROGRESS`. * - * We can't cache `INPROGRESS` records because we have no way to reflect updates - * that might happen to the record outside the execution context of the function. + * Records with `INPROGRESS` status are not cached because we have no way to + * reflect updates that might happen to the record outside the execution + * context of the function. * * @param record - record to save */ @@ -314,13 +337,26 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface { this.cache?.add(record.idempotencyKey, record); } - private validatePayload(data: JSONValue, record: IdempotencyRecord): void { + /** + * Validates the payload against the stored record. If the payload does not match the stored record, + * an `IdempotencyValidationError` error is thrown. + * + * @param data - The data payload to validate against the stored record + * @param storedDataRecord - The stored record to validate against + */ + private validatePayload( + data: JSONValue | IdempotencyRecord, + storedDataRecord: IdempotencyRecord + ): void { if (this.payloadValidationEnabled) { - const hashedPayload: string = this.getHashedPayload(data); - if (hashedPayload !== record.payloadHash) { + const hashedPayload = + data instanceof IdempotencyRecord + ? data.payloadHash + : this.getHashedPayload(data); + if (hashedPayload !== storedDataRecord.payloadHash) { throw new IdempotencyValidationError( 'Payload does not match stored record for this event key', - record + storedDataRecord ); } } diff --git a/packages/idempotency/tests/unit/persistence/BasePersistenceLayer.test.ts b/packages/idempotency/tests/unit/persistence/BasePersistenceLayer.test.ts index 71f271706d..cb1943237e 100644 --- a/packages/idempotency/tests/unit/persistence/BasePersistenceLayer.test.ts +++ b/packages/idempotency/tests/unit/persistence/BasePersistenceLayer.test.ts @@ -3,6 +3,7 @@ * * @group unit/idempotency/persistence/base */ +import { createHash } from 'node:crypto'; import { ContextExamples as dummyContext } from '@aws-lambda-powertools/commons'; import { IdempotencyConfig, IdempotencyRecordStatus } from '../../../src'; import { @@ -464,6 +465,92 @@ describe('Class: BasePersistenceLayer', () => { }); }); + describe('Method: processExistingRecord', () => { + it('throws an error if the payload does not match the stored record', () => { + // Prepare + const persistenceLayer = new PersistenceLayerTestClass(); + persistenceLayer.configure({ + config: new IdempotencyConfig({ + payloadValidationJmesPath: 'foo', + }), + }); + const existingRecord = new IdempotencyRecord({ + idempotencyKey: 'my-lambda-function#mocked-hash', + status: IdempotencyRecordStatus.INPROGRESS, + payloadHash: 'different-hash', + }); + + // Act & Assess + expect(() => + persistenceLayer.processExistingRecord(existingRecord, { foo: 'bar' }) + ).toThrow( + new IdempotencyValidationError( + 'Payload does not match stored record for this event key', + existingRecord + ) + ); + }); + + it('returns if the payload matches the stored record', () => { + // Prepare + const persistenceLayer = new PersistenceLayerTestClass(); + persistenceLayer.configure({ + config: new IdempotencyConfig({ + payloadValidationJmesPath: 'foo', + }), + }); + const existingRecord = new IdempotencyRecord({ + idempotencyKey: 'my-lambda-function#mocked-hash', + status: IdempotencyRecordStatus.INPROGRESS, + payloadHash: 'mocked-hash', + }); + + // Act & Assess + expect(() => + persistenceLayer.processExistingRecord(existingRecord, { foo: 'bar' }) + ).not.toThrow(); + }); + + it('skips validation if payload validation is not enabled', () => { + // Prepare + const persistenceLayer = new PersistenceLayerTestClass(); + const existingRecord = new IdempotencyRecord({ + idempotencyKey: 'my-lambda-function#mocked-hash', + status: IdempotencyRecordStatus.INPROGRESS, + payloadHash: 'different-hash', + }); + + // Act & Assess + expect(() => + persistenceLayer.processExistingRecord(existingRecord, { foo: 'bar' }) + ).not.toThrow(); + }); + + it('skips hashing if the payload is already an IdempotencyRecord', () => { + // Prepare + const persistenceLayer = new PersistenceLayerTestClass(); + persistenceLayer.configure({ + config: new IdempotencyConfig({ + payloadValidationJmesPath: 'foo', + }), + }); + const existingRecord = new IdempotencyRecord({ + idempotencyKey: 'my-lambda-function#mocked-hash', + status: IdempotencyRecordStatus.INPROGRESS, + payloadHash: 'mocked-hash', + }); + const payload = new IdempotencyRecord({ + idempotencyKey: 'my-lambda-function#mocked-hash', + status: IdempotencyRecordStatus.INPROGRESS, + payloadHash: 'mocked-hash', + }); + + // Act + persistenceLayer.processExistingRecord(existingRecord, payload); + expect(createHash).toHaveBeenCalledTimes(0); + }); + }); + describe('Method: getExpiresAfterSeconds', () => { it('returns the configured value', () => { // Prepare