@@ -38,7 +38,27 @@ public abstract class BlockingIdentifierGenerator implements ReactiveIdentifierG
38
38
private int loValue ;
39
39
private long hiValue ;
40
40
41
- private volatile List <CompletableFuture <Long >> queue = null ;
41
+ private volatile List <Queued > queue = null ;
42
+
43
+ /**
44
+ * A queued request for an identifier that can't be satisfied immediately
45
+ * because we're currently fetching a new "hi" value in a different request.
46
+ */
47
+ private final class Queued {
48
+ private final ReactiveConnectionSupplier session ;
49
+ private final Object entity ;
50
+ private final CompletableFuture <Long > completion ;
51
+
52
+ private Queued (ReactiveConnectionSupplier session , Object entity , CompletableFuture <Long > completion ) {
53
+ this .session = session ;
54
+ this .entity = entity ;
55
+ this .completion = completion ;
56
+ }
57
+
58
+ private void complete () {
59
+ generate (session , entity ).thenAccept (completion ::complete );
60
+ }
61
+ }
42
62
43
63
protected synchronized long next () {
44
64
return loValue >0 && loValue <getBlockSize ()
@@ -70,7 +90,7 @@ public CompletionStage<Long> generate(ReactiveConnectionSupplier session, Object
70
90
// go off and fetch the next hi value from db
71
91
nextHiValue (session ).thenAccept ( id -> {
72
92
// Vertx.currentContext().runOnContext(v -> {
73
- List <CompletableFuture < Long > > list ;
93
+ List <Queued > list ;
74
94
synchronized (this ) {
75
95
// clone ref to the queue
76
96
list = queue ;
@@ -79,14 +99,13 @@ public CompletionStage<Long> generate(ReactiveConnectionSupplier session, Object
79
99
result .complete ( next (id ) );
80
100
}
81
101
// send waiting streams back to try again
82
- list .forEach ( completion -> generate (session , entity )
83
- .thenAccept (completion ::complete ) );
102
+ list .forEach (Queued ::complete );
84
103
// } );
85
104
} );
86
105
}
87
106
else {
88
107
// wait for the concurrent fetch to complete
89
- queue .add (result );
108
+ queue .add ( new Queued ( session , entity , result ) );
90
109
}
91
110
return result ;
92
111
}
0 commit comments