Open
Description
I can't get numeric.mapping
to work with MySQL and Confluent Platform 5.1. Steps to reproduce below.
Create MySQL table:
use demo;
create table transactions (
txn_id INT,
customer_id INT,
amount DECIMAL(5,2),
currency VARCHAR(50),
txn_timestamp VARCHAR(50)
);
insert into transactions (txn_id, customer_id, amount, currency, txn_timestamp) values (3, 2, 17.13, 'EUR', '2018-04-30T21:30:39Z');
Inspect table:
mysql> describe transactions;
+---------------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+---------------+--------------+------+-----+---------+-------+
| txn_id | int(11) | YES | | NULL | |
| customer_id | int(11) | YES | | NULL | |
| amount | decimal(5,2) | YES | | NULL | |
| currency | varchar(50) | YES | | NULL | |
| txn_timestamp | varchar(50) | YES | | NULL | |
+---------------+--------------+------+-----+---------+-------+
5 rows in set (0.00 sec)
Create connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_mysql_12a",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://mysql:3306/demo",
"connection.user": "connect_user",
"connection.password": "asgard",
"topic.prefix": "mysql-12a-",
"numeric.mapping": "best_fit",
"table.whitelist" : "demo.transactions",
"mode":"bulk",
"poll.interval.ms" : 3600000
}
}'
Even though "numeric.mapping": "best_fit"
, Kafka Connect stores the DECIMAL(5,2)
as a Decimal
, serialised to bytes in Avro:
$ curl -s "http://localhost:8081/subjects/mysql-12a-transactions-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name == "amount")'
{
"name": "amount",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
Connect Worker log excerpt:
INFO Kafka version : 2.1.0-cp1 (org.apache.kafka.common.utils.AppInfoParser)
…
INFO JdbcSourceTaskConfig values:
batch.max.rows = 100
catalog.pattern = null
connection.attempts = 3
connection.backoff.ms = 10000
connection.password = [hidden]
connection.url = jdbc:mysql://mysql:3306/demo
connection.user = connect_user
dialect.name =
incrementing.column.name =
mode = bulk
numeric.mapping = best_fit
numeric.precision.mapping = false
poll.interval.ms = 3600000
query =
schema.pattern = null
table.blacklist = []
table.poll.interval.ms = 60000
table.types = [TABLE]
table.whitelist = [demo.transactions]
tables = [`demo`.`transactions`]
timestamp.column.name = []
timestamp.delay.interval.ms = 0
topic.prefix = mysql-12a-
validate.non.null = true
(io.confluent.connect.jdbc.source.JdbcSourceTaskConfig)
…
DEBUG Checking for next block of results from BulkTableQuerier{table='"demo"."transactions"', query='null', topicPrefix='mysql-12a-'} (io.confluent.connect.jdbc.source.JdbcSourceTask)
DEBUG BulkTableQuerier{table='"demo"."transactions"', query='null', topicPrefix='mysql-12a-'} prepared SQL query: SELECT * FROM `demo`.`transactions` (io.confluent.connect.jdbc.source.BulkTableQuerier)
DEBUG DECIMAL with precision: '5' and scale: '2' (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect)
DEBUG DECIMAL with precision: '5' and scale: '2' (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect)
DEBUG Returning 100 records for BulkTableQuerier{table='"demo"."transactions"', query='null', topicPrefix='mysql-12a-'} (io.confluent.connect.jdbc.source.JdbcSourceTask)
…
kafka-connect_1_8eb73e80dda1 | [2019-01-07 13:37:50,920] DEBUG Sending POST with input {"schema":"{\"type\":\"record\",\"name\":\"transactions\",\"fields\":[{\"name\":\"txn_id\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"customer_id\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"amount\",\"type\":[\"null\",{\"type\":\"bytes\",\"scale\":2,\"precision\":64,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"2\"},\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"}],\"default\":null},{\"name\":\"currency\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"txn_timestamp\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"transactions\"}"} to http://schema-registry:8081/subjects/mysql-12a-transactions-value/versions (io.confluent.kafka.schemaregistry.client.rest.RestService)
I've tried this with three different settings, each still results in the amount
field serialised to bytes in Avro:
"numeric.mapping": "best_fit"
"numeric.mapping": "precision_only"
"numeric.precision.mapping": true
Per docs I am expecting to see the decimal(5,2)
serialised to Avro FLOAT64
(I think - but at least, not bytes
)