Skip to content

Commit 0c1c026

Browse files
committed
New stress test for insertion of entities whose ID is generated by a sequence optimiser
1 parent e35ed94 commit 0c1c026

File tree

1 file changed

+236
-0
lines changed

1 file changed

+236
-0
lines changed
Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
/* Hibernate, Relational Persistence for Idiomatic Java
2+
*
3+
* SPDX-License-Identifier: Apache-2.0
4+
* Copyright: Red Hat Inc. and Hibernate Authors
5+
*/
6+
package org.hibernate.reactive;
7+
8+
import io.vertx.core.*;
9+
import io.vertx.ext.unit.Async;
10+
import io.vertx.ext.unit.TestContext;
11+
import io.vertx.ext.unit.junit.VertxUnitRunner;
12+
import jakarta.persistence.Entity;
13+
import jakarta.persistence.GeneratedValue;
14+
import jakarta.persistence.Id;
15+
import org.hibernate.SessionFactory;
16+
import org.hibernate.boot.registry.StandardServiceRegistry;
17+
import org.hibernate.boot.registry.StandardServiceRegistryBuilder;
18+
import org.hibernate.cfg.Configuration;
19+
import org.hibernate.reactive.provider.ReactiveServiceRegistryBuilder;
20+
import org.hibernate.reactive.provider.Settings;
21+
import org.hibernate.reactive.stage.Stage;
22+
import org.hibernate.reactive.util.impl.CompletionStages;
23+
import org.hibernate.reactive.vertx.VertxInstance;
24+
import org.junit.AfterClass;
25+
import org.junit.BeforeClass;
26+
import org.junit.Test;
27+
import org.junit.runner.RunWith;
28+
29+
import java.util.concurrent.*;
30+
31+
/**
32+
* This is a multi-threaded stress test, intentionally consuming some time.
33+
* The purpose is to verify that the sequence optimizer used by Hibernate Reactive
34+
* is indeed able to generate unique IDs backed by the database sequences, while
35+
* running multiple operations in different threads and on multiple Vert.x eventloops.
36+
* This is very similar to MultithreadedIdentityGenerationTest except it models
37+
* the full operations including the insert statements, while the latter focuses
38+
* on the generated IDs to be unique; it's useful to maintain both tests as:
39+
* - ID generation needs to be unique so it's good to stress that aspect
40+
* in isolation
41+
* - insert operations are downstream events, so this allows us to test that
42+
* such downstream events are not being unintentionally duplicated/dropped,
43+
* which could actually happen when the id generator triggers unintended
44+
* threading behaviours.
45+
*
46+
* N.B. We actually had a case in which the IDs were uniquely generated but the
47+
* downstream event was being processed twice (or more) concurrently, so it's
48+
* useful to have both integration tests.
49+
*
50+
* A typical reactive application will not require multiple threads, but we
51+
* specifically want to test for the case in which the single ID source is being
52+
* shared across multiple threads and also multiple eventloops.
53+
*/
54+
@RunWith(VertxUnitRunner.class)
55+
public class MultithreadedInsertionTest {
56+
57+
/**
58+
* The number of threads should be higher than the default size of the connection pool so that
59+
* this test is also effective in detecting problems with resource starvation.
60+
*/
61+
private static final int N_THREADS = 12;
62+
private static final int ENTITIES_STORED_PER_THREAD = 2000;
63+
64+
//Should finish much sooner, but generating this amount of IDs could be slow on some CIs
65+
private static final int TIMEOUT_MINUTES = 10;
66+
67+
private static final boolean LOG_SQL = false;
68+
private static final Latch startLatch = new Latch( "start", N_THREADS );
69+
private static final Latch endLatch = new Latch( "end", N_THREADS );
70+
71+
private static Stage.SessionFactory stageSessionFactory;
72+
private static Vertx vertx;
73+
private static SessionFactory sessionFactory;
74+
75+
@BeforeClass
76+
public static void setupSessionFactory() {
77+
final VertxOptions vertxOptions = new VertxOptions();
78+
vertxOptions.setEventLoopPoolSize( N_THREADS );
79+
//We relax the blocked thread checks as we'll actually use latches to block them
80+
//intentionally for the purpose of the test; functionally this isn't required
81+
//but it's useful as self-test in the design of this, to ensure that the way
82+
//things are setup are indeed being run in multiple, separate threads.
83+
vertxOptions.setBlockedThreadCheckInterval( TIMEOUT_MINUTES );
84+
vertxOptions.setBlockedThreadCheckIntervalUnit( TimeUnit.MINUTES );
85+
vertx = Vertx.vertx( vertxOptions );
86+
Configuration configuration = new Configuration();
87+
configuration.addAnnotatedClass( EntityWithGeneratedId.class );
88+
BaseReactiveTest.setDefaultProperties( configuration );
89+
configuration.setProperty( Settings.SHOW_SQL, String.valueOf( LOG_SQL ) );
90+
StandardServiceRegistryBuilder builder = new ReactiveServiceRegistryBuilder()
91+
.applySettings( configuration.getProperties() )
92+
//Inject our custom vert.x instance:
93+
.addService( VertxInstance.class, () -> vertx );
94+
StandardServiceRegistry registry = builder.build();
95+
sessionFactory = configuration.buildSessionFactory( registry );
96+
stageSessionFactory = sessionFactory.unwrap( Stage.SessionFactory.class );
97+
}
98+
99+
@AfterClass
100+
public static void closeSessionFactory() {
101+
stageSessionFactory.close();
102+
}
103+
104+
@Test(timeout = ( 1000 * 60 * 10 ))//10 minutes timeout
105+
public void testIdentityGenerator(TestContext context) {
106+
final Async async = context.async();
107+
108+
final DeploymentOptions deploymentOptions = new DeploymentOptions();
109+
deploymentOptions.setInstances( N_THREADS );
110+
111+
vertx
112+
.deployVerticle( () -> new InsertEntitiesVerticle(), deploymentOptions )
113+
.onSuccess( res -> {
114+
endLatch.waitForEveryone();
115+
async.complete();
116+
} )
117+
.onFailure( context::fail )
118+
.eventually( unused -> vertx.close() );
119+
}
120+
121+
private static class InsertEntitiesVerticle extends AbstractVerticle {
122+
123+
int sequentialOperation = 0;
124+
125+
public InsertEntitiesVerticle() {
126+
}
127+
128+
@Override
129+
public void start(Promise<Void> startPromise) {
130+
startLatch.reached();
131+
startLatch.waitForEveryone();//Not essential, but to ensure a good level of parallelism
132+
final String initialThreadName = Thread.currentThread().getName();
133+
stageSessionFactory.withSession(
134+
s -> storeMultipleEntities( s )
135+
)
136+
.whenComplete( (o, throwable) -> {
137+
endLatch.reached();
138+
if ( throwable != null ) {
139+
startPromise.fail( throwable );
140+
}
141+
else {
142+
if ( !initialThreadName.equals( Thread.currentThread().getName() ) ) {
143+
startPromise.fail( "Thread switch detected!" );
144+
}
145+
else {
146+
startPromise.complete();
147+
}
148+
}
149+
} );
150+
}
151+
152+
private CompletionStage<Void> storeMultipleEntities( Stage.Session s) {
153+
return CompletionStages.loop( 0, ENTITIES_STORED_PER_THREAD, index -> storeEntity( s ) );
154+
}
155+
156+
private CompletionStage<Void> storeEntity(Stage.Session s) {
157+
final Thread beforeOperationThread = Thread.currentThread();
158+
final int localVerticleOperationSequence = sequentialOperation++;
159+
final EntityWithGeneratedId entity = new EntityWithGeneratedId();
160+
entity.name = beforeOperationThread + "__" + localVerticleOperationSequence;
161+
162+
return s.persist( entity )
163+
.thenCompose( v -> s.flush() )
164+
.thenAccept( v -> {
165+
s.clear();
166+
if ( beforeOperationThread != Thread.currentThread() ) {
167+
throw new IllegalStateException( "Detected an unexpected switch of carrier threads!" );
168+
}
169+
});
170+
}
171+
172+
@Override
173+
public void stop() {
174+
prettyOut( "Verticle stopped " + super.toString() );
175+
}
176+
}
177+
178+
179+
/**
180+
* Trivial entity using a Sequence for Id generation
181+
*/
182+
@Entity
183+
private static class EntityWithGeneratedId {
184+
@Id
185+
@GeneratedValue
186+
Long id;
187+
188+
String name;
189+
190+
public EntityWithGeneratedId() {
191+
}
192+
}
193+
194+
/**
195+
* Custom latch which is rather verbose about threads reaching the milestones, to help verifying the design
196+
*/
197+
private static final class Latch {
198+
private final String label;
199+
private final CountDownLatch countDownLatch;
200+
201+
public Latch(String label, int membersCount) {
202+
this.label = label;
203+
this.countDownLatch = new CountDownLatch( membersCount );
204+
}
205+
206+
public void reached() {
207+
final long count = countDownLatch.getCount();
208+
countDownLatch.countDown();
209+
prettyOut( "Reached latch '" + label + "', current countdown is " + ( count - 1 ) );
210+
}
211+
212+
public void waitForEveryone() {
213+
try {
214+
countDownLatch.await( TIMEOUT_MINUTES, TimeUnit.MINUTES );
215+
prettyOut( "Everyone has now breached '" + label + "'" );
216+
}
217+
catch ( InterruptedException e ) {
218+
e.printStackTrace();
219+
}
220+
}
221+
}
222+
223+
private static void prettyOut(final String message) {
224+
final String threadName = Thread.currentThread().getName();
225+
final long l = System.currentTimeMillis();
226+
final long seconds = ( l / 1000 ) - initialSecond;
227+
//We prefix log messages by seconds since bootstrap; I'm preferring this over millisecond precision
228+
//as it's not very relevant to see exactly how long each stage took (it's actually distracting)
229+
//but it's more useful to group things coarsely when some lock or timeout introduces a significant
230+
//divide between some operations (when a starvation or timeout happens it takes some seconds).
231+
System.out.println( seconds + " - " + threadName + ": " + message );
232+
}
233+
234+
private static final long initialSecond = ( System.currentTimeMillis() / 1000 );
235+
236+
}

0 commit comments

Comments
 (0)