Open
Description
We are continuously listening for messages on a stream. When we use block > 0(We do not want to set it to 0) in the XREADGROUP command, spring is creating new connections for every read. This results in so many TIME_WAIT ports on the client which builds up rapidly if there are many incoming messages. For our use case, we need to set block > 0 and also re-use the same connection. Checking the code in DefaultStreamReceiver
's constructor, looks like overriding the StreamReadOptions.isBlocking()
to always return false will solve our problem but the DefaultStreamReceiver
does not take a custom StreamReadOptions
. Can you expose a method like setReadOptions(reaOptions)
on DefaultStreamReceiver
to take it as an argument?
Sample code I use:
StreamReceiverOptions<String, MapRecord<String, String, String>> options = StreamReceiverOptions.builder().pollTimeout(Duration.ofSeconds(30)).build();
StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(factory, options);
receiver.receiveAutoAck(Consumer.from("group", "consumer"), StreamOffset.create("stream", ReadOffset.lastConsumed()))//
.flatMap(record -> Mono.fromCallable(() -> consume(record)).subscribeOn(Schedulers.boundedElastic()))//
.subscribe();