diff --git a/packages/batch/src/AsyncBatchProcessor.ts b/packages/batch/src/AsyncBatchProcessor.ts index f4e02a327e..10c404a323 100644 --- a/packages/batch/src/AsyncBatchProcessor.ts +++ b/packages/batch/src/AsyncBatchProcessor.ts @@ -1,4 +1,5 @@ import { BasePartialBatchProcessor } from './BasePartialBatchProcessor'; +import { BatchProcessingError } from './errors'; import type { BaseRecord, FailureResponse, SuccessResponse } from './types'; /** @@ -24,7 +25,9 @@ class AsyncBatchProcessor extends BasePartialBatchProcessor { * @returns response of success or failure */ public processRecord(_record: BaseRecord): SuccessResponse | FailureResponse { - throw new Error('Not implemented. Use asyncProcess() instead.'); + throw new BatchProcessingError( + 'Not implemented. Use asyncProcess() instead.' + ); } } diff --git a/packages/batch/src/BasePartialBatchProcessor.ts b/packages/batch/src/BasePartialBatchProcessor.ts index d4cfd7e9ce..3cf4b30309 100644 --- a/packages/batch/src/BasePartialBatchProcessor.ts +++ b/packages/batch/src/BasePartialBatchProcessor.ts @@ -5,7 +5,7 @@ import type { } from 'aws-lambda'; import { BasePartialProcessor } from './BasePartialProcessor'; import { DATA_CLASS_MAPPING, DEFAULT_RESPONSE, EventType } from './constants'; -import { BatchProcessingError } from './errors'; +import { FullBatchFailureError } from './errors'; import type { EventSourceDataClassTypes, PartialItemFailureResponse, @@ -46,12 +46,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { } if (this.entireBatchFailed()) { - throw new BatchProcessingError( - 'All records failed processing. ' + - this.exceptions.length + - ' individual errors logged separately below.', - this.exceptions - ); + throw new FullBatchFailureError(this.errors); } const messages: PartialItemFailures[] = this.getMessagesToReport(); @@ -110,7 +105,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { * @returns true if all records resulted in exception results */ public entireBatchFailed(): boolean { - return this.exceptions.length == this.records.length; + return this.errors.length == this.records.length; } /** @@ -135,7 +130,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { public prepare(): void { this.successMessages.length = 0; this.failureMessages.length = 0; - this.exceptions.length = 0; + this.errors.length = 0; this.batchResponse = DEFAULT_RESPONSE; } diff --git a/packages/batch/src/BasePartialProcessor.ts b/packages/batch/src/BasePartialProcessor.ts index ecd62c29b0..a51400d4a5 100644 --- a/packages/batch/src/BasePartialProcessor.ts +++ b/packages/batch/src/BasePartialProcessor.ts @@ -11,7 +11,7 @@ import type { * Abstract class for batch processors. */ abstract class BasePartialProcessor { - public exceptions: Error[]; + public errors: Error[]; public failureMessages: EventSourceDataClassTypes[]; @@ -29,7 +29,7 @@ abstract class BasePartialProcessor { public constructor() { this.successMessages = []; this.failureMessages = []; - this.exceptions = []; + this.errors = []; this.records = []; this.handler = new Function(); } @@ -84,7 +84,7 @@ abstract class BasePartialProcessor { exception: Error ): FailureResponse { const entry: FailureResponse = ['fail', exception.message, record]; - this.exceptions.push(exception); + this.errors.push(exception); this.failureMessages.push(record); return entry; diff --git a/packages/batch/src/BatchProcessor.ts b/packages/batch/src/BatchProcessor.ts index fdab0c6f44..730b3e94ef 100644 --- a/packages/batch/src/BatchProcessor.ts +++ b/packages/batch/src/BatchProcessor.ts @@ -1,4 +1,5 @@ import { BasePartialBatchProcessor } from './BasePartialBatchProcessor'; +import { BatchProcessingError } from './errors'; import type { BaseRecord, FailureResponse, SuccessResponse } from './types'; /** @@ -8,7 +9,7 @@ class BatchProcessor extends BasePartialBatchProcessor { public async asyncProcessRecord( _record: BaseRecord ): Promise { - throw new Error('Not implemented. Use process() instead.'); + throw new BatchProcessingError('Not implemented. Use process() instead.'); } /** diff --git a/packages/batch/src/SqsFifoPartialProcessor.ts b/packages/batch/src/SqsFifoPartialProcessor.ts index 0c10993273..ebdd4d73e8 100644 --- a/packages/batch/src/SqsFifoPartialProcessor.ts +++ b/packages/batch/src/SqsFifoPartialProcessor.ts @@ -1,5 +1,6 @@ import { BatchProcessor } from './BatchProcessor'; import { EventType } from './constants'; +import { SqsFifoShortCircuitError } from './errors'; import type { FailureResponse, SuccessResponse } from './types'; /** @@ -52,10 +53,7 @@ class SqsFifoPartialProcessor extends BatchProcessor { for (const record of remainingRecords) { const data = this.toBatchType(record, this.eventType); processedRecords.push( - this.failureHandler( - data, - new Error('A previous record failed processing') - ) + this.failureHandler(data, new SqsFifoShortCircuitError()) ); } diff --git a/packages/batch/src/asyncProcessPartialResponse.ts b/packages/batch/src/asyncProcessPartialResponse.ts index eee584ed1f..5a33b5b534 100644 --- a/packages/batch/src/asyncProcessPartialResponse.ts +++ b/packages/batch/src/asyncProcessPartialResponse.ts @@ -1,5 +1,5 @@ import { BasePartialBatchProcessor } from './BasePartialBatchProcessor'; -import { EventType } from './constants'; +import { UnexpectedBatchTypeError } from './errors'; import type { BaseRecord, BatchProcessingOptions, @@ -19,13 +19,8 @@ const asyncProcessPartialResponse = async ( processor: BasePartialBatchProcessor, options?: BatchProcessingOptions ): Promise => { - if (!event.Records) { - const eventTypes: string = Object.values(EventType).toString(); - throw new Error( - 'Failed to convert event to record batch for processing.\nPlease ensure batch event is a valid ' + - eventTypes + - ' event.' - ); + if (!event.Records || !Array.isArray(event.Records)) { + throw new UnexpectedBatchTypeError(); } processor.register(event.Records, recordHandler, options); diff --git a/packages/batch/src/errors.ts b/packages/batch/src/errors.ts index ed5bd4fc9e..8d2b0327b0 100644 --- a/packages/batch/src/errors.ts +++ b/packages/batch/src/errors.ts @@ -1,49 +1,59 @@ +import { EventType } from './constants'; + /** - * Base error type for batch processing - * All errors thrown by major failures extend this base class + * Base error thrown by the Batch Processing utility */ -class BaseBatchProcessingError extends Error { - public childErrors: Error[]; - - public msg: string; - - public constructor(msg: string, childErrors: Error[]) { - super(msg); - this.msg = msg; - this.childErrors = childErrors; +class BatchProcessingError extends Error { + public constructor(message: string) { + super(message); + this.name = 'BatchProcessingError'; } +} - /** - * Generates a list of errors that were generated by the major failure - * @returns Formatted string listing all the errors that occurred - * - * @example - * When all batch records fail to be processed, this will generate a string like: - * All records failed processing. 3 individual errors logged separately below. - * ,Failed to process record. - * ,Failed to process record. - * ,Failed to process record. - */ - public formatErrors(parentErrorString: string): string { - const errorList: string[] = [parentErrorString + '\n']; - - for (const error of this.childErrors) { - errorList.push(error.message + '\n'); - } +/** + * Error thrown by the Batch Processing utility when all batch records failed to be processed + */ +class FullBatchFailureError extends BatchProcessingError { + public recordErrors: Error[]; - return '\n' + errorList; + public constructor(childErrors: Error[]) { + super('All records failed processing. See individual errors below.'); + this.recordErrors = childErrors; + this.name = 'FullBatchFailureError'; } } /** - * When all batch records failed to be processed + * Error thrown by the Batch Processing utility when a SQS FIFO queue is short-circuited. + * This happens when a record fails processing and the remaining records are not processed + * to avoid out-of-order delivery. */ -class BatchProcessingError extends BaseBatchProcessingError { - public constructor(msg: string, childErrors: Error[]) { - super(msg, childErrors); - const parentErrorString: string = this.message; - this.message = this.formatErrors(parentErrorString); +class SqsFifoShortCircuitError extends BatchProcessingError { + public constructor() { + super( + 'A previous record failed processing. The remaining records were not processed to avoid out-of-order delivery.' + ); + this.name = 'SqsFifoShortCircuitError'; } } -export { BaseBatchProcessingError, BatchProcessingError }; +/** + * Error thrown by the Batch Processing utility when a partial processor receives an unexpected + * batch type. + */ +class UnexpectedBatchTypeError extends BatchProcessingError { + public constructor() { + super( + `Unexpected batch type. Possible values are: ${Object.values( + EventType + ).join(', ')}` + ); + this.name = 'UnexpectedBatchTypeError'; + } +} +export { + BatchProcessingError, + FullBatchFailureError, + SqsFifoShortCircuitError, + UnexpectedBatchTypeError, +}; diff --git a/packages/batch/src/processPartialResponse.ts b/packages/batch/src/processPartialResponse.ts index d09e7be6b9..2385f28666 100644 --- a/packages/batch/src/processPartialResponse.ts +++ b/packages/batch/src/processPartialResponse.ts @@ -1,5 +1,5 @@ import { BasePartialBatchProcessor } from './BasePartialBatchProcessor'; -import { EventType } from './constants'; +import { UnexpectedBatchTypeError } from './errors'; import type { BaseRecord, BatchProcessingOptions, @@ -19,13 +19,8 @@ const processPartialResponse = ( processor: BasePartialBatchProcessor, options?: BatchProcessingOptions ): PartialItemFailureResponse => { - if (!event.Records) { - const eventTypes: string = Object.values(EventType).toString(); - throw new Error( - 'Failed to convert event to record batch for processing.\nPlease ensure batch event is a valid ' + - eventTypes + - ' event.' - ); + if (!event.Records || !Array.isArray(event.Records)) { + throw new UnexpectedBatchTypeError(); } processor.register(event.Records, recordHandler, options); diff --git a/packages/batch/tests/unit/AsyncBatchProcessor.test.ts b/packages/batch/tests/unit/AsyncBatchProcessor.test.ts index 9079a1c464..0bccdd6d07 100644 --- a/packages/batch/tests/unit/AsyncBatchProcessor.test.ts +++ b/packages/batch/tests/unit/AsyncBatchProcessor.test.ts @@ -7,7 +7,7 @@ import type { Context } from 'aws-lambda'; import { helloworldContext as dummyContext } from '../../../commons/src/samples/resources/contexts'; import { AsyncBatchProcessor } from '../../src/AsyncBatchProcessor'; import { EventType } from '../../src/constants'; -import { BatchProcessingError } from '../../src/errors'; +import { BatchProcessingError, FullBatchFailureError } from '../../src/errors'; import type { BatchProcessingOptions } from '../../src/types'; import { dynamodbRecordFactory, @@ -95,7 +95,7 @@ describe('Class: AsyncBatchProcessor', () => { // Assess await expect(processor.asyncProcess()).rejects.toThrowError( - BatchProcessingError + FullBatchFailureError ); }); }); @@ -160,7 +160,7 @@ describe('Class: AsyncBatchProcessor', () => { // Assess await expect(processor.asyncProcess()).rejects.toThrowError( - BatchProcessingError + FullBatchFailureError ); }); }); @@ -225,7 +225,7 @@ describe('Class: AsyncBatchProcessor', () => { // Assess await expect(processor.asyncProcess()).rejects.toThrowError( - BatchProcessingError + FullBatchFailureError ); }); }); @@ -279,7 +279,7 @@ describe('Class: AsyncBatchProcessor', () => { // Act processor.register(records, asyncHandlerWithContext, badOptions); await expect(() => processor.asyncProcess()).rejects.toThrowError( - BatchProcessingError + FullBatchFailureError ); }); }); @@ -289,8 +289,6 @@ describe('Class: AsyncBatchProcessor', () => { const processor = new AsyncBatchProcessor(EventType.SQS); // Act & Assess - expect(() => processor.process()).toThrowError( - 'Not implemented. Use asyncProcess() instead.' - ); + expect(() => processor.process()).toThrowError(BatchProcessingError); }); }); diff --git a/packages/batch/tests/unit/BatchProcessor.test.ts b/packages/batch/tests/unit/BatchProcessor.test.ts index 5be28271d2..97f50b7d11 100644 --- a/packages/batch/tests/unit/BatchProcessor.test.ts +++ b/packages/batch/tests/unit/BatchProcessor.test.ts @@ -7,7 +7,7 @@ import type { Context } from 'aws-lambda'; import { helloworldContext as dummyContext } from '../../../commons/src/samples/resources/contexts'; import { BatchProcessor } from '../../src/BatchProcessor'; import { EventType } from '../../src/constants'; -import { BatchProcessingError } from '../../src/errors'; +import { BatchProcessingError, FullBatchFailureError } from '../../src/errors'; import type { BatchProcessingOptions } from '../../src/types'; import { dynamodbRecordFactory, @@ -92,7 +92,7 @@ describe('Class: BatchProcessor', () => { // Act & Assess processor.register(records, sqsRecordHandler); - expect(() => processor.process()).toThrowError(BatchProcessingError); + expect(() => processor.process()).toThrowError(FullBatchFailureError); }); }); @@ -154,7 +154,7 @@ describe('Class: BatchProcessor', () => { processor.register(records, kinesisRecordHandler); // Assess - expect(() => processor.process()).toThrowError(BatchProcessingError); + expect(() => processor.process()).toThrowError(FullBatchFailureError); }); }); @@ -217,7 +217,7 @@ describe('Class: BatchProcessor', () => { processor.register(records, dynamodbRecordHandler); // Assess - expect(() => processor.process()).toThrowError(BatchProcessingError); + expect(() => processor.process()).toThrowError(FullBatchFailureError); }); }); @@ -269,7 +269,7 @@ describe('Class: BatchProcessor', () => { // Act processor.register(records, handlerWithContext, badOptions); - expect(() => processor.process()).toThrowError(BatchProcessingError); + expect(() => processor.process()).toThrowError(FullBatchFailureError); }); }); @@ -278,8 +278,8 @@ describe('Class: BatchProcessor', () => { const processor = new BatchProcessor(EventType.SQS); // Act & Assess - await expect(() => processor.asyncProcess()).rejects.toThrow( - 'Not implemented. Use process() instead.' + await expect(() => processor.asyncProcess()).rejects.toThrowError( + BatchProcessingError ); }); }); diff --git a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts index 564886b1d8..127c87a0a2 100644 --- a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts +++ b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts @@ -3,7 +3,11 @@ * * @group unit/batch/class/sqsfifobatchprocessor */ -import { SqsFifoPartialProcessor, processPartialResponse } from '../../src'; +import { + SqsFifoPartialProcessor, + processPartialResponse, + SqsFifoShortCircuitError, +} from '../../src'; import { sqsRecordFactory } from '../helpers/factories'; import { sqsRecordHandler } from '../helpers/handlers'; @@ -54,6 +58,7 @@ describe('Class: SqsFifoBatchProcessor', () => { expect(result['batchItemFailures'][1]['itemIdentifier']).toBe( thirdRecord.messageId ); + expect(processor.errors[1]).toBeInstanceOf(SqsFifoShortCircuitError); }); }); }); diff --git a/packages/batch/tests/unit/asyncProcessPartialResponse.test.ts b/packages/batch/tests/unit/asyncProcessPartialResponse.test.ts index fde15ccf42..aef38f33ec 100644 --- a/packages/batch/tests/unit/asyncProcessPartialResponse.test.ts +++ b/packages/batch/tests/unit/asyncProcessPartialResponse.test.ts @@ -173,7 +173,6 @@ describe('Function: processPartialResponse()', () => { // Prepare const processor = new AsyncBatchProcessor(EventType.SQS); const event = dummyEvent; - const eventTypes: string = Object.values(EventType).toString(); const handler = async ( event: SQSEvent, @@ -190,11 +189,9 @@ describe('Function: processPartialResponse()', () => { await expect(() => handler(event as unknown as SQSEvent, context) ).rejects.toThrowError( - new Error( - 'Failed to convert event to record batch for processing.\nPlease ensure batch event is a valid ' + - eventTypes + - ' event.' - ) + `Unexpected batch type. Possible values are: ${Object.keys( + EventType + ).join(', ')}` ); }); diff --git a/packages/batch/tests/unit/processPartialResponse.test.ts b/packages/batch/tests/unit/processPartialResponse.test.ts index 3de2edcce3..f33caa6f06 100644 --- a/packages/batch/tests/unit/processPartialResponse.test.ts +++ b/packages/batch/tests/unit/processPartialResponse.test.ts @@ -157,7 +157,6 @@ describe('Function: processPartialResponse()', () => { // Prepare const processor = new BatchProcessor(EventType.SQS); const event = dummyEvent; - const eventTypes: string = Object.values(EventType).toString(); const handler = ( event: SQSEvent, @@ -168,11 +167,9 @@ describe('Function: processPartialResponse()', () => { // Act & Assess expect(() => handler(event as unknown as SQSEvent, context)).toThrowError( - new Error( - 'Failed to convert event to record batch for processing.\nPlease ensure batch event is a valid ' + - eventTypes + - ' event.' - ) + `Unexpected batch type. Possible values are: ${Object.keys( + EventType + ).join(', ')}` ); });