diff --git a/pom.xml b/pom.xml
index 4afbede17..3b5b617d4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -230,6 +230,10 @@
httpcore
provided
+
+ org.apache.commons
+ commons-collections4
+
commons-logging
commons-logging
@@ -275,6 +279,11 @@
httpcore
4.4.11
+
+ org.apache.commons
+ commons-collections4
+ 4.4
+
commons-codec
commons-codec
diff --git a/src/main/java/com/arangodb/ArangoCollection.java b/src/main/java/com/arangodb/ArangoCollection.java
index 0f0dd8c10..b00f8b6dc 100644
--- a/src/main/java/com/arangodb/ArangoCollection.java
+++ b/src/main/java/com/arangodb/ArangoCollection.java
@@ -150,6 +150,23 @@ MultiDocumentEntity> insertDocuments(
*/
DocumentImportEntity importDocuments(Collection> values, DocumentImportOptions options) throws ArangoDBException;
+ /**
+ * Bulk imports the given values into the collection.
+ *
+ * @param values
+ * a list of Objects that will be stored as documents
+ * @param options
+ * Additional options, can be null
+ * @param batchSize
+ * Size for individual data batches of the original collection of values
+ * @param numThreads
+ * Number of parallel import threads
+ * @return list of information about the imported batches
+ * @throws ArangoDBException
+ */
+ Collection importDocuments(Collection> values, DocumentImportOptions options,
+ int batchSize, int numThreads) throws ArangoDBException;
+
/**
* Bulk imports the given values into the collection.
*
diff --git a/src/main/java/com/arangodb/internal/ArangoCollectionImpl.java b/src/main/java/com/arangodb/internal/ArangoCollectionImpl.java
index fb3f78154..4fda706c6 100644
--- a/src/main/java/com/arangodb/internal/ArangoCollectionImpl.java
+++ b/src/main/java/com/arangodb/internal/ArangoCollectionImpl.java
@@ -20,8 +20,15 @@
package com.arangodb.internal;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.commons.collections4.ListUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -105,6 +112,32 @@ public DocumentImportEntity importDocuments(final Collection> values, final Do
return executor.execute(importDocumentsRequest(values, options), DocumentImportEntity.class);
}
+ @Override
+ public Collection importDocuments(Collection> values, DocumentImportOptions options,
+ int batchSize, int numThreads) throws ArangoDBException {
+ List extends List>> batches = ListUtils.partition(new ArrayList<>(values), batchSize);
+ ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+ List> completableFutureList = new ArrayList<>();
+ for (List> batch : batches) {
+ CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
+ DocumentImportEntity documentImportEntity = importDocuments(batch, options);
+ return documentImportEntity;
+ }, executorService);
+ completableFutureList.add(completableFuture);
+ }
+ List documentImportEntityList = new ArrayList<>();
+ for (CompletableFuture completableFuture : completableFutureList) {
+ DocumentImportEntity documentImportEntity = null;
+ try {
+ documentImportEntity = completableFuture.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new ArangoDBException(e);
+ }
+ documentImportEntityList.add(documentImportEntity);
+ }
+ return documentImportEntityList;
+ }
+
@Override
public DocumentImportEntity importDocuments(final String values) throws ArangoDBException {
return importDocuments(values, new DocumentImportOptions());
diff --git a/src/test/java/com/arangodb/ArangoCollectionTest.java b/src/test/java/com/arangodb/ArangoCollectionTest.java
index c19c63473..502a9b7c6 100644
--- a/src/test/java/com/arangodb/ArangoCollectionTest.java
+++ b/src/test/java/com/arangodb/ArangoCollectionTest.java
@@ -1726,6 +1726,64 @@ public void importDocumentsJsonFromToPrefix() {
}
}
+ @Test
+ public void importDocumentsBatchSizeNumThreads() {
+ final Collection values = new ArrayList();
+ for (int i = 1; i <= 100; i++) {
+ values.add(new BaseDocument(String.valueOf(i)));
+ }
+ int batchSize = 5;
+ int numThreads = 8;
+ final Collection docsList = db.collection(COLLECTION_NAME).importDocuments(values,
+ new DocumentImportOptions(), batchSize, numThreads);
+ assertThat(docsList.size(), is(values.size() / batchSize));
+ for (final DocumentImportEntity docs : docsList) {
+ assertThat(docs, is(notNullValue()));
+ assertThat(docs.getCreated(), is(batchSize));
+ assertThat(docs.getEmpty(), is(0));
+ assertThat(docs.getErrors(), is(0));
+ assertThat(docs.getIgnored(), is(0));
+ assertThat(docs.getUpdated(), is(0));
+ assertThat(docs.getDetails(), is(empty()));
+ }
+ }
+
+ @Test
+ public void importDocumentsBatchSizeNumThreadsIllegalBatchSize() {
+ final Collection values = new ArrayList();
+ for (int i = 1; i <= 10; i++) {
+ values.add(new BaseDocument(String.valueOf(i)));
+ }
+
+ int batchSize = 0;
+ int numThreads = 8;
+
+ try {
+ final Collection docsList = db.collection(COLLECTION_NAME).importDocuments(values,
+ new DocumentImportOptions(), batchSize, numThreads);
+ fail();
+ } catch (IllegalArgumentException e) {
+ }
+ }
+
+ @Test
+ public void importDocumentsBatchSizeNumThreadsIllegalNumThreads() {
+ final Collection values = new ArrayList();
+ for (int i = 1; i <= 10; i++) {
+ values.add(new BaseDocument(String.valueOf(i)));
+ }
+
+ int batchSize = 5;
+ int numThreads = 0;
+
+ try {
+ final Collection docsList = db.collection(COLLECTION_NAME).importDocuments(values,
+ new DocumentImportOptions(), batchSize, numThreads);
+ fail();
+ } catch (IllegalArgumentException e) {
+ }
+ }
+
@Test
public void deleteDocumentsByKey() {
final Collection values = new ArrayList();