diff --git a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java index 37f611774..f2a2ac28c 100644 --- a/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java +++ b/java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/IngesterOperation.java @@ -80,7 +80,7 @@ private static IngesterOperation createOperation(BulkOperation operation, JsonpM BinaryData binaryDoc = BinaryData.of(create.document(), mapper); size += binaryDoc.size(); newOperation = BulkOperation.of(bo -> bo.create(idx -> { - copyBaseProperties(create, idx); + copyCreateProperties(create, idx); return idx.document(binaryDoc); })); } @@ -102,7 +102,7 @@ private static IngesterOperation indexOperation(BulkOperation operation, JsonpMa BinaryData binaryDoc = BinaryData.of(index.document(), mapper); size += binaryDoc.size(); newOperation = BulkOperation.of(bo -> bo.index(idx -> { - copyBaseProperties(index, idx); + copyIndexProperties(index, idx); return idx.document(binaryDoc); })); } @@ -154,6 +154,18 @@ private static void copyBaseProperties(BulkOperationBase op, BulkOperationBase.A .versionType(op.versionType()); } + private static void copyIndexProperties(IndexOperation op, IndexOperation.Builder builder) { + copyBaseProperties(op, builder); + builder.pipeline(op.pipeline()); + builder.requireAlias(op.requireAlias()); + } + + private static void copyCreateProperties(CreateOperation op, CreateOperation.Builder builder) { + copyBaseProperties(op, builder); + builder.pipeline(op.pipeline()); + builder.requireAlias(op.requireAlias()); + } + private static int size(String name, @Nullable Boolean value) { if (value != null) { return name.length() + 12; // 12 added chars for "name":"false", diff --git a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java index 80d9daec2..4353a696d 100644 --- a/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java +++ b/java-client/src/test/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngesterTest.java @@ -29,6 +29,7 @@ import co.elastic.clients.elasticsearch.core.bulk.OperationType; import co.elastic.clients.elasticsearch.end_to_end.RequestTest; import co.elastic.clients.json.JsonpMapper; +import co.elastic.clients.json.JsonpUtils; import co.elastic.clients.json.SimpleJsonpMapper; import co.elastic.clients.transport.ElasticsearchTransport; import co.elastic.clients.transport.Endpoint; @@ -358,6 +359,28 @@ public void beforeBulk(long executionId, BulkRequest request, List context assertEquals("bar", storedRequest.get().routing()); } + @Test + public void pipelineTest() { + String json = "{\"create\":{\"_id\":\"some_id\",\"_index\":\"some_idx\",\"pipeline\":\"pipe\",\"require_alias\":true}}"; + JsonpMapper mapper = new SimpleJsonpMapper(); + + BulkOperation create = BulkOperation.of(o -> o.create(c -> c + .pipeline("pipe") + .requireAlias(true) + .index("some_idx") + .id("some_id") + .document("Some doc") + )); + + String createStr = JsonpUtils.toJsonString(create, mapper); + assertEquals(json, createStr); + + BulkOperation create1 = IngesterOperation.of(create, mapper).operation(); + + String create1Str = JsonpUtils.toJsonString(create1, mapper); + assertEquals(json, create1Str); + } + @Test public void endToEndTest() throws Exception { String index = "bulk-ingester-test";