Skip to content

Commit 836c5b4

Browse files
committed
#48 - Atomically close connections.
We now make sure to close connections only once by tracking the cleanup state. Flux.usingWhen/Mono.usingWhen do not ensure atomic cleanup in situations where the subscription completes and then the subscription is terminated. This behavior has lead to closing a connection multiple times. Related ticket: reactor/reactor-core#1486
1 parent 2dfb1ff commit 836c5b4

File tree

2 files changed

+120
-8
lines changed

2 files changed

+120
-8
lines changed

src/main/java/org/springframework/data/r2dbc/function/DefaultDatabaseClient.java

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.List;
3939
import java.util.Map;
4040
import java.util.Set;
41+
import java.util.concurrent.atomic.AtomicBoolean;
4142
import java.util.function.BiFunction;
4243
import java.util.function.Function;
4344
import java.util.function.Supplier;
@@ -122,15 +123,16 @@ public <T> Mono<T> inConnection(Function<Connection, Mono<T>> action) throws Dat
122123

123124
Assert.notNull(action, "Callback object must not be null");
124125

125-
Mono<Connection> connectionMono = getConnection();
126-
// Create close-suppressing Connection proxy, also preparing returned Statements.
126+
Mono<ConnectionCloseHolder> connectionMono = getConnection()
127+
.map(it -> new ConnectionCloseHolder(it, this::closeConnection));
127128

128129
return Mono.usingWhen(connectionMono, it -> {
129130

130-
Connection connectionToUse = createConnectionProxy(it);
131+
// Create close-suppressing Connection proxy
132+
Connection connectionToUse = createConnectionProxy(it.connection);
131133

132134
return doInConnection(connectionToUse, action);
133-
}, this::closeConnection, this::closeConnection, this::closeConnection) //
135+
}, ConnectionCloseHolder::close, ConnectionCloseHolder::close, ConnectionCloseHolder::close) //
134136
.onErrorMap(R2dbcException.class, ex -> translateException("execute", getSql(action), ex));
135137
}
136138

@@ -149,15 +151,16 @@ public <T> Flux<T> inConnectionMany(Function<Connection, Flux<T>> action) throws
149151

150152
Assert.notNull(action, "Callback object must not be null");
151153

152-
Mono<Connection> connectionMono = getConnection();
153-
// Create close-suppressing Connection proxy, also preparing returned Statements.
154+
Mono<ConnectionCloseHolder> connectionMono = getConnection()
155+
.map(it -> new ConnectionCloseHolder(it, this::closeConnection));
154156

155157
return Flux.usingWhen(connectionMono, it -> {
156158

157-
Connection connectionToUse = createConnectionProxy(it);
159+
// Create close-suppressing Connection proxy, also preparing returned Statements.
160+
Connection connectionToUse = createConnectionProxy(it.connection);
158161

159162
return doInConnectionMany(connectionToUse, action);
160-
}, this::closeConnection, this::closeConnection, this::closeConnection) //
163+
}, ConnectionCloseHolder::close, ConnectionCloseHolder::close, ConnectionCloseHolder::close) //
161164
.onErrorMap(R2dbcException.class, ex -> translateException("executeMany", getSql(action), ex));
162165
}
163166

@@ -1104,4 +1107,26 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
11041107
}
11051108
}
11061109
}
1110+
1111+
/**
1112+
* Holder for a connection that makes sure the close action is invoked atomically only once.
1113+
*/
1114+
@RequiredArgsConstructor
1115+
static class ConnectionCloseHolder extends AtomicBoolean {
1116+
1117+
final Connection connection;
1118+
final Function<Connection, Publisher<Void>> closeFunction;
1119+
1120+
Mono<Void> close() {
1121+
1122+
return Mono.defer(() -> {
1123+
1124+
if (compareAndSet(false, true)) {
1125+
return Mono.from(closeFunction.apply(connection));
1126+
}
1127+
1128+
return Mono.empty();
1129+
});
1130+
}
1131+
}
11071132
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.r2dbc.function;
17+
18+
import static org.mockito.Mockito.*;
19+
20+
import io.r2dbc.spi.Connection;
21+
import io.r2dbc.spi.ConnectionFactory;
22+
import reactor.core.CoreSubscriber;
23+
import reactor.core.publisher.Flux;
24+
import reactor.core.publisher.Mono;
25+
26+
import org.junit.Before;
27+
import org.junit.Test;
28+
import org.junit.runner.RunWith;
29+
import org.mockito.Mock;
30+
import org.mockito.junit.MockitoJUnitRunner;
31+
import org.reactivestreams.Publisher;
32+
import org.reactivestreams.Subscription;
33+
import org.springframework.data.r2dbc.support.R2dbcExceptionTranslator;
34+
35+
/**
36+
* Unit tests for {@link DefaultDatabaseClient}.
37+
*
38+
* @author Mark Paluch
39+
*/
40+
@RunWith(MockitoJUnitRunner.class)
41+
public class DefaultDatabaseClientUnitTests {
42+
43+
@Mock ConnectionFactory connectionFactory;
44+
@Mock Connection connection;
45+
@Mock ReactiveDataAccessStrategy strategy;
46+
@Mock R2dbcExceptionTranslator translator;
47+
48+
@Before
49+
public void before() {
50+
when(connectionFactory.create()).thenReturn((Publisher) Mono.just(connection));
51+
when(connection.close()).thenReturn(Mono.empty());
52+
}
53+
54+
@Test // gh-48
55+
public void shouldCloseConnectionOnlyOnce() {
56+
57+
DefaultDatabaseClient databaseClient = (DefaultDatabaseClient) DatabaseClient.builder()
58+
.connectionFactory(connectionFactory).dataAccessStrategy(strategy).exceptionTranslator(translator).build();
59+
60+
Flux<Object> flux = databaseClient.inConnectionMany(it -> {
61+
return Flux.empty();
62+
});
63+
64+
flux.subscribe(new CoreSubscriber<Object>() {
65+
Subscription subscription;
66+
67+
@Override
68+
public void onSubscribe(Subscription s) {
69+
s.request(1);
70+
subscription = s;
71+
}
72+
73+
@Override
74+
public void onNext(Object o) {}
75+
76+
@Override
77+
public void onError(Throwable t) {}
78+
79+
@Override
80+
public void onComplete() {
81+
subscription.cancel();
82+
}
83+
});
84+
85+
verify(connection, times(1)).close();
86+
}
87+
}

0 commit comments

Comments
 (0)