Skip to content

Commit 78367f2

Browse files
authored
Fix PostgresSubscribableChannel.notifyUpdate()
When transaction is configured for the `PostgresSubscribableChannel.notifyUpdate()` and it is rolled back, the next poll in that loop will return the same message. Again and again if transaction is always rolled back. This leads to the condition when we never leave this loop even if we fully unsubscribed from this channel. The issue has need spotted after introducing `SKIP LOCKED` for `PostgresChannelMessageStoreQueryProvider` which leads to the locked record in DB in the mentioned above transaction. * Introduce `PostgresSubscribableChannel.hasHandlers` flag to check in the `notifyUpdate()` before performing poll query in DB. **Cherry-pick to `6.1.x` & `6.0.x`**
1 parent df67c59 commit 78367f2

File tree

2 files changed

+25
-14
lines changed

2 files changed

+25
-14
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/channel/PostgresSubscribableChannel.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ public class PostgresSubscribableChannel extends AbstractSubscribableChannel
6767

6868
private Executor executor;
6969

70+
private volatile boolean hasHandlers;
71+
7072
/**
7173
* Create a subscribable channel for a Postgres database.
7274
* @param jdbcChannelMessageStore The message store to use for the relevant region.
@@ -128,6 +130,7 @@ public boolean subscribe(MessageHandler handler) {
128130
boolean subscribed = super.subscribe(handler);
129131
if (this.dispatcher.getHandlerCount() == 1) {
130132
this.messageTableSubscriber.subscribe(this);
133+
this.hasHandlers = true;
131134
notifyUpdate();
132135
}
133136
return subscribed;
@@ -138,6 +141,7 @@ public boolean unsubscribe(MessageHandler handle) {
138141
boolean unsubscribed = super.unsubscribe(handle);
139142
if (this.dispatcher.getHandlerCount() == 0) {
140143
this.messageTableSubscriber.unsubscribe(this);
144+
this.hasHandlers = false;
141145
}
142146
return unsubscribed;
143147
}
@@ -159,18 +163,7 @@ public void notifyUpdate() {
159163
try {
160164
Optional<Message<?>> dispatchedMessage;
161165
do {
162-
if (this.transactionTemplate != null) {
163-
dispatchedMessage =
164-
this.retryTemplate.execute(context ->
165-
this.transactionTemplate.execute(status ->
166-
pollMessage()
167-
.map(this::dispatch)));
168-
}
169-
else {
170-
dispatchedMessage =
171-
pollMessage()
172-
.map(message -> this.retryTemplate.execute(context -> dispatch(message)));
173-
}
166+
dispatchedMessage = askForMessage();
174167
} while (dispatchedMessage.isPresent());
175168
}
176169
catch (Exception ex) {
@@ -179,6 +172,20 @@ public void notifyUpdate() {
179172
});
180173
}
181174

175+
private Optional<Message<?>> askForMessage() {
176+
if (this.hasHandlers) {
177+
if (this.transactionTemplate != null) {
178+
return this.retryTemplate.execute(context ->
179+
this.transactionTemplate.execute(status -> pollMessage().map(this::dispatch)));
180+
}
181+
else {
182+
return pollMessage()
183+
.map(message -> this.retryTemplate.execute(context -> dispatch(message)));
184+
}
185+
}
186+
return Optional.empty();
187+
}
188+
182189
private Optional<Message<?>> pollMessage() {
183190
return Optional.ofNullable(this.jdbcChannelMessageStore.pollMessageFromGroup(this.groupId));
184191
}

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/channel/PostgresChannelMessageTableSubscriberTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.springframework.jdbc.datasource.init.DataSourceInitializer;
4646
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
4747
import org.springframework.jdbc.datasource.init.ScriptUtils;
48+
import org.springframework.messaging.MessageHandler;
4849
import org.springframework.messaging.support.GenericMessage;
4950
import org.springframework.retry.support.RetryTemplate;
5051
import org.springframework.test.annotation.DirtiesContext;
@@ -172,14 +173,16 @@ void testMessagesDispatchedInTransaction() throws InterruptedException {
172173
postgresSubscribableChannel.setTransactionManager(transactionManager);
173174

174175
postgresChannelMessageTableSubscriber.start();
175-
postgresSubscribableChannel.subscribe(message -> {
176+
MessageHandler messageHandler =
177+
message -> {
176178
try {
177179
throw new RuntimeException("An error has occurred");
178180
}
179181
finally {
180182
latch.countDown();
181183
}
182-
});
184+
};
185+
postgresSubscribableChannel.subscribe(messageHandler);
183186

184187
messageStore.addMessageToGroup(groupId, new GenericMessage<>("1"));
185188
messageStore.addMessageToGroup(groupId, new GenericMessage<>("2"));
@@ -188,6 +191,7 @@ void testMessagesDispatchedInTransaction() throws InterruptedException {
188191

189192
// Stop subscriber to unlock records from TX for the next verification
190193
postgresChannelMessageTableSubscriber.stop();
194+
postgresSubscribableChannel.unsubscribe(messageHandler);
191195

192196
assertThat(messageStore.messageGroupSize(groupId)).isEqualTo(2);
193197
assertThat(messageStore.pollMessageFromGroup(groupId).getPayload()).isEqualTo("1");

0 commit comments

Comments
 (0)