Skip to content

proposed fix for #707 #708

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 16, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public abstract class BlockingIdentifierGenerator implements ReactiveIdentifierG
private int loValue;
private long hiValue;

private volatile List<CompletableFuture<Long>> queue = null;
private volatile List<Runnable> queue = null;

protected synchronized long next() {
return loValue>0 && loValue<getBlockSize()
Expand All @@ -54,6 +54,11 @@ protected synchronized long next(long hi) {

@Override
public CompletionStage<Long> generate(ReactiveConnectionSupplier session, Object entity) {
if ( getBlockSize()<=1 ) {
//special case where we're not using blocking at all
return nextHiValue(session);
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

long local = next();
if ( local >= 0 ) {
// We don't need to update or initialize the hi
Expand All @@ -70,7 +75,7 @@ public CompletionStage<Long> generate(ReactiveConnectionSupplier session, Object
// go off and fetch the next hi value from db
nextHiValue(session).thenAccept( id -> {
// Vertx.currentContext().runOnContext(v -> {
List<CompletableFuture<Long>> list;
List<Runnable> list;
synchronized (this) {
// clone ref to the queue
list = queue;
Expand All @@ -79,14 +84,14 @@ public CompletionStage<Long> generate(ReactiveConnectionSupplier session, Object
result.complete( next(id) );
}
// send waiting streams back to try again
list.forEach( completion -> generate(session, entity)
.thenAccept(completion::complete) );
list.forEach(Runnable::run);
// } );
} );
}
else {
// wait for the concurrent fetch to complete
queue.add(result);
// note that we carefully capture the right session,entity here!
queue.add( () -> generate(session, entity).thenAccept(result::complete) );
}
return result;
}
Expand Down