Skip to content

Commit e35ed94

Browse files
committed
Introduce new multi-threaded stress test for the Id generation based on sequences
1 parent 22da4d9 commit e35ed94

File tree

1 file changed

+295
-0
lines changed

1 file changed

+295
-0
lines changed
Lines changed: 295 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
/* Hibernate, Relational Persistence for Idiomatic Java
2+
*
3+
* SPDX-License-Identifier: Apache-2.0
4+
* Copyright: Red Hat Inc. and Hibernate Authors
5+
*/
6+
package org.hibernate.reactive;
7+
8+
import io.vertx.core.AbstractVerticle;
9+
import io.vertx.core.DeploymentOptions;
10+
import io.vertx.core.Promise;
11+
import io.vertx.core.Vertx;
12+
import io.vertx.core.VertxOptions;
13+
import io.vertx.ext.unit.Async;
14+
import io.vertx.ext.unit.TestContext;
15+
import io.vertx.ext.unit.junit.VertxUnitRunner;
16+
import jakarta.persistence.Entity;
17+
import jakarta.persistence.GeneratedValue;
18+
import jakarta.persistence.Id;
19+
20+
import org.hibernate.SessionFactory;
21+
import org.hibernate.boot.registry.StandardServiceRegistry;
22+
import org.hibernate.boot.registry.StandardServiceRegistryBuilder;
23+
import org.hibernate.cfg.Configuration;
24+
import org.hibernate.reactive.id.impl.ReactiveGeneratorWrapper;
25+
import org.hibernate.reactive.provider.ReactiveServiceRegistryBuilder;
26+
import org.hibernate.reactive.provider.Settings;
27+
import org.hibernate.reactive.session.ReactiveConnectionSupplier;
28+
import org.hibernate.reactive.session.impl.ReactiveSessionFactoryImpl;
29+
import org.hibernate.reactive.stage.Stage;
30+
import org.hibernate.reactive.stage.impl.StageSessionImpl;
31+
import org.hibernate.reactive.util.impl.CompletionStages;
32+
import org.hibernate.reactive.vertx.VertxInstance;
33+
34+
import org.junit.AfterClass;
35+
import org.junit.BeforeClass;
36+
import org.junit.Test;
37+
import org.junit.runner.RunWith;
38+
39+
import java.util.ArrayList;
40+
import java.util.BitSet;
41+
import java.util.List;
42+
import java.util.concurrent.CompletionStage;
43+
import java.util.concurrent.ConcurrentHashMap;
44+
import java.util.concurrent.ConcurrentMap;
45+
import java.util.concurrent.CountDownLatch;
46+
import java.util.concurrent.TimeUnit;
47+
48+
/**
49+
* This is a multi-threaded stress test, intentionally consuming some time.
50+
* The purpose is to verify that the sequence optimizer used by Hibernate Reactive
51+
* is indeed able to generate unique IDs backed by the database sequences, while
52+
* running multiple operations in different threads and on multiple Vert.x eventloops.
53+
* A typical reactive application will not require multiple threads, but we
54+
* specifically want to test for the case in which the single ID source is being
55+
* shared across multiple threads and eventloops.
56+
*/
57+
@RunWith(VertxUnitRunner.class)
58+
public class MultithreadedIdentityGenerationTest {
59+
60+
/* The number of threads should be higher than the default size of the connection pool so that
61+
* this test is also effective in detecting problems with resource starvation.
62+
*/
63+
private static final int N_THREADS = 48;
64+
private static final int IDS_GENERATED_PER_THREAD = 10000;
65+
66+
//Should finish much sooner, but generating this amount of IDs could be slow on some CIs
67+
private static final int TIMEOUT_MINUTES = 10;
68+
69+
private static final boolean LOG_SQL = false;
70+
private static final Latch startLatch = new Latch( "start", N_THREADS );
71+
private static final Latch endLatch = new Latch( "end", N_THREADS );
72+
73+
private static Stage.SessionFactory stageSessionFactory;
74+
private static Vertx vertx;
75+
private static SessionFactory sessionFactory;
76+
77+
@BeforeClass
78+
public static void setupSessionFactory() {
79+
final VertxOptions vertxOptions = new VertxOptions();
80+
vertxOptions.setEventLoopPoolSize( N_THREADS );
81+
//We relax the blocked thread checks as we'll actually use latches to block them
82+
//intentionally for the purpose of the test; functionally this isn't required
83+
//but it's useful as self-test in the design of this, to ensure that the way
84+
//things are setup are indeed being run in multiple, separate threads.
85+
vertxOptions.setBlockedThreadCheckInterval( TIMEOUT_MINUTES );
86+
vertxOptions.setBlockedThreadCheckIntervalUnit( TimeUnit.MINUTES );
87+
vertx = Vertx.vertx( vertxOptions );
88+
Configuration configuration = new Configuration();
89+
configuration.addAnnotatedClass( EntityWithGeneratedId.class );
90+
BaseReactiveTest.setDefaultProperties( configuration );
91+
configuration.setProperty( Settings.SHOW_SQL, String.valueOf( LOG_SQL ) );
92+
StandardServiceRegistryBuilder builder = new ReactiveServiceRegistryBuilder()
93+
.applySettings( configuration.getProperties() )
94+
//Inject our custom vert.x instance:
95+
.addService( VertxInstance.class, () -> vertx );
96+
StandardServiceRegistry registry = builder.build();
97+
sessionFactory = configuration.buildSessionFactory( registry );
98+
stageSessionFactory = sessionFactory.unwrap( Stage.SessionFactory.class );
99+
}
100+
101+
@AfterClass
102+
public static void closeSessionFactory() {
103+
stageSessionFactory.close();
104+
}
105+
106+
private ReactiveGeneratorWrapper getIdGenerator() {
107+
final ReactiveSessionFactoryImpl hibernateSessionFactory = (ReactiveSessionFactoryImpl) sessionFactory;
108+
final ReactiveGeneratorWrapper identifierGenerator = (ReactiveGeneratorWrapper) hibernateSessionFactory.getIdentifierGenerator(
109+
"org.hibernate.reactive.MultithreadedIdentityGenerationTest$EntityWithGeneratedId" );
110+
return identifierGenerator;
111+
}
112+
113+
@Test(timeout = ( 1000 * 60 * 10 ))//10 minutes timeout
114+
public void testIdentityGenerator(TestContext context) {
115+
final Async async = context.async();
116+
final ReactiveGeneratorWrapper idGenerator = getIdGenerator();
117+
context.assertNotNull( idGenerator );
118+
119+
final DeploymentOptions deploymentOptions = new DeploymentOptions();
120+
deploymentOptions.setInstances( N_THREADS );
121+
122+
ResultsCollector allResults = new ResultsCollector();
123+
124+
vertx
125+
.deployVerticle( () -> new IdGenVerticle( idGenerator, allResults ), deploymentOptions )
126+
.onSuccess( res -> {
127+
endLatch.waitForEveryone();
128+
if ( allResultsAreUnique( allResults ) ) {
129+
async.complete();
130+
}
131+
else {
132+
context.fail( "Non unique numbers detected" );
133+
}
134+
} )
135+
.onFailure( context::fail )
136+
.eventually( unused -> vertx.close() );
137+
}
138+
139+
private boolean allResultsAreUnique(ResultsCollector allResults) {
140+
//Add 50 per thread to the total amount of generated ids to allow for gaps
141+
//in the hi/lo partitioning (not likely to be necessary)
142+
final int expectedSize = N_THREADS * ( IDS_GENERATED_PER_THREAD + 50 );
143+
BitSet resultsSeen = new BitSet( expectedSize );
144+
boolean failed = false;
145+
for ( List<Long> partialResult : allResults.resultsByThread.values() ) {
146+
for ( Long aLong : partialResult ) {
147+
final int intValue = aLong.intValue();
148+
final boolean existing = resultsSeen.get( intValue );
149+
if ( existing ) {
150+
System.out.println( "Duplicate ID detected: " + intValue );
151+
failed = true;
152+
}
153+
resultsSeen.set( intValue );
154+
}
155+
}
156+
return !failed;
157+
}
158+
159+
private static class IdGenVerticle extends AbstractVerticle {
160+
161+
private final ReactiveGeneratorWrapper idGenerator;
162+
private final ResultsCollector allResults;
163+
private final ArrayList<Long> generatedIds = new ArrayList<>( IDS_GENERATED_PER_THREAD );
164+
165+
public IdGenVerticle(ReactiveGeneratorWrapper idGenerator, ResultsCollector allResults) {
166+
this.idGenerator = idGenerator;
167+
this.allResults = allResults;
168+
}
169+
170+
@Override
171+
public void start(Promise<Void> startPromise) {
172+
try {
173+
startLatch.reached();
174+
startLatch.waitForEveryone();//Not essential, but to ensure a good level of parallelism
175+
final String initialThreadName = Thread.currentThread().getName();
176+
stageSessionFactory.withSession(
177+
s -> generateMultipleIds( idGenerator, s, generatedIds )
178+
)
179+
.whenComplete( (o, throwable) -> {
180+
endLatch.reached();
181+
if ( throwable != null ) {
182+
startPromise.fail( throwable );
183+
}
184+
else {
185+
if ( !initialThreadName.equals( Thread.currentThread().getName() ) ) {
186+
startPromise.fail( "Thread switch detected!" );
187+
}
188+
else {
189+
allResults.deliverResulst( generatedIds );
190+
startPromise.complete();
191+
}
192+
}
193+
} );
194+
}
195+
catch (RuntimeException e) {
196+
startPromise.fail( e );
197+
}
198+
}
199+
200+
@Override
201+
public void stop() {
202+
prettyOut( "Verticle stopped " + super.toString() );
203+
}
204+
}
205+
206+
private static class ResultsCollector {
207+
208+
private final ConcurrentMap<String,List<Long>> resultsByThread = new ConcurrentHashMap<>();
209+
210+
public void deliverResulst(List<Long> generatedIds) {
211+
final String threadName = Thread.currentThread().getName();
212+
resultsByThread.put( threadName, generatedIds );
213+
}
214+
}
215+
216+
private static CompletionStage<Void> generateMultipleIds(
217+
ReactiveGeneratorWrapper idGenerator,
218+
Stage.Session s,
219+
ArrayList<Long> collector) {
220+
return CompletionStages.loop( 0, IDS_GENERATED_PER_THREAD, index -> generateIds( idGenerator, s, collector ) );
221+
}
222+
223+
private static CompletionStage<Void> generateIds(
224+
ReactiveGeneratorWrapper idGenerator,
225+
Stage.Session s,
226+
ArrayList<Long> collector) {
227+
final Thread beforeOperationThread = Thread.currentThread();
228+
return idGenerator.generate( ( (StageSessionImpl) s )
229+
.unwrap( ReactiveConnectionSupplier.class ), new EntityWithGeneratedId() )
230+
.thenAccept( o -> {
231+
if ( beforeOperationThread != Thread.currentThread() ) {
232+
throw new IllegalStateException( "Detected an unexpected switch of carrier threads!" );
233+
}
234+
collector.add( (Long) o );
235+
} );
236+
}
237+
238+
/**
239+
* Trivial entity using a Sequence for Id generation
240+
*/
241+
@Entity
242+
private static class EntityWithGeneratedId {
243+
@Id
244+
@GeneratedValue
245+
Long id;
246+
247+
String name;
248+
249+
public EntityWithGeneratedId() {
250+
}
251+
}
252+
253+
/**
254+
* Custom latch which is rather verbose about threads reaching the milestones, to help verifying the design
255+
*/
256+
private static final class Latch {
257+
private final String label;
258+
private final CountDownLatch countDownLatch;
259+
260+
public Latch(String label, int membersCount) {
261+
this.label = label;
262+
this.countDownLatch = new CountDownLatch( membersCount );
263+
}
264+
265+
public void reached() {
266+
final long count = countDownLatch.getCount();
267+
countDownLatch.countDown();
268+
prettyOut( "Reached latch '" + label + "', current countdown is " + ( count - 1 ) );
269+
}
270+
271+
public void waitForEveryone() {
272+
try {
273+
countDownLatch.await( TIMEOUT_MINUTES, TimeUnit.MINUTES );
274+
prettyOut( "Everyone has now breached '" + label + "'" );
275+
}
276+
catch ( InterruptedException e ) {
277+
e.printStackTrace();
278+
}
279+
}
280+
}
281+
282+
private static void prettyOut(final String message) {
283+
final String threadName = Thread.currentThread().getName();
284+
final long l = System.currentTimeMillis();
285+
final long seconds = ( l / 1000 ) - initialSecond;
286+
//We prefix log messages by seconds since bootstrap; I'm preferring this over millisecond precision
287+
//as it's not very relevant to see exactly how long each stage took (it's actually distracting)
288+
//but it's more useful to group things coarsely when some lock or timeout introduces a significant
289+
//divide between some operations (when a starvation or timeout happens it takes some seconds).
290+
System.out.println( seconds + " - " + threadName + ": " + message );
291+
}
292+
293+
private static final long initialSecond = ( System.currentTimeMillis() / 1000 );
294+
295+
}

0 commit comments

Comments
 (0)