Skip to content

Commit 0cfb2ed

Browse files
committed
[hibernate#712] Make ReactiveConnection#close reactive
They made it reactive with the upgrade to Vert.x SQL client 4. We also have to update the tests becaus now they have to wait for the session to be close before moving to the next chained operation.
1 parent d613aa5 commit 0cfb2ed

File tree

83 files changed

+2258
-2518
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

83 files changed

+2258
-2518
lines changed

hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/Mutiny.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.hibernate.engine.spi.SharedSessionContractImplementor;
1919
import org.hibernate.proxy.HibernateProxy;
2020
import org.hibernate.reactive.common.AffectedEntities;
21-
import org.hibernate.reactive.common.AutoCloseable;
2221
import org.hibernate.reactive.common.Identifier;
2322
import org.hibernate.reactive.common.ResultSetMapping;
2423
import org.hibernate.reactive.session.ReactiveSession;
@@ -304,7 +303,7 @@ interface Query<R> {
304303
*
305304
* @see org.hibernate.Session
306305
*/
307-
interface Session extends AutoCloseable {
306+
interface Session {
308307

309308
/**
310309
* Asynchronously return the persistent instance of the given entity
@@ -1073,7 +1072,7 @@ <R> Query<R> createNativeQuery(String queryString, ResultSetMapping<R> resultSet
10731072
* Close the reactive session and release the underlying database
10741073
* connection.
10751074
*/
1076-
void close();
1075+
Uni<Void> close();
10771076

10781077
/**
10791078
* @return false if {@link #close()} has been called
@@ -1111,7 +1110,7 @@ <R> Query<R> createNativeQuery(String queryString, ResultSetMapping<R> resultSet
11111110
*
11121111
* @see org.hibernate.StatelessSession
11131112
*/
1114-
interface StatelessSession extends AutoCloseable {
1113+
interface StatelessSession {
11151114

11161115
/**
11171116
* Retrieve a row.
@@ -1382,15 +1381,13 @@ interface StatelessSession extends AutoCloseable {
13821381
/**
13831382
* @return false if {@link #close()} has been called
13841383
*/
1385-
@Override
13861384
boolean isOpen();
13871385

13881386
/**
13891387
* Close the reactive session and release the underlying database
13901388
* connection.
13911389
*/
1392-
@Override
1393-
void close();
1390+
Uni<Void> close();
13941391
}
13951392

13961393
/**
@@ -1568,6 +1565,7 @@ interface SessionFactory extends AutoCloseable {
15681565
/**
15691566
* Destroy the session factory and clean up its connection pool.
15701567
*/
1568+
@Override
15711569
void close();
15721570

15731571
/**

hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySessionImpl.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import javax.persistence.criteria.CriteriaUpdate;
2626
import javax.persistence.metamodel.Attribute;
2727
import java.util.List;
28+
import java.util.concurrent.CompletableFuture;
2829
import java.util.concurrent.CompletionStage;
2930
import java.util.function.Function;
3031
import java.util.function.Supplier;
@@ -468,8 +469,12 @@ public boolean isMarkedForRollback() {
468469
}
469470

470471
@Override
471-
public void close() {
472-
delegate.close();
472+
public Uni<Void> close() {
473+
return uni( () -> {
474+
CompletableFuture<Void> closing = new CompletableFuture<>();
475+
delegate.close( closing );
476+
return closing;
477+
} );
473478
}
474479

475480
@Override

hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinyStatelessSessionImpl.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import javax.persistence.criteria.CriteriaDelete;
1818
import javax.persistence.criteria.CriteriaQuery;
1919
import javax.persistence.criteria.CriteriaUpdate;
20+
import java.util.concurrent.CompletableFuture;
2021
import java.util.concurrent.CompletionStage;
2122
import java.util.function.Function;
2223
import java.util.function.Supplier;
@@ -215,8 +216,12 @@ public boolean isMarkedForRollback() {
215216
}
216217

217218
@Override
218-
public void close() {
219-
delegate.close();
219+
public Uni<Void> close() {
220+
return uni( () -> {
221+
CompletableFuture<Void> closing = new CompletableFuture<>();
222+
delegate.close( closing );
223+
return closing;
224+
} );
220225
}
221226

222227
@Override

hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/BatchingConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ public CompletionStage<Void> rollbackTransaction() {
176176
return delegate.rollbackTransaction();
177177
}
178178

179-
public void close() {
180-
delegate.close();
179+
public CompletionStage<Void> close() {
180+
return delegate.close();
181181
}
182182
}

hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/ReactiveConnection.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,5 @@ interface Result extends Iterator<Object[]> {
6161

6262
CompletionStage<Void> executeBatch();
6363

64-
void close();
65-
64+
CompletionStage<Void> close();
6665
}

hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/ProxyConnection.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import org.hibernate.reactive.pool.ReactiveConnection;
1515
import org.hibernate.reactive.pool.ReactiveConnectionPool;
16+
import org.hibernate.reactive.util.impl.CompletionStages;
1617

1718
import static org.hibernate.reactive.common.InternalStateAssertions.assertUseOnEventLoop;
1819

@@ -144,12 +145,16 @@ public CompletionStage<Void> executeBatch() {
144145
}
145146

146147
@Override
147-
public void close() {
148+
public CompletionStage<Void> close() {
149+
CompletionStage<Void> stage = CompletionStages.voidFuture();
148150
if ( connection != null ) {
149-
connection.close();
150-
connection = null;
151+
stage = stage.thenCompose( v -> connection.close() );
152+
151153
}
152-
closed = true;
154+
return stage.thenAccept( v -> {
155+
connection = null;
156+
closed = true;
157+
} );
153158
}
154159

155160
}

hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,8 @@ public CompletionStage<Void> rollbackTransaction() {
217217
}
218218

219219
@Override
220-
public void close() {
221-
connection.close();
220+
public CompletionStage<Void> close() {
221+
return connection.close().toCompletionStage();
222222
}
223223

224224
/**

hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/ReactiveSession.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.io.Serializable;
2525
import java.util.List;
2626
import java.util.Map;
27+
import java.util.concurrent.CompletableFuture;
2728
import java.util.concurrent.CompletionStage;
2829

2930
/**
@@ -147,5 +148,7 @@ <T> CompletionStage<T> reactiveFind(Class<T> entityClass, Object id,
147148

148149
boolean isDirty();
149150
boolean isOpen();
150-
void close();
151+
152+
// Different approach so that we can overload the method in SessionImpl
153+
void close(CompletableFuture<Void> closing);
151154
}

hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/ReactiveStatelessSession.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.hibernate.LockMode;
1010

1111
import javax.persistence.EntityGraph;
12+
import java.util.concurrent.CompletableFuture;
1213
import java.util.concurrent.CompletionStage;
1314

1415
/**
@@ -60,6 +61,6 @@ public interface ReactiveStatelessSession extends ReactiveQueryExecutor {
6061

6162
boolean isOpen();
6263

63-
void close();
64+
void close(CompletableFuture<Void> closing);
6465

6566
}

hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionImpl.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@
103103
import java.util.List;
104104
import java.util.Map;
105105
import java.util.Set;
106+
import java.util.concurrent.CompletableFuture;
106107
import java.util.concurrent.CompletionException;
107108
import java.util.concurrent.CompletionStage;
108109
import java.util.function.BiFunction;
@@ -1516,10 +1517,24 @@ public ReactiveConnection getReactiveConnection() {
15161517

15171518
@Override
15181519
public void close() throws HibernateException {
1520+
throw new UnsupportedOperationException( "Non reactive close method called. Use close(CompletableFuture<Void> closing) instead." );
1521+
}
1522+
1523+
@Override
1524+
public void close(CompletableFuture<Void> closing) {
1525+
CompletionStage<Void> stage = voidFuture();
15191526
if ( reactiveConnection != null ) {
1520-
reactiveConnection.close();
1527+
stage = stage.thenCompose( v -> reactiveConnection.close() );
15211528
}
1522-
super.close();
1529+
stage.thenAccept( v -> super.close() )
1530+
.whenComplete( (v, t) -> {
1531+
if ( t != null ) {
1532+
closing.completeExceptionally( t );
1533+
}
1534+
else {
1535+
closing.complete( null );
1536+
}
1537+
} );
15231538
}
15241539

15251540
@Override @SuppressWarnings("unchecked")

hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveStatelessSessionImpl.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import javax.persistence.Tuple;
5656
import java.io.Serializable;
5757
import java.util.List;
58+
import java.util.concurrent.CompletableFuture;
5859
import java.util.concurrent.CompletionStage;
5960

6061
import static org.hibernate.reactive.id.impl.IdentifierGeneration.assignIdIfNecessary;
@@ -704,7 +705,20 @@ public <T> ReactiveQuery<T> createReactiveCriteriaQuery(String jpaqlString,
704705

705706
@Override
706707
public void close() {
707-
reactiveConnection.close();
708-
super.close();
708+
throw new UnsupportedOperationException( "Non reactive close method called. Use close(CompletableFuture<Void> closing) instead." );
709+
}
710+
711+
@Override
712+
public void close(CompletableFuture<Void> closing) {
713+
reactiveConnection.close()
714+
.thenAccept( v -> super.close() )
715+
.whenComplete( (unused, throwable) -> {
716+
if ( throwable != null ) {
717+
closing.completeExceptionally( throwable );
718+
}
719+
else {
720+
closing.complete( null );
721+
}
722+
} );
709723
}
710724
}

hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/Stage.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import org.hibernate.engine.spi.SharedSessionContractImplementor;
1818
import org.hibernate.proxy.HibernateProxy;
1919
import org.hibernate.reactive.common.AffectedEntities;
20-
import org.hibernate.reactive.common.AutoCloseable;
2120
import org.hibernate.reactive.common.Identifier;
2221
import org.hibernate.reactive.common.ResultSetMapping;
2322
import org.hibernate.reactive.session.ReactiveSession;
@@ -304,7 +303,7 @@ interface Query<R> {
304303
*
305304
* @see org.hibernate.Session
306305
*/
307-
interface Session extends AutoCloseable {
306+
interface Session {
308307

309308
/**
310309
* Asynchronously return the persistent instance of the given entity
@@ -1075,7 +1074,7 @@ <R> Query<R> createNativeQuery(String queryString,
10751074
* Close the reactive session and release the underlying database
10761075
* connection.
10771076
*/
1078-
void close();
1077+
CompletionStage<Void> close();
10791078

10801079
/**
10811080
* @return false if {@link #close()} has been called
@@ -1113,7 +1112,7 @@ <R> Query<R> createNativeQuery(String queryString,
11131112
*
11141113
* @see org.hibernate.StatelessSession
11151114
*/
1116-
interface StatelessSession extends AutoCloseable {
1115+
interface StatelessSession {
11171116

11181117
/**
11191118
* Retrieve a row.
@@ -1384,15 +1383,13 @@ interface StatelessSession extends AutoCloseable {
13841383
/**
13851384
* @return false if {@link #close()} has been called
13861385
*/
1387-
@Override
13881386
boolean isOpen();
13891387

13901388
/**
13911389
* Close the reactive session and release the underlying database
13921390
* connection.
13931391
*/
1394-
@Override
1395-
void close();
1392+
CompletionStage<Void> close();
13961393
}
13971394

13981395
/**
@@ -1570,6 +1567,7 @@ interface SessionFactory extends AutoCloseable {
15701567
/**
15711568
* Destroy the session factory and clean up its connection pool.
15721569
*/
1570+
@Override
15731571
void close();
15741572

15751573
/**

hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionImpl.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import javax.persistence.criteria.CriteriaUpdate;
2626
import javax.persistence.metamodel.Attribute;
2727
import java.util.List;
28+
import java.util.concurrent.CompletableFuture;
2829
import java.util.concurrent.CompletionStage;
2930
import java.util.function.Function;
3031

@@ -477,8 +478,12 @@ public boolean isMarkedForRollback() {
477478
}
478479

479480
@Override
480-
public void close() {
481-
delegate.close();
481+
public CompletionStage<Void> close() {
482+
return stage( v -> {
483+
CompletableFuture<Void> closing = new CompletableFuture<>();
484+
delegate.close( closing );
485+
return closing;
486+
} );
482487
}
483488

484489
@Override

hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageStatelessSessionImpl.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import javax.persistence.criteria.CriteriaDelete;
1818
import javax.persistence.criteria.CriteriaQuery;
1919
import javax.persistence.criteria.CriteriaUpdate;
20+
import java.util.concurrent.CompletableFuture;
2021
import java.util.concurrent.CompletionStage;
2122
import java.util.function.Function;
2223

@@ -177,9 +178,13 @@ public <T> CompletionStage<T> withTransaction(Function<Stage.Transaction, Comple
177178
}
178179

179180
@Override
180-
public void close() {
181-
delegate.close();
182-
}
181+
public CompletionStage<Void> close() {
182+
return stage( v -> {
183+
CompletableFuture<Void> closing = new CompletableFuture<>();
184+
delegate.close( closing );
185+
return closing;
186+
} );
187+
}
183188

184189
@Override
185190
public boolean isOpen() {

0 commit comments

Comments
 (0)