Skip to content

Support for ArrayType in es.update.script.params #2036

Open
@vararo27

Description

@vararo27

What kind an issue is this?

  • Bug report. If you’ve found a bug, please provide a code snippet or test to reproduce it below.
    The easier it is to track down the bug, the faster it is solved.
  • Feature Request. Start by telling us what problem you’re trying to solve.
    Often a solution already exists! Don’t send pull requests to implement new features without
    first getting our support. Sometimes we leave features out on purpose to keep the project small.

Issue description

Arrays fields are not working with ES Update script. tags is an array in shared example and it fails in transformation stage. Do we have reference to use ArrayType with es.update.script.params

Steps to reproduce

Code:

        List<String> data = Collections.singletonList("{\"Context\":\"129\",\"MessageType\":{\"id\":\"1013\",\"content\":\"Hello World\"},\"Module\":\"1203\",\"time\":3249,\"critical\":0,\"id\":1, \"tags\":[\"user\",\"device\"]}");

        SparkConf sparkConf = new SparkConf();

        sparkConf.set("es.nodes", "localhost");
        sparkConf.set("es.port", "9200");
        sparkConf.set("es.net.ssl", "false");
        sparkConf.set("es.nodes.wan.only", "true");

        SparkSession session = SparkSession.builder().appName("SparkElasticSearchTest").master("local[*]").config(sparkConf).getOrCreate();

        Dataset<Row> df = session.createDataset(data, Encoders.STRING()).toDF();
        Dataset<String> df1 = df.as(Encoders.STRING());
        Dataset<Row> df2 = session.read().json(df1.javaRDD());

        df2.printSchema();
        df2.show(false);

        String script = "ctx._source.Context = params.Context; ctx._source.Module = params.Module; ctx._source.critical = params.critical; ctx._source.id = params.id; if (ctx._source.time == null) {ctx._source.time = params.time} ctx._source.MessageType = new HashMap(); ctx._source.MessageType.put('id', params.MessageTypeId); ctx._source.MessageType.put('content', params.MessageTypeContent); ctx._source.tags = params.tags";

        String ja = "MessageTypeId:MessageType.id, MessageTypeContent:MessageType.content, Context:Context, Module:Module, time:time, critical:critical, id:id, tags:tags";

        DataFrameWriter<Row> dsWriter = df2.write()
                .format("org.elasticsearch.spark.sql")
                .option(ConfigurationOptions.ES_NODES, "localhost")
                .option(ConfigurationOptions.ES_PORT, "9200")
                .option(ConfigurationOptions.ES_NET_USE_SSL, false)
                .option(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")
                .option(ConfigurationOptions.ES_MAPPING_ID, "id")
                .option(ConfigurationOptions.ES_WRITE_OPERATION, ConfigurationOptions.ES_OPERATION_UPSERT)
                .option(ConfigurationOptions.ES_UPDATE_SCRIPT_UPSERT, "true")
                .option(ConfigurationOptions.ES_UPDATE_SCRIPT_INLINE, script)
                .option(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "painless")
                .option(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, ja);


        dsWriter.mode("append");
        dsWriter.save("user-details");

Strack trace:

org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row
	at org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry(BulkEntryWriter.java:136)
	at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:170)
	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:83)
	at org.elasticsearch.spark.sql.EsSparkSQL$.$anonfun$saveToEs$1(EsSparkSQL.scala:103)
	at org.elasticsearch.spark.sql.EsSparkSQL$.$anonfun$saveToEs$1$adapted(EsSparkSQL.scala:103)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row
	at org.elasticsearch.spark.sql.DataFrameValueWriter.writeArray(DataFrameValueWriter.scala:75)
	at org.elasticsearch.spark.sql.DataFrameValueWriter.write(DataFrameValueWriter.scala:69)
	at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.doWrite(AbstractBulkFactory.java:153)
	at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:123)
	at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
	at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
	at org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry(BulkEntryWriter.java:68)
	... 12 more

Version Info

OS: : Any
JVM : 1.8
Hadoop/Spark: 3.2.1
ES-Hadoop : 8.2.2
ES : 7.10.2

Feature description

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions