Skip to content

Error while encoding: java.lang.RuntimeException: java.lang.Integer is not a valid external type for schema of double #2055

Open
@sarath-mec

Description

@sarath-mec

What kind an issue is this?

  • Bug report. If you’ve found a bug, please provide a code snippet or test to reproduce it below.
    The easier it is to track down the bug, the faster it is solved.
  • Feature Request. Start by telling us what problem you’re trying to solve.
    Often a solution already exists! Don’t send pull requests to implement new features without
    first getting our support. Sometimes we leave features out on purpose to keep the project small.

Issue description

Trying a simple Example of Reading ElasticSearch GeoIP data from Sample kibana_sample_data_ecommerce
https://www.elastic.co/guide/en/kibana/8.6/get-started.html

If we try to Read the geoip.location field which is a geoip filed , df.show() will error with the following message

java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.Integer is not a valid external type for schema of double

Tried with various options without any success

  • .option("es.mapping.date.rich", False)
  • .option("es.field.read.empty.as.null", False)

Steps to reproduce

Code:

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("SparkTest").getOrCreate()

df = (spark.read
      .format( "org.elasticsearch.spark.sql" )
      .option( "es.nodes", "<host_ip>" )
      .option( "es.port", "9200")
      .option( "es.resource", "kibana_sample_data_ecommerce" )
      .option("es.read.metadata", True)
      .option("es.read.field.as.array.include", "products,manufacturer,category,sku")
      .option("es.mapping.date.rich", False)
      .option("es.field.read.empty.as.null", False)
      #.option("es.read.field.exclude", "geoip.location")
      .option( "es.query", "?q=*:*" )
      .load()
      )
df.printSchema()
df.count()

Run
df.show()

Strack trace:

23/01/12 19:21:54 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 13.0 (TID 22) (cluster-a104-w-0.c.api-staging-367614.internal executor 2): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.Integer is not a valid external type for schema of double
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -1), StringType), true, false), validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, category), ArrayType(StringType,true)), None) AS category#1236
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, currency), StringType), true, false) AS currency#1237
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, customer_birth_date), StringType), true, false) AS customer_birth_date#1238
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, customer_first_name), StringType), true, false) AS customer_first_name#1239
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, customer_full_name), StringType), true, false) AS customer_full_name#1240
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, customer_gender), StringType), true, false) AS customer_gender#1241
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, customer_id), StringType), true, false) AS customer_id#1242
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, customer_last_name), StringType), true, false) AS customer_last_name#1243
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, customer_phone), StringType), true, false) AS customer_phone#1244
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, day_of_week), StringType), true, false) AS day_of_week#1245
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 10, day_of_week_i), IntegerType) AS day_of_week_i#1246
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 11, email), StringType), true, false) AS email#1247
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else named_struct(dataset, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 12, event), StructField(dataset,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 12, event), StructField(dataset,StringType,true)), 0, dataset), StringType), true, false)) AS event#1248
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else named_struct(city_name, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, geoip), StructField(city_name,StringType,true), StructField(continent_name,StringType,true), StructField(country_iso_code,StringType,true), StructField(location,StructType(StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)),true), StructField(region_name,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, geoip), StructField(city_name,StringType,true), StructField(continent_name,StringType,true), StructField(country_iso_code,StringType,true), StructField(location,StructType(StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)),true), StructField(region_name,StringType,true)), 0, city_name), StringType), true, false), continent_name, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, geoip), StructField(city_name,StringType,true), StructField(continent_name,StringType,true), StructField(country_iso_code,StringType,true), StructField(location,StructType(StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)),true), StructField(region_name,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, geoip), StructField(city_name,StringType,true), StructField(continent_name,StringType,true), StructField(country_iso_code,StringType,true), StructField(location,StructType(StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)),true), StructField(region_name,StringType,true)), 1, continent_name), StringType), true, false), country_iso_code, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, geoip), StructField(city_name,StringType,true), StructField(continent_name,StringType,true), StructField(country_iso_code,StringType,true), StructField(location,StructType(StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)),true), StructField(region_name,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, geoip), StructField(city_name,StringType,true), StructField(continent_name,StringType,true), StructField(country_iso_code,StringType,true), StructField(location,StructType(StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)),true), StructField(region_name,StringType,true)), 2, country_iso_code), StringType), true, false), location, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, geoip), StructField(city_name,StringType,true), StructField(continent_name,StringType,true), StructField(country_iso_code,StringType,true), StructField(location,StructType(StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)),true), StructField(region_name,StringType,true)).isNullAt) null else named_struct(lat, if (validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, geoip), StructField(city_name,StringType,true), StructField(continent_name,StringType,true), StructField(country_iso_code,StringType,true), StructField(location,StructType(StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)),true), StructField(region_name,StringType,true)), 3, location), StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)).isNullAt) null else validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, geoip), StructField(city_name,StringType,true), StructField(continent_name,StringType,true), StructField(country_iso_code,StringType,true), StructField(location,StructType(StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)),true), StructField(region_name,StringType,true)), 3, location), StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)), 0, lat), DoubleType), lon, if (validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, geoip), StructField(city_name,StringType,true), StructField(continent_name,StringType,true), StructField(country_iso_code,StringType,true), StructField(location,StructType(StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)),true), StructField(region_name,StringType,true)), 3, location), StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)).isNullAt) null else validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, geoip), StructField(city_name,StringType,true), StructField(continent_name,StringType,true), StructField(country_iso_code,StringType,true), StructField(location,StructType(StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)),true), StructField(region_name,StringType,true)), 3, location), StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)), 1, lon), DoubleType)), region_name, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, geoip), StructField(city_name,StringType,true), StructField(continent_name,StringType,true), StructField(country_iso_code,StringType,true), StructField(location,StructType(StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)),true), StructField(region_name,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, geoip), StructField(city_name,StringType,true), StructField(continent_name,StringType,true), StructField(country_iso_code,StringType,true), StructField(location,StructType(StructField(lat,DoubleType,true), StructField(lon,DoubleType,true)),true), StructField(region_name,StringType,true)), 4, region_name), StringType), true, false)) AS geoip#1249
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -2), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -2), StringType), true, false), validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 14, manufacturer), ArrayType(StringType,true)), None) AS manufacturer#1250
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 15, order_date), StringType), true, false) AS order_date#1251
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 16, order_id), StringType), true, false) AS order_id#1252
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), if (isnull(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)))) null else named_struct(_id, if (validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)), 0, _id), StringType), true, false), base_price, if (validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)).isNullAt) null else validateexternaltype(getexternalrowfield(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)), 1, base_price), FloatType), base_unit_price, if (validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)).isNullAt) null else validateexternaltype(getexternalrowfield(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)), 2, base_unit_price), FloatType), category, if (validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)), 3, category), StringType), true, false), created_on, if (validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)), 4, created_on), StringType), true, false), discount_amount, if (validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)).isNullAt) null else validateexternaltype(getexternalrowfield(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)), 5, discount_amount), FloatType), discount_percentage, if (validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)).isNullAt) null else validateexternaltype(getexternalrowfield(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)), 6, discount_percentage), FloatType), manufacturer, if (validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)), 7, manufacturer), StringType), true, false), min_price, if (validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)).isNullAt) null else validateexternaltype(getexternalrowfield(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)), 8, min_price), FloatType), price, if (validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)).isNullAt) null else validateexternaltype(getexternalrowfield(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)), 9, price), FloatType), product_id, if (validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)).isNullAt) null else validateexternaltype(getexternalrowfield(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)), 10, product_id), LongType), product_name, if (validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -3), StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)), 11, product_name), StringType), true, false), ... 12 more fields), validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 17, products), ArrayType(StructType(StructField(_id,StringType,true), StructField(base_price,FloatType,true), StructField(base_unit_price,FloatType,true), StructField(category,StringType,true), StructField(created_on,StringType,true), StructField(discount_amount,FloatType,true), StructField(discount_percentage,FloatType,true), StructField(manufacturer,StringType,true), StructField(min_price,FloatType,true), StructField(price,FloatType,true), StructField(product_id,LongType,true), StructField(product_name,StringType,true), StructField(quantity,IntegerType,true), StructField(sku,StringType,true), StructField(tax_amount,FloatType,true), StructField(taxful_price,FloatType,true), StructField(taxless_price,FloatType,true), StructField(unit_discount_amount,FloatType,true)),true)), None) AS products#1253
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -4), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(lambdavariable(MapObject, ObjectType(class java.lang.Object), true, -4), StringType), true, false), validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 18, sku), ArrayType(StringType,true)), None) AS sku#1254
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 19, taxful_total_price), FloatType) AS taxful_total_price#1255
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 20, taxless_total_price), FloatType) AS taxless_total_price#1256
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 21, total_quantity), IntegerType) AS total_quantity#1257
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 22, total_unique_products), IntegerType) AS total_unique_products#1258
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 23, type), StringType), true, false) AS type#1259
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 24, user), StringType), true, false) AS user#1260
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else newInstance(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData) AS _metadata#1261
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:213)
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:195)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:505)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:508)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: java.lang.Integer is not a valid external type for schema of double
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If_2$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.createNamedStruct_0_1$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_2_7$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:209)
	... 19 more

23/01/12 19:21:55 ERROR org.apache.spark.scheduler.TaskSetManager: Task 0 in stage 13.0 failed 4 times; aborting job
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Cell In[13], line 20
     18 df.printSchema()
     19 df.count()
---> 20 df.show()

File /usr/lib/spark/python/pyspark/sql/dataframe.py:484, in DataFrame.show(self, n, truncate, vertical)
    441 """Prints the first ``n`` rows to the console.
    442 
    443 .. versionadded:: 1.3.0
   (...)
    481  name | Bob
    482 """
    483 if isinstance(truncate, bool) and truncate:
--> 484     print(self._jdf.showString(n, 20, vertical))
    485 else:
    486     print(self._jdf.showString(n, int(truncate), vertical))

File /usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py:1304, in JavaMember.__call__(self, *args)
   1298 command = proto.CALL_COMMAND_NAME +\
   1299     self.command_header +\
   1300     args_command +\
   1301     proto.END_COMMAND_PART
   1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
   1305     answer, self.gateway_client, self.target_id, self.name)
   1307 for temp_arg in temp_args:
   1308     temp_arg._detach()

File /usr/lib/spark/python/pyspark/sql/utils.py:111, in capture_sql_exception.<locals>.deco(*a, **kw)
    109 def deco(*a, **kw):
    110     try:
--> 111         return f(*a, **kw)
    112     except py4j.protocol.Py4JJavaError as e:
    113         converted = convert_exception(e.java_exception)

File /usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o275.showString.

Version Info

Jar : elasticsearch-spark-30_2.12-8.6.0.jar

OS: : Google Data Proc 3 Node Default Cluster
JVM :
Hadoop/Spark:
ES-Hadoop :
ES :

PySpark Version Info:

      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.3
      /_/
                        
Using Scala version 2.12.14, OpenJDK 64-Bit Server VM, 1.8.0_352
Branch HEAD
Compiled by user  on 2022-11-01T22:00:39Z
Revision b28f046c307a8374984c0231d76debeb3a3beb97
Url https://bigdataoss-internal.googlesource.com/third_party/apache/spark``` 


Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions