18
18
import io .r2dbc .spi .Connection ;
19
19
import io .r2dbc .spi .ConnectionFactory ;
20
20
import reactor .core .publisher .Mono ;
21
- import reactor .util .function .Tuple2 ;
22
- import reactor .util .function .Tuples ;
23
21
24
22
import org .apache .commons .logging .Log ;
25
23
import org .apache .commons .logging .LogFactory ;
@@ -61,9 +59,10 @@ private ConnectionFactoryUtils() {}
61
59
* Is aware of a corresponding Connection bound to the current {@link reactor.util.context.Context}. Will bind a
62
60
* Connection to the {@link reactor.util.context.Context} if transaction synchronization is active.
63
61
*
64
- * @param connectionFactory the {@link io.r2dbc.spi.ConnectionFactory} to obtain Connections from
62
+ * @param connectionFactory the {@link io.r2dbc.spi.ConnectionFactory} to obtain {@link io.r2dbc.spi.Connection
63
+ * Connections} from.
65
64
* @return a R2DBC Connection from the given {@link io.r2dbc.spi.ConnectionFactory}.
66
- * @throws DataAccessResourceFailureException if the attempt to get a {@link io.r2dbc.spi.Connection} failed
65
+ * @throws DataAccessResourceFailureException if the attempt to get a {@link io.r2dbc.spi.Connection} failed.
67
66
* @see #releaseConnection
68
67
*/
69
68
public static Mono <Connection > getConnection (ConnectionFactory connectionFactory ) {
@@ -72,14 +71,14 @@ public static Mono<Connection> getConnection(ConnectionFactory connectionFactory
72
71
}
73
72
74
73
/**
75
- * Actually obtain a R2DBC Connection from the given {@link ConnectionFactory}. Same as {@link #getConnection}, but
76
- * preserving the original exceptions.
74
+ * Actually obtain a R2DBC Connection from the given {@link io.r2dbc.spi. ConnectionFactory}. Same as
75
+ * {@link #getConnection}, but preserving the original exceptions.
77
76
* <p>
78
77
* Is aware of a corresponding Connection bound to the current {@link reactor.util.context.Context}. Will bind a
79
78
* Connection to the {@link reactor.util.context.Context} if transaction synchronization is active.
80
79
*
81
- * @param connectionFactory the {@link ConnectionFactory} to obtain Connections from.
82
- * @return a R2DBC {@link io.r2dbc.spi.Connection} from the given {@link ConnectionFactory}.
80
+ * @param connectionFactory the {@link io.r2dbc.spi. ConnectionFactory} to obtain Connections from.
81
+ * @return a R2DBC {@link io.r2dbc.spi.Connection} from the given {@link io.r2dbc.spi. ConnectionFactory}.
83
82
*/
84
83
public static Mono <Connection > doGetConnection (ConnectionFactory connectionFactory ) {
85
84
@@ -143,45 +142,44 @@ public static Mono<Connection> doGetConnection(ConnectionFactory connectionFacto
143
142
}
144
143
145
144
/**
146
- * Actually fetch a {@link Connection} from the given {@link ConnectionFactory}, defensively turning an unexpected
147
- * {@code null} return value from {@link ConnectionFactory#create()} into an {@link IllegalStateException}.
145
+ * Actually fetch a {@link io.r2dbc.spi.Connection} from the given {@link io.r2dbc.spi.ConnectionFactory}, defensively
146
+ * turning an unexpected {@literal null} return value from {@link io.r2dbc.spi.ConnectionFactory#create()} into an
147
+ * {@link IllegalStateException}.
148
148
*
149
- * @param connectionFactory the {@link ConnectionFactory} to obtain {@link Connection}s from
150
- * @return a R2DBC {@link Connection} from the given {@link ConnectionFactory} (never {@code null})
151
- * @throws IllegalStateException if the {@link ConnectionFactory} returned a {@literal null} value.
149
+ * @param connectionFactory the {@link io.r2dbc.spi.ConnectionFactory} to obtain {@link io.r2dbc.spi.Connection}s from
150
+ * @return a R2DBC {@link io.r2dbc.spi.Connection} from the given {@link io.r2dbc.spi.ConnectionFactory} (never
151
+ * {@literal null}).
152
+ * @throws IllegalStateException if the {@link io.r2dbc.spi.ConnectionFactory} returned a {@literal null} value.
152
153
* @see ConnectionFactory#create()
153
154
*/
154
155
private static Mono <Connection > fetchConnection (ConnectionFactory connectionFactory ) {
155
156
return Mono .from (connectionFactory .create ());
156
157
}
157
158
158
159
/**
159
- * Close the given {@link io.r2dbc.spi.Connection}, obtained from the given {@link ConnectionFactory}, if it is not
160
- * managed externally (that is, not bound to the thread).
160
+ * Close the given {@link io.r2dbc.spi.Connection}, obtained from the given {@link io.r2dbc.spi. ConnectionFactory}, if
161
+ * it is not managed externally (that is, not bound to the thread).
161
162
*
162
163
* @param con the {@link io.r2dbc.spi.Connection} to close if necessary.
163
- * @param connectionFactory the {@link ConnectionFactory} that the Connection was obtained from (may be
164
- * {@literal null}).
164
+ * @param connectionFactory the {@link io.r2dbc.spi.ConnectionFactory} that the Connection was obtained from.
165
165
* @see #getConnection
166
166
*/
167
- public static Mono <Void > releaseConnection (@ Nullable io .r2dbc .spi .Connection con ,
168
- @ Nullable ConnectionFactory connectionFactory ) {
167
+ public static Mono <Void > releaseConnection (io .r2dbc .spi .Connection con , ConnectionFactory connectionFactory ) {
169
168
170
169
return doReleaseConnection (con , connectionFactory )
171
170
.onErrorMap (e -> new DataAccessResourceFailureException ("Failed to close R2DBC Connection" , e ));
172
171
}
173
172
174
173
/**
175
- * Actually close the given {@link io.r2dbc.spi.Connection}, obtained from the given {@link ConnectionFactory}. Same
176
- * as {@link #releaseConnection}, but preserving the original exception.
174
+ * Actually close the given {@link io.r2dbc.spi.Connection}, obtained from the given
175
+ * {@link io.r2dbc.spi.ConnectionFactory}. Same as {@link #releaseConnection}, but preserving the original exception.
177
176
*
178
177
* @param connection the {@link io.r2dbc.spi.Connection} to close if necessary.
179
- * @param connectionFactory the {@link ConnectionFactory} that the Connection was obtained from (may be
180
- * {@literal null}).
178
+ * @param connectionFactory the {@link io.r2dbc.spi.ConnectionFactory} that the Connection was obtained from.
181
179
* @see #doGetConnection
182
180
*/
183
- public static Mono <Void > doReleaseConnection (@ Nullable io .r2dbc .spi .Connection connection ,
184
- @ Nullable ConnectionFactory connectionFactory ) {
181
+ public static Mono <Void > doReleaseConnection (io .r2dbc .spi .Connection connection ,
182
+ ConnectionFactory connectionFactory ) {
185
183
186
184
return TransactionSynchronizationManager .forCurrentTransaction ().flatMap (it -> {
187
185
@@ -200,12 +198,17 @@ public static Mono<Void> doReleaseConnection(@Nullable io.r2dbc.spi.Connection c
200
198
* Close the {@link io.r2dbc.spi.Connection}. Translates exceptions into the Spring hierarchy of unchecked generic
201
199
* data access exceptions, simplifying calling code and making any exception that is thrown more meaningful.
202
200
*
203
- * @param connectionFactory the {@link io.r2dbc.spi.ConnectionFactory} to obtain Connections from
201
+ * @param connection the {@link io.r2dbc.spi.Connection} to close.
202
+ * @param connectionFactory the {@link io.r2dbc.spi.ConnectionFactory} that the {@link io.r2dbc.spi.Connection} was
203
+ * obtained from.
204
204
* @return a R2DBC Connection from the given {@link io.r2dbc.spi.ConnectionFactory}.
205
205
* @throws DataAccessResourceFailureException if the attempt to get a {@link io.r2dbc.spi.Connection} failed
206
206
*/
207
207
public static Mono <Void > closeConnection (Connection connection , ConnectionFactory connectionFactory ) {
208
208
209
+ Assert .notNull (connection , "Connection must not be null!" );
210
+ Assert .notNull (connectionFactory , "ConnectionFactory must not be null!" );
211
+
209
212
return doCloseConnection (connection , connectionFactory )
210
213
.onErrorMap (e -> new DataAccessResourceFailureException ("Failed to obtain R2DBC Connection" , e ));
211
214
}
@@ -214,7 +217,7 @@ public static Mono<Void> closeConnection(Connection connection, ConnectionFactor
214
217
* Close the {@link io.r2dbc.spi.Connection}, unless a {@link SmartConnectionFactory} doesn't want us to.
215
218
*
216
219
* @param connection the {@link io.r2dbc.spi.Connection} to close if necessary.
217
- * @param connectionFactory the {@link ConnectionFactory} that the Connection was obtained from.
220
+ * @param connectionFactory the {@link io.r2dbc.spi. ConnectionFactory} that the Connection was obtained from.
218
221
* @see Connection#close()
219
222
* @see SmartConnectionFactory#shouldClose(Connection)
220
223
*/
@@ -236,6 +239,7 @@ public static Mono<Void> doCloseConnection(Connection connection, @Nullable Conn
236
239
/**
237
240
* Obtain the {@link io.r2dbc.spi.ConnectionFactory} from the current subscriber {@link reactor.util.context.Context}.
238
241
*
242
+ * @param connectionFactory the {@link io.r2dbc.spi.ConnectionFactory} that the Connection was obtained from.
239
243
* @see TransactionSynchronizationManager
240
244
*/
241
245
public static Mono <ConnectionFactory > currentConnectionFactory (ConnectionFactory connectionFactory ) {
@@ -252,12 +256,13 @@ public static Mono<ConnectionFactory> currentConnectionFactory(ConnectionFactory
252
256
}
253
257
254
258
/**
255
- * Determine whether the given two {@link Connection}s are equal, asking the target {@link Connection} in case of a
256
- * proxy. Used to detect equality even if the user passed in a raw target Connection while the held one is a proxy.
259
+ * Determine whether the given two {@link io.r2dbc.spi.Connection}s are equal, asking the target
260
+ * {@link io.r2dbc.spi.Connection} in case of a proxy. Used to detect equality even if the user passed in a raw target
261
+ * Connection while the held one is a proxy.
257
262
*
258
- * @param conHolder the {@link ConnectionHolder} for the held Connection (potentially a proxy)
259
- * @param passedInCon the {@link Connection} passed-in by the user (potentially a target {@link Connection} without
260
- * proxy)
263
+ * @param conHolder the {@link . ConnectionHolder} for the held {@link io.r2dbc.spi. Connection} (potentially a proxy).
264
+ * @param passedInCon the {@link io.r2dbc.spi. Connection} passed-in by the user (potentially a target
265
+ * {@link io.r2dbc.spi.Connection} without proxy).
261
266
* @return whether the given Connections are equal
262
267
* @see #getTargetConnection
263
268
*/
@@ -273,11 +278,11 @@ private static boolean connectionEquals(ConnectionHolder conHolder, Connection p
273
278
}
274
279
275
280
/**
276
- * Return the innermost target {@link Connection} of the given {@link Connection}. If the given {@link Connection} is
277
- * a proxy, it will be unwrapped until a non-proxy {@link Connection} is found. Otherwise, the passed-in Connection
278
- * will be returned as-is.
281
+ * Return the innermost target {@link io.r2dbc.spi. Connection} of the given {@link io.r2dbc.spi. Connection}. If the
282
+ * given {@link io.r2dbc.spi.Connection} is a proxy, it will be unwrapped until a non-proxy
283
+ * {@link io.r2dbc.spi.Connection} is found. Otherwise, the passed-in Connection will be returned as-is.
279
284
*
280
- * @param con the {@link Connection} proxy to unwrap
285
+ * @param con the {@link io.r2dbc.spi. Connection} proxy to unwrap
281
286
* @return the innermost target Connection, or the passed-in one if no proxy
282
287
* @see ConnectionProxy#getTargetConnection()
283
288
*/
@@ -291,11 +296,11 @@ public static Connection getTargetConnection(Connection con) {
291
296
}
292
297
293
298
/**
294
- * Determine the connection synchronization order to use for the given {@link ConnectionFactory}. Decreased for every
295
- * level of nesting that a {@link ConnectionFactory} has, checked through the level of
296
- * {@link DelegatingConnectionFactory} nesting.
299
+ * Determine the connection synchronization order to use for the given {@link io.r2dbc.spi. ConnectionFactory}.
300
+ * Decreased for every level of nesting that a {@link io.r2dbc.spi. ConnectionFactory} has, checked through the level
301
+ * of {@link DelegatingConnectionFactory} nesting.
297
302
*
298
- * @param connectionFactory the {@link ConnectionFactory} to check.
303
+ * @param connectionFactory the {@link io.r2dbc.spi. ConnectionFactory} to check.
299
304
* @return the connection synchronization order to use.
300
305
* @see #CONNECTION_SYNCHRONIZATION_ORDER
301
306
*/
@@ -310,22 +315,6 @@ private static int getConnectionSynchronizationOrder(ConnectionFactory connectio
310
315
return order ;
311
316
}
312
317
313
- /**
314
- * Create a {@link Connection} via the given {@link ConnectionFactory#create() factory} and return a {@link Tuple2}
315
- * associating the {@link Connection} with its creating {@link ConnectionFactory}.
316
- *
317
- * @param factory must not be {@literal null}.
318
- * @return never {@literal null}
319
- */
320
- private static Mono <Tuple2 <Connection , ConnectionFactory >> createConnection (ConnectionFactory factory ) {
321
-
322
- if (logger .isDebugEnabled ()) {
323
- logger .debug ("Fetching resumed R2DBC Connection from ConnectionFactory" );
324
- }
325
-
326
- return Mono .from (factory .create ()).map (connection -> Tuples .of (connection , factory ));
327
- }
328
-
329
318
/**
330
319
* Callback for resource cleanup at the end of a non-native R2DBC transaction.
331
320
*/
0 commit comments