diff --git a/java/IcebergDatastreamSink/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/java/IcebergDatastreamSink/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
new file mode 100644
index 0000000..e69de29
diff --git a/java/PaimonCDCSink/README.md b/java/PaimonCDCSink/README.md
new file mode 100644
index 0000000..49418b9
--- /dev/null
+++ b/java/PaimonCDCSink/README.md
@@ -0,0 +1,165 @@
+## Flink Apache Paimon Sink using DataStream API
+
+* Flink version: 1.20
+* Flink API: DataStream API
+* Language: Java (11)
+* Apache Paimon: 1.0.1
+* Flink connectors: Flink CDC-MySQL / PostgreSQL / MongoDB / Kafka
+
+This example demonstrates how to use Apache Paimon CDC ingestion components(MySQL / PostgreSQL / MongoDB / Kafka) to sink
+data to Amazon S3 with Apache Paimon table format. The Apache Paimon Hive Catalog can work with Glue Data Catalog.
+
+The project can run both on Amazon Managed Service for Apache Flink, and locally for development.
+
+### Prerequisites
+* A database source(MySQL, PostgreSQL, MongoDB) with binlog enabled or Kakfa / Amazon MSK source with Apache Paimon
+ supported CDC format(Canal CDC, Debezium CDC, Maxwell CDC, OGG CDC, JSON, aws-dms-json ) data streamed in it.
+* If you want to use Apache Paimon Hive catalog with Glue Data Catalog, please install aws-glue-datacatalog-hive3-client
+ jar file into your local maven repo(please refer this [github repo](https://github.com/awslabs/aws-glue-data-catalog-client-for-apache-hive-metastore) to install or
+ you can find this jar file in EMR Cluster and install it into your local maven repo) and copy your EMR cluster's `hive-site.xml` file into the project and repackage the project.
+* An S3 bucket to write the Paimon table.
+
+
+#### IAM Permissions
+
+The application must have IAM permissions to:
+* Show and alter Glue Data Catalog databases, show and create Glue Data Catalog tables.
+ See [Glue Data Catalog permissions](https://docs.aws.amazon.com/athena/latest/ug/fine-grained-access-to-glue-resources.html).
+* Read and Write from the S3 bucket.
+
+
+### Runtime configuration
+
+When running on Amazon Managed Service for Apache Flink the runtime configuration is read from *Runtime Properties*.
+
+When running locally, the configuration is read from the [`resources/flink-application-properties-dev.json`](resources/flink-application-properties-dev.json) file located in the resources folder.
+
+This example parses runtime parameters according to the following rules and passes the parsed parameters to Apache Paimon Actions.
+
+- The Paimon CDC ingestion action name is parsed from the key named action in the 'ActionConf' parameter group.
+- Some global or common parameters can be placed in the 'ActionConf' parameter group. The parameter names should refer to the specific ingestion [action name](https://paimon.apache.org/docs/1.0/cdc-ingestion/overview/).
+- For parameters like 'table_conf' and 'catalog_conf' that are set in the format of Key=Value, the name of the parameter group can be customized, such as “TableConf” or “CatalogConf”.
+For specific parameter names within the parameter group, they should follow the format “parameter group name@_parameter Key”,
+such as “table_conf@_bucket”, and the parameter value should be the corresponding Value.
+
+
+Runtime parameters(Sample):
+
+| Group ID | Key | Description |
+|---------------|--------------------------------------------|----------------------------------------------------------------------------------------|
+| `ActionConf` | `action` | Name of Apache Paimon CDC ingestion, `kafka_sync_database`, `mysql_sync_database` etc. |
+| `ActionConf` | `database` | Target Paimon database name. |
+| `ActionConf` | `primary_keys` | (Optional) The primary keys for Paimon table |
+| `KafkaConf` | `kafka_conf@_properties.bootstrap.servers` | Bootstrap servers of the Kafka Cluster. |
+| `KafkaConf` | `kafka_conf@_properties.auto.offset.reset` | Offset of the Kafka Consumer |
+| `KafkaConf` | `kafka_conf@_properties.group.id` | Consumer group Id |
+| `CatalogConf` | `catalog_conf@_metastore.client.class` | Paimon Hive Catalog metastore client class name |
+| `CatalogConf` | `...` | ... |
+| `TableConf` | `table_conf@_bucket` | Bucket of Paimon table |
+| `TableConf` | `...` | ... |
+
+All parameters are case-sensitive.
+
+### Samples
+**Create an MSF application**
+
+First, compile and package the application using Maven, then copy the packaged jar file to your s3.
+
+```shell
+mvn clean package -P KafkaCDC
+```
+
+Second, prepare an input json file to create a MSF application, you can add required information(like VPC, Subnets,Security.etc.) into this json file.
+
+**Notice:** Your service execution role should have appropriate permissions, like s3 bucket access and glue access if you want to use Glue Data Catalog as Paimon Hive Catalog.
+```json
+{
+ "ApplicationName": "kafka-cdc-paimon",
+ "ApplicationDescription": "Sink CDC from Kafka as Apache Paimon table",
+ "RuntimeEnvironment": "FLINK-1_20",
+ "ServiceExecutionRole": "Your service role arn",
+ "ApplicationConfiguration": {
+ "ApplicationCodeConfiguration": {
+ "CodeContent": {
+ "S3ContentLocation": {
+ "BucketARN": "Your bucket arn",
+ "FileKey": "Your jar file s3 key"
+ }
+ },
+ "CodeContentType": "ZIPFILE"
+ },
+ "EnvironmentProperties": {
+ "PropertyGroups": [
+ {
+ "PropertyGroupId": "ActionConf",
+ "PropertyMap": {
+ "action": "kafka_sync_database",
+ "database": "Your Paimon Database",
+ "warehouse": "Your paimon warehouse path"
+ }
+ },
+ {
+ "PropertyGroupId": "KafkaConf",
+ "PropertyMap": {
+ "kafka_conf@_properties.bootstrap.servers": "MSK bootstrap servers",
+ "kafka_conf@_properties.auto.offset.reset": "earliest",
+ "kafka_conf@_properties.group.id": "group id",
+ "kafka_conf@_topic": "Your cdc topic",
+ "kafka_conf@_value.format": "debezium-json"
+ }
+ },
+ {
+ "PropertyGroupId": "CatalogConf",
+ "PropertyMap": {
+ "catalog_conf@_hadoop.fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
+ "catalog_conf@_hadoop.fs.s3.buffer.dir": "/var/tmp"
+ }
+ },
+ {
+ "PropertyGroupId": "TableConf",
+ "PropertyMap": {
+ "table_conf@_bucket": "4",
+ "table_conf@_metadata.iceberg.storage": "hive-catalog",
+ "table_conf@_metadata.iceberg.manifest-legacy-version": "true",
+ "table_conf@_metadata.iceberg.hive-client-class": "com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient",
+ "table_conf@_fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
+ "table_conf@_fs.s3.buffer.dir": "/var/tmp",
+ "table_conf@_sink.parallelism": "4"
+ }
+ }
+ ]
+ }
+ },
+ "FlinkApplicationConfiguration": {
+ "ParallelismConfiguration": {
+ "AutoScalingEnabled": true,
+ "Parallelism": 4,
+ "ParallelismPerKPU": 1
+ }
+ },
+ "CloudWatchLoggingOptions": [
+ {
+ "LogStreamARN": "arn:aws:logs:us-west-2:YourAccountId:log-group:/aws/kinesis-analytics/kafka-cdc-paimon:log-stream:kinesis-analytics-log-stream"
+ }
+ ]
+}
+```
+
+Last, create an MSF application using AWS CLI.
+
+```shell
+aws kinesisanalyticsv2 create-application \
+--cli-input-json file://create-kafkacdc-paimon.json
+```
+
+### Running in IntelliJ
+
+You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.
+
+See [Running examples locally](../running-examples-locally.md) for details.
+
+### Generating data
+
+You can use [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator),
+also available in a [hosted version](https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html),
+to generate random data to Kinesis Data Stream and test the application.
\ No newline at end of file
diff --git a/java/PaimonCDCSink/pom.xml b/java/PaimonCDCSink/pom.xml
new file mode 100644
index 0000000..e4a4fdd
--- /dev/null
+++ b/java/PaimonCDCSink/pom.xml
@@ -0,0 +1,340 @@
+
+
+ 4.0.0
+
+ com.amazonaws
+ amazon-msf-examples
+ 1.0
+
+
+ paimon-cdc-sink
+
+
+ UTF-8
+ ${project.basedir}/target
+ ${project.name}-${project.version}
+ 11
+ ${target.java.version}
+ ${target.java.version}
+ 1.20.0
+ 5.0.0-1.20
+ 1.2.0
+ 2.23.1
+ 2.16.2
+ 1.0.1
+ 3.4.0-1.20
+ 3.3.0
+ 8.4.0
+ 3.4.0
+ 2.30.16
+
+
+
+
+
+ com.amazonaws
+ aws-java-sdk-bom
+
+ 1.12.676
+ pom
+ import
+
+
+
+
+
+
+
+ com.amazonaws
+ aws-kinesisanalytics-runtime
+ ${kda.runtime.version}
+ provided
+
+
+
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-runtime-web
+ ${flink.version}
+ provided
+
+
+
+ org.apache.flink
+ flink-table-runtime
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+ provided
+
+
+
+ org.apache.flink
+ flink-s3-fs-hadoop
+ ${flink.version}
+
+
+
+
+ org.apache.logging.log4j
+ log4j-api
+ ${log4j.version}
+ provided
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ ${log4j.version}
+ provided
+
+
+ org.apache.logging.log4j
+ log4j-core
+ ${log4j.version}
+ provided
+
+
+
+ org.apache.paimon
+ paimon-flink-action
+ ${paimon.version}
+
+
+
+ org.apache.paimon
+ paimon-flink-cdc
+ ${paimon.version}
+
+
+
+ org.apache.paimon
+ paimon-flink-1.20
+ ${paimon.version}
+
+
+ org.apache.paimon
+ paimon-hive-connector-3.1
+ ${paimon.version}
+
+
+
+ org.apache.thrift
+ libthrift
+ 0.21.0
+
+
+ org.apache.thrift
+ libfb303
+ 0.9.3
+
+
+ org.apache.hive
+ hive-exec
+ 3.1.3
+
+
+ org.apache.hadoop
+ hadoop-client
+ ${hadoop.version}
+
+
+ org.apache.avro
+ avro
+
+
+
+ org.slf4j
+ slf4j-reload4j
+
+
+
+
+
+ com.amazonaws.glue
+ aws-glue-datacatalog-hive3-client
+ 4.2.0
+
+
+
+ software.amazon.awssdk
+ glue
+ ${aws.sdkv2.version}
+ compile
+
+
+ software.amazon.awssdk
+ aws-core
+ ${aws.sdkv2.version}
+ compile
+
+
+ software.amazon.awssdk
+ sts
+ ${aws.sdkv2.version}
+ compile
+
+
+ software.amazon.awssdk
+ utils
+ ${aws.sdkv2.version}
+
+
+
+
+
+
+ KafkaCDC
+
+
+ org.apache.flink
+ flink-sql-connector-kafka
+ ${flink.kafka.sql.version}
+
+
+
+ kafka
+
+
+
+
+ MySQLCDC
+
+
+ org.apache.flink
+ flink-connector-mysql-cdc
+ ${flink.cdc.version}
+
+
+ com.mysql
+ mysql-connector-j
+ ${mysql.driver.version}
+
+
+
+ mysql
+
+
+
+ PostgresCDC
+
+
+ org.apache.flink
+ flink-connector-postgres-cdc
+ ${flink.cdc.version}
+
+
+
+ postgre
+
+
+
+ MongoDBCDC
+
+
+ org.apache.flink
+ flink-connector-mongodb-cdc
+ ${flink.cdc.version}
+
+
+
+ mongo
+
+
+
+
+
+
+ ${buildDirectory}
+ ${cdc.source}-${jar.finalName}
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.8.1
+
+ ${target.java.version}
+ ${target.java.version}
+ ${target.java.version}
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.1
+
+
+
+ package
+
+ shade
+
+
+
+
+ org.apache.flink:force-shading
+ com.google.code.findbugs:jsr305
+ org.slf4j:*
+ log4j:*
+
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+ META-INF/versions/17/**/*.class
+ META-INF/versions/19/**/*.class
+ META-INF/versions/15/**/*.class
+
+
+
+
+
+
+ org.apache.kafka.connect
+ org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect
+
+
+ org.apache.kafka
+ org.apache.flink.kafka.shaded.org.apache.kafka
+
+
+
+
+
+
+ com.amazonaws.services.msf.PaimonCDCSinkJob
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/java/PaimonCDCSink/src/main/java/com/amazonaws/services/msf/PaimonCDCSinkJob.java b/java/PaimonCDCSink/src/main/java/com/amazonaws/services/msf/PaimonCDCSinkJob.java
new file mode 100644
index 0000000..c6ff043
--- /dev/null
+++ b/java/PaimonCDCSink/src/main/java/com/amazonaws/services/msf/PaimonCDCSinkJob.java
@@ -0,0 +1,107 @@
+package com.amazonaws.services.msf;
+
+import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.paimon.flink.action.Action;
+import org.apache.paimon.flink.action.ActionBase;
+import org.apache.paimon.flink.action.ActionFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+public class PaimonCDCSinkJob {
+
+ private static final Logger LOGGER = LogManager.getLogger(PaimonCDCSinkJob.class);
+ private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json";
+ private static final String SEP_KEY = "@_";
+ private static final String ACTION_CONF_GROUP = "ActionConf";
+ private static final String ACTION_KEY = "action";
+ private static final String PARAM_KEY_PREFIX = "--";
+
+ public static void main(String[] args) throws Exception{
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ Map confMap = loadApplicationProperties(env);
+ String[] actionArgs = configToActionParameters(confMap);
+ if (actionArgs.length < 1) {
+ LOGGER.error("No action specified");
+ System.exit(1);
+ }
+
+ LOGGER.info("actionArgs: {}", Arrays.toString(actionArgs));
+
+ Optional actionOpt = ActionFactory.createAction(actionArgs);
+
+ if (actionOpt.isPresent()) {
+ Action action = actionOpt.get();
+ if (action instanceof ActionBase) {
+ LOGGER.info("ActionBase: {}", action.getClass().getName());
+ ((ActionBase) action).withStreamExecutionEnvironment(env).run();
+ } else {
+ action.run();
+ }
+ } else {
+ LOGGER.info("No paimon flink action service found");
+ System.exit(1);
+ }
+ }
+
+ private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException {
+ if (env instanceof LocalStreamEnvironment) {
+ LOGGER.debug("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE);
+ return KinesisAnalyticsRuntime.getApplicationProperties(
+ PaimonCDCSinkJob.class.getClassLoader()
+ .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath());
+ } else {
+ LOGGER.debug("Loading application properties from Amazon Managed Service for Apache Flink");
+ return KinesisAnalyticsRuntime.getApplicationProperties();
+ }
+ }
+
+ private static String[] configToActionParameters(Map confMap) {
+
+ Properties actionProp = confMap.get(ACTION_CONF_GROUP);
+ if (actionProp == null) {
+ LOGGER.error("ActionConf not found in application properties");
+ System.exit(1);
+ }
+
+ String action = actionProp.getProperty(ACTION_KEY);
+ if (action == null || action.isEmpty()) {
+ LOGGER.error("Action not found in application properties");
+ }
+
+ actionProp.remove(ACTION_KEY);
+
+ List params = new ArrayList<>();
+ params.add(action);
+
+ for (Map.Entry confEntry : confMap.entrySet()) {
+ confEntry.getValue().forEach(
+ (k, v) -> {
+ String ks = k.toString();
+ int idx = ks.indexOf(SEP_KEY);
+ String paramKey;
+ String paramVal;
+ if (idx != -1) {
+ paramKey = String.format("%s%s", PARAM_KEY_PREFIX , ks.substring(0, idx));
+ paramVal = String.format("%s=%s", ks.substring(idx + SEP_KEY.length()), v);
+
+ } else {
+ paramKey = String.format("%s%s", PARAM_KEY_PREFIX , ks);
+ paramVal = v.toString();
+ }
+ params.add(paramKey);
+ params.add(paramVal);
+ }
+ );
+ }
+
+ return params.toArray(new String[0]);
+ }
+
+}
diff --git a/java/PaimonCDCSink/src/main/resources/flink-application-properties-dev.json b/java/PaimonCDCSink/src/main/resources/flink-application-properties-dev.json
new file mode 100644
index 0000000..cb0fa4d
--- /dev/null
+++ b/java/PaimonCDCSink/src/main/resources/flink-application-properties-dev.json
@@ -0,0 +1,39 @@
+[
+ {
+ "PropertyGroupId": "ActionConf",
+ "PropertyMap": {
+ "action": "kafka_sync_database",
+ "warehouse": "s3://bucket/data/prefix",
+ "database": "paimon_flink",
+ "primary_keys": "ID",
+ "table_prefix": "ods_"
+ }
+ },
+ {
+ "PropertyGroupId": "KafkaConf",
+ "PropertyMap": {
+ "kafka_conf@_properties.bootstrap.servers": "b-2.mycluster.bzvtby.c8.kafka.us-west-2.amazonaws.com:9092,b-1.mycluster.bzvtby.c8.kafka.us-west-2.amazonaws.com:9092",
+ "kafka_conf@_topic": "kafka_topic",
+ "kafka_conf@_properties.group.id": 1234546,
+ "kafka_conf@_properties.auto.offset.reset": "earliest"
+ }
+ },
+ {
+ "PropertyGroupId": "CatalogConf",
+ "PropertyMap": {
+ "catalog_conf@_metastore": "hive",
+ "catalog_conf@_hive-conf-dir": "/etc/hive/conf.dist",
+ "catalog_conf@_lock.enabled": "false",
+ "catalog_conf@_metastore.client.class": "com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient",
+ "catalog_conf@_warehouse": "s3://bucket/data/prefix"
+ }
+ },
+ {
+ "PropertyGroupId": "TableConf",
+ "PropertyMap": {
+ "table_conf@_bucket": "4",
+ "table_conf@_changelog-producer": "input",
+ "table_conf@_sink.parallelism": "4"
+ }
+ }
+]
\ No newline at end of file
diff --git a/java/PaimonCDCSink/src/main/resources/hive-site.xml b/java/PaimonCDCSink/src/main/resources/hive-site.xml
new file mode 100644
index 0000000..95bb612
--- /dev/null
+++ b/java/PaimonCDCSink/src/main/resources/hive-site.xml
@@ -0,0 +1,273 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ hbase.master
+
+ http://wiki.apache.org/hadoop/Hive/HBaseIntegration
+
+
+
+ hive.zookeeper.quorum
+ ip-xx-xx-xx-xx.us-west-2.compute.internal:2181
+
+
+
+ hive.llap.zk.sm.connectionString
+ ip-xx-xx-xx-xx.us-west-2.compute.internal:2181
+
+
+
+ hbase.zookeeper.quorum
+ ip-xx-xx-xx-xx.us-west-2.compute.internal
+ http://wiki.apache.org/hadoop/Hive/HBaseIntegration
+
+
+
+ hive.execution.engine
+ tez
+
+
+
+ fs.defaultFS
+ hdfs://ip-xx-xx-xx-xx.us-west-2.compute.internal:8020
+
+
+
+
+ hive.metastore.uris
+ thrift://ip-xx-xx-xx-xx.us-west-2.compute.internal:9083
+ JDBC connect string for a JDBC metastore
+
+
+
+ javax.jdo.option.ConnectionURL
+ jdbc:mysql://ip-xx-xx-xx-xx.us-west-2.compute.internal:3306/hive?createDatabaseIfNotExist=true
+ username to use against metastore database
+
+
+
+ javax.jdo.option.ConnectionDriverName
+ org.mariadb.jdbc.Driver
+ username to use against metastore database
+
+
+
+ javax.jdo.option.ConnectionUserName
+ hive
+ username to use against metastore database
+
+
+
+ javax.jdo.option.ConnectionPassword
+ kWs5sQ8HnZaEC2kj
+ password to use against metastore database
+
+
+
+ hive.server2.allow.user.substitution
+ true
+
+
+
+ hive.server2.enable.doAs
+ true
+
+
+
+ hive.server2.thrift.port
+ 10000
+
+
+
+ hive.server2.thrift.http.port
+ 10001
+
+
+
+
+
+ hive.optimize.ppd.input.formats
+ com.amazonaws.emr.s3select.hive.S3SelectableTextInputFormat
+
+
+
+ s3select.filter
+ false
+
+
+
+ hive.server2.in.place.progress
+ false
+
+
+
+ hive.llap.zk.registry.user
+ hadoop
+
+
+
+ hive.security.metastore.authorization.manager
+ org.apache.hadoop.hive.ql.security.authorization.StorageBasedAuthorizationProvider
+
+
+
+ hive.log.explain.output
+ false
+
+
+
+ datanucleus.fixedDatastore
+ true
+
+
+
+ mapred.reduce.tasks
+ -1
+
+
+
+ mapred.max.split.size
+ 256000000
+
+
+
+ hive.mapjoin.hybridgrace.hashtable
+ false
+
+
+
+ hive.merge.nway.joins
+ false
+
+
+
+ hive.metastore.connect.retries
+ 15
+
+
+
+ hive.optimize.joinreducededuplication
+ false
+
+
+
+ hive.optimize.sort.dynamic.partition.threshold
+ 1
+
+
+
+ hive.server2.materializedviews.registry.impl
+ DUMMY
+
+
+
+ hive.tez.auto.reducer.parallelism
+ true
+
+
+
+ hive.vectorized.execution.mapjoin.minmax.enabled
+ true
+
+
+
+ hive.vectorized.execution.mapjoin.native.fast.hashtable.enabled
+ true
+
+
+
+ hive.optimize.dynamic.partition.hashjoin
+ true
+
+
+
+ hive.compactor.initiator.on
+ true
+
+
+
+ hive.blobstore.use.output-committer
+ true
+
+
+
+ hive.llap.daemon.service.hosts
+ @llap0
+
+
+
+ hive.llap.execution.mode
+ only
+
+
+
+ hive.optimize.metadataonly
+ true
+
+
+
+ hive.tez.bucket.pruning
+ true
+
+
+
+ hive.exec.mode.local.auto
+ true
+
+
+
+ hive.exec.mode.local.auto.inputbytes.max
+ 50000000
+
+
+
+ hive.query.reexecution.stats.persist.scope
+ hiveserver
+
+
+
+ hive.metastore.client.factory.class
+ com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory
+
+
+
+ hive.auto.convert.join.noconditionaltask.size
+ 1073741824
+
+
+
+ hive.compactor.worker.threads
+ 1
+
+
+
+
+
+
+
diff --git a/java/PaimonCDCSink/src/main/resources/log4j2.properties b/java/PaimonCDCSink/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..3546643
--- /dev/null
+++ b/java/PaimonCDCSink/src/main/resources/log4j2.properties
@@ -0,0 +1,7 @@
+rootLogger.level = INFO
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/java/pom.xml b/java/pom.xml
index bc44dbe..010fd2b 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -37,6 +37,7 @@
Serialization/CustomTypeInfo
SideOutputs
PrometheusSink
+ PaimonCDCSink
SQSSink
\ No newline at end of file