-
Notifications
You must be signed in to change notification settings - Fork 35
Add an Apache Paimon CDC Ingestion using MSF Example #95
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,165 @@ | ||
## Flink Apache Paimon Sink using DataStream API | ||
|
||
* Flink version: 1.20 | ||
* Flink API: DataStream API | ||
* Language: Java (11) | ||
* Apache Paimon: 1.0.1 | ||
* Flink connectors: Flink CDC-MySQL / PostgreSQL / MongoDB / Kafka | ||
|
||
This example demonstrates how to use Apache Paimon CDC ingestion components(MySQL / PostgreSQL / MongoDB / Kafka) to sink | ||
data to Amazon S3 with Apache Paimon table format. The Apache Paimon Hive Catalog can work with Glue Data Catalog. | ||
|
||
The project can run both on Amazon Managed Service for Apache Flink, and locally for development. | ||
|
||
### Prerequisites | ||
* A database source(MySQL, PostgreSQL, MongoDB) with binlog enabled or Kakfa / Amazon MSK source with Apache Paimon | ||
supported CDC format(Canal CDC, Debezium CDC, Maxwell CDC, OGG CDC, JSON, aws-dms-json ) data streamed in it. | ||
* If you want to use Apache Paimon Hive catalog with Glue Data Catalog, please install aws-glue-datacatalog-hive3-client | ||
jar file into your local maven repo(please refer this [github repo](https://github.com/awslabs/aws-glue-data-catalog-client-for-apache-hive-metastore) to install or | ||
you can find this jar file in EMR Cluster and install it into your local maven repo) and copy your EMR cluster's `hive-site.xml` file into the project and repackage the project. | ||
* An S3 bucket to write the Paimon table. | ||
|
||
|
||
#### IAM Permissions | ||
|
||
The application must have IAM permissions to: | ||
* Show and alter Glue Data Catalog databases, show and create Glue Data Catalog tables. | ||
See [Glue Data Catalog permissions](https://docs.aws.amazon.com/athena/latest/ug/fine-grained-access-to-glue-resources.html). | ||
* Read and Write from the S3 bucket. | ||
|
||
|
||
### Runtime configuration | ||
|
||
When running on Amazon Managed Service for Apache Flink the runtime configuration is read from *Runtime Properties*. | ||
|
||
When running locally, the configuration is read from the [`resources/flink-application-properties-dev.json`](resources/flink-application-properties-dev.json) file located in the resources folder. | ||
|
||
This example parses runtime parameters according to the following rules and passes the parsed parameters to Apache Paimon Actions. | ||
|
||
- The Paimon CDC ingestion action name is parsed from the key named action in the 'ActionConf' parameter group. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it should be explained that this example reads from Kafka so you need to set up a CDC Kafka Connector |
||
- Some global or common parameters can be placed in the 'ActionConf' parameter group. The parameter names should refer to the specific ingestion [action name](https://paimon.apache.org/docs/1.0/cdc-ingestion/overview/). | ||
- For parameters like 'table_conf' and 'catalog_conf' that are set in the format of Key=Value, the name of the parameter group can be customized, such as “TableConf” or “CatalogConf”. | ||
For specific parameter names within the parameter group, they should follow the format “parameter group name@_parameter Key”, | ||
such as “table_conf@_bucket”, and the parameter value should be the corresponding Value. | ||
|
||
|
||
Runtime parameters(Sample): | ||
|
||
| Group ID | Key | Description | | ||
|---------------|--------------------------------------------|----------------------------------------------------------------------------------------| | ||
| `ActionConf` | `action` | Name of Apache Paimon CDC ingestion, `kafka_sync_database`, `mysql_sync_database` etc. | | ||
| `ActionConf` | `database` | Target Paimon database name. | | ||
| `ActionConf` | `primary_keys` | (Optional) The primary keys for Paimon table | | ||
| `KafkaConf` | `kafka_conf@_properties.bootstrap.servers` | Bootstrap servers of the Kafka Cluster. | | ||
| `KafkaConf` | `kafka_conf@_properties.auto.offset.reset` | Offset of the Kafka Consumer | | ||
| `KafkaConf` | `kafka_conf@_properties.group.id` | Consumer group Id | | ||
| `CatalogConf` | `catalog_conf@_metastore.client.class` | Paimon Hive Catalog metastore client class name | | ||
| `CatalogConf` | `...` | ... | | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does "..." means here? What else the user can add? |
||
| `TableConf` | `table_conf@_bucket` | Bucket of Paimon table | | ||
| `TableConf` | `...` | ... | | ||
|
||
All parameters are case-sensitive. | ||
|
||
### Samples | ||
**Create an MSF application** | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not consistent with what we are doing with all other examples. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These instructions do not really explain what data the kafka source is expecting. What schema? What serialization? |
||
|
||
First, compile and package the application using Maven, then copy the packaged jar file to your s3. | ||
|
||
```shell | ||
mvn clean package -P KafkaCDC | ||
``` | ||
|
||
Second, prepare an input json file to create a MSF application, you can add required information(like VPC, Subnets,Security.etc.) into this json file. | ||
|
||
**Notice:** Your service execution role should have appropriate permissions, like s3 bucket access and glue access if you want to use Glue Data Catalog as Paimon Hive Catalog. | ||
```json | ||
{ | ||
"ApplicationName": "kafka-cdc-paimon", | ||
"ApplicationDescription": "Sink CDC from Kafka as Apache Paimon table", | ||
"RuntimeEnvironment": "FLINK-1_20", | ||
"ServiceExecutionRole": "Your service role arn", | ||
"ApplicationConfiguration": { | ||
"ApplicationCodeConfiguration": { | ||
"CodeContent": { | ||
"S3ContentLocation": { | ||
"BucketARN": "Your bucket arn", | ||
"FileKey": "Your jar file s3 key" | ||
} | ||
}, | ||
"CodeContentType": "ZIPFILE" | ||
}, | ||
"EnvironmentProperties": { | ||
"PropertyGroups": [ | ||
{ | ||
"PropertyGroupId": "ActionConf", | ||
"PropertyMap": { | ||
"action": "kafka_sync_database", | ||
"database": "Your Paimon Database", | ||
"warehouse": "Your paimon warehouse path" | ||
} | ||
}, | ||
{ | ||
"PropertyGroupId": "KafkaConf", | ||
"PropertyMap": { | ||
"kafka_conf@_properties.bootstrap.servers": "MSK bootstrap servers", | ||
"kafka_conf@_properties.auto.offset.reset": "earliest", | ||
"kafka_conf@_properties.group.id": "group id", | ||
"kafka_conf@_topic": "Your cdc topic", | ||
"kafka_conf@_value.format": "debezium-json" | ||
} | ||
}, | ||
{ | ||
"PropertyGroupId": "CatalogConf", | ||
"PropertyMap": { | ||
"catalog_conf@_hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", | ||
"catalog_conf@_hadoop.fs.s3.buffer.dir": "/var/tmp" | ||
} | ||
}, | ||
{ | ||
"PropertyGroupId": "TableConf", | ||
"PropertyMap": { | ||
"table_conf@_bucket": "4", | ||
"table_conf@_metadata.iceberg.storage": "hive-catalog", | ||
"table_conf@_metadata.iceberg.manifest-legacy-version": "true", | ||
"table_conf@_metadata.iceberg.hive-client-class": "com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient", | ||
"table_conf@_fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", | ||
"table_conf@_fs.s3.buffer.dir": "/var/tmp", | ||
"table_conf@_sink.parallelism": "4" | ||
} | ||
} | ||
] | ||
} | ||
}, | ||
"FlinkApplicationConfiguration": { | ||
"ParallelismConfiguration": { | ||
"AutoScalingEnabled": true, | ||
"Parallelism": 4, | ||
"ParallelismPerKPU": 1 | ||
} | ||
}, | ||
"CloudWatchLoggingOptions": [ | ||
{ | ||
"LogStreamARN": "arn:aws:logs:us-west-2:YourAccountId:log-group:/aws/kinesis-analytics/kafka-cdc-paimon:log-stream:kinesis-analytics-log-stream" | ||
} | ||
] | ||
} | ||
``` | ||
|
||
Last, create an MSF application using AWS CLI. | ||
|
||
```shell | ||
aws kinesisanalyticsv2 create-application \ | ||
--cli-input-json file://create-kafkacdc-paimon.json | ||
``` | ||
|
||
### Running in IntelliJ | ||
|
||
You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation. | ||
|
||
See [Running examples locally](../running-examples-locally.md) for details. | ||
|
||
### Generating data | ||
|
||
You can use [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is nor correct. The example uses Kafka. To run this example e2e the user should set up a DB, and a CDC Kafka Connector. This is quite heavyweight, particularly to test the application locally. Would it be possible to add a dockercompose the user can run to set up locally what required? Also, doing CDC directly in Flink would simplify the requirements for running this example e2e, both locally and on AWS |
||
also available in a [hosted version](https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html), | ||
to generate random data to Kinesis Data Stream and test the application. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is confusing for the reader. There is no EMR Cluster to copy the file from. The
hive-site.xml
is part of this example and that's it