Skip to content

ReactiveRedisTemplate.listenToLater throws CancellationException Disconnected when used #2489

Closed
@cfredri4

Description

@cfredri4

New to this reactive thing and either I misunderstand something or something is broken with ReactiveRedisTemplate.listenToLater. Reproducing tests below; listenToChannel works fine and prints bar, but listenToChannelLater throws CancellationException Disconnected (full stack trace at end). Tested using 3.0.0.

import org.junit.jupiter.api.Test;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;

public class Test {
    
    @Test
    public void listenToChannel() throws Exception {
        
        var redisConfig = new RedisStandaloneConfiguration("127.0.0.1", 6379);
        var lettuceConfig = LettucePoolingClientConfiguration.defaultConfiguration();
        var connnectionFactory = new LettuceConnectionFactory(redisConfig, lettuceConfig);
        connnectionFactory.afterPropertiesSet();
        
        var template = new ReactiveStringRedisTemplate(connnectionFactory);
        
        var sub = template.listenToChannel("foo")
                .subscribe(System.out::println, Throwable::printStackTrace);
        
        Thread.sleep(1000);
        
        template.convertAndSend("foo", "bar").block();
        
        sub.dispose();
    }
    
    @Test
    public void listenToChannelLater() throws Exception {
        
        var redisConfig = new RedisStandaloneConfiguration("127.0.0.1", 6379);
        var lettuceConfig = LettucePoolingClientConfiguration.defaultConfiguration();
        var connnectionFactory = new LettuceConnectionFactory(redisConfig, lettuceConfig);
        connnectionFactory.afterPropertiesSet();
        
        var template = new ReactiveStringRedisTemplate(connnectionFactory);
        
        var sub = template.listenToChannelLater("foo")
                .flatMapMany(f -> f)
                .subscribe(System.out::println, Throwable::printStackTrace);
        
        Thread.sleep(1000);
        
        template.convertAndSend("foo", "bar").block();
        
        sub.dispose();
    }    
}
java.util.concurrent.CancellationException: Disconnected
	at reactor.core.publisher.FluxPublish$PublishSubscriber.disconnectAction(FluxPublish.java:327)
	at reactor.core.publisher.FluxPublish$PublishSubscriber.dispose(FluxPublish.java:318)
	at org.springframework.data.redis.connection.lettuce.LettuceReactiveSubscription$State.terminate(LettuceReactiveSubscription.java:263)
	at org.springframework.data.redis.connection.lettuce.LettuceReactiveSubscription.lambda$cancel$8(LettuceReactiveSubscription.java:147)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:240)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2205)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonoPeekTerminal.java:299)
	at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
	at io.lettuce.core.RedisPublisher$ImmediateSubscriber.onComplete(RedisPublisher.java:896)
	at io.lettuce.core.RedisPublisher$State.onAllDataRead(RedisPublisher.java:698)
	at io.lettuce.core.RedisPublisher$State$3.read(RedisPublisher.java:608)
	at io.lettuce.core.RedisPublisher$State$3.onDataAvailable(RedisPublisher.java:565)
	at io.lettuce.core.RedisPublisher$RedisSubscription.onDataAvailable(RedisPublisher.java:326)
	at io.lettuce.core.RedisPublisher$RedisSubscription.onAllDataRead(RedisPublisher.java:341)
	at io.lettuce.core.RedisPublisher$SubscriptionCommand.doOnComplete(RedisPublisher.java:778)
	at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:65)
	at io.lettuce.core.pubsub.PubSubCommandHandler.completeCommand(PubSubCommandHandler.java:260)
	at io.lettuce.core.pubsub.PubSubCommandHandler.notifyPushListeners(PubSubCommandHandler.java:220)
	at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:646)
	at io.lettuce.core.pubsub.PubSubCommandHandler.decode(PubSubCommandHandler.java:112)
	at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:598)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:833)

Metadata

Metadata

Assignees

Labels

status: invalidAn issue that we don't feel is valid

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions