Skip to content

Commit 037e71f

Browse files
authored
fix(idempotency): validate idempotency record returned in conditional write (#2083)
* fix(idempotency): apply validation to idempotency record returned in error * fix(idempotency): expose validation & caching existing record as single method * fix(idempotency): rename new method * chore(idempotency): comments & ordering
1 parent 2d82117 commit 037e71f

File tree

3 files changed

+149
-14
lines changed

3 files changed

+149
-14
lines changed

packages/idempotency/src/IdempotencyHandler.ts

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -300,9 +300,10 @@ export class IdempotencyHandler<Func extends AnyFunction> {
300300
};
301301

302302
/**
303-
* Save an in progress record to the idempotency store or return an existing result.
303+
* Save an in progress record to the idempotency store or return an stored result.
304304
*
305-
* If the record already exists, return the result from the record.
305+
* Before returning a result, we might neede to look up the idempotency record
306+
* and validate it to ensure that it is consistent with the payload to be hashed.
306307
*/
307308
#saveInProgressOrReturnExistingResult =
308309
async (): Promise<JSONValue | void> => {
@@ -313,11 +314,22 @@ export class IdempotencyHandler<Func extends AnyFunction> {
313314
);
314315
} catch (e) {
315316
if (e instanceof IdempotencyItemAlreadyExistsError) {
316-
const idempotencyRecord: IdempotencyRecord =
317-
e.existingRecord ||
318-
(await this.#persistenceStore.getRecord(
317+
let idempotencyRecord = e.existingRecord;
318+
if (idempotencyRecord !== undefined) {
319+
// If the error includes the existing record, we can use it to validate
320+
// the record being processed and cache it in memory.
321+
idempotencyRecord = this.#persistenceStore.processExistingRecord(
322+
idempotencyRecord,
319323
this.#functionPayloadToBeHashed
320-
));
324+
);
325+
// If the error doesn't include the existing record, we need to fetch
326+
// it from the persistence layer. In doing so, we also call the processExistingRecord
327+
// method to validate the record and cache it in memory.
328+
} else {
329+
idempotencyRecord = await this.#persistenceStore.getRecord(
330+
this.#functionPayloadToBeHashed
331+
);
332+
}
321333

322334
return IdempotencyHandler.determineResultFromIdempotencyRecord(
323335
idempotencyRecord

packages/idempotency/src/persistence/BasePersistenceLayer.ts

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,7 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
114114
}
115115

116116
const record = await this._getRecord(idempotencyKey);
117-
this.saveToCache(record);
118-
this.validatePayload(data, record);
117+
this.processExistingRecord(record, data);
119118

120119
return record;
121120
}
@@ -127,6 +126,29 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
127126
return this.payloadValidationEnabled;
128127
}
129128

129+
/**
130+
* Validates an existing record against the data payload being processed.
131+
* If the payload does not match the stored record, an `IdempotencyValidationError` error is thrown.
132+
*
133+
* Whenever a record is retrieved from the persistence layer, it should be validated against the data payload
134+
* being processed. This is to ensure that the data payload being processed is the same as the one that was
135+
* used to create the record in the first place.
136+
*
137+
* The record is also saved to the local cache if local caching is enabled.
138+
*
139+
* @param record - the stored record to validate against
140+
* @param data - the data payload being processed and to be validated against the stored record
141+
*/
142+
public processExistingRecord(
143+
storedDataRecord: IdempotencyRecord,
144+
processedData: JSONValue | IdempotencyRecord
145+
): IdempotencyRecord {
146+
this.validatePayload(processedData, storedDataRecord);
147+
this.saveToCache(storedDataRecord);
148+
149+
return storedDataRecord;
150+
}
151+
130152
/**
131153
* Saves a record indicating that the function's execution is currently in progress
132154
*
@@ -303,8 +325,9 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
303325
/**
304326
* Save record to local cache except for when status is `INPROGRESS`.
305327
*
306-
* We can't cache `INPROGRESS` records because we have no way to reflect updates
307-
* that might happen to the record outside the execution context of the function.
328+
* Records with `INPROGRESS` status are not cached because we have no way to
329+
* reflect updates that might happen to the record outside the execution
330+
* context of the function.
308331
*
309332
* @param record - record to save
310333
*/
@@ -314,13 +337,26 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
314337
this.cache?.add(record.idempotencyKey, record);
315338
}
316339

317-
private validatePayload(data: JSONValue, record: IdempotencyRecord): void {
340+
/**
341+
* Validates the payload against the stored record. If the payload does not match the stored record,
342+
* an `IdempotencyValidationError` error is thrown.
343+
*
344+
* @param data - The data payload to validate against the stored record
345+
* @param storedDataRecord - The stored record to validate against
346+
*/
347+
private validatePayload(
348+
data: JSONValue | IdempotencyRecord,
349+
storedDataRecord: IdempotencyRecord
350+
): void {
318351
if (this.payloadValidationEnabled) {
319-
const hashedPayload: string = this.getHashedPayload(data);
320-
if (hashedPayload !== record.payloadHash) {
352+
const hashedPayload =
353+
data instanceof IdempotencyRecord
354+
? data.payloadHash
355+
: this.getHashedPayload(data);
356+
if (hashedPayload !== storedDataRecord.payloadHash) {
321357
throw new IdempotencyValidationError(
322358
'Payload does not match stored record for this event key',
323-
record
359+
storedDataRecord
324360
);
325361
}
326362
}

packages/idempotency/tests/unit/persistence/BasePersistenceLayer.test.ts

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
*
44
* @group unit/idempotency/persistence/base
55
*/
6+
import { createHash } from 'node:crypto';
67
import { ContextExamples as dummyContext } from '@aws-lambda-powertools/commons';
78
import { IdempotencyConfig, IdempotencyRecordStatus } from '../../../src';
89
import {
@@ -464,6 +465,92 @@ describe('Class: BasePersistenceLayer', () => {
464465
});
465466
});
466467

468+
describe('Method: processExistingRecord', () => {
469+
it('throws an error if the payload does not match the stored record', () => {
470+
// Prepare
471+
const persistenceLayer = new PersistenceLayerTestClass();
472+
persistenceLayer.configure({
473+
config: new IdempotencyConfig({
474+
payloadValidationJmesPath: 'foo',
475+
}),
476+
});
477+
const existingRecord = new IdempotencyRecord({
478+
idempotencyKey: 'my-lambda-function#mocked-hash',
479+
status: IdempotencyRecordStatus.INPROGRESS,
480+
payloadHash: 'different-hash',
481+
});
482+
483+
// Act & Assess
484+
expect(() =>
485+
persistenceLayer.processExistingRecord(existingRecord, { foo: 'bar' })
486+
).toThrow(
487+
new IdempotencyValidationError(
488+
'Payload does not match stored record for this event key',
489+
existingRecord
490+
)
491+
);
492+
});
493+
494+
it('returns if the payload matches the stored record', () => {
495+
// Prepare
496+
const persistenceLayer = new PersistenceLayerTestClass();
497+
persistenceLayer.configure({
498+
config: new IdempotencyConfig({
499+
payloadValidationJmesPath: 'foo',
500+
}),
501+
});
502+
const existingRecord = new IdempotencyRecord({
503+
idempotencyKey: 'my-lambda-function#mocked-hash',
504+
status: IdempotencyRecordStatus.INPROGRESS,
505+
payloadHash: 'mocked-hash',
506+
});
507+
508+
// Act & Assess
509+
expect(() =>
510+
persistenceLayer.processExistingRecord(existingRecord, { foo: 'bar' })
511+
).not.toThrow();
512+
});
513+
514+
it('skips validation if payload validation is not enabled', () => {
515+
// Prepare
516+
const persistenceLayer = new PersistenceLayerTestClass();
517+
const existingRecord = new IdempotencyRecord({
518+
idempotencyKey: 'my-lambda-function#mocked-hash',
519+
status: IdempotencyRecordStatus.INPROGRESS,
520+
payloadHash: 'different-hash',
521+
});
522+
523+
// Act & Assess
524+
expect(() =>
525+
persistenceLayer.processExistingRecord(existingRecord, { foo: 'bar' })
526+
).not.toThrow();
527+
});
528+
529+
it('skips hashing if the payload is already an IdempotencyRecord', () => {
530+
// Prepare
531+
const persistenceLayer = new PersistenceLayerTestClass();
532+
persistenceLayer.configure({
533+
config: new IdempotencyConfig({
534+
payloadValidationJmesPath: 'foo',
535+
}),
536+
});
537+
const existingRecord = new IdempotencyRecord({
538+
idempotencyKey: 'my-lambda-function#mocked-hash',
539+
status: IdempotencyRecordStatus.INPROGRESS,
540+
payloadHash: 'mocked-hash',
541+
});
542+
const payload = new IdempotencyRecord({
543+
idempotencyKey: 'my-lambda-function#mocked-hash',
544+
status: IdempotencyRecordStatus.INPROGRESS,
545+
payloadHash: 'mocked-hash',
546+
});
547+
548+
// Act
549+
persistenceLayer.processExistingRecord(existingRecord, payload);
550+
expect(createHash).toHaveBeenCalledTimes(0);
551+
});
552+
});
553+
467554
describe('Method: getExpiresAfterSeconds', () => {
468555
it('returns the configured value', () => {
469556
// Prepare

0 commit comments

Comments
 (0)