Skip to content

Commit 9bff925

Browse files
committed
[#1909] Run queries for schema creation using the pool
Before we were creating a connection and then ignoring it for each query required to update the schema or collect metatada. Now the method for running queries outside the "current" transaction is in the SqlClientPool. Note that nowdays it might not be necessary to run these queries in a separate transaction, but it simplify the code quite a bit and it's consistent to what we were doing before.
1 parent a03614b commit 9bff925

File tree

7 files changed

+131
-67
lines changed

7 files changed

+131
-67
lines changed

hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/BatchingConnection.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import java.util.List;
1212
import java.util.concurrent.CompletionStage;
1313

14-
1514
import org.hibernate.reactive.adaptor.impl.ResultSetAdaptor;
1615

1716
import io.vertx.sqlclient.spi.DatabaseMetadata;
@@ -192,11 +191,6 @@ public CompletionStage<ResultSet> selectJdbc(String sql, Object[] paramValues) {
192191
: delegate.selectJdbc( sql, paramValues );
193192
}
194193

195-
@Override
196-
public CompletionStage<ResultSet> selectJdbcOutsideTransaction(String sql, Object[] paramValues) {
197-
return delegate.selectJdbcOutsideTransaction( sql, paramValues );
198-
}
199-
200194
public <T> CompletionStage<T> selectIdentifier(String sql, Object[] paramValues, Class<T> idClass) {
201195
// Do not want to execute the batch here
202196
// because we want to be able to select

hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/ReactiveConnection.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -60,23 +60,6 @@ interface Expectation {
6060

6161
CompletionStage<ResultSet> selectJdbc(String sql, Object[] paramValues);
6262

63-
/**
64-
* This method is intended to be used only for queries returning
65-
* a ResultSet that must be executed outside of any "current"
66-
* transaction (i.e with autocommit=true).
67-
* <p/>
68-
* For example, it would be appropriate to use this method when
69-
* performing queries on information_schema or system tables in
70-
* order to obtain metadata information about catalogs, schemas,
71-
* tables, etc.
72-
*
73-
* @param sql - the query to execute outside of a transaction
74-
* @param paramValues - a non-null array of parameter values
75-
*
76-
* @return the CompletionStage<ResultSet> from executing the query.
77-
*/
78-
CompletionStage<ResultSet> selectJdbcOutsideTransaction(String sql, Object[] paramValues);
79-
8063
<T> CompletionStage<T> insertAndSelectIdentifier(String sql, Object[] paramValues, Class<T> idClass, String idColumnName);
8164
CompletionStage<ResultSet> insertAndSelectIdentifierAsResultSet(String sql, Object[] paramValues, Class<?> idClass, String idColumnName);
8265

hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/ReactiveConnectionPool.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@
1010
import org.hibernate.reactive.provider.ReactiveServiceRegistryBuilder;
1111
import org.hibernate.service.Service;
1212

13+
import java.sql.ResultSet;
1314
import java.util.concurrent.CompletionStage;
1415

16+
1517
/**
1618
* A Hibernate {@link Service} that provides access to pooled
1719
* {@link ReactiveConnection reactive connections}.
@@ -63,6 +65,8 @@ public interface ReactiveConnectionPool extends Service {
6365
*/
6466
CompletionStage<ReactiveConnection> getConnection(String tenantId, SqlExceptionHelper sqlExceptionHelper);
6567

68+
CompletionStage<ResultSet> selectJdbcOutsideTransaction(String sql, Object[] paramValues);
69+
6670
/**
6771
* The shutdown of the pool is actually asynchronous but the
6872
* core service registry won't return the {@link CompletionStage}.

hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/ExternalSqlClientPool.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,26 @@
55
*/
66
package org.hibernate.reactive.pool.impl;
77

8+
import java.sql.ResultSet;
9+
import java.sql.SQLException;
10+
import java.util.Objects;
811
import java.util.concurrent.CompletionStage;
912

13+
import org.hibernate.engine.jdbc.internal.FormatStyle;
1014
import org.hibernate.engine.jdbc.spi.SqlExceptionHelper;
1115
import org.hibernate.engine.jdbc.spi.SqlStatementLogger;
16+
import org.hibernate.reactive.adaptor.impl.ResultSetAdaptor;
1217
import org.hibernate.reactive.mutiny.Mutiny;
1318
import org.hibernate.reactive.stage.Stage;
1419
import org.hibernate.reactive.util.impl.CompletionStages;
1520

21+
import io.vertx.sqlclient.DatabaseException;
1622
import io.vertx.sqlclient.Pool;
23+
import io.vertx.sqlclient.Row;
24+
import io.vertx.sqlclient.RowSet;
25+
import io.vertx.sqlclient.Tuple;
26+
27+
import static org.hibernate.reactive.util.impl.CompletionStages.rethrow;
1728

1829
/**
1930
* A pool of reactive connections backed by a Vert.x {@link Pool}.
@@ -82,4 +93,41 @@ public SqlExceptionHelper getSqlExceptionHelper() {
8293
public CompletionStage<Void> getCloseFuture() {
8394
return CompletionStages.voidFuture();
8495
}
96+
97+
98+
@Override
99+
public CompletionStage<ResultSet> selectJdbcOutsideTransaction(String sql, Object[] paramValues) {
100+
return preparedQueryOutsideTransaction( sql, Tuple.wrap( paramValues ) )
101+
.thenApply( ResultSetAdaptor::new );
102+
}
103+
104+
public CompletionStage<RowSet<Row>> preparedQueryOutsideTransaction(String sql, Tuple parameters) {
105+
feedback( sql );
106+
return getPool().preparedQuery( sql ).execute( parameters ).toCompletionStage()
107+
.handle( (rows, throwable) -> convertException( rows, sql, throwable ) );
108+
}
109+
110+
/**
111+
* Similar to {@link org.hibernate.exception.internal.SQLExceptionTypeDelegate#convert(SQLException, String, String)}
112+
*/
113+
private <T> T convertException(T rows, String sql, Throwable sqlException) {
114+
if ( sqlException == null ) {
115+
return rows;
116+
}
117+
if ( sqlException instanceof DatabaseException ) {
118+
DatabaseException de = (DatabaseException) sqlException;
119+
sqlException = sqlExceptionHelper
120+
.convert( new SQLException( de.getMessage(), de.getSqlState(), de.getErrorCode() ), "error executing SQL statement", sql );
121+
}
122+
return rethrow( sqlException );
123+
}
124+
125+
private void feedback(String sql) {
126+
Objects.requireNonNull( sql, "SQL query cannot be null" );
127+
// DDL already gets formatted by the client, so don't reformat it
128+
FormatStyle formatStyle = sqlStatementLogger.isFormat() && !sql.contains( System.lineSeparator() )
129+
? FormatStyle.BASIC
130+
: FormatStyle.NONE;
131+
sqlStatementLogger.logStatement( sql, formatStyle.getFormatter() );
132+
}
85133
}

hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientConnection.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -128,12 +128,6 @@ public CompletionStage<ResultSet> selectJdbc(String sql, Object[] paramValues) {
128128
.thenApply( ResultSetAdaptor::new );
129129
}
130130

131-
@Override
132-
public CompletionStage<ResultSet> selectJdbcOutsideTransaction(String sql, Object[] paramValues) {
133-
return preparedQueryOutsideTransaction( sql, Tuple.wrap( paramValues ) )
134-
.thenApply( ResultSetAdaptor::new );
135-
}
136-
137131
@Override
138132
public CompletionStage<Void> execute(String sql) {
139133
return preparedQuery( sql )
@@ -278,12 +272,6 @@ public CompletionStage<RowSet<Row>> preparedQueryOutsideTransaction(String sql)
278272
.handle( (rows, throwable) -> convertException( rows, sql, throwable ) );
279273
}
280274

281-
public CompletionStage<RowSet<Row>> preparedQueryOutsideTransaction(String sql, Tuple parameters) {
282-
feedback( sql );
283-
return pool.preparedQuery( sql ).execute( parameters ).toCompletionStage()
284-
.handle( (rows, throwable) -> convertException( rows, sql, throwable ) );
285-
}
286-
287275
private void feedback(String sql) {
288276
Objects.requireNonNull( sql, "SQL query cannot be null" );
289277
// DDL already gets formatted by the client, so don't reformat it

hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientPool.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,29 @@
55
*/
66
package org.hibernate.reactive.pool.impl;
77

8+
import java.sql.ResultSet;
9+
import java.sql.SQLException;
10+
import java.util.Objects;
811
import java.util.concurrent.CompletableFuture;
912
import java.util.concurrent.CompletionStage;
1013
import java.util.function.Consumer;
1114

15+
import org.hibernate.engine.jdbc.internal.FormatStyle;
1216
import org.hibernate.engine.jdbc.spi.SqlExceptionHelper;
1317
import org.hibernate.engine.jdbc.spi.SqlStatementLogger;
18+
import org.hibernate.reactive.adaptor.impl.ResultSetAdaptor;
1419
import org.hibernate.reactive.pool.ReactiveConnection;
1520
import org.hibernate.reactive.pool.ReactiveConnectionPool;
1621

1722
import io.vertx.core.Future;
23+
import io.vertx.sqlclient.DatabaseException;
1824
import io.vertx.sqlclient.Pool;
25+
import io.vertx.sqlclient.Row;
26+
import io.vertx.sqlclient.RowSet;
1927
import io.vertx.sqlclient.SqlConnection;
28+
import io.vertx.sqlclient.Tuple;
29+
30+
import static org.hibernate.reactive.util.impl.CompletionStages.rethrow;
2031

2132
/**
2233
* A pool of reactive connections backed by a supplier of
@@ -99,6 +110,56 @@ private CompletionStage<ReactiveConnection> getConnectionFromPool(Pool pool, Sql
99110
);
100111
}
101112

113+
/**
114+
* This method is intended to be used only for queries returning
115+
* a ResultSet that must be executed outside any "current"
116+
* transaction (i.e. with autocommit=true).
117+
* <p/>
118+
* For example, it would be appropriate to use this method when
119+
* performing queries on information_schema or system tables in
120+
* order to obtain metadata information about catalogs, schemas,
121+
* tables, etc.
122+
*
123+
* @param sql - the query to execute outside a transaction
124+
* @param paramValues - a non-null array of parameter values
125+
*
126+
* @return the CompletionStage<ResultSet> from executing the query.
127+
*/
128+
public CompletionStage<ResultSet> selectJdbcOutsideTransaction(String sql, Object[] paramValues) {
129+
return preparedQueryOutsideTransaction( sql, Tuple.wrap( paramValues ) )
130+
.thenApply( ResultSetAdaptor::new );
131+
}
132+
133+
private CompletionStage<RowSet<Row>> preparedQueryOutsideTransaction(String sql, Tuple parameters) {
134+
feedback( sql );
135+
return getPool().preparedQuery( sql ).execute( parameters ).toCompletionStage()
136+
.handle( (rows, throwable) -> convertException( rows, sql, throwable ) );
137+
}
138+
139+
/**
140+
* Similar to {@link org.hibernate.exception.internal.SQLExceptionTypeDelegate#convert(SQLException, String, String)}
141+
*/
142+
private <T> T convertException(T rows, String sql, Throwable sqlException) {
143+
if ( sqlException == null ) {
144+
return rows;
145+
}
146+
if ( sqlException instanceof DatabaseException ) {
147+
DatabaseException de = (DatabaseException) sqlException;
148+
sqlException = getSqlExceptionHelper()
149+
.convert( new SQLException( de.getMessage(), de.getSqlState(), de.getErrorCode() ), "error executing SQL statement", sql );
150+
}
151+
return rethrow( sqlException );
152+
}
153+
154+
private void feedback(String sql) {
155+
Objects.requireNonNull( sql, "SQL query cannot be null" );
156+
// DDL already gets formatted by the client, so don't reformat it
157+
FormatStyle formatStyle = getSqlStatementLogger().isFormat() && !sql.contains( System.lineSeparator() )
158+
? FormatStyle.BASIC
159+
: FormatStyle.NONE;
160+
getSqlStatementLogger().logStatement( sql, formatStyle.getFormatter() );
161+
}
162+
102163
/**
103164
* @param onCancellation invoke when converted {@link java.util.concurrent.CompletionStage} cancellation.
104165
*/

hibernate-reactive-core/src/main/java/org/hibernate/reactive/provider/service/ReactiveImprovedExtractionContextImpl.java

Lines changed: 18 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,10 @@
3434
import java.util.Calendar;
3535
import java.util.Map;
3636
import java.util.Properties;
37-
import java.util.concurrent.CompletionStage;
3837
import java.util.concurrent.Executor;
3938

4039
import org.hibernate.boot.model.relational.SqlStringGenerationContext;
4140
import org.hibernate.engine.jdbc.env.spi.JdbcEnvironment;
42-
import org.hibernate.reactive.pool.ReactiveConnection;
4341
import org.hibernate.reactive.pool.ReactiveConnectionPool;
4442
import org.hibernate.reactive.pool.impl.Parameters;
4543
import org.hibernate.resource.transaction.spi.DdlTransactionIsolator;
@@ -48,11 +46,10 @@
4846
import org.hibernate.tool.schema.internal.exec.JdbcContext;
4947

5048
import static org.hibernate.reactive.util.impl.CompletionStages.logSqlException;
51-
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
5249

5350
public class ReactiveImprovedExtractionContextImpl extends ImprovedExtractionContextImpl {
5451

55-
private final ReactiveConnectionPool service;
52+
private final ReactiveConnectionPool connectionPool;
5653

5754
public ReactiveImprovedExtractionContextImpl(
5855
ServiceRegistry registry,
@@ -65,54 +62,43 @@ public ReactiveImprovedExtractionContextImpl(
6562
NoopDdlTransactionIsolator.INSTANCE,
6663
databaseObjectAccess
6764
);
68-
service = registry.getService( ReactiveConnectionPool.class );
65+
connectionPool = registry.getService( ReactiveConnectionPool.class );
6966
}
7067

7168
@Override
7269
public <T> T getQueryResults(
7370
String queryString,
7471
Object[] positionalParameters,
7572
ResultSetProcessor<T> resultSetProcessor) throws SQLException {
76-
77-
final CompletionStage<ReactiveConnection> connectionStage = service.getConnection();
78-
79-
try (final ResultSet resultSet = getQueryResultSet( queryString, positionalParameters, connectionStage )) {
73+
try (final ResultSet resultSet = getQueryResultSet( queryString, positionalParameters )) {
8074
return resultSetProcessor.process( resultSet );
8175
}
82-
finally {
83-
// This method doesn't return a reactive type, so we start closing the connection and ignore the result
84-
connectionStage
85-
.handle( ReactiveImprovedExtractionContextImpl::ignoreException )
86-
.thenCompose( ReactiveImprovedExtractionContextImpl::closeConnection );
87-
88-
}
89-
}
90-
91-
private static ReactiveConnection ignoreException(ReactiveConnection reactiveConnection, Throwable throwable) {
92-
return reactiveConnection;
93-
}
94-
95-
private static CompletionStage<Void> closeConnection(ReactiveConnection connection) {
96-
// Avoid NullPointerException if we couldn't create a connection
97-
return connection != null ? connection.close() : voidFuture();
9876
}
9977

10078
private ResultSet getQueryResultSet(
10179
String queryString,
102-
Object[] positionalParameters,
103-
CompletionStage<ReactiveConnection> connectionStage) {
80+
Object[] positionalParameters) {
10481
final Object[] parametersToUse = positionalParameters != null ? positionalParameters : new Object[0];
105-
final Parameters parametersDialectSpecific = Parameters.instance(
106-
getJdbcEnvironment().getDialect()
107-
);
82+
final Parameters parametersDialectSpecific = Parameters.instance( getJdbcEnvironment().getDialect() );
10883
final String queryToUse = parametersDialectSpecific.process( queryString, parametersToUse.length );
109-
return connectionStage.thenCompose( c -> c.selectJdbcOutsideTransaction( queryToUse, parametersToUse ) )
84+
return connectionPool
85+
// It might not be necessary anymore to run the queries outside the current transaction,
86+
// but we don't have to deal with creating and closing connections.
87+
// Because the schema migration API, we can change this part when we will make the schema migration
88+
// reactive
89+
.selectJdbcOutsideTransaction( queryToUse, parametersToUse )
11090
.whenComplete( (resultSet, err) -> logSqlException( err, () -> "could not execute query ", queryToUse ) )
111-
.thenApply(ResultSetWorkaround::new)
91+
.thenApply( ResultSetWorkaround::new )
92+
// During schema migration, errors are ignored
93+
.handle( ReactiveImprovedExtractionContextImpl::ignoreException )
11294
.toCompletableFuture()
11395
.join();
11496
}
11597

98+
private static <T> T ignoreException(T result, Throwable throwable) {
99+
return result;
100+
}
101+
116102
private static class NoopDdlTransactionIsolator implements DdlTransactionIsolator {
117103
static final NoopDdlTransactionIsolator INSTANCE = new NoopDdlTransactionIsolator();
118104

0 commit comments

Comments
 (0)