Skip to content

MySQL / MSSQL : numeric.mapping doesn't work for DECIMAL fields #563

Open
@rmoff

Description

@rmoff

I can't get numeric.mapping to work with MySQL and Confluent Platform 5.1. Steps to reproduce below.

Create MySQL table:

use demo;

create table transactions (
	txn_id INT,
	customer_id INT,
	amount DECIMAL(5,2),
	currency VARCHAR(50),
	txn_timestamp VARCHAR(50)
);

insert into transactions (txn_id, customer_id, amount, currency, txn_timestamp) values (3, 2, 17.13, 'EUR', '2018-04-30T21:30:39Z');

Inspect table:

mysql> describe transactions;
+---------------+--------------+------+-----+---------+-------+
| Field         | Type         | Null | Key | Default | Extra |
+---------------+--------------+------+-----+---------+-------+
| txn_id        | int(11)      | YES  |     | NULL    |       |
| customer_id   | int(11)      | YES  |     | NULL    |       |
| amount        | decimal(5,2) | YES  |     | NULL    |       |
| currency      | varchar(50)  | YES  |     | NULL    |       |
| txn_timestamp | varchar(50)  | YES  |     | NULL    |       |
+---------------+--------------+------+-----+---------+-------+
5 rows in set (0.00 sec)

Create connector

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
          "name": "jdbc_source_mysql_12a",
          "config": {
                  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                  "connection.url": "jdbc:mysql://mysql:3306/demo",
                  "connection.user": "connect_user",
                  "connection.password": "asgard",
                  "topic.prefix": "mysql-12a-",
                  "numeric.mapping": "best_fit",
                  "table.whitelist" : "demo.transactions",
                  "mode":"bulk",
                  "poll.interval.ms" : 3600000
                  }
          }'

Even though "numeric.mapping": "best_fit", Kafka Connect stores the DECIMAL(5,2) as a Decimal, serialised to bytes in Avro:

$ curl -s "http://localhost:8081/subjects/mysql-12a-transactions-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name == "amount")'
{
  "name": "amount",
  "type": [
    "null",
    {
      "type": "bytes",
      "scale": 2,
      "precision": 64,
      "connect.version": 1,
      "connect.parameters": {
        "scale": "2"
      },
      "connect.name": "org.apache.kafka.connect.data.Decimal",
      "logicalType": "decimal"
    }
  ],
  "default": null
}

Connect Worker log excerpt:


INFO Kafka version : 2.1.0-cp1 (org.apache.kafka.common.utils.AppInfoParser)
…
INFO JdbcSourceTaskConfig values:
 batch.max.rows = 100
 catalog.pattern = null
 connection.attempts = 3
 connection.backoff.ms = 10000
 connection.password = [hidden]
 connection.url = jdbc:mysql://mysql:3306/demo
 connection.user = connect_user
 dialect.name =
 incrementing.column.name =
 mode = bulk
 numeric.mapping = best_fit
 numeric.precision.mapping = false
 poll.interval.ms = 3600000
 query =
 schema.pattern = null
 table.blacklist = []
 table.poll.interval.ms = 60000
 table.types = [TABLE]
 table.whitelist = [demo.transactions]
 tables = [`demo`.`transactions`]
 timestamp.column.name = []
 timestamp.delay.interval.ms = 0
 topic.prefix = mysql-12a-
 validate.non.null = true
 (io.confluent.connect.jdbc.source.JdbcSourceTaskConfig)
…
DEBUG Checking for next block of results from BulkTableQuerier{table='"demo"."transactions"', query='null', topicPrefix='mysql-12a-'} (io.confluent.connect.jdbc.source.JdbcSourceTask)
DEBUG BulkTableQuerier{table='"demo"."transactions"', query='null', topicPrefix='mysql-12a-'} prepared SQL query: SELECT * FROM `demo`.`transactions` (io.confluent.connect.jdbc.source.BulkTableQuerier)
DEBUG DECIMAL with precision: '5' and scale: '2' (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect)
DEBUG DECIMAL with precision: '5' and scale: '2' (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect)
DEBUG Returning 100 records for BulkTableQuerier{table='"demo"."transactions"', query='null', topicPrefix='mysql-12a-'} (io.confluent.connect.jdbc.source.JdbcSourceTask)
…
kafka-connect_1_8eb73e80dda1 | [2019-01-07 13:37:50,920] DEBUG Sending POST with input {"schema":"{\"type\":\"record\",\"name\":\"transactions\",\"fields\":[{\"name\":\"txn_id\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"customer_id\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"amount\",\"type\":[\"null\",{\"type\":\"bytes\",\"scale\":2,\"precision\":64,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"2\"},\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"}],\"default\":null},{\"name\":\"currency\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"txn_timestamp\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"transactions\"}"} to http://schema-registry:8081/subjects/mysql-12a-transactions-value/versions (io.confluent.kafka.schemaregistry.client.rest.RestService)

I've tried this with three different settings, each still results in the amount field serialised to bytes in Avro:

  • "numeric.mapping": "best_fit"
  • "numeric.mapping": "precision_only"
  • "numeric.precision.mapping": true

Per docs I am expecting to see the decimal(5,2) serialised to Avro FLOAT64(I think - but at least, not bytes)

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions