diff --git a/pom.xml b/pom.xml index 4afbede17..3b5b617d4 100644 --- a/pom.xml +++ b/pom.xml @@ -230,6 +230,10 @@ httpcore provided + + org.apache.commons + commons-collections4 + commons-logging commons-logging @@ -275,6 +279,11 @@ httpcore 4.4.11 + + org.apache.commons + commons-collections4 + 4.4 + commons-codec commons-codec diff --git a/src/main/java/com/arangodb/ArangoCollection.java b/src/main/java/com/arangodb/ArangoCollection.java index 0f0dd8c10..b00f8b6dc 100644 --- a/src/main/java/com/arangodb/ArangoCollection.java +++ b/src/main/java/com/arangodb/ArangoCollection.java @@ -150,6 +150,23 @@ MultiDocumentEntity> insertDocuments( */ DocumentImportEntity importDocuments(Collection values, DocumentImportOptions options) throws ArangoDBException; + /** + * Bulk imports the given values into the collection. + * + * @param values + * a list of Objects that will be stored as documents + * @param options + * Additional options, can be null + * @param batchSize + * Size for individual data batches of the original collection of values + * @param numThreads + * Number of parallel import threads + * @return list of information about the imported batches + * @throws ArangoDBException + */ + Collection importDocuments(Collection values, DocumentImportOptions options, + int batchSize, int numThreads) throws ArangoDBException; + /** * Bulk imports the given values into the collection. * diff --git a/src/main/java/com/arangodb/internal/ArangoCollectionImpl.java b/src/main/java/com/arangodb/internal/ArangoCollectionImpl.java index fb3f78154..4fda706c6 100644 --- a/src/main/java/com/arangodb/internal/ArangoCollectionImpl.java +++ b/src/main/java/com/arangodb/internal/ArangoCollectionImpl.java @@ -20,8 +20,15 @@ package com.arangodb.internal; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.commons.collections4.ListUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,6 +112,32 @@ public DocumentImportEntity importDocuments(final Collection values, final Do return executor.execute(importDocumentsRequest(values, options), DocumentImportEntity.class); } + @Override + public Collection importDocuments(Collection values, DocumentImportOptions options, + int batchSize, int numThreads) throws ArangoDBException { + List> batches = ListUtils.partition(new ArrayList<>(values), batchSize); + ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + List> completableFutureList = new ArrayList<>(); + for (List batch : batches) { + CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> { + DocumentImportEntity documentImportEntity = importDocuments(batch, options); + return documentImportEntity; + }, executorService); + completableFutureList.add(completableFuture); + } + List documentImportEntityList = new ArrayList<>(); + for (CompletableFuture completableFuture : completableFutureList) { + DocumentImportEntity documentImportEntity = null; + try { + documentImportEntity = completableFuture.get(); + } catch (InterruptedException | ExecutionException e) { + throw new ArangoDBException(e); + } + documentImportEntityList.add(documentImportEntity); + } + return documentImportEntityList; + } + @Override public DocumentImportEntity importDocuments(final String values) throws ArangoDBException { return importDocuments(values, new DocumentImportOptions()); diff --git a/src/test/java/com/arangodb/ArangoCollectionTest.java b/src/test/java/com/arangodb/ArangoCollectionTest.java index c19c63473..502a9b7c6 100644 --- a/src/test/java/com/arangodb/ArangoCollectionTest.java +++ b/src/test/java/com/arangodb/ArangoCollectionTest.java @@ -1726,6 +1726,64 @@ public void importDocumentsJsonFromToPrefix() { } } + @Test + public void importDocumentsBatchSizeNumThreads() { + final Collection values = new ArrayList(); + for (int i = 1; i <= 100; i++) { + values.add(new BaseDocument(String.valueOf(i))); + } + int batchSize = 5; + int numThreads = 8; + final Collection docsList = db.collection(COLLECTION_NAME).importDocuments(values, + new DocumentImportOptions(), batchSize, numThreads); + assertThat(docsList.size(), is(values.size() / batchSize)); + for (final DocumentImportEntity docs : docsList) { + assertThat(docs, is(notNullValue())); + assertThat(docs.getCreated(), is(batchSize)); + assertThat(docs.getEmpty(), is(0)); + assertThat(docs.getErrors(), is(0)); + assertThat(docs.getIgnored(), is(0)); + assertThat(docs.getUpdated(), is(0)); + assertThat(docs.getDetails(), is(empty())); + } + } + + @Test + public void importDocumentsBatchSizeNumThreadsIllegalBatchSize() { + final Collection values = new ArrayList(); + for (int i = 1; i <= 10; i++) { + values.add(new BaseDocument(String.valueOf(i))); + } + + int batchSize = 0; + int numThreads = 8; + + try { + final Collection docsList = db.collection(COLLECTION_NAME).importDocuments(values, + new DocumentImportOptions(), batchSize, numThreads); + fail(); + } catch (IllegalArgumentException e) { + } + } + + @Test + public void importDocumentsBatchSizeNumThreadsIllegalNumThreads() { + final Collection values = new ArrayList(); + for (int i = 1; i <= 10; i++) { + values.add(new BaseDocument(String.valueOf(i))); + } + + int batchSize = 5; + int numThreads = 0; + + try { + final Collection docsList = db.collection(COLLECTION_NAME).importDocuments(values, + new DocumentImportOptions(), batchSize, numThreads); + fail(); + } catch (IllegalArgumentException e) { + } + } + @Test public void deleteDocumentsByKey() { final Collection values = new ArrayList();