Skip to content

Support for re-using the same connection when we set "block" greater than 0 in XREADGROUP(using lettuce lib underneath) #2248

Open
@arulned

Description

@arulned

We are continuously listening for messages on a stream. When we use block > 0(We do not want to set it to 0) in the XREADGROUP command, spring is creating new connections for every read. This results in so many TIME_WAIT ports on the client which builds up rapidly if there are many incoming messages. For our use case, we need to set block > 0 and also re-use the same connection. Checking the code in DefaultStreamReceiver's constructor, looks like overriding the StreamReadOptions.isBlocking() to always return false will solve our problem but the DefaultStreamReceiver does not take a custom StreamReadOptions. Can you expose a method like setReadOptions(reaOptions) on DefaultStreamReceiver to take it as an argument?

Sample code I use:

StreamReceiverOptions<String, MapRecord<String, String, String>> options  = StreamReceiverOptions.builder().pollTimeout(Duration.ofSeconds(30)).build();
StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(factory, options);

receiver.receiveAutoAck(Consumer.from("group", "consumer"), StreamOffset.create("stream", ReadOffset.lastConsumed()))//
        .flatMap(record -> Mono.fromCallable(() -> consume(record)).subscribeOn(Schedulers.boundedElastic()))//
        .subscribe();

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions