Skip to content

Adding support for update output mode to structured streaming #1839

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

masseyke
Copy link
Member

This commit adds support for "update" as the output mode for spark structured streaming to Elasticsearch.
Closes #1123

@masseyke
Copy link
Member Author

I'm new to structured streaming, but I think that this does what we'd expect. That is, if you set output mode to "update", then it inserts documents if they didn't exist before, and only updates documents that have changed. That's what my understanding from https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#basic-concepts is of the expected behavior. In addition to the integration test that's part of this PR, I did a manual test (using the docker image at https://github.com/masseyke/es-spark-docker)

import org.apache.spark.sql.SparkSession
case class Person(id: String, name: String, surname: String, age: Int)
val people = spark.readStream.textFile("/test/*").map(_.split(",")).map(p => Person(p(0), p(1).trim, p(2).trim, p(3).trim.toInt))
val appendStream = people.writeStream.outputMode("append").option("checkpointLocation", "/save/location").option("es.mapping.id", "name").format("es").start("people")

Then I put a 4-row, 4-col csv file in hdfs at /test/, and made sure those 4 rows showed up as documents in Elasticsearch. And back in spark shell:

appendStream.stop

And then I added a second csv file to /test/ in HDFS that had the same rows as the first csv file but one of them had an updated value. Then back in spark shell:

val updateStream = people.writeStream.outputMode("update").option("checkpointLocation", "/save/location").option("es.mapping.id", "name").format("es").start("people")

Then with a query to Elasticsearch that returns version (GET people/_search?version=true) I verified that only the one document had been updated.
As @jbaiera mentioned in #1123, I'm intentionally blowing up if an "es.mapping.id" is not provided since we can't really update otherwise. Also I'm intentionally blowing up if "es.write.operation" is set to something other than "upsert" since based on the spark documentation upsert is what we want here. If it is unset, I set it to upsert.

@masseyke masseyke marked this pull request as ready for review January 14, 2022 22:00
Copy link
Member

@jbaiera jbaiera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

* Waits until all inputs are processed on the streaming query, but leaves the query open with the listener still in place, expecting
* another batch of inputs.
*/
def waitForPartialCompletion(): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

@masseyke masseyke merged commit 3ef547d into elastic:master Jan 20, 2022
@masseyke masseyke deleted the feature/structured-streaming-update branch January 20, 2022 14:12
masseyke added a commit that referenced this pull request Jan 24, 2022
…1878)

AbstractMROldApiSaveTest.testUpdateWithoutId broke when #1839 was merged because we are now failing earlier if
upsert is used but es.mapping.id is not set. The exception is now the same as the one you get when update is not
configured to use es.mapping.id.
Relates #1839 #69
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support UPDATE output mode for Spark Structured Streaming
2 participants