You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: content/microservices/kafka.md
+102Lines changed: 102 additions & 0 deletions
Original file line number
Diff line number
Diff line change
@@ -433,6 +433,108 @@ throw new KafkaRetriableException('...');
433
433
434
434
> info **Hint**`KafkaRetriableException` class is exported from the `@nestjs/microservices` package.
435
435
436
+
### Kafka Exception Handling
437
+
438
+
In addition to the default error handling mechanisms, you can implement a custom Exception Filter for Kafka events to handle retry logic. For example, the following sample shows how to skip a problematic event after a configurable number of retries:
if (!topic||partition===undefined||offset===undefined) {
510
+
thrownewError('Incomplete Kafka message context for committing offset.');
511
+
}
512
+
513
+
awaitconsumer.commitOffsets([
514
+
{
515
+
topic,
516
+
partition,
517
+
// When committing an offset, commit the next number (i.e., current offset + 1)
518
+
offset: (Number(offset) +1).toString(),
519
+
},
520
+
]);
521
+
}
522
+
}
523
+
```
524
+
525
+
This filter provides a mechanism to retry processing a Kafka event up to a configurable number of times. Once the maximum retries are reached, it executes a custom skipHandler (if provided) and commits the offset, effectively skipping the problematic event. This ensures that subsequent events can be processed.
526
+
You can integrate this filter by adding it to your event handlers:
Committing offsets is essential when working with Kafka. Per default, messages will be automatically committed after a specific time. For more information visit [KafkaJS docs](https://kafka.js.org/docs/consuming#autocommit). `KafkaContext` offers a way to access the active consumer for manually committing offsets. The consumer is the KafkaJS consumer and works as the [native KafkaJS implementation](https://kafka.js.org/docs/consuming#manual-committing).
0 commit comments