Skip to content

reactive search batch size #2392

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1313,7 +1313,7 @@ private <T> void prepareSearchRequest(Query query, @Nullable Class<T> clazz, Ind
Duration scrollTimeout = query.getScrollTime() != null ? query.getScrollTime() : Duration.ofMinutes(1);
builder.scroll(time(scrollTimeout));
// limit the number of documents in a batch
builder.size(500);
builder.size(query.getReactiveBatchSize());
}

if (!isEmpty(query.getIndicesBoost())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
*/
public class BaseQuery implements Query {

private static final int DEFAULT_REACTIVE_BATCH_SIZE = 500;

@Nullable protected Sort sort;
protected Pageable pageable = DEFAULT_PAGE;
protected List<String> fields = new ArrayList<>();
Expand Down Expand Up @@ -75,6 +77,7 @@ public class BaseQuery implements Query {
@Nullable protected PointInTime pointInTime;

private boolean queryIsUpdatedByConverter = false;
@Nullable private Integer reactiveBatchSize = null;

public BaseQuery() {}

Expand Down Expand Up @@ -105,6 +108,7 @@ public <Q extends BaseQuery, B extends BaseQueryBuilder<Q, B>> BaseQuery(BaseQue
this.requestCache = builder.getRequestCache();
this.idsWithRouting = builder.getIdsWithRouting();
this.pointInTime = builder.getPointInTime();
this.reactiveBatchSize = builder.getReactiveBatchSize();
}

@Override
Expand Down Expand Up @@ -471,6 +475,7 @@ public void setPointInTime(@Nullable PointInTime pointInTime) {

/**
* used internally. Not considered part of the API.
*
* @since 5.0
*/
public boolean queryIsUpdatedByConverter() {
Expand All @@ -479,9 +484,22 @@ public boolean queryIsUpdatedByConverter() {

/**
* used internally. Not considered part of the API.
*
* @since 5.0
*/
public void setQueryIsUpdatedByConverter(boolean queryIsUpdatedByConverter) {
this.queryIsUpdatedByConverter = queryIsUpdatedByConverter;
}

@Override
public Integer getReactiveBatchSize() {
return reactiveBatchSize != null ? reactiveBatchSize : DEFAULT_REACTIVE_BATCH_SIZE;
}

/**
* @since 5.1
*/
public void setReactiveBatchSize(Integer reactiveBatchSize) {
this.reactiveBatchSize = reactiveBatchSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public abstract class BaseQueryBuilder<Q extends BaseQuery, SELF extends BaseQue
protected final List<RuntimeField> runtimeFields = new ArrayList<>();
@Nullable protected Query.PointInTime pointInTime;

@Nullable Integer reactiveBatchSize;

@Nullable
public Sort getSort() {
return sort;
Expand Down Expand Up @@ -191,6 +193,13 @@ public Query.PointInTime getPointInTime() {
return pointInTime;
}

/**
* @since 5.1
*/
public Integer getReactiveBatchSize() {
return reactiveBatchSize;
}

public SELF withPageable(Pageable pageable) {
this.pageable = pageable;
return self();
Expand Down Expand Up @@ -375,6 +384,14 @@ public SELF withPointInTime(@Nullable Query.PointInTime pointInTime) {
return self();
}

/**
* @since 5.1
*/
public SELF withReactiveBatchSize(@Nullable Integer reactiveBatchSize) {
this.reactiveBatchSize = reactiveBatchSize;
return self();
}

public abstract Q build();

private SELF self() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,17 @@ default PointInTime getPointInTime() {
return null;
};

/**
* returns the number of documents that are requested when the reactive code does a batched search operation. This is
* the case when a query has no limit and no Pageable set.
*
* @return the batch size, defaults to 500 in {@link BaseQuery}
* @since 5.1
*/
default Integer getReactiveBatchSize() {
return 500;
}

/**
* @since 4.3
*/
Expand Down