Skip to content

fix(idempotency): validate idempotency record returned in conditional write #2083

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions packages/idempotency/src/IdempotencyHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,10 @@ export class IdempotencyHandler<Func extends AnyFunction> {
};

/**
* Save an in progress record to the idempotency store or return an existing result.
* Save an in progress record to the idempotency store or return an stored result.
*
* If the record already exists, return the result from the record.
* Before returning a result, we might neede to look up the idempotency record
* and validate it to ensure that it is consistent with the payload to be hashed.
*/
#saveInProgressOrReturnExistingResult =
async (): Promise<JSONValue | void> => {
Expand All @@ -313,11 +314,22 @@ export class IdempotencyHandler<Func extends AnyFunction> {
);
} catch (e) {
if (e instanceof IdempotencyItemAlreadyExistsError) {
const idempotencyRecord: IdempotencyRecord =
e.existingRecord ||
(await this.#persistenceStore.getRecord(
let idempotencyRecord = e.existingRecord;
if (idempotencyRecord !== undefined) {
// If the error includes the existing record, we can use it to validate
// the record being processed and cache it in memory.
idempotencyRecord = this.#persistenceStore.processExistingRecord(
idempotencyRecord,
this.#functionPayloadToBeHashed
));
);
// If the error doesn't include the existing record, we need to fetch
// it from the persistence layer. In doing so, we also call the processExistingRecord
// method to validate the record and cache it in memory.
} else {
idempotencyRecord = await this.#persistenceStore.getRecord(
this.#functionPayloadToBeHashed
);
}

return IdempotencyHandler.determineResultFromIdempotencyRecord(
idempotencyRecord
Expand Down
52 changes: 44 additions & 8 deletions packages/idempotency/src/persistence/BasePersistenceLayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
}

const record = await this._getRecord(idempotencyKey);
this.saveToCache(record);
this.validatePayload(data, record);
this.processExistingRecord(record, data);

return record;
}
Expand All @@ -127,6 +126,29 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
return this.payloadValidationEnabled;
}

/**
* Validates an existing record against the data payload being processed.
* If the payload does not match the stored record, an `IdempotencyValidationError` error is thrown.
*
* Whenever a record is retrieved from the persistence layer, it should be validated against the data payload
* being processed. This is to ensure that the data payload being processed is the same as the one that was
* used to create the record in the first place.
*
* The record is also saved to the local cache if local caching is enabled.
*
* @param record - the stored record to validate against
* @param data - the data payload being processed and to be validated against the stored record
*/
public processExistingRecord(
storedDataRecord: IdempotencyRecord,
processedData: JSONValue | IdempotencyRecord
): IdempotencyRecord {
this.validatePayload(processedData, storedDataRecord);
this.saveToCache(storedDataRecord);

return storedDataRecord;
}

/**
* Saves a record indicating that the function's execution is currently in progress
*
Expand Down Expand Up @@ -303,8 +325,9 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
/**
* Save record to local cache except for when status is `INPROGRESS`.
*
* We can't cache `INPROGRESS` records because we have no way to reflect updates
* that might happen to the record outside the execution context of the function.
* Records with `INPROGRESS` status are not cached because we have no way to
* reflect updates that might happen to the record outside the execution
* context of the function.
*
* @param record - record to save
*/
Expand All @@ -314,13 +337,26 @@ abstract class BasePersistenceLayer implements BasePersistenceLayerInterface {
this.cache?.add(record.idempotencyKey, record);
}

private validatePayload(data: JSONValue, record: IdempotencyRecord): void {
/**
* Validates the payload against the stored record. If the payload does not match the stored record,
* an `IdempotencyValidationError` error is thrown.
*
* @param data - The data payload to validate against the stored record
* @param storedDataRecord - The stored record to validate against
*/
private validatePayload(
data: JSONValue | IdempotencyRecord,
storedDataRecord: IdempotencyRecord
): void {
if (this.payloadValidationEnabled) {
const hashedPayload: string = this.getHashedPayload(data);
if (hashedPayload !== record.payloadHash) {
const hashedPayload =
data instanceof IdempotencyRecord
? data.payloadHash
: this.getHashedPayload(data);
if (hashedPayload !== storedDataRecord.payloadHash) {
throw new IdempotencyValidationError(
'Payload does not match stored record for this event key',
record
storedDataRecord
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*
* @group unit/idempotency/persistence/base
*/
import { createHash } from 'node:crypto';
import { ContextExamples as dummyContext } from '@aws-lambda-powertools/commons';
import { IdempotencyConfig, IdempotencyRecordStatus } from '../../../src';
import {
Expand Down Expand Up @@ -464,6 +465,92 @@ describe('Class: BasePersistenceLayer', () => {
});
});

describe('Method: processExistingRecord', () => {
it('throws an error if the payload does not match the stored record', () => {
// Prepare
const persistenceLayer = new PersistenceLayerTestClass();
persistenceLayer.configure({
config: new IdempotencyConfig({
payloadValidationJmesPath: 'foo',
}),
});
const existingRecord = new IdempotencyRecord({
idempotencyKey: 'my-lambda-function#mocked-hash',
status: IdempotencyRecordStatus.INPROGRESS,
payloadHash: 'different-hash',
});

// Act & Assess
expect(() =>
persistenceLayer.processExistingRecord(existingRecord, { foo: 'bar' })
).toThrow(
new IdempotencyValidationError(
'Payload does not match stored record for this event key',
existingRecord
)
);
});

it('returns if the payload matches the stored record', () => {
// Prepare
const persistenceLayer = new PersistenceLayerTestClass();
persistenceLayer.configure({
config: new IdempotencyConfig({
payloadValidationJmesPath: 'foo',
}),
});
const existingRecord = new IdempotencyRecord({
idempotencyKey: 'my-lambda-function#mocked-hash',
status: IdempotencyRecordStatus.INPROGRESS,
payloadHash: 'mocked-hash',
});

// Act & Assess
expect(() =>
persistenceLayer.processExistingRecord(existingRecord, { foo: 'bar' })
).not.toThrow();
});

it('skips validation if payload validation is not enabled', () => {
// Prepare
const persistenceLayer = new PersistenceLayerTestClass();
const existingRecord = new IdempotencyRecord({
idempotencyKey: 'my-lambda-function#mocked-hash',
status: IdempotencyRecordStatus.INPROGRESS,
payloadHash: 'different-hash',
});

// Act & Assess
expect(() =>
persistenceLayer.processExistingRecord(existingRecord, { foo: 'bar' })
).not.toThrow();
});

it('skips hashing if the payload is already an IdempotencyRecord', () => {
// Prepare
const persistenceLayer = new PersistenceLayerTestClass();
persistenceLayer.configure({
config: new IdempotencyConfig({
payloadValidationJmesPath: 'foo',
}),
});
const existingRecord = new IdempotencyRecord({
idempotencyKey: 'my-lambda-function#mocked-hash',
status: IdempotencyRecordStatus.INPROGRESS,
payloadHash: 'mocked-hash',
});
const payload = new IdempotencyRecord({
idempotencyKey: 'my-lambda-function#mocked-hash',
status: IdempotencyRecordStatus.INPROGRESS,
payloadHash: 'mocked-hash',
});

// Act
persistenceLayer.processExistingRecord(existingRecord, payload);
expect(createHash).toHaveBeenCalledTimes(0);
});
});

describe('Method: getExpiresAfterSeconds', () => {
it('returns the configured value', () => {
// Prepare
Expand Down