Skip to content

Commit df2133a

Browse files
committed
[#1687] Fix race condition in ReactiveUpdateCoordinator
Hibernate ORM can use the same update coordinator among multiple update operations. In Hibernate Reactive, the reactive update coordinator has a state that cannot be shared. So, we decided to create a new scoped coordinator for each update operation. Right now, we want to merge a fix for the issue that doesn't require us to copy or change code from Hibernate ORM. The plan is to find a more efficient solution as a separate issue. And, possible add some infrastructure to make it easier to track these type of issues.
1 parent 3b7fe84 commit df2133a

9 files changed

+107
-29
lines changed

hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveCoordinatorFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.hibernate.reactive.persister.entity.mutation.ReactiveInsertCoordinator;
1515
import org.hibernate.reactive.persister.entity.mutation.ReactiveUpdateCoordinator;
1616
import org.hibernate.reactive.persister.entity.mutation.ReactiveUpdateCoordinatorNoOp;
17-
import org.hibernate.reactive.persister.entity.mutation.ReactiveUpdateCoordinatorStandard;
1817

1918
public final class ReactiveCoordinatorFactory {
2019

@@ -32,7 +31,7 @@ public static ReactiveUpdateCoordinator buildUpdateCoordinator(
3231
for ( int i = 0; i < attributeMappings.size(); i++ ) {
3332
AttributeMapping attributeMapping = attributeMappings.get( i );
3433
if ( attributeMapping instanceof SingularAttributeMapping ) {
35-
return new ReactiveUpdateCoordinatorStandard( entityPersister, factory );
34+
return new ReactiveUpdateCoordinatorStandardScopeFactory( entityPersister, factory );
3635
}
3736
}
3837

hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveJoinedSubclassEntityPersister.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,9 @@ public CompletionStage<Void> updateReactive(
212212
Object rowId,
213213
SharedSessionContractImplementor session) {
214214
return ( (ReactiveUpdateCoordinator) getUpdateCoordinator() )
215+
// This is required in Hibernate because our reactive update coordinator cannot be share among
216+
// multiple update operations
217+
.makeScopedCoordinator()
215218
.coordinateReactiveUpdate( object, id, rowId, values, oldVersion, oldValues, dirtyAttributeIndexes, hasDirtyCollection, session );
216219
}
217220

hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveSingleTableEntityPersister.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ public CompletionStage<Void> updateReactive(
303303
final Object rowId,
304304
final SharedSessionContractImplementor session) throws HibernateException {
305305
return ( (ReactiveUpdateCoordinator) getUpdateCoordinator() )
306+
.makeScopedCoordinator()
306307
.coordinateReactiveUpdate( object, id, rowId, values, oldVersion, oldValues, dirtyAttributeIndexes, hasDirtyCollection, session );
307308
}
308309

hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/impl/ReactiveUnionSubclassEntityPersister.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ public CompletionStage<Void> updateReactive(
326326
Object rowId,
327327
SharedSessionContractImplementor session) {
328328
return ( (ReactiveUpdateCoordinator) getUpdateCoordinator() )
329+
.makeScopedCoordinator()
329330
.coordinateReactiveUpdate( object, id, rowId, values, oldVersion, oldValues, dirtyAttributeIndexes, hasDirtyCollection, session );
330331
}
331332

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/* Hibernate, Relational Persistence for Idiomatic Java
2+
*
3+
* SPDX-License-Identifier: Apache-2.0
4+
* Copyright: Red Hat Inc. and Hibernate Authors
5+
*/
6+
package org.hibernate.reactive.persister.entity.impl;
7+
8+
import org.hibernate.engine.spi.SessionFactoryImplementor;
9+
import org.hibernate.persister.entity.AbstractEntityPersister;
10+
import org.hibernate.persister.entity.mutation.UpdateCoordinatorStandard;
11+
import org.hibernate.reactive.persister.entity.mutation.ReactiveScopedUpdateCoordinator;
12+
import org.hibernate.reactive.persister.entity.mutation.ReactiveUpdateCoordinator;
13+
import org.hibernate.reactive.persister.entity.mutation.ReactiveUpdateCoordinatorStandard;
14+
15+
public class ReactiveUpdateCoordinatorStandardScopeFactory extends UpdateCoordinatorStandard implements ReactiveUpdateCoordinator {
16+
17+
private final AbstractEntityPersister entityPersister;
18+
private final SessionFactoryImplementor factory;
19+
20+
public ReactiveUpdateCoordinatorStandardScopeFactory(
21+
AbstractEntityPersister entityPersister,
22+
SessionFactoryImplementor factory) {
23+
super( entityPersister, factory );
24+
this.entityPersister = entityPersister;
25+
this.factory = factory;
26+
}
27+
28+
@Override
29+
public ReactiveScopedUpdateCoordinator makeScopedCoordinator() {
30+
return new ReactiveUpdateCoordinatorStandard( entityPersister, factory );
31+
}
32+
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/* Hibernate, Relational Persistence for Idiomatic Java
2+
*
3+
* SPDX-License-Identifier: Apache-2.0
4+
* Copyright: Red Hat Inc. and Hibernate Authors
5+
*/
6+
package org.hibernate.reactive.persister.entity.mutation;
7+
8+
import java.util.concurrent.CompletionStage;
9+
10+
import org.hibernate.engine.spi.SharedSessionContractImplementor;
11+
12+
/**
13+
* Scoped to a single operation, so that we can keep
14+
* instance scoped state.
15+
*
16+
* @see org.hibernate.persister.entity.mutation.UpdateCoordinator
17+
* @see ReactiveUpdateCoordinator
18+
*/
19+
public interface ReactiveScopedUpdateCoordinator {
20+
21+
CompletionStage<Void> coordinateReactiveUpdate(
22+
Object entity,
23+
Object id,
24+
Object rowId,
25+
Object[] values,
26+
Object oldVersion,
27+
Object[] incomingOldValues,
28+
int[] dirtyAttributeIndexes,
29+
boolean hasDirtyCollection,
30+
SharedSessionContractImplementor session);
31+
32+
}

hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/mutation/ReactiveUpdateCoordinator.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,14 @@
55
*/
66
package org.hibernate.reactive.persister.entity.mutation;
77

8-
import java.util.concurrent.CompletionStage;
9-
10-
import org.hibernate.engine.spi.SharedSessionContractImplementor;
118
import org.hibernate.persister.entity.mutation.UpdateCoordinator;
129

10+
/**
11+
* A reactive {@link UpdateCoordinator} that allows the creation of a {@link ReactiveScopedUpdateCoordinator} scoped
12+
* to a single update operation.
13+
*/
1314
public interface ReactiveUpdateCoordinator extends UpdateCoordinator {
1415

15-
CompletionStage<Void> coordinateReactiveUpdate(
16-
Object entity,
17-
Object id,
18-
Object rowId,
19-
Object[] values,
20-
Object oldVersion,
21-
Object[] incomingOldValues,
22-
int[] dirtyAttributeIndexes,
23-
boolean hasDirtyCollection,
24-
SharedSessionContractImplementor session);
16+
ReactiveScopedUpdateCoordinator makeScopedCoordinator();
2517

2618
}

hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/mutation/ReactiveUpdateCoordinatorNoOp.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
1515

16-
public class ReactiveUpdateCoordinatorNoOp extends UpdateCoordinatorNoOp implements ReactiveUpdateCoordinator {
16+
public class ReactiveUpdateCoordinatorNoOp extends UpdateCoordinatorNoOp implements ReactiveScopedUpdateCoordinator, ReactiveUpdateCoordinator {
1717

1818
public ReactiveUpdateCoordinatorNoOp(AbstractEntityPersister entityPersister) {
1919
super( entityPersister );
@@ -45,4 +45,11 @@ public CompletionStage<Void> coordinateReactiveUpdate(
4545
SharedSessionContractImplementor session) {
4646
return voidFuture();
4747
}
48+
49+
@Override
50+
public ReactiveScopedUpdateCoordinator makeScopedCoordinator() {
51+
//This particular implementation is stateless, so we can return ourselves w/o needing to create a scope.
52+
return this;
53+
}
54+
4855
}

hibernate-reactive-core/src/main/java/org/hibernate/reactive/persister/entity/mutation/ReactiveUpdateCoordinatorStandard.java

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.concurrent.CompletionStage;
1010

1111
import java.util.concurrent.atomic.AtomicInteger;
12+
1213
import org.hibernate.engine.jdbc.mutation.ParameterUsage;
1314
import org.hibernate.engine.jdbc.mutation.spi.MutationExecutorService;
1415
import org.hibernate.engine.spi.EntityEntry;
@@ -23,7 +24,6 @@
2324
import org.hibernate.persister.entity.mutation.EntityTableMapping;
2425
import org.hibernate.persister.entity.mutation.UpdateCoordinatorStandard;
2526
import org.hibernate.reactive.engine.jdbc.env.internal.ReactiveMutationExecutor;
26-
import org.hibernate.reactive.util.impl.CompletionStages;
2727
import org.hibernate.sql.model.MutationOperationGroup;
2828
import org.hibernate.tuple.entity.EntityMetamodel;
2929

@@ -35,24 +35,31 @@
3535
import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture;
3636
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
3737

38-
public class ReactiveUpdateCoordinatorStandard extends UpdateCoordinatorStandard implements ReactiveUpdateCoordinator {
39-
40-
private CompletionStage<Void> stage;
38+
/**
39+
* Reactive version of {@link UpdateCoordinatorStandard}, but it cannot be share between multiple update operations.
40+
*/
41+
public class ReactiveUpdateCoordinatorStandard extends UpdateCoordinatorStandard implements ReactiveScopedUpdateCoordinator {
4142

43+
private CompletableFuture<Void> updateResultStage;
4244

4345
public ReactiveUpdateCoordinatorStandard(AbstractEntityPersister entityPersister, SessionFactoryImplementor factory) {
4446
super( entityPersister, factory );
4547
}
4648

47-
private void complete(Object o, Throwable throwable) {
49+
// Utility method to use method reference
50+
private void complete(final Object o, final Throwable throwable) {
4851
if ( throwable != null ) {
49-
stage.toCompletableFuture().completeExceptionally( throwable );
52+
fail( throwable );
5053
}
5154
else {
52-
stage.toCompletableFuture().complete( null );
55+
updateResultStage.complete( null );
5356
}
5457
}
5558

59+
private void fail(Throwable throwable) {
60+
updateResultStage.completeExceptionally( throwable );
61+
}
62+
5663
@Override
5764
public CompletionStage<Void> coordinateReactiveUpdate(
5865
Object entity,
@@ -85,7 +92,8 @@ public CompletionStage<Void> coordinateReactiveUpdate(
8592
// Ensure that an immutable or non-modifiable entity is not being updated unless it is
8693
// in the process of being deleted.
8794
if ( entry == null && !entityPersister().isMutable() ) {
88-
return CompletionStages.failedFuture(new IllegalStateException( "Updating immutable entity that is not in session yet" ));
95+
fail( new IllegalStateException( "Updating immutable entity that is not in session yet" ) );
96+
return updateResultStage;
8997
}
9098

9199
CompletionStage<Void> s = voidFuture();
@@ -144,8 +152,10 @@ && entityPersister().hasLazyDirtyFields( dirtyAttributeIndexes ) ) {
144152
forceDynamicUpdate
145153
);
146154

147-
// stage gets updated by doDynamicUpdate and doStaticUpdate which get called by performUpdate
148-
return stage != null ? stage : voidFuture();
155+
// doDynamicUpdate, doVersionUpdate, or doStaticUpdate will initialize the stage,
156+
// if an update is necessary.
157+
// Otherwise, updateResultStage could be null.
158+
return updateResultStage != null ? updateResultStage : voidFuture();
149159
});
150160
}
151161

@@ -202,7 +212,7 @@ protected void doVersionUpdate(
202212
Object oldVersion,
203213
SharedSessionContractImplementor session) {
204214
assert getVersionUpdateGroup() != null;
205-
this.stage = new CompletableFuture<>();
215+
this.updateResultStage = new CompletableFuture<>();
206216

207217
final EntityTableMapping mutatingTableDetails = (EntityTableMapping) getVersionUpdateGroup()
208218
.getSingleOperation().getTableDetails();
@@ -274,7 +284,7 @@ protected void doDynamicUpdate(
274284
UpdateCoordinatorStandard.InclusionChecker dirtinessChecker,
275285
UpdateCoordinatorStandard.UpdateValuesAnalysisImpl valuesAnalysis,
276286
SharedSessionContractImplementor session) {
277-
this.stage = new CompletableFuture<>();
287+
this.updateResultStage = new CompletableFuture<>();
278288
// Create the JDBC operation descriptors
279289
final MutationOperationGroup dynamicUpdateGroup = generateDynamicUpdateGroup(
280290
id,
@@ -342,7 +352,7 @@ protected void doStaticUpdate(
342352
Object[] oldValues,
343353
UpdateValuesAnalysisImpl valuesAnalysis,
344354
SharedSessionContractImplementor session) {
345-
this.stage = new CompletableFuture<>();
355+
this.updateResultStage = new CompletableFuture<>();
346356
final MutationOperationGroup staticUpdateGroup = getStaticUpdateGroup();
347357
final ReactiveMutationExecutor mutationExecutor = mutationExecutor( session, staticUpdateGroup );
348358

0 commit comments

Comments
 (0)