Skip to content

Commit b5ac43b

Browse files
committed
#204 - Add SingleConnectionConnectionFactory.
1 parent 94c8f75 commit b5ac43b

File tree

3 files changed

+433
-3
lines changed

3 files changed

+433
-3
lines changed

src/main/java/org/springframework/data/r2dbc/connectionfactory/DelegatingConnectionFactory.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
import io.r2dbc.spi.ConnectionFactory;
2020
import io.r2dbc.spi.ConnectionFactoryMetadata;
2121
import io.r2dbc.spi.Wrapped;
22-
import org.reactivestreams.Publisher;
22+
import reactor.core.publisher.Mono;
23+
2324
import org.springframework.lang.Nullable;
2425
import org.springframework.util.Assert;
2526

@@ -47,8 +48,8 @@ public DelegatingConnectionFactory(ConnectionFactory targetConnectionFactory) {
4748
* @see io.r2dbc.spi.ConnectionFactory#create()
4849
*/
4950
@Override
50-
public Publisher<? extends Connection> create() {
51-
return targetConnectionFactory.create();
51+
public Mono<? extends Connection> create() {
52+
return Mono.from(targetConnectionFactory.create());
5253
}
5354

5455
/**
Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.data.r2dbc.connectionfactory;
18+
19+
import io.r2dbc.spi.Connection;
20+
import io.r2dbc.spi.ConnectionFactories;
21+
import io.r2dbc.spi.ConnectionFactory;
22+
import io.r2dbc.spi.ConnectionFactoryMetadata;
23+
import reactor.core.publisher.Mono;
24+
25+
import java.lang.reflect.InvocationHandler;
26+
import java.lang.reflect.InvocationTargetException;
27+
import java.lang.reflect.Method;
28+
import java.lang.reflect.Proxy;
29+
import java.util.concurrent.atomic.AtomicReference;
30+
31+
import org.reactivestreams.Publisher;
32+
33+
import org.springframework.beans.factory.DisposableBean;
34+
import org.springframework.lang.Nullable;
35+
import org.springframework.util.Assert;
36+
37+
/**
38+
* Implementation of {@link SmartConnectionFactory} that wraps a single R2DBC Connection which is not closed after use.
39+
* Obviously, this is not multi-threading capable.
40+
* <p>
41+
* Note that at shutdown, someone should close the underlying Connection via the {@code close()} method. Client code
42+
* will never call close on the Connection handle if it is SmartDataSource-aware (e.g. uses
43+
* {@link ConnectionFactoryUtils#releaseConnection(io.r2dbc.spi.Connection, ConnectionFactory)}).
44+
* <p>
45+
* If client code will call {@link #close()} in the assumption of a pooled Connection, like when using persistence
46+
* tools, set "suppressClose" to "true". This will return a close-suppressing proxy instead of the physical Connection.
47+
* <p>
48+
* This is primarily intended for testing. For example, it enables easy testing outside an application server, for code
49+
* that expects to work on a {@link ConnectionFactory}.
50+
*
51+
* @author Mark Paluch
52+
* @see #create()
53+
* @see io.r2dbc.spi.Connection#close()
54+
* @see ConnectionFactoryUtils#releaseConnection(io.r2dbc.spi.Connection, ConnectionFactory)
55+
*/
56+
public class SingleConnectionConnectionFactory extends DelegatingConnectionFactory
57+
implements SmartConnectionFactory, DisposableBean {
58+
59+
/** Create a close-suppressing proxy?. */
60+
private boolean suppressClose;
61+
62+
/** Override auto-commit state?. */
63+
private @Nullable Boolean autoCommit;
64+
65+
/** Wrapped Connection. */
66+
private final AtomicReference<Connection> target = new AtomicReference<>();
67+
68+
/** Proxy Connection. */
69+
private @Nullable Connection connection;
70+
71+
private final Mono<? extends Connection> connectionEmitter;
72+
73+
/**
74+
* Constructor for bean-style configuration.
75+
*/
76+
public SingleConnectionConnectionFactory(ConnectionFactory targetConnectionFactory) {
77+
super(targetConnectionFactory);
78+
this.connectionEmitter = super.create().cache();
79+
}
80+
81+
/**
82+
* Create a new {@link SingleConnectionConnectionFactory} using a R2DBC connection URL.
83+
*
84+
* @param url the R2DBC URL to use for accessing {@link ConnectionFactory} discovery.
85+
* @param suppressClose if the returned {@link Connection} should be a close-suppressing proxy or the physical
86+
* {@link Connection}.
87+
* @see ConnectionFactories#get(String)
88+
*/
89+
public SingleConnectionConnectionFactory(String url, boolean suppressClose) {
90+
super(ConnectionFactories.get(url));
91+
this.suppressClose = suppressClose;
92+
this.connectionEmitter = super.create().cache();
93+
}
94+
95+
/**
96+
* Create a new {@link SingleConnectionConnectionFactory} with a given {@link Connection} and
97+
* {@link ConnectionFactoryMetadata}.
98+
*
99+
* @param target underlying target {@link Connection}.
100+
* @param metadata {@link ConnectionFactory} metadata to be associated with this {@link ConnectionFactory}.
101+
* @param suppressClose if the {@link Connection} should be wrapped with a {@link Connection} that suppresses
102+
* {@code close()} calls (to allow for normal {@link #close()} usage in applications that expect a pooled
103+
* {@link Connection} but do not know our {@link SmartConnectionFactory} interface).
104+
*/
105+
public SingleConnectionConnectionFactory(Connection target, ConnectionFactoryMetadata metadata,
106+
boolean suppressClose) {
107+
super(new ConnectionFactory() {
108+
@Override
109+
public Publisher<? extends Connection> create() {
110+
return Mono.just(target);
111+
}
112+
113+
@Override
114+
public ConnectionFactoryMetadata getMetadata() {
115+
return metadata;
116+
}
117+
});
118+
Assert.notNull(target, "Connection must not be null");
119+
Assert.notNull(metadata, "ConnectionFactoryMetadata must not be null");
120+
this.target.set(target);
121+
this.connectionEmitter = Mono.just(target);
122+
this.suppressClose = suppressClose;
123+
this.connection = (suppressClose ? getCloseSuppressingConnectionProxy(target) : target);
124+
}
125+
126+
/**
127+
* Set whether the returned {@link Connection} should be a close-suppressing proxy or the physical {@link Connection}.
128+
*/
129+
public void setSuppressClose(boolean suppressClose) {
130+
this.suppressClose = suppressClose;
131+
}
132+
133+
/**
134+
* Return whether the returned {@link Connection} will be a close-suppressing proxy or the physical
135+
* {@link Connection}.
136+
*/
137+
protected boolean isSuppressClose() {
138+
return this.suppressClose;
139+
}
140+
141+
/**
142+
* Set whether the returned {@link Connection}'s "autoCommit" setting should be overridden.
143+
*/
144+
public void setAutoCommit(boolean autoCommit) {
145+
this.autoCommit = autoCommit;
146+
}
147+
148+
/**
149+
* Return whether the returned {@link Connection}'s "autoCommit" setting should be overridden.
150+
*
151+
* @return the "autoCommit" value, or {@code null} if none to be applied
152+
*/
153+
@Nullable
154+
protected Boolean getAutoCommitValue() {
155+
return this.autoCommit;
156+
}
157+
158+
@Override
159+
public Mono<? extends Connection> create() {
160+
161+
Connection connection = this.target.get();
162+
163+
return connectionEmitter.map(it -> {
164+
165+
if (connection == null) {
166+
this.target.compareAndSet(connection, it);
167+
this.connection = (isSuppressClose() ? getCloseSuppressingConnectionProxy(it) : it);
168+
}
169+
170+
return this.connection;
171+
}).flatMap(this::prepareConnection);
172+
}
173+
174+
/**
175+
* This is a single Connection: Do not close it when returning to the "pool".
176+
*/
177+
@Override
178+
public boolean shouldClose(Connection con) {
179+
return (con != this.connection && con != this.target.get());
180+
}
181+
182+
/**
183+
* Close the underlying {@link Connection}. The provider of this {@link ConnectionFactory} needs to care for proper
184+
* shutdown.
185+
* <p>
186+
* As this bean implements {@link DisposableBean}, a bean factory will automatically invoke this on destruction of its
187+
* cached singletons.
188+
*/
189+
@Override
190+
public void destroy() {
191+
resetConnection().block();
192+
}
193+
194+
/**
195+
* Reset the underlying shared Connection, to be reinitialized on next access.
196+
*/
197+
public Mono<Void> resetConnection() {
198+
199+
Connection connection = this.target.get();
200+
201+
if (connection == null) {
202+
return Mono.empty();
203+
}
204+
205+
return Mono.defer(() -> {
206+
207+
if (this.target.compareAndSet(connection, null)) {
208+
209+
this.connection = null;
210+
211+
return Mono.from(connection.close());
212+
}
213+
214+
return Mono.empty();
215+
});
216+
}
217+
218+
/**
219+
* Prepare the {@link Connection} before using it. Applies {@link #getAutoCommitValue() auto-commit} settings if
220+
* configured.
221+
*
222+
* @param connection the requested {@link Connection}.
223+
* @return the prepared {@link Connection}.
224+
*/
225+
protected Mono<Connection> prepareConnection(Connection connection) {
226+
227+
Boolean autoCommit = getAutoCommitValue();
228+
if (autoCommit != null) {
229+
return Mono.from(connection.setAutoCommit(autoCommit)).thenReturn(connection);
230+
}
231+
232+
return Mono.just(connection);
233+
}
234+
235+
/**
236+
* Wrap the given {@link Connection} with a proxy that delegates every method call to it but suppresses close calls.
237+
*
238+
* @param target the original {@link Connection} to wrap.
239+
* @return the wrapped Connection.
240+
*/
241+
protected Connection getCloseSuppressingConnectionProxy(Connection target) {
242+
return (Connection) Proxy.newProxyInstance(ConnectionProxy.class.getClassLoader(),
243+
new Class<?>[] { ConnectionProxy.class }, new CloseSuppressingInvocationHandler(target));
244+
}
245+
246+
/**
247+
* Invocation handler that suppresses close calls on R2DBC Connections.
248+
*
249+
* @see io.r2dbc.spi.Connection#close()
250+
*/
251+
private static class CloseSuppressingInvocationHandler implements InvocationHandler {
252+
253+
private final io.r2dbc.spi.Connection target;
254+
255+
CloseSuppressingInvocationHandler(io.r2dbc.spi.Connection target) {
256+
this.target = target;
257+
}
258+
259+
@Override
260+
@Nullable
261+
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
262+
// Invocation on ConnectionProxy interface coming in...
263+
264+
if (method.getName().equals("equals")) {
265+
// Only consider equal when proxies are identical.
266+
return proxy == args[0];
267+
} else if (method.getName().equals("hashCode")) {
268+
// Use hashCode of PersistenceManager proxy.
269+
return System.identityHashCode(proxy);
270+
} else if (method.getName().equals("unwrap")) {
271+
return target;
272+
} else if (method.getName().equals("close")) {
273+
// Handle close method: suppress, not valid.
274+
return Mono.empty();
275+
} else if (method.getName().equals("getTargetConnection")) {
276+
// Handle getTargetConnection method: return underlying Connection.
277+
return this.target;
278+
}
279+
280+
// Invoke method on target Connection.
281+
try {
282+
Object retVal = method.invoke(this.target, args);
283+
284+
return retVal;
285+
} catch (InvocationTargetException ex) {
286+
throw ex.getTargetException();
287+
}
288+
}
289+
}
290+
291+
}

0 commit comments

Comments
 (0)