Skip to content

Commit 3e0b4e7

Browse files
rkhajahkernbach
authored andcommitted
Feature/import documents batch threads (#276)
* Defined new interface method and Javadoc for importDocuments that accepts two additional parameters batchSize and numThreads. * Added dependency commons-collections4 so that we can use ListUtils.partition. * Implemented new importDocuments method. * Added test for importDocuments method with batchSize and numThreads. * Minor refactoring of test for importDocuments method with batchSize and numThreads. * added two false positive tests
1 parent ce94b93 commit 3e0b4e7

File tree

4 files changed

+117
-0
lines changed

4 files changed

+117
-0
lines changed

pom.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,10 @@
230230
<artifactId>httpcore</artifactId>
231231
<scope>provided</scope>
232232
</dependency>
233+
<dependency>
234+
<groupId>org.apache.commons</groupId>
235+
<artifactId>commons-collections4</artifactId>
236+
</dependency>
233237
<dependency>
234238
<groupId>commons-logging</groupId>
235239
<artifactId>commons-logging</artifactId>
@@ -275,6 +279,11 @@
275279
<artifactId>httpcore</artifactId>
276280
<version>4.4.11</version>
277281
</dependency>
282+
<dependency>
283+
<groupId>org.apache.commons</groupId>
284+
<artifactId>commons-collections4</artifactId>
285+
<version>4.4</version>
286+
</dependency>
278287
<dependency>
279288
<groupId>commons-codec</groupId>
280289
<artifactId>commons-codec</artifactId>

src/main/java/com/arangodb/ArangoCollection.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,23 @@ <T> MultiDocumentEntity<DocumentCreateEntity<T>> insertDocuments(
150150
*/
151151
DocumentImportEntity importDocuments(Collection<?> values, DocumentImportOptions options) throws ArangoDBException;
152152

153+
/**
154+
* Bulk imports the given values into the collection.
155+
*
156+
* @param values
157+
* a list of Objects that will be stored as documents
158+
* @param options
159+
* Additional options, can be null
160+
* @param batchSize
161+
* Size for individual data batches of the original collection of values
162+
* @param numThreads
163+
* Number of parallel import threads
164+
* @return list of information about the imported batches
165+
* @throws ArangoDBException
166+
*/
167+
Collection<DocumentImportEntity> importDocuments(Collection<?> values, DocumentImportOptions options,
168+
int batchSize, int numThreads) throws ArangoDBException;
169+
153170
/**
154171
* Bulk imports the given values into the collection.
155172
*

src/main/java/com/arangodb/internal/ArangoCollectionImpl.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,15 @@
2020

2121
package com.arangodb.internal;
2222

23+
import java.util.ArrayList;
2324
import java.util.Collection;
25+
import java.util.List;
26+
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.ExecutionException;
28+
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.Executors;
2430

31+
import org.apache.commons.collections4.ListUtils;
2532
import org.slf4j.Logger;
2633
import org.slf4j.LoggerFactory;
2734

@@ -105,6 +112,32 @@ public DocumentImportEntity importDocuments(final Collection<?> values, final Do
105112
return executor.execute(importDocumentsRequest(values, options), DocumentImportEntity.class);
106113
}
107114

115+
@Override
116+
public Collection<DocumentImportEntity> importDocuments(Collection<?> values, DocumentImportOptions options,
117+
int batchSize, int numThreads) throws ArangoDBException {
118+
List<? extends List<?>> batches = ListUtils.partition(new ArrayList<>(values), batchSize);
119+
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
120+
List<CompletableFuture<DocumentImportEntity>> completableFutureList = new ArrayList<>();
121+
for (List<?> batch : batches) {
122+
CompletableFuture<DocumentImportEntity> completableFuture = CompletableFuture.supplyAsync(() -> {
123+
DocumentImportEntity documentImportEntity = importDocuments(batch, options);
124+
return documentImportEntity;
125+
}, executorService);
126+
completableFutureList.add(completableFuture);
127+
}
128+
List<DocumentImportEntity> documentImportEntityList = new ArrayList<>();
129+
for (CompletableFuture<DocumentImportEntity> completableFuture : completableFutureList) {
130+
DocumentImportEntity documentImportEntity = null;
131+
try {
132+
documentImportEntity = completableFuture.get();
133+
} catch (InterruptedException | ExecutionException e) {
134+
throw new ArangoDBException(e);
135+
}
136+
documentImportEntityList.add(documentImportEntity);
137+
}
138+
return documentImportEntityList;
139+
}
140+
108141
@Override
109142
public DocumentImportEntity importDocuments(final String values) throws ArangoDBException {
110143
return importDocuments(values, new DocumentImportOptions());

src/test/java/com/arangodb/ArangoCollectionTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1879,6 +1879,64 @@ public void importDocumentsJsonFromToPrefix() {
18791879
}
18801880
}
18811881

1882+
@Test
1883+
public void importDocumentsBatchSizeNumThreads() {
1884+
final Collection<BaseDocument> values = new ArrayList<BaseDocument>();
1885+
for (int i = 1; i <= 100; i++) {
1886+
values.add(new BaseDocument(String.valueOf(i)));
1887+
}
1888+
int batchSize = 5;
1889+
int numThreads = 8;
1890+
final Collection<DocumentImportEntity> docsList = db.collection(COLLECTION_NAME).importDocuments(values,
1891+
new DocumentImportOptions(), batchSize, numThreads);
1892+
assertThat(docsList.size(), is(values.size() / batchSize));
1893+
for (final DocumentImportEntity docs : docsList) {
1894+
assertThat(docs, is(notNullValue()));
1895+
assertThat(docs.getCreated(), is(batchSize));
1896+
assertThat(docs.getEmpty(), is(0));
1897+
assertThat(docs.getErrors(), is(0));
1898+
assertThat(docs.getIgnored(), is(0));
1899+
assertThat(docs.getUpdated(), is(0));
1900+
assertThat(docs.getDetails(), is(empty()));
1901+
}
1902+
}
1903+
1904+
@Test
1905+
public void importDocumentsBatchSizeNumThreadsIllegalBatchSize() {
1906+
final Collection<BaseDocument> values = new ArrayList<BaseDocument>();
1907+
for (int i = 1; i <= 10; i++) {
1908+
values.add(new BaseDocument(String.valueOf(i)));
1909+
}
1910+
1911+
int batchSize = 0;
1912+
int numThreads = 8;
1913+
1914+
try {
1915+
final Collection<DocumentImportEntity> docsList = db.collection(COLLECTION_NAME).importDocuments(values,
1916+
new DocumentImportOptions(), batchSize, numThreads);
1917+
fail();
1918+
} catch (IllegalArgumentException e) {
1919+
}
1920+
}
1921+
1922+
@Test
1923+
public void importDocumentsBatchSizeNumThreadsIllegalNumThreads() {
1924+
final Collection<BaseDocument> values = new ArrayList<BaseDocument>();
1925+
for (int i = 1; i <= 10; i++) {
1926+
values.add(new BaseDocument(String.valueOf(i)));
1927+
}
1928+
1929+
int batchSize = 5;
1930+
int numThreads = 0;
1931+
1932+
try {
1933+
final Collection<DocumentImportEntity> docsList = db.collection(COLLECTION_NAME).importDocuments(values,
1934+
new DocumentImportOptions(), batchSize, numThreads);
1935+
fail();
1936+
} catch (IllegalArgumentException e) {
1937+
}
1938+
}
1939+
18821940
@Test
18831941
public void deleteDocumentsByKey() {
18841942
final Collection<BaseDocument> values = new ArrayList<BaseDocument>();

0 commit comments

Comments
 (0)