Skip to content

Feature/import documents batch threads #276

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jul 24, 2019
9 changes: 9 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@
<artifactId>httpcore</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
Expand Down Expand Up @@ -275,6 +279,11 @@
<artifactId>httpcore</artifactId>
<version>4.4.11</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/com/arangodb/ArangoCollection.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,23 @@ <T> MultiDocumentEntity<DocumentCreateEntity<T>> insertDocuments(
*/
DocumentImportEntity importDocuments(Collection<?> values, DocumentImportOptions options) throws ArangoDBException;

/**
* Bulk imports the given values into the collection.
*
* @param values
* a list of Objects that will be stored as documents
* @param options
* Additional options, can be null
* @param batchSize
* Size for individual data batches of the original collection of values
* @param numThreads
* Number of parallel import threads
* @return list of information about the imported batches
* @throws ArangoDBException
*/
Collection<DocumentImportEntity> importDocuments(Collection<?> values, DocumentImportOptions options,
int batchSize, int numThreads) throws ArangoDBException;

/**
* Bulk imports the given values into the collection.
*
Expand Down
33 changes: 33 additions & 0 deletions src/main/java/com/arangodb/internal/ArangoCollectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,15 @@

package com.arangodb.internal;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.commons.collections4.ListUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -105,6 +112,32 @@ public DocumentImportEntity importDocuments(final Collection<?> values, final Do
return executor.execute(importDocumentsRequest(values, options), DocumentImportEntity.class);
}

@Override
public Collection<DocumentImportEntity> importDocuments(Collection<?> values, DocumentImportOptions options,
int batchSize, int numThreads) throws ArangoDBException {
List<? extends List<?>> batches = ListUtils.partition(new ArrayList<>(values), batchSize);
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
List<CompletableFuture<DocumentImportEntity>> completableFutureList = new ArrayList<>();
for (List<?> batch : batches) {
CompletableFuture<DocumentImportEntity> completableFuture = CompletableFuture.supplyAsync(() -> {
DocumentImportEntity documentImportEntity = importDocuments(batch, options);
return documentImportEntity;
}, executorService);
completableFutureList.add(completableFuture);
}
List<DocumentImportEntity> documentImportEntityList = new ArrayList<>();
for (CompletableFuture<DocumentImportEntity> completableFuture : completableFutureList) {
DocumentImportEntity documentImportEntity = null;
try {
documentImportEntity = completableFuture.get();
} catch (InterruptedException | ExecutionException e) {
throw new ArangoDBException(e);
}
documentImportEntityList.add(documentImportEntity);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hkernbach I noticed a small problem.
RIght before the return statement on this line
we need to shutdown the executorService.

        executorService.shutdown();

If another pull request is needed, please let me know.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix that, thanks for spotting :)

return documentImportEntityList;
}

@Override
public DocumentImportEntity importDocuments(final String values) throws ArangoDBException {
return importDocuments(values, new DocumentImportOptions());
Expand Down
58 changes: 58 additions & 0 deletions src/test/java/com/arangodb/ArangoCollectionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1726,6 +1726,64 @@ public void importDocumentsJsonFromToPrefix() {
}
}

@Test
public void importDocumentsBatchSizeNumThreads() {
final Collection<BaseDocument> values = new ArrayList<BaseDocument>();
for (int i = 1; i <= 100; i++) {
values.add(new BaseDocument(String.valueOf(i)));
}
int batchSize = 5;
int numThreads = 8;
final Collection<DocumentImportEntity> docsList = db.collection(COLLECTION_NAME).importDocuments(values,
new DocumentImportOptions(), batchSize, numThreads);
assertThat(docsList.size(), is(values.size() / batchSize));
for (final DocumentImportEntity docs : docsList) {
assertThat(docs, is(notNullValue()));
assertThat(docs.getCreated(), is(batchSize));
assertThat(docs.getEmpty(), is(0));
assertThat(docs.getErrors(), is(0));
assertThat(docs.getIgnored(), is(0));
assertThat(docs.getUpdated(), is(0));
assertThat(docs.getDetails(), is(empty()));
}
}

@Test
public void importDocumentsBatchSizeNumThreadsIllegalBatchSize() {
final Collection<BaseDocument> values = new ArrayList<BaseDocument>();
for (int i = 1; i <= 10; i++) {
values.add(new BaseDocument(String.valueOf(i)));
}

int batchSize = 0;
int numThreads = 8;

try {
final Collection<DocumentImportEntity> docsList = db.collection(COLLECTION_NAME).importDocuments(values,
new DocumentImportOptions(), batchSize, numThreads);
fail();
} catch (IllegalArgumentException e) {
}
}

@Test
public void importDocumentsBatchSizeNumThreadsIllegalNumThreads() {
final Collection<BaseDocument> values = new ArrayList<BaseDocument>();
for (int i = 1; i <= 10; i++) {
values.add(new BaseDocument(String.valueOf(i)));
}

int batchSize = 5;
int numThreads = 0;

try {
final Collection<DocumentImportEntity> docsList = db.collection(COLLECTION_NAME).importDocuments(values,
new DocumentImportOptions(), batchSize, numThreads);
fail();
} catch (IllegalArgumentException e) {
}
}

@Test
public void deleteDocumentsByKey() {
final Collection<BaseDocument> values = new ArrayList<BaseDocument>();
Expand Down