From 971710578b48bf4b5190a4bcb596a7b7080659fd Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Wed, 2 Feb 2022 13:30:18 -0600 Subject: [PATCH 1/5] Adding support for commas in field names --- .../hadoop/util/StringUtils.java | 69 ++++++++++++------- .../hadoop/util/StringUtilsTest.java | 32 +++++++++ .../integration/AbstractScalaEsSparkSQL.scala | 25 ++++++- 3 files changed, 100 insertions(+), 26 deletions(-) diff --git a/mr/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java b/mr/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java index 866e13768..c899c8b84 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java @@ -29,6 +29,7 @@ import java.util.Collections; import java.util.List; import java.util.StringTokenizer; +import java.util.stream.Collectors; /** @@ -126,20 +127,44 @@ public static List tokenize(String string, String delimiters, boolean tr if (!StringUtils.hasText(string)) { return Collections.emptyList(); } - StringTokenizer st = new StringTokenizer(string, delimiters); - List tokens = new ArrayList(); - while (st.hasMoreTokens()) { - String token = st.nextToken(); - if (trimTokens) { - token = token.trim(); + List tokens = new ArrayList<>(); + char[] delims = delimiters.toCharArray(); + StringBuilder currentToken = new StringBuilder(); + boolean inQuotedToken = false; + for (char character : string.toCharArray()) { + if (character == '\"') { + inQuotedToken = !inQuotedToken; } - if (!ignoreEmptyTokens || token.length() > 0) { - tokens.add(token); + else if (inQuotedToken == false && isCharacterInArray(character, delims)) { + addTokenToList(tokens, currentToken, trimTokens, ignoreEmptyTokens); + currentToken = new StringBuilder(); + } else { + currentToken.append(character); } } + addTokenToList(tokens, currentToken, trimTokens, ignoreEmptyTokens); return tokens; } + private static void addTokenToList(List tokens, StringBuilder newToken, boolean trimTokens, boolean ignoreEmptyTokens) { + String token = newToken.toString(); + if (trimTokens) { + token = token.trim(); + } + if (!ignoreEmptyTokens || token.length() > 0) { + tokens.add(token); + } + } + + private static boolean isCharacterInArray(char character, char[] charArray) { + for (char arrayChar : charArray) { + if (character == arrayChar) { + return true; + } + } + return false; + } + public static String concatenate(Collection list) { return concatenate(list, DEFAULT_DELIMITER); } @@ -151,15 +176,10 @@ public static String concatenate(Collection list, String delimiter) { if (delimiter == null) { delimiter = EMPTY; } - StringBuilder sb = new StringBuilder(); - - for (Object object : list) { - sb.append(object.toString()); - sb.append(delimiter); - } - - sb.setLength(sb.length() - delimiter.length()); - return sb.toString(); + final String finalDelimiter = delimiter; + return list.stream().map(item -> item.toString()) + .map(token -> optionallyWrapToken(token, finalDelimiter)) + .collect(Collectors.joining(delimiter)); } public static String concatenate(Object[] array, String delimiter) { @@ -169,15 +189,14 @@ public static String concatenate(Object[] array, String delimiter) { if (delimiter == null) { delimiter = EMPTY; } + final String finalDelimiter = delimiter; + return Arrays.stream(array).map(item -> item.toString()) + .map(token -> optionallyWrapToken(token, finalDelimiter)) + .collect(Collectors.joining(delimiter)); + } - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < array.length; i++) { - if (i > 0) { - sb.append(delimiter); - } - sb.append(array[i]); - } - return sb.toString(); + private static String optionallyWrapToken(String token, String delimiter) { + return token.contains(delimiter) ? "\"" + token + "\"" : token; } public static String deleteWhitespace(CharSequence sequence) { diff --git a/mr/src/test/java/org/elasticsearch/hadoop/util/StringUtilsTest.java b/mr/src/test/java/org/elasticsearch/hadoop/util/StringUtilsTest.java index 5ddfb1623..e38ea5887 100644 --- a/mr/src/test/java/org/elasticsearch/hadoop/util/StringUtilsTest.java +++ b/mr/src/test/java/org/elasticsearch/hadoop/util/StringUtilsTest.java @@ -20,6 +20,10 @@ import org.junit.Test; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + import static org.junit.Assert.*; public class StringUtilsTest { @@ -72,4 +76,32 @@ public void testSingularIndexNames() { assertFalse(StringUtils.isValidSingularIndexName("abc{date|yyyy-MM-dd}defg")); } + + @Test + public void testTokenize() { + List test1 = Arrays.asList(new String[]{"this", "is a", "test"}); + String concatenatedString = StringUtils.concatenate(test1); + List tokens = StringUtils.tokenize(concatenatedString, ",", true, true); + assertEquals(test1, tokens); + + List test2 = Arrays.asList(new String[]{"this", " is a", " test ", " "}); + concatenatedString = StringUtils.concatenate(test2); + tokens = StringUtils.tokenize(concatenatedString, ",", false, false); + assertEquals(test2, tokens); + + List test3 = Arrays.asList(new String[]{"this", "is, a", "test"}); + concatenatedString = StringUtils.concatenate(test3); + tokens = StringUtils.tokenize(concatenatedString, ",", true, true); + assertEquals(test3, tokens); + + Object[] test4 = new String[]{"this", "is, a", "test"}; + concatenatedString = StringUtils.concatenate(test4, ";"); + tokens = StringUtils.tokenize(concatenatedString, ";", true, true); + assertEquals(Arrays.asList(test4), tokens); + + List test5 = Arrays.asList(new String[]{"this", "is, a", "test"}); + concatenatedString = StringUtils.concatenate(test5, ","); + tokens = StringUtils.tokenize(concatenatedString, ";,", true, true); + assertEquals(test5, tokens); + } } diff --git a/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala index 53255bf62..b6de5d2cc 100644 --- a/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala +++ b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala @@ -2284,7 +2284,30 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus assertEquals(nested(0).getLong(1), 6) } - + @Test + def testCommasInFieldNames(): Unit = { + val index = wrapIndex("commas-in-names-index") + val typed = "data" + val (target, docPath) = makeTargets(index, typed) + val mapping = wrapMapping("data", s"""{ + | "dynamic": "strict", + | "properties" : { + | "some column with a, comma and then some" : { + | "type" : "keyword" + | } + | } + | } + """.stripMargin) + RestUtils.touch(index) + RestUtils.putMapping(index, typed, mapping.getBytes(StringUtils.UTF_8)) + RestUtils.postData(docPath, "{\"some column with a, comma and then some\": \"sdfdsf\"}".getBytes("UTF-8")) + RestUtils.refresh(target) + val df = sqc.read.format("es").load(index) + df.printSchema() + df.show() + assertEquals(1, df.count()) + } + @Test def testMultiIndexes() { // add some data From a333af2a6fcb5323d9934ec396323306e6adbcd4 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Tue, 8 Feb 2022 13:23:17 -0600 Subject: [PATCH 2/5] Trying to force a build --- .../spark/integration/AbstractScalaEsSparkSQL.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala index b6de5d2cc..99e3c601e 100644 --- a/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala +++ b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala @@ -2292,7 +2292,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus val mapping = wrapMapping("data", s"""{ | "dynamic": "strict", | "properties" : { - | "some column with a, comma and then some" : { + | "some column with a comma, and then some" : { | "type" : "keyword" | } | } @@ -2300,7 +2300,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus """.stripMargin) RestUtils.touch(index) RestUtils.putMapping(index, typed, mapping.getBytes(StringUtils.UTF_8)) - RestUtils.postData(docPath, "{\"some column with a, comma and then some\": \"sdfdsf\"}".getBytes("UTF-8")) + RestUtils.postData(docPath, "{\"some column with a comma, and then some\": \"sdfdsf\"}".getBytes("UTF-8")) RestUtils.refresh(target) val df = sqc.read.format("es").load(index) df.printSchema() From 1ee69928108e3da9e3f29fc8092067e5f83b35f9 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Thu, 10 Feb 2022 14:12:04 -0600 Subject: [PATCH 3/5] Switching to http encoding --- .../hadoop/util/StringUtils.java | 69 +++++++------------ .../integration/AbstractScalaEsSparkSQL.scala | 25 ++++++- .../integration/AbstractScalaEsSparkSQL.scala | 25 ++++++- 3 files changed, 74 insertions(+), 45 deletions(-) diff --git a/mr/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java b/mr/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java index c899c8b84..dfc4d10fd 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java @@ -21,6 +21,7 @@ import org.elasticsearch.hadoop.EsHadoopIllegalStateException; import org.elasticsearch.hadoop.serialization.json.BackportedJsonStringEncoder; import org.elasticsearch.hadoop.thirdparty.codehaus.jackson.io.JsonStringEncoder; +import org.elasticsearch.hadoop.util.encoding.HttpEncodingTools; import java.nio.charset.Charset; import java.util.ArrayList; @@ -127,44 +128,20 @@ public static List tokenize(String string, String delimiters, boolean tr if (!StringUtils.hasText(string)) { return Collections.emptyList(); } - List tokens = new ArrayList<>(); - char[] delims = delimiters.toCharArray(); - StringBuilder currentToken = new StringBuilder(); - boolean inQuotedToken = false; - for (char character : string.toCharArray()) { - if (character == '\"') { - inQuotedToken = !inQuotedToken; + StringTokenizer st = new StringTokenizer(string, delimiters); + List tokens = new ArrayList(); + while (st.hasMoreTokens()) { + String token = st.nextToken(); + if (trimTokens) { + token = token.trim(); } - else if (inQuotedToken == false && isCharacterInArray(character, delims)) { - addTokenToList(tokens, currentToken, trimTokens, ignoreEmptyTokens); - currentToken = new StringBuilder(); - } else { - currentToken.append(character); + if (!ignoreEmptyTokens || token.length() > 0) { + tokens.add(HttpEncodingTools.decode(token)); } } - addTokenToList(tokens, currentToken, trimTokens, ignoreEmptyTokens); return tokens; } - private static void addTokenToList(List tokens, StringBuilder newToken, boolean trimTokens, boolean ignoreEmptyTokens) { - String token = newToken.toString(); - if (trimTokens) { - token = token.trim(); - } - if (!ignoreEmptyTokens || token.length() > 0) { - tokens.add(token); - } - } - - private static boolean isCharacterInArray(char character, char[] charArray) { - for (char arrayChar : charArray) { - if (character == arrayChar) { - return true; - } - } - return false; - } - public static String concatenate(Collection list) { return concatenate(list, DEFAULT_DELIMITER); } @@ -176,10 +153,15 @@ public static String concatenate(Collection list, String delimiter) { if (delimiter == null) { delimiter = EMPTY; } - final String finalDelimiter = delimiter; - return list.stream().map(item -> item.toString()) - .map(token -> optionallyWrapToken(token, finalDelimiter)) - .collect(Collectors.joining(delimiter)); + StringBuilder sb = new StringBuilder(); + + for (Object object : list) { + sb.append(HttpEncodingTools.encode(object.toString())); + sb.append(delimiter); + } + + sb.setLength(sb.length() - delimiter.length()); + return sb.toString(); } public static String concatenate(Object[] array, String delimiter) { @@ -189,14 +171,15 @@ public static String concatenate(Object[] array, String delimiter) { if (delimiter == null) { delimiter = EMPTY; } - final String finalDelimiter = delimiter; - return Arrays.stream(array).map(item -> item.toString()) - .map(token -> optionallyWrapToken(token, finalDelimiter)) - .collect(Collectors.joining(delimiter)); - } - private static String optionallyWrapToken(String token, String delimiter) { - return token.contains(delimiter) ? "\"" + token + "\"" : token; + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < array.length; i++) { + if (i > 0) { + sb.append(delimiter); + } + sb.append(HttpEncodingTools.encode(array[i].toString())); + } + return sb.toString(); } public static String deleteWhitespace(CharSequence sequence) { diff --git a/spark/sql-13/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala b/spark/sql-13/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala index 0cd96120f..d0cf23f1a 100644 --- a/spark/sql-13/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala +++ b/spark/sql-13/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala @@ -2223,7 +2223,30 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus assertEquals(nested(0).getLong(1), 6) } - + @Test + def testCommasInFieldNames(): Unit = { + val index = wrapIndex("commas-in-names-index") + val typed = "data" + val (target, docPath) = makeTargets(index, typed) + val mapping = wrapMapping("data", s"""{ + | "dynamic": "strict", + | "properties" : { + | "some column with a comma, and then some" : { + | "type" : "keyword" + | } + | } + | } + """.stripMargin) + RestUtils.touch(index) + RestUtils.putMapping(index, typed, mapping.getBytes(StringUtils.UTF_8)) + RestUtils.postData(docPath, "{\"some column with a comma, and then some\": \"sdfdsf\"}".getBytes("UTF-8")) + RestUtils.refresh(target) + val df = sqc.read.format("es").load(index) + df.printSchema() + df.show() + assertEquals(1, df.count()) + } + @Test def testMultiIndexes() { // add some data diff --git a/spark/sql-20/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala b/spark/sql-20/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala index fd146041a..a76d08885 100644 --- a/spark/sql-20/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala +++ b/spark/sql-20/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala @@ -2284,7 +2284,30 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus assertEquals(nested(0).getLong(1), 6) } - + @Test + def testCommasInFieldNames(): Unit = { + val index = wrapIndex("commas-in-names-index") + val typed = "data" + val (target, docPath) = makeTargets(index, typed) + val mapping = wrapMapping("data", s"""{ + | "dynamic": "strict", + | "properties" : { + | "some column with a comma, and then some" : { + | "type" : "keyword" + | } + | } + | } + """.stripMargin) + RestUtils.touch(index) + RestUtils.putMapping(index, typed, mapping.getBytes(StringUtils.UTF_8)) + RestUtils.postData(docPath, "{\"some column with a comma, and then some\": \"sdfdsf\"}".getBytes("UTF-8")) + RestUtils.refresh(target) + val df = sqc.read.format("es").load(index) + df.printSchema() + df.show() + assertEquals(1, df.count()) + } + @Test def testMultiIndexes() { // add some data From b6be271ccfc6614a2899ff98d767aa90c32503ba Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Thu, 10 Feb 2022 14:13:03 -0600 Subject: [PATCH 4/5] optimizing imports --- mr/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java | 1 - 1 file changed, 1 deletion(-) diff --git a/mr/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java b/mr/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java index dfc4d10fd..fc2a16591 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java @@ -30,7 +30,6 @@ import java.util.Collections; import java.util.List; import java.util.StringTokenizer; -import java.util.stream.Collectors; /** From 7bd4997b4eb268a999670b2dcf91a625fe6fdb41 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Thu, 10 Feb 2022 15:00:19 -0600 Subject: [PATCH 5/5] Reverting back to the escaped commas solution --- .../hadoop/util/StringUtils.java | 70 ++++++++++++------- 1 file changed, 44 insertions(+), 26 deletions(-) diff --git a/mr/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java b/mr/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java index fc2a16591..c899c8b84 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java @@ -21,7 +21,6 @@ import org.elasticsearch.hadoop.EsHadoopIllegalStateException; import org.elasticsearch.hadoop.serialization.json.BackportedJsonStringEncoder; import org.elasticsearch.hadoop.thirdparty.codehaus.jackson.io.JsonStringEncoder; -import org.elasticsearch.hadoop.util.encoding.HttpEncodingTools; import java.nio.charset.Charset; import java.util.ArrayList; @@ -30,6 +29,7 @@ import java.util.Collections; import java.util.List; import java.util.StringTokenizer; +import java.util.stream.Collectors; /** @@ -127,20 +127,44 @@ public static List tokenize(String string, String delimiters, boolean tr if (!StringUtils.hasText(string)) { return Collections.emptyList(); } - StringTokenizer st = new StringTokenizer(string, delimiters); - List tokens = new ArrayList(); - while (st.hasMoreTokens()) { - String token = st.nextToken(); - if (trimTokens) { - token = token.trim(); + List tokens = new ArrayList<>(); + char[] delims = delimiters.toCharArray(); + StringBuilder currentToken = new StringBuilder(); + boolean inQuotedToken = false; + for (char character : string.toCharArray()) { + if (character == '\"') { + inQuotedToken = !inQuotedToken; } - if (!ignoreEmptyTokens || token.length() > 0) { - tokens.add(HttpEncodingTools.decode(token)); + else if (inQuotedToken == false && isCharacterInArray(character, delims)) { + addTokenToList(tokens, currentToken, trimTokens, ignoreEmptyTokens); + currentToken = new StringBuilder(); + } else { + currentToken.append(character); } } + addTokenToList(tokens, currentToken, trimTokens, ignoreEmptyTokens); return tokens; } + private static void addTokenToList(List tokens, StringBuilder newToken, boolean trimTokens, boolean ignoreEmptyTokens) { + String token = newToken.toString(); + if (trimTokens) { + token = token.trim(); + } + if (!ignoreEmptyTokens || token.length() > 0) { + tokens.add(token); + } + } + + private static boolean isCharacterInArray(char character, char[] charArray) { + for (char arrayChar : charArray) { + if (character == arrayChar) { + return true; + } + } + return false; + } + public static String concatenate(Collection list) { return concatenate(list, DEFAULT_DELIMITER); } @@ -152,15 +176,10 @@ public static String concatenate(Collection list, String delimiter) { if (delimiter == null) { delimiter = EMPTY; } - StringBuilder sb = new StringBuilder(); - - for (Object object : list) { - sb.append(HttpEncodingTools.encode(object.toString())); - sb.append(delimiter); - } - - sb.setLength(sb.length() - delimiter.length()); - return sb.toString(); + final String finalDelimiter = delimiter; + return list.stream().map(item -> item.toString()) + .map(token -> optionallyWrapToken(token, finalDelimiter)) + .collect(Collectors.joining(delimiter)); } public static String concatenate(Object[] array, String delimiter) { @@ -170,15 +189,14 @@ public static String concatenate(Object[] array, String delimiter) { if (delimiter == null) { delimiter = EMPTY; } + final String finalDelimiter = delimiter; + return Arrays.stream(array).map(item -> item.toString()) + .map(token -> optionallyWrapToken(token, finalDelimiter)) + .collect(Collectors.joining(delimiter)); + } - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < array.length; i++) { - if (i > 0) { - sb.append(delimiter); - } - sb.append(HttpEncodingTools.encode(array[i].toString())); - } - return sb.toString(); + private static String optionallyWrapToken(String token, String delimiter) { + return token.contains(delimiter) ? "\"" + token + "\"" : token; } public static String deleteWhitespace(CharSequence sequence) {