Open
Description
What kind an issue is this?
- Bug report. If you’ve found a bug, please provide a code snippet or test to reproduce it below.
The easier it is to track down the bug, the faster it is solved. - Feature Request. Start by telling us what problem you’re trying to solve.
Often a solution already exists! Don’t send pull requests to implement new features without
first getting our support. Sometimes we leave features out on purpose to keep the project small.
Issue description
Arrays fields are not working with ES Update script. tags
is an array in shared example and it fails in transformation stage. Do we have reference to use ArrayType with es.update.script.params
Steps to reproduce
Code:
List<String> data = Collections.singletonList("{\"Context\":\"129\",\"MessageType\":{\"id\":\"1013\",\"content\":\"Hello World\"},\"Module\":\"1203\",\"time\":3249,\"critical\":0,\"id\":1, \"tags\":[\"user\",\"device\"]}");
SparkConf sparkConf = new SparkConf();
sparkConf.set("es.nodes", "localhost");
sparkConf.set("es.port", "9200");
sparkConf.set("es.net.ssl", "false");
sparkConf.set("es.nodes.wan.only", "true");
SparkSession session = SparkSession.builder().appName("SparkElasticSearchTest").master("local[*]").config(sparkConf).getOrCreate();
Dataset<Row> df = session.createDataset(data, Encoders.STRING()).toDF();
Dataset<String> df1 = df.as(Encoders.STRING());
Dataset<Row> df2 = session.read().json(df1.javaRDD());
df2.printSchema();
df2.show(false);
String script = "ctx._source.Context = params.Context; ctx._source.Module = params.Module; ctx._source.critical = params.critical; ctx._source.id = params.id; if (ctx._source.time == null) {ctx._source.time = params.time} ctx._source.MessageType = new HashMap(); ctx._source.MessageType.put('id', params.MessageTypeId); ctx._source.MessageType.put('content', params.MessageTypeContent); ctx._source.tags = params.tags";
String ja = "MessageTypeId:MessageType.id, MessageTypeContent:MessageType.content, Context:Context, Module:Module, time:time, critical:critical, id:id, tags:tags";
DataFrameWriter<Row> dsWriter = df2.write()
.format("org.elasticsearch.spark.sql")
.option(ConfigurationOptions.ES_NODES, "localhost")
.option(ConfigurationOptions.ES_PORT, "9200")
.option(ConfigurationOptions.ES_NET_USE_SSL, false)
.option(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")
.option(ConfigurationOptions.ES_MAPPING_ID, "id")
.option(ConfigurationOptions.ES_WRITE_OPERATION, ConfigurationOptions.ES_OPERATION_UPSERT)
.option(ConfigurationOptions.ES_UPDATE_SCRIPT_UPSERT, "true")
.option(ConfigurationOptions.ES_UPDATE_SCRIPT_INLINE, script)
.option(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "painless")
.option(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, ja);
dsWriter.mode("append");
dsWriter.save("user-details");
Strack trace:
org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row
at org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry(BulkEntryWriter.java:136)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:170)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:83)
at org.elasticsearch.spark.sql.EsSparkSQL$.$anonfun$saveToEs$1(EsSparkSQL.scala:103)
at org.elasticsearch.spark.sql.EsSparkSQL$.$anonfun$saveToEs$1$adapted(EsSparkSQL.scala:103)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row
at org.elasticsearch.spark.sql.DataFrameValueWriter.writeArray(DataFrameValueWriter.scala:75)
at org.elasticsearch.spark.sql.DataFrameValueWriter.write(DataFrameValueWriter.scala:69)
at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.doWrite(AbstractBulkFactory.java:153)
at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:123)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
at org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry(BulkEntryWriter.java:68)
... 12 more
Version Info
OS: : Any
JVM : 1.8
Hadoop/Spark: 3.2.1
ES-Hadoop : 8.2.2
ES : 7.10.2