Skip to content

Commit 1844363

Browse files
docs: minor improvements
1 parent 7cd8ea5 commit 1844363

File tree

1 file changed

+10
-10
lines changed

1 file changed

+10
-10
lines changed

content/microservices/kafka.md

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -433,9 +433,9 @@ throw new KafkaRetriableException('...');
433433

434434
> info **Hint** `KafkaRetriableException` class is exported from the `@nestjs/microservices` package.
435435
436-
### Kafka Exception Handling
436+
### Custom exception handling
437437

438-
In addition to the default error handling mechanisms, you can implement a custom Exception Filter for Kafka events to handle retry logic. For example, the following sample shows how to skip a problematic event after a configurable number of retries:
438+
Along with the default error handling mechanisms, you can create a custom Exception Filter for Kafka events to manage retry logic. For instance, the example below demonstrates how to skip a problematic event after a configurable number of retries:
439439

440440
```typescript
441441
import { Catch, ArgumentsHost, Logger } from '@nestjs/common';
@@ -457,13 +457,13 @@ export class KafkaMaxRetryExceptionFilter extends BaseExceptionFilter {
457457
async catch(exception: unknown, host: ArgumentsHost) {
458458
const kafkaContext = host.switchToRpc().getContext<KafkaContext>();
459459
const message = kafkaContext.getMessage();
460-
461-
// Assume that the retryCount can be retrieved from the KafkaContext (or message headers)
462460
const currentRetryCount = this.getRetryCountFromContext(kafkaContext);
463461

464462
if (currentRetryCount >= this.maxRetries) {
465463
this.logger.warn(
466-
`Max retries (${this.maxRetries}) exceeded for message: ${JSON.stringify(message)}`,
464+
`Max retries (${
465+
this.maxRetries
466+
}) exceeded for message: ${JSON.stringify(message)}`,
467467
);
468468

469469
if (this.skipHandler) {
@@ -474,7 +474,6 @@ export class KafkaMaxRetryExceptionFilter extends BaseExceptionFilter {
474474
}
475475
}
476476

477-
// Attempt to commit the message offset
478477
try {
479478
await this.commitOffset(kafkaContext);
480479
} catch (commitError) {
@@ -487,14 +486,12 @@ export class KafkaMaxRetryExceptionFilter extends BaseExceptionFilter {
487486
super.catch(exception, host);
488487
}
489488

490-
// Extracts retryCount from the KafkaContext or message headers
491489
private getRetryCountFromContext(context: KafkaContext): number {
492490
const headers = context.getMessage().headers || {};
493491
const retryHeader = headers['retryCount'] || headers['retry-count'];
494492
return retryHeader ? Number(retryHeader) : 0;
495493
}
496494

497-
// Commits the offset of the message (dependent on the KafkaJS API)
498495
private async commitOffset(context: KafkaContext): Promise<void> {
499496
const consumer = context.getConsumer && context.getConsumer();
500497
if (!consumer) {
@@ -507,7 +504,9 @@ export class KafkaMaxRetryExceptionFilter extends BaseExceptionFilter {
507504
const offset = message.offset;
508505

509506
if (!topic || partition === undefined || offset === undefined) {
510-
throw new Error('Incomplete Kafka message context for committing offset.');
507+
throw new Error(
508+
'Incomplete Kafka message context for committing offset.',
509+
);
511510
}
512511

513512
await consumer.commitOffsets([
@@ -522,7 +521,8 @@ export class KafkaMaxRetryExceptionFilter extends BaseExceptionFilter {
522521
}
523522
```
524523

525-
This filter provides a mechanism to retry processing a Kafka event up to a configurable number of times. Once the maximum retries are reached, it executes a custom skipHandler (if provided) and commits the offset, effectively skipping the problematic event. This ensures that subsequent events can be processed.
524+
This filter offers a way to retry processing a Kafka event up to a configurable number of times. Once the maximum retries are reached, it triggers a custom `skipHandler` (if provided) and commits the offset, effectively skipping the problematic event. This allows subsequent events to be processed without interruption.
525+
526526
You can integrate this filter by adding it to your event handlers:
527527

528528
```typescript

0 commit comments

Comments
 (0)