Skip to content

Commit 97ef1ed

Browse files
committed
#244 - Add Reactor Checkpoint operator for SQL execution.
We now integrate with Reactor's checkpoint operator to include more information for debugging.
1 parent 86ce98d commit 97ef1ed

File tree

1 file changed

+14
-4
lines changed

1 file changed

+14
-4
lines changed

src/main/java/org/springframework/data/r2dbc/core/DefaultDatabaseClient.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ <T> FetchSpec<T> exchange(Supplier<String> sqlSupplier, BiFunction<Row, RowMetad
386386
return statement;
387387
};
388388

389-
Function<Connection, Flux<Result>> resultFunction = it -> Flux.from(executeFunction.apply(it).execute());
389+
Function<Connection, Flux<Result>> resultFunction = toExecuteFunction(sql, executeFunction);
390390

391391
return new DefaultSqlResult<>(DefaultDatabaseClient.this, //
392392
sql, //
@@ -707,7 +707,7 @@ <R> FetchSpec<R> execute(PreparedOperation<?> preparedOperation, BiFunction<Row,
707707

708708
String sql = getRequiredSql(preparedOperation);
709709
Function<Connection, Statement> selectFunction = wrapPreparedOperation(sql, preparedOperation);
710-
Function<Connection, Flux<Result>> resultFunction = it -> Flux.from(selectFunction.apply(it).execute());
710+
Function<Connection, Flux<Result>> resultFunction = DefaultDatabaseClient.toExecuteFunction(sql, selectFunction);
711711

712712
return new DefaultSqlResult<>(DefaultDatabaseClient.this, //
713713
sql, //
@@ -1377,7 +1377,7 @@ private <R> FetchSpec<R> exchangeInsert(BiFunction<Row, RowMetadata, R> mappingF
13771377
String sql = getRequiredSql(operation);
13781378
Function<Connection, Statement> insertFunction = wrapPreparedOperation(sql, operation)
13791379
.andThen(statement -> statement.returnGeneratedValues());
1380-
Function<Connection, Flux<Result>> resultFunction = it -> Flux.from(insertFunction.apply(it).execute());
1380+
Function<Connection, Flux<Result>> resultFunction = toExecuteFunction(sql, insertFunction);
13811381

13821382
return new DefaultSqlResult<>(this, //
13831383
sql, //
@@ -1390,7 +1390,7 @@ private UpdatedRowsFetchSpec exchangeUpdate(PreparedOperation<?> operation) {
13901390

13911391
String sql = getRequiredSql(operation);
13921392
Function<Connection, Statement> executeFunction = wrapPreparedOperation(sql, operation);
1393-
Function<Connection, Flux<Result>> resultFunction = it -> Flux.from(executeFunction.apply(it).execute());
1393+
Function<Connection, Flux<Result>> resultFunction = toExecuteFunction(sql, executeFunction);
13941394

13951395
return new DefaultSqlResult<>(this, //
13961396
sql, //
@@ -1421,6 +1421,16 @@ private Function<Connection, Statement> wrapPreparedOperation(String sql, Prepar
14211421
};
14221422
}
14231423

1424+
private static Function<Connection, Flux<Result>> toExecuteFunction(String sql,
1425+
Function<Connection, Statement> executeFunction) {
1426+
1427+
return it -> {
1428+
1429+
Flux<Result> from = Flux.defer(() -> executeFunction.apply(it).execute()).cast(Result.class);
1430+
return from.checkpoint("SQL \"" + sql + "\" [DatabaseClient]");
1431+
};
1432+
}
1433+
14241434
private static <T> Flux<T> doInConnectionMany(Connection connection, Function<Connection, Flux<T>> action) {
14251435

14261436
try {

0 commit comments

Comments
 (0)