Skip to content

improv(batch): improve errors #1648

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion packages/batch/src/AsyncBatchProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor';
import { BatchProcessingError } from './errors';
import type { BaseRecord, FailureResponse, SuccessResponse } from './types';

/**
Expand All @@ -24,7 +25,9 @@ class AsyncBatchProcessor extends BasePartialBatchProcessor {
* @returns response of success or failure
*/
public processRecord(_record: BaseRecord): SuccessResponse | FailureResponse {
throw new Error('Not implemented. Use asyncProcess() instead.');
throw new BatchProcessingError(
'Not implemented. Use asyncProcess() instead.'
);
}
}

Expand Down
13 changes: 4 additions & 9 deletions packages/batch/src/BasePartialBatchProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type {
} from 'aws-lambda';
import { BasePartialProcessor } from './BasePartialProcessor';
import { DATA_CLASS_MAPPING, DEFAULT_RESPONSE, EventType } from './constants';
import { BatchProcessingError } from './errors';
import { FullBatchFailureError } from './errors';
import type {
EventSourceDataClassTypes,
PartialItemFailureResponse,
Expand Down Expand Up @@ -46,12 +46,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
}

if (this.entireBatchFailed()) {
throw new BatchProcessingError(
'All records failed processing. ' +
this.exceptions.length +
' individual errors logged separately below.',
this.exceptions
);
throw new FullBatchFailureError(this.errors);
}

const messages: PartialItemFailures[] = this.getMessagesToReport();
Expand Down Expand Up @@ -110,7 +105,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
* @returns true if all records resulted in exception results
*/
public entireBatchFailed(): boolean {
return this.exceptions.length == this.records.length;
return this.errors.length == this.records.length;
}

/**
Expand All @@ -135,7 +130,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
public prepare(): void {
this.successMessages.length = 0;
this.failureMessages.length = 0;
this.exceptions.length = 0;
this.errors.length = 0;
this.batchResponse = DEFAULT_RESPONSE;
}

Expand Down
6 changes: 3 additions & 3 deletions packages/batch/src/BasePartialProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import type {
* Abstract class for batch processors.
*/
abstract class BasePartialProcessor {
public exceptions: Error[];
public errors: Error[];

public failureMessages: EventSourceDataClassTypes[];

Expand All @@ -29,7 +29,7 @@ abstract class BasePartialProcessor {
public constructor() {
this.successMessages = [];
this.failureMessages = [];
this.exceptions = [];
this.errors = [];
this.records = [];
this.handler = new Function();
}
Expand Down Expand Up @@ -84,7 +84,7 @@ abstract class BasePartialProcessor {
exception: Error
): FailureResponse {
const entry: FailureResponse = ['fail', exception.message, record];
this.exceptions.push(exception);
this.errors.push(exception);
this.failureMessages.push(record);

return entry;
Expand Down
3 changes: 2 additions & 1 deletion packages/batch/src/BatchProcessor.ts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to update the error passed to the failure handler here (and in AsyncBatchProcessor) to be the BatchProcessingError as well?

And potentially add tests for the error type checking like with the SqsFifoPartialProcessor tests if we add the typing here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should leave Error there because that's the type expected by failureHandler and we want that method to stay generic so that customers creating their own partial processor can pass any type of error. Making the error more strict would force them to have to subclass our own error class.

Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor';
import { BatchProcessingError } from './errors';
import type { BaseRecord, FailureResponse, SuccessResponse } from './types';

/**
Expand All @@ -8,7 +9,7 @@ class BatchProcessor extends BasePartialBatchProcessor {
public async asyncProcessRecord(
_record: BaseRecord
): Promise<SuccessResponse | FailureResponse> {
throw new Error('Not implemented. Use process() instead.');
throw new BatchProcessingError('Not implemented. Use process() instead.');
}

/**
Expand Down
6 changes: 2 additions & 4 deletions packages/batch/src/SqsFifoPartialProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { BatchProcessor } from './BatchProcessor';
import { EventType } from './constants';
import { SqsFifoShortCircuitError } from './errors';
import type { FailureResponse, SuccessResponse } from './types';

/**
Expand Down Expand Up @@ -52,10 +53,7 @@ class SqsFifoPartialProcessor extends BatchProcessor {
for (const record of remainingRecords) {
const data = this.toBatchType(record, this.eventType);
processedRecords.push(
this.failureHandler(
data,
new Error('A previous record failed processing')
)
this.failureHandler(data, new SqsFifoShortCircuitError())
);
}

Expand Down
11 changes: 3 additions & 8 deletions packages/batch/src/asyncProcessPartialResponse.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor';
import { EventType } from './constants';
import { UnexpectedBatchTypeError } from './errors';
import type {
BaseRecord,
BatchProcessingOptions,
Expand All @@ -19,13 +19,8 @@ const asyncProcessPartialResponse = async (
processor: BasePartialBatchProcessor,
options?: BatchProcessingOptions
): Promise<PartialItemFailureResponse> => {
if (!event.Records) {
const eventTypes: string = Object.values(EventType).toString();
throw new Error(
'Failed to convert event to record batch for processing.\nPlease ensure batch event is a valid ' +
eventTypes +
' event.'
);
if (!event.Records || !Array.isArray(event.Records)) {
throw new UnexpectedBatchTypeError();
}

processor.register(event.Records, recordHandler, options);
Expand Down
82 changes: 46 additions & 36 deletions packages/batch/src/errors.ts
Original file line number Diff line number Diff line change
@@ -1,49 +1,59 @@
import { EventType } from './constants';

/**
* Base error type for batch processing
* All errors thrown by major failures extend this base class
* Base error thrown by the Batch Processing utility
*/
class BaseBatchProcessingError extends Error {
public childErrors: Error[];

public msg: string;

public constructor(msg: string, childErrors: Error[]) {
super(msg);
this.msg = msg;
this.childErrors = childErrors;
class BatchProcessingError extends Error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we want the error thrown in the BatchProcessor here to be as this BatchProcessingError instead of just Error, now that we have this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, very good catch. Updated in the latest commit.

public constructor(message: string) {
super(message);
this.name = 'BatchProcessingError';
}
}

/**
* Generates a list of errors that were generated by the major failure
* @returns Formatted string listing all the errors that occurred
*
* @example
* When all batch records fail to be processed, this will generate a string like:
* All records failed processing. 3 individual errors logged separately below.
* ,Failed to process record.
* ,Failed to process record.
* ,Failed to process record.
*/
public formatErrors(parentErrorString: string): string {
const errorList: string[] = [parentErrorString + '\n'];

for (const error of this.childErrors) {
errorList.push(error.message + '\n');
}
/**
* Error thrown by the Batch Processing utility when all batch records failed to be processed
*/
class FullBatchFailureError extends BatchProcessingError {
public recordErrors: Error[];

return '\n' + errorList;
public constructor(childErrors: Error[]) {
super('All records failed processing. See individual errors below.');
this.recordErrors = childErrors;
this.name = 'FullBatchFailureError';
}
}

/**
* When all batch records failed to be processed
* Error thrown by the Batch Processing utility when a SQS FIFO queue is short-circuited.
* This happens when a record fails processing and the remaining records are not processed
* to avoid out-of-order delivery.
*/
class BatchProcessingError extends BaseBatchProcessingError {
public constructor(msg: string, childErrors: Error[]) {
super(msg, childErrors);
const parentErrorString: string = this.message;
this.message = this.formatErrors(parentErrorString);
class SqsFifoShortCircuitError extends BatchProcessingError {
public constructor() {
super(
'A previous record failed processing. The remaining records were not processed to avoid out-of-order delivery.'
);
this.name = 'SqsFifoShortCircuitError';
}
}

export { BaseBatchProcessingError, BatchProcessingError };
/**
* Error thrown by the Batch Processing utility when a partial processor receives an unexpected
* batch type.
*/
class UnexpectedBatchTypeError extends BatchProcessingError {
public constructor() {
super(
`Unexpected batch type. Possible values are: ${Object.values(
EventType
).join(', ')}`
);
this.name = 'UnexpectedBatchTypeError';
}
}
export {
BatchProcessingError,
FullBatchFailureError,
SqsFifoShortCircuitError,
UnexpectedBatchTypeError,
};
11 changes: 3 additions & 8 deletions packages/batch/src/processPartialResponse.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor';
import { EventType } from './constants';
import { UnexpectedBatchTypeError } from './errors';
import type {
BaseRecord,
BatchProcessingOptions,
Expand All @@ -19,13 +19,8 @@ const processPartialResponse = (
processor: BasePartialBatchProcessor,
options?: BatchProcessingOptions
): PartialItemFailureResponse => {
if (!event.Records) {
const eventTypes: string = Object.values(EventType).toString();
throw new Error(
'Failed to convert event to record batch for processing.\nPlease ensure batch event is a valid ' +
eventTypes +
' event.'
);
if (!event.Records || !Array.isArray(event.Records)) {
throw new UnexpectedBatchTypeError();
}

processor.register(event.Records, recordHandler, options);
Expand Down
14 changes: 6 additions & 8 deletions packages/batch/tests/unit/AsyncBatchProcessor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import type { Context } from 'aws-lambda';
import { helloworldContext as dummyContext } from '../../../commons/src/samples/resources/contexts';
import { AsyncBatchProcessor } from '../../src/AsyncBatchProcessor';
import { EventType } from '../../src/constants';
import { BatchProcessingError } from '../../src/errors';
import { BatchProcessingError, FullBatchFailureError } from '../../src/errors';
import type { BatchProcessingOptions } from '../../src/types';
import {
dynamodbRecordFactory,
Expand Down Expand Up @@ -95,7 +95,7 @@ describe('Class: AsyncBatchProcessor', () => {

// Assess
await expect(processor.asyncProcess()).rejects.toThrowError(
BatchProcessingError
FullBatchFailureError
);
});
});
Expand Down Expand Up @@ -160,7 +160,7 @@ describe('Class: AsyncBatchProcessor', () => {

// Assess
await expect(processor.asyncProcess()).rejects.toThrowError(
BatchProcessingError
FullBatchFailureError
);
});
});
Expand Down Expand Up @@ -225,7 +225,7 @@ describe('Class: AsyncBatchProcessor', () => {

// Assess
await expect(processor.asyncProcess()).rejects.toThrowError(
BatchProcessingError
FullBatchFailureError
);
});
});
Expand Down Expand Up @@ -279,7 +279,7 @@ describe('Class: AsyncBatchProcessor', () => {
// Act
processor.register(records, asyncHandlerWithContext, badOptions);
await expect(() => processor.asyncProcess()).rejects.toThrowError(
BatchProcessingError
FullBatchFailureError
);
});
});
Expand All @@ -289,8 +289,6 @@ describe('Class: AsyncBatchProcessor', () => {
const processor = new AsyncBatchProcessor(EventType.SQS);

// Act & Assess
expect(() => processor.process()).toThrowError(
'Not implemented. Use asyncProcess() instead.'
);
expect(() => processor.process()).toThrowError(BatchProcessingError);
});
});
14 changes: 7 additions & 7 deletions packages/batch/tests/unit/BatchProcessor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import type { Context } from 'aws-lambda';
import { helloworldContext as dummyContext } from '../../../commons/src/samples/resources/contexts';
import { BatchProcessor } from '../../src/BatchProcessor';
import { EventType } from '../../src/constants';
import { BatchProcessingError } from '../../src/errors';
import { BatchProcessingError, FullBatchFailureError } from '../../src/errors';
import type { BatchProcessingOptions } from '../../src/types';
import {
dynamodbRecordFactory,
Expand Down Expand Up @@ -92,7 +92,7 @@ describe('Class: BatchProcessor', () => {

// Act & Assess
processor.register(records, sqsRecordHandler);
expect(() => processor.process()).toThrowError(BatchProcessingError);
expect(() => processor.process()).toThrowError(FullBatchFailureError);
});
});

Expand Down Expand Up @@ -154,7 +154,7 @@ describe('Class: BatchProcessor', () => {
processor.register(records, kinesisRecordHandler);

// Assess
expect(() => processor.process()).toThrowError(BatchProcessingError);
expect(() => processor.process()).toThrowError(FullBatchFailureError);
});
});

Expand Down Expand Up @@ -217,7 +217,7 @@ describe('Class: BatchProcessor', () => {
processor.register(records, dynamodbRecordHandler);

// Assess
expect(() => processor.process()).toThrowError(BatchProcessingError);
expect(() => processor.process()).toThrowError(FullBatchFailureError);
});
});

Expand Down Expand Up @@ -269,7 +269,7 @@ describe('Class: BatchProcessor', () => {

// Act
processor.register(records, handlerWithContext, badOptions);
expect(() => processor.process()).toThrowError(BatchProcessingError);
expect(() => processor.process()).toThrowError(FullBatchFailureError);
});
});

Expand All @@ -278,8 +278,8 @@ describe('Class: BatchProcessor', () => {
const processor = new BatchProcessor(EventType.SQS);

// Act & Assess
await expect(() => processor.asyncProcess()).rejects.toThrow(
'Not implemented. Use process() instead.'
await expect(() => processor.asyncProcess()).rejects.toThrowError(
BatchProcessingError
);
});
});
7 changes: 6 additions & 1 deletion packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
*
* @group unit/batch/class/sqsfifobatchprocessor
*/
import { SqsFifoPartialProcessor, processPartialResponse } from '../../src';
import {
SqsFifoPartialProcessor,
processPartialResponse,
SqsFifoShortCircuitError,
} from '../../src';
import { sqsRecordFactory } from '../helpers/factories';
import { sqsRecordHandler } from '../helpers/handlers';

Expand Down Expand Up @@ -54,6 +58,7 @@ describe('Class: SqsFifoBatchProcessor', () => {
expect(result['batchItemFailures'][1]['itemIdentifier']).toBe(
thirdRecord.messageId
);
expect(processor.errors[1]).toBeInstanceOf(SqsFifoShortCircuitError);
});
});
});
Loading