Closed
Description
New to this reactive thing and either I misunderstand something or something is broken with ReactiveRedisTemplate.listenToLater. Reproducing tests below; listenToChannel
works fine and prints bar
, but listenToChannelLater
throws CancellationException Disconnected
(full stack trace at end). Tested using 3.0.0.
import org.junit.jupiter.api.Test;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
public class Test {
@Test
public void listenToChannel() throws Exception {
var redisConfig = new RedisStandaloneConfiguration("127.0.0.1", 6379);
var lettuceConfig = LettucePoolingClientConfiguration.defaultConfiguration();
var connnectionFactory = new LettuceConnectionFactory(redisConfig, lettuceConfig);
connnectionFactory.afterPropertiesSet();
var template = new ReactiveStringRedisTemplate(connnectionFactory);
var sub = template.listenToChannel("foo")
.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(1000);
template.convertAndSend("foo", "bar").block();
sub.dispose();
}
@Test
public void listenToChannelLater() throws Exception {
var redisConfig = new RedisStandaloneConfiguration("127.0.0.1", 6379);
var lettuceConfig = LettucePoolingClientConfiguration.defaultConfiguration();
var connnectionFactory = new LettuceConnectionFactory(redisConfig, lettuceConfig);
connnectionFactory.afterPropertiesSet();
var template = new ReactiveStringRedisTemplate(connnectionFactory);
var sub = template.listenToChannelLater("foo")
.flatMapMany(f -> f)
.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(1000);
template.convertAndSend("foo", "bar").block();
sub.dispose();
}
}
java.util.concurrent.CancellationException: Disconnected
at reactor.core.publisher.FluxPublish$PublishSubscriber.disconnectAction(FluxPublish.java:327)
at reactor.core.publisher.FluxPublish$PublishSubscriber.dispose(FluxPublish.java:318)
at org.springframework.data.redis.connection.lettuce.LettuceReactiveSubscription$State.terminate(LettuceReactiveSubscription.java:263)
at org.springframework.data.redis.connection.lettuce.LettuceReactiveSubscription.lambda$cancel$8(LettuceReactiveSubscription.java:147)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:240)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2205)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonoPeekTerminal.java:299)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
at io.lettuce.core.RedisPublisher$ImmediateSubscriber.onComplete(RedisPublisher.java:896)
at io.lettuce.core.RedisPublisher$State.onAllDataRead(RedisPublisher.java:698)
at io.lettuce.core.RedisPublisher$State$3.read(RedisPublisher.java:608)
at io.lettuce.core.RedisPublisher$State$3.onDataAvailable(RedisPublisher.java:565)
at io.lettuce.core.RedisPublisher$RedisSubscription.onDataAvailable(RedisPublisher.java:326)
at io.lettuce.core.RedisPublisher$RedisSubscription.onAllDataRead(RedisPublisher.java:341)
at io.lettuce.core.RedisPublisher$SubscriptionCommand.doOnComplete(RedisPublisher.java:778)
at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:65)
at io.lettuce.core.pubsub.PubSubCommandHandler.completeCommand(PubSubCommandHandler.java:260)
at io.lettuce.core.pubsub.PubSubCommandHandler.notifyPushListeners(PubSubCommandHandler.java:220)
at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:646)
at io.lettuce.core.pubsub.PubSubCommandHandler.decode(PubSubCommandHandler.java:112)
at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:598)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)