From 9efa064602f5fb1faebd066f302266f7a077347e Mon Sep 17 00:00:00 2001 From: Jeremy Ber Date: Mon, 6 Jan 2025 09:12:55 -0600 Subject: [PATCH 01/13] testing iceberg with s3 tables --- .../IcebergDataStreamSink}/README.md | 0 .../IcebergDataStreamSink}/pom.xml | 0 .../amazonaws/services/msf/StreamingJob.java | 0 .../services/msf/avro/AvroSchemaUtils.java | 0 ...vroGenericStockTradeGeneratorFunction.java | 0 .../msf/iceberg/IcebergSinkBuilder.java | 0 .../flink-application-properties-dev.json | 20 ++ .../src/main/resources/log4j2.properties | 0 .../src/main/resources/price.avsc | 0 ...enericStockTradeGeneratorFunctionTest.java | 0 .../Iceberg/IcebergDataStreamSource/README.md | 86 +++++++ java/Iceberg/IcebergDataStreamSource/pom.xml | 217 ++++++++++++++++++ .../amazonaws/services/msf/StreamingJob.java | 109 +++++++++ .../services/msf/avro/AvroSchemaUtils.java | 22 ++ ...vroGenericStockTradeGeneratorFunction.java | 46 ++++ .../msf/iceberg/IcebergSinkBuilder.java | 154 +++++++++++++ .../msf/iceberg/IcebergSourceBuilder.java | 4 + .../flink-application-properties-dev.json | 0 .../src/main/resources/log4j2.properties | 7 + .../src/main/resources/price.avsc | 23 ++ ...enericStockTradeGeneratorFunctionTest.java | 26 +++ 21 files changed, 714 insertions(+) rename java/{IcebergDatastreamSink => Iceberg/IcebergDataStreamSink}/README.md (100%) rename java/{IcebergDatastreamSink => Iceberg/IcebergDataStreamSink}/pom.xml (100%) rename java/{IcebergDatastreamSink => Iceberg/IcebergDataStreamSink}/src/main/java/com/amazonaws/services/msf/StreamingJob.java (100%) rename java/{IcebergDatastreamSink => Iceberg/IcebergDataStreamSink}/src/main/java/com/amazonaws/services/msf/avro/AvroSchemaUtils.java (100%) rename java/{IcebergDatastreamSink => Iceberg/IcebergDataStreamSink}/src/main/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunction.java (100%) rename java/{IcebergDatastreamSink => Iceberg/IcebergDataStreamSink}/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java (100%) create mode 100644 java/Iceberg/IcebergDataStreamSink/src/main/resources/flink-application-properties-dev.json rename java/{IcebergDatastreamSink => Iceberg/IcebergDataStreamSink}/src/main/resources/log4j2.properties (100%) rename java/{IcebergDatastreamSink => Iceberg/IcebergDataStreamSink}/src/main/resources/price.avsc (100%) rename java/{IcebergDatastreamSink => Iceberg/IcebergDataStreamSink}/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java (100%) create mode 100644 java/Iceberg/IcebergDataStreamSource/README.md create mode 100644 java/Iceberg/IcebergDataStreamSource/pom.xml create mode 100644 java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/StreamingJob.java create mode 100644 java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/avro/AvroSchemaUtils.java create mode 100644 java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunction.java create mode 100644 java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java create mode 100644 java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSourceBuilder.java rename java/{IcebergDatastreamSink => Iceberg/IcebergDataStreamSource}/src/main/resources/flink-application-properties-dev.json (100%) create mode 100644 java/Iceberg/IcebergDataStreamSource/src/main/resources/log4j2.properties create mode 100644 java/Iceberg/IcebergDataStreamSource/src/main/resources/price.avsc create mode 100644 java/Iceberg/IcebergDataStreamSource/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java diff --git a/java/IcebergDatastreamSink/README.md b/java/Iceberg/IcebergDataStreamSink/README.md similarity index 100% rename from java/IcebergDatastreamSink/README.md rename to java/Iceberg/IcebergDataStreamSink/README.md diff --git a/java/IcebergDatastreamSink/pom.xml b/java/Iceberg/IcebergDataStreamSink/pom.xml similarity index 100% rename from java/IcebergDatastreamSink/pom.xml rename to java/Iceberg/IcebergDataStreamSink/pom.xml diff --git a/java/IcebergDatastreamSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java b/java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java similarity index 100% rename from java/IcebergDatastreamSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java rename to java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java diff --git a/java/IcebergDatastreamSink/src/main/java/com/amazonaws/services/msf/avro/AvroSchemaUtils.java b/java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/avro/AvroSchemaUtils.java similarity index 100% rename from java/IcebergDatastreamSink/src/main/java/com/amazonaws/services/msf/avro/AvroSchemaUtils.java rename to java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/avro/AvroSchemaUtils.java diff --git a/java/IcebergDatastreamSink/src/main/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunction.java b/java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunction.java similarity index 100% rename from java/IcebergDatastreamSink/src/main/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunction.java rename to java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunction.java diff --git a/java/IcebergDatastreamSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java b/java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java similarity index 100% rename from java/IcebergDatastreamSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java rename to java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java diff --git a/java/Iceberg/IcebergDataStreamSink/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/IcebergDataStreamSink/src/main/resources/flink-application-properties-dev.json new file mode 100644 index 0000000..eec7dfd --- /dev/null +++ b/java/Iceberg/IcebergDataStreamSink/src/main/resources/flink-application-properties-dev.json @@ -0,0 +1,20 @@ +[ + { + "PropertyGroupId": "DataGen", + "PropertyMap": { + "records.per.sec": 10.0 + } + }, + { + "PropertyGroupId": "Iceberg", + "PropertyMap": { + "bucket.prefix": "arn:aws:s3tables:us-east-1:143479883528:bucket/my-table-bucket", + "catalog.db": "iceberg", + "catalog.table": "prices_iceberg", + "partition.fields": "symbol", + "sort.field": "timestamp", + "operation": "upsert", + "upsert.equality.fields": "symbol" + } + } +] \ No newline at end of file diff --git a/java/IcebergDatastreamSink/src/main/resources/log4j2.properties b/java/Iceberg/IcebergDataStreamSink/src/main/resources/log4j2.properties similarity index 100% rename from java/IcebergDatastreamSink/src/main/resources/log4j2.properties rename to java/Iceberg/IcebergDataStreamSink/src/main/resources/log4j2.properties diff --git a/java/IcebergDatastreamSink/src/main/resources/price.avsc b/java/Iceberg/IcebergDataStreamSink/src/main/resources/price.avsc similarity index 100% rename from java/IcebergDatastreamSink/src/main/resources/price.avsc rename to java/Iceberg/IcebergDataStreamSink/src/main/resources/price.avsc diff --git a/java/IcebergDatastreamSink/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java b/java/Iceberg/IcebergDataStreamSink/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java similarity index 100% rename from java/IcebergDatastreamSink/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java rename to java/Iceberg/IcebergDataStreamSink/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java diff --git a/java/Iceberg/IcebergDataStreamSource/README.md b/java/Iceberg/IcebergDataStreamSource/README.md new file mode 100644 index 0000000..7da74d0 --- /dev/null +++ b/java/Iceberg/IcebergDataStreamSource/README.md @@ -0,0 +1,86 @@ +# Flink Iceberg Sink using DataStream API + +* Flink version: 1.20.0 +* Flink API: DataStream API +* Iceberg 1.6.1 +* Language: Java (11) +* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/) + and [Iceberg](https://iceberg.apache.org/docs/latest/flink/) + +This example demonstrate how to use +[Flink Iceberg Sink Connector](https://iceberg.apache.org/docs/latest/flink-writes/) with the Glue Data Catalog. + +For simplicity, the application generates synthetic data, random stock prices, internally. +Data is generated as AVRO Generic Record, simulating a real source, for example a Kafka Source, that receives records +serialized with AVRO. + +### Prerequisites + +The application expects the following resources: +* A Glue Data Catalog database in the current AWS region. The database name is configurable (default: "default"). + The application creates the Table, but the Catalog must exist already. +* An S3 bucket to write the Iceberg table. + +#### IAM Permissions + +The application must have IAM permissions to: +* Show and alter Glue Data Catalog databases, show and create Glue Data Catalog tables. + See [Glue Data Catalog permissions](https://docs.aws.amazon.com/athena/latest/ug/fine-grained-access-to-glue-resources.html). +* Read and Write from the S3 bucket. + +### Runtime configuration + +When running on Amazon Managed Service for Apache Flink the runtime configuration is read from Runtime Properties. + +When running locally, the configuration is read from the +[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file. + +Runtime parameters: + +| Group ID | Key | Default | Description | +|-----------|--------------------------|-------------------|---------------------------------------------------------------------------------------------------------------------| +| `DataGen` | `records.per.sec` | `10.0` | Records per second generated. | +| `Iceberg` | `bucket.prefix` | (mandatory) | S3 bucket prefix, for example `s3://my-bucket/iceberg`. | +| `Iceberg` | `catalog.db` | `default` | Name of the Glue Data Catalog database. | +| `Iceberg` | `catalog.table` | `prices_iceberg` | Name of the Glue Data Catalog table. | +| `Iceberg` | `partition.fields` | `symbol` | Comma separated list of partition fields. | +| `Iceberg` | `sort.field` | `timestamp` | Sort field. | +| `Iceberg` | `operation` | `updsert` | Iceberg operation. One of `upsert`, `append` or `overwrite`. | +| `Iceberg` | `upsert.equality.fields` | `symbol` | Comma separated list of fields used for upsert. It must match partition fields. Required if `operation` = `upsert`. | + + +### Checkpoints + +Checkpointing must be enabled. Iceberg commits writes on checkpoint. + +When running locally, the application enables checkpoints programmatically, every 10 seconds. +When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration. + + +### Known limitations + +At the moment there are current limitations concerning Flink Iceberg integration: +* Doesn't support Iceberg Table with hidden partitioning +* Doesn't support adding columns, removing columns, renaming columns or changing columns. + +### Schema and schema evolution + +The application must "know" the AVRO schema on start. +The schema cannot be dynamically inferred based on the incoming records, for example using a schema registry. +This is due to a limitation of the Flink Iceberg integration, that requires knowing the table schema upfront. + +This implementation does support schema evolution in the incoming data, as long as new schema versions are FORWARD compatible. +Schema changes are not propagated to Iceberg. +As long as the schema of incoming records is FORWARD compatible, the application deserializes incoming records using +the schema it knows. Any new field in the incoming record is discarded. + +In this example, the schema is loaded from a schema definition file, [price.avsc](./src/main/resources/price.avsc) embedded +with the application. +It is technically possible to fetch the schema on application start from an external source, like a schema registry or a +schema definition file in an S3 bucket. This is beyond the scope of this example. + +### Running locally, in IntelliJ + +You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation. + +See [Running examples locally](https://github.com/nicusX/amazon-managed-service-for-apache-flink-examples/blob/main/java/running-examples-locally.md) for details. diff --git a/java/Iceberg/IcebergDataStreamSource/pom.xml b/java/Iceberg/IcebergDataStreamSource/pom.xml new file mode 100644 index 0000000..0a1b7e6 --- /dev/null +++ b/java/Iceberg/IcebergDataStreamSource/pom.xml @@ -0,0 +1,217 @@ + + + 4.0.0 + + com.amazonaws + iceberg-datastream-sink + 1.0 + jar + + + UTF-8 + 11 + ${target.java.version} + ${target.java.version} + + 1.20.0 + 1.11.3 + 3.4.0 + 1.6.1 + 1.2.0 + 2.23.1 + 5.8.1 + + + + + + org.apache.flink + flink-runtime-web + ${flink.version} + provided + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-clients + ${flink.version} + provided + + + org.apache.flink + flink-connector-files + ${flink.version} + provided + + + org.apache.flink + flink-table-runtime + ${flink.version} + provided + + + + + org.apache.flink + flink-metrics-dropwizard + ${flink.version} + + + + + com.amazonaws + aws-kinesisanalytics-runtime + ${kda.runtime.version} + provided + + + + + org.apache.flink + flink-avro + ${flink.version} + + + + + + org.apache.iceberg + iceberg-core + ${iceberg.version} + + + org.apache.iceberg + iceberg-flink + ${iceberg.version} + + + org.apache.iceberg + iceberg-flink-1.19 + ${iceberg.version} + + + org.apache.iceberg + iceberg-aws-bundle + ${iceberg.version} + + + org.apache.iceberg + iceberg-aws + ${iceberg.version} + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + org.apache.avro + avro + + + + org.slf4j + slf4j-reload4j + + + + + + + + org.junit.jupiter + junit-jupiter + ${junit5.version} + test + + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + runtime + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${target.java.version} + ${target.java.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + log4j:* + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.amazonaws.services.msf.StreamingJob + + + + + + + + + \ No newline at end of file diff --git a/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/StreamingJob.java b/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/StreamingJob.java new file mode 100644 index 0000000..bbeffec --- /dev/null +++ b/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/StreamingJob.java @@ -0,0 +1,109 @@ +package com.amazonaws.services.msf; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.Preconditions; + +import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; +import com.amazonaws.services.msf.avro.AvroSchemaUtils; +import com.amazonaws.services.msf.datagen.AvroGenericStockTradeGeneratorFunction; +import com.amazonaws.services.msf.iceberg.IcebergSinkBuilder; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.iceberg.flink.sink.FlinkSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + + +public class StreamingJob { + private final static Logger LOG = LoggerFactory.getLogger(StreamingJob.class); + private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; + + private static boolean isLocal(StreamExecutionEnvironment env) { + return env instanceof LocalStreamEnvironment; + } + + /** + * Load application properties from Amazon Managed Service for Apache Flink runtime or from a local resource, when the environment is local + */ + private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { + if (isLocal(env)) { + LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); + return KinesisAnalyticsRuntime.getApplicationProperties( + Objects.requireNonNull(StreamingJob.class.getClassLoader() + .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE)).getPath()); + } else { + LOG.info("Loading application properties from Amazon Managed Service for Apache Flink"); + return KinesisAnalyticsRuntime.getApplicationProperties(); + } + } + + + // Data Generator source generating random trades as AVRO GenericRecords + private static DataGeneratorSource createDataGenerator(Properties generatorProperties, Schema avroSchema) { + double recordsPerSecond = Double.parseDouble(generatorProperties.getProperty("records.per.sec", "10.0")); + Preconditions.checkArgument(recordsPerSecond > 0, "Generator records per sec must be > 0"); + + LOG.info("Data generator: {} record/sec", recordsPerSecond); + return new DataGeneratorSource<>( + new AvroGenericStockTradeGeneratorFunction(avroSchema), + Long.MAX_VALUE, + RateLimiterStrategy.perSecond(recordsPerSecond), + new GenericRecordAvroTypeInfo(avroSchema) + ); + } + + + public static void main(String[] args) throws Exception { + // Set up the streaming execution environment + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + Map applicationProperties = loadApplicationProperties(env); + + + // Get AVRO Schema from the definition bundled with the application + // Note that the application must "knows" the AVRO schema upfront, i.e. the schema must be either embedded + // with the application or fetched at start time. + // If the schema of the records received by the source changes, all changes must be FORWARD compatible. + // This way, the application will be able to write the data with the new schema into the old schema, but schema + // changes are not propagated to the Iceberg table. + Schema avroSchema = AvroSchemaUtils.loadSchema(); + + // Create Generic Record TypeInfo from schema. + GenericRecordAvroTypeInfo avroTypeInfo = new GenericRecordAvroTypeInfo(avroSchema); + + // Local dev specific settings + if (isLocal(env)) { + // Checkpointing and parallelism are set by Amazon Managed Service for Apache Flink when running on AWS + env.enableCheckpointing(10000); + env.setParallelism(2); + } + + // Data Generator Source. + // Simulates an external source that receives AVRO Generic Records + Properties dataGeneratorProperties = applicationProperties.get("DataGen"); + DataStream genericRecordDataStream = env.fromSource( + createDataGenerator(dataGeneratorProperties, avroSchema), + WatermarkStrategy.noWatermarks(), + "DataGen"); + + // Flink Sink Builder + Properties icebergProperties = applicationProperties.get("Iceberg"); + FlinkSink.Builder icebergSinkBuilder = IcebergSinkBuilder.createBuilder( + icebergProperties, + genericRecordDataStream, avroSchema); + // Sink to Iceberg Table + icebergSinkBuilder.append(); + + env.execute("Flink DataStream Sink"); + } +} \ No newline at end of file diff --git a/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/avro/AvroSchemaUtils.java b/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/avro/AvroSchemaUtils.java new file mode 100644 index 0000000..de34c5e --- /dev/null +++ b/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/avro/AvroSchemaUtils.java @@ -0,0 +1,22 @@ +package com.amazonaws.services.msf.avro; + +import com.amazonaws.services.msf.StreamingJob; +import org.apache.avro.Schema; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; + +public class AvroSchemaUtils implements Serializable { + + private static final String AVRO_SCHEMA_RESOURCE = "price.avsc"; + + /** + * Load the AVRO Schema from the resources folder + */ + public static Schema loadSchema() throws IOException { + // Get AVRO Schema from the definition bundled with the application + InputStream inputStream = StreamingJob.class.getClassLoader().getResourceAsStream(AVRO_SCHEMA_RESOURCE); + return new org.apache.avro.Schema.Parser().parse(inputStream); + } +} diff --git a/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunction.java b/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunction.java new file mode 100644 index 0000000..5b1cf34 --- /dev/null +++ b/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunction.java @@ -0,0 +1,46 @@ +package com.amazonaws.services.msf.datagen; + +import org.apache.flink.connector.datagen.source.GeneratorFunction; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.RandomUtils; + +import java.time.Instant; + +/** + * Function used by DataGen source to generate random records as AVRO GenericRecord. + *

+ * The generator assumes that the AVRO schema provided contains the generated fields + */ +public class AvroGenericStockTradeGeneratorFunction implements GeneratorFunction { + + private static final String[] SYMBOLS = {"AAPL", "AMZN", "MSFT", "INTC", "TBV"}; + + private Schema avroSchema; + + public AvroGenericStockTradeGeneratorFunction(Schema avroSchema) { + this.avroSchema = avroSchema; + } + + /** + * Generates a random trade + */ + @Override + public GenericRecord map(Long value) { + GenericData.Record record = new GenericData.Record(avroSchema); + + String symbol = SYMBOLS[RandomUtils.nextInt(0, SYMBOLS.length)]; + float price = RandomUtils.nextFloat(0, 10); + int volumes = RandomUtils.nextInt(0, 1000000); + String timestamp = Instant.now().toString(); + + record.put("symbol", symbol); + record.put("price", price); + record.put("volumes", volumes); + record.put("timestamp", timestamp); + + return record; + } +} diff --git a/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java b/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java new file mode 100644 index 0000000..8dbbcdc --- /dev/null +++ b/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java @@ -0,0 +1,154 @@ +package com.amazonaws.services.msf.iceberg; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.generic.GenericRecord; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.NullOrder; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.sink.AvroGenericRecordToRowDataMapper; +import org.apache.iceberg.flink.sink.FlinkSink; +import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * Wraps the code to initialize an Iceberg sink that uses Glue Data Catalog as catalog + */ +public class IcebergSinkBuilder { + private static final String DEFAULT_GLUE_DB = "default"; + private static final String DEFAULT_ICEBERG_TABLE_NAME = "prices_iceberg"; + private static final String DEFAULT_ICEBERG_SORT_ORDER_FIELD = "accountNr"; + private static final String DEFAULT_ICEBERG_PARTITION_FIELDS = "symbol"; + private static final String DEFAULT_ICEBERG_OPERATION = "upsert"; + private static final String DEFAULT_ICEBERG_UPSERT_FIELDS = "symbol"; + + + /** + * If Iceberg Table has not been previously created, we will create it using the Partition Fields specified in the + * Properties, as well as add a Sort Field to improve query performance + */ + private static void createTable(Catalog catalog, TableIdentifier outputTable, org.apache.iceberg.Schema icebergSchema, PartitionSpec partitionSpec, String sortField) { + // If table has been previously created, we do not do any operation or modification + if (!catalog.tableExists(outputTable)) { + Table icebergTable = catalog.createTable(outputTable, icebergSchema, partitionSpec); + // Modifying newly created iceberg table to have a sort field + icebergTable.replaceSortOrder() + .asc(sortField, NullOrder.NULLS_LAST) + .commit(); + // The catalog.create table creates an Iceberg V1 table. If we want to perform upserts, we need to upgrade the table version to 2. + TableOperations tableOperations = ((BaseTable) icebergTable).operations(); + TableMetadata appendTableMetadata = tableOperations.current(); + tableOperations.commit(appendTableMetadata, appendTableMetadata.upgradeToFormatVersion(2)); + } + } + + /** + * Generate the PartitionSpec, if when creating table, you want it to be partitioned. + * If you are doing Upserts in your Iceberg Table, your Equality Fields must be the same as the fields used for Partitioning. + */ + private static PartitionSpec getPartitionSpec(org.apache.iceberg.Schema icebergSchema, List partitionFieldsList) { + PartitionSpec.Builder partitionBuilder = PartitionSpec.builderFor(icebergSchema); + for (String s : partitionFieldsList) { + partitionBuilder = partitionBuilder.identity(s); + } + return partitionBuilder.build(); + } + + // Iceberg Flink Sink Builder + public static FlinkSink.Builder createBuilder(Properties icebergProperties, DataStream dataStream, org.apache.avro.Schema avroSchema) { + // Retrieve configuration from application parameters + String s3BucketPrefix = Preconditions.checkNotNull(icebergProperties.getProperty("bucket.prefix"), "Iceberg S3 bucket prefix not defined"); + + String glueDatabase = icebergProperties.getProperty("catalog.db", DEFAULT_GLUE_DB); + String glueTable = icebergProperties.getProperty("catalog.table", DEFAULT_ICEBERG_TABLE_NAME); + + String partitionFields = icebergProperties.getProperty("partition.fields", DEFAULT_ICEBERG_PARTITION_FIELDS); + List partitionFieldList = Arrays.asList(partitionFields.split("\\s*,\\s*")); + + String sortField = icebergProperties.getProperty("sort.field", DEFAULT_ICEBERG_SORT_ORDER_FIELD); + + // Iceberg you can perform Appends, Upserts and Overwrites. + String icebergOperation = icebergProperties.getProperty("operation", DEFAULT_ICEBERG_OPERATION); + Preconditions.checkArgument(icebergOperation.equals("append") || icebergOperation.equals("upsert") || icebergOperation.equals("overwrite"), "Invalid Iceberg Operation"); + + // If operation is upsert, we need to specify the fields that will be used for equality in the upsert operation + // If the table is partitioned, we must include the partition fields + // This is a comma-separated list of fields + String upsertEqualityFields = icebergProperties.getProperty("upsert.equality.fields", DEFAULT_ICEBERG_UPSERT_FIELDS); + List equalityFieldsList = Arrays.asList(upsertEqualityFields.split("[, ]+")); + + + // Convert Avro Schema to Iceberg Schema, this will be used for creating the table + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + // Avro Generic Record to Row Data Mapper + MapFunction avroGenericRecordToRowDataMapper = AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema); + + + // Catalog properties for using Glue Data Catalog + Map catalogProperties = new HashMap<>(); + catalogProperties.put("type", "iceberg"); + catalogProperties.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); + catalogProperties.put("warehouse", s3BucketPrefix); + + // Load Glue Data Catalog + CatalogLoader glueCatalogLoader = CatalogLoader.custom( + "glue", + catalogProperties, + new org.apache.hadoop.conf.Configuration(), + "org.apache.iceberg.aws.glue.GlueCatalog"); + // Table Object that represents the table in the Glue Data Catalog + TableIdentifier outputTable = TableIdentifier.of(glueDatabase, glueTable); + // Load created Iceberg Catalog to perform table operations + Catalog catalog = glueCatalogLoader.loadCatalog(); + + + // Based on how many fields we want to partition, we create the Partition Spec + PartitionSpec partitionSpec = getPartitionSpec(icebergSchema, partitionFieldList); + // We create the Iceberg Table, using the Iceberg Catalog, Table Identifier, Schema parsed in Iceberg Schema Format and the partition spec + createTable(catalog, outputTable, icebergSchema, partitionSpec, sortField); + // Once the table has been created in the job or before, we load it + TableLoader tableLoader = TableLoader.fromCatalog(glueCatalogLoader, outputTable); + // Get RowType Schema from Iceberg Schema + RowType rowType = FlinkSchemaUtil.convert(icebergSchema); + + // Iceberg DataStream sink builder + FlinkSink.Builder flinkSinkBuilder = FlinkSink.builderFor( + dataStream, + avroGenericRecordToRowDataMapper, + FlinkCompatibilityUtil.toTypeInfo(rowType)) + .tableLoader(tableLoader); + + // Returns the builder for the selected operation + switch (icebergOperation) { + case "upsert": + // If operation is "upsert" we need to set up the equality fields + return flinkSinkBuilder + .equalityFieldColumns(equalityFieldsList) + .upsert(true); + case "overwrite": + return flinkSinkBuilder + .overwrite(true); + default: + return flinkSinkBuilder; + } + } + +} diff --git a/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSourceBuilder.java b/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSourceBuilder.java new file mode 100644 index 0000000..005e729 --- /dev/null +++ b/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSourceBuilder.java @@ -0,0 +1,4 @@ +package com.amazonaws.services.msf.iceberg; + +public class IcebergSourceBuilder { +} diff --git a/java/IcebergDatastreamSink/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/IcebergDataStreamSource/src/main/resources/flink-application-properties-dev.json similarity index 100% rename from java/IcebergDatastreamSink/src/main/resources/flink-application-properties-dev.json rename to java/Iceberg/IcebergDataStreamSource/src/main/resources/flink-application-properties-dev.json diff --git a/java/Iceberg/IcebergDataStreamSource/src/main/resources/log4j2.properties b/java/Iceberg/IcebergDataStreamSource/src/main/resources/log4j2.properties new file mode 100644 index 0000000..c7d36aa --- /dev/null +++ b/java/Iceberg/IcebergDataStreamSource/src/main/resources/log4j2.properties @@ -0,0 +1,7 @@ +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n + +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender \ No newline at end of file diff --git a/java/Iceberg/IcebergDataStreamSource/src/main/resources/price.avsc b/java/Iceberg/IcebergDataStreamSource/src/main/resources/price.avsc new file mode 100644 index 0000000..4603eb2 --- /dev/null +++ b/java/Iceberg/IcebergDataStreamSource/src/main/resources/price.avsc @@ -0,0 +1,23 @@ +{ + "type": "record", + "name": "Price", + "namespace": "com.amazonaws.services.msf.avro", + "fields": [ + { + "name": "timestamp", + "type": "string" + }, + { + "name": "symbol", + "type": "string" + }, + { + "name": "price", + "type": "float" + }, + { + "name": "volumes", + "type": "int" + } + ] +} \ No newline at end of file diff --git a/java/Iceberg/IcebergDataStreamSource/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java b/java/Iceberg/IcebergDataStreamSource/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java new file mode 100644 index 0000000..5593ca1 --- /dev/null +++ b/java/Iceberg/IcebergDataStreamSource/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java @@ -0,0 +1,26 @@ +package com.amazonaws.services.msf.datagen; + +import com.amazonaws.services.msf.avro.AvroSchemaUtils; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertInstanceOf; + +class AvroGenericStockTradeGeneratorFunctionTest { + + @Test + void generateRecord() throws Exception { + Schema avroSchema = AvroSchemaUtils.loadSchema(); + AvroGenericStockTradeGeneratorFunction generatorFunction = new AvroGenericStockTradeGeneratorFunction(avroSchema); + + GenericRecord record = generatorFunction.map(42L); + + + assertInstanceOf(String.class, record.get("timestamp")); + assertInstanceOf(String.class, record.get("symbol")); + assertInstanceOf(Float.class, record.get("price")); + assertInstanceOf(Integer.class, record.get("volumes")); + } + +} \ No newline at end of file From b700f8f67e01a6c0f53345afb62b36a61697a02d Mon Sep 17 00:00:00 2001 From: Jeremy Ber Date: Mon, 6 Jan 2025 09:13:45 -0600 Subject: [PATCH 02/13] iceberg with s3 tables --- java/Iceberg/IcebergDataStreamSource/pom.xml | 11 ++ .../amazonaws/services/msf/StreamingJob.java | 117 ++++++++++++------ .../msf/iceberg/IcebergSourceBuilder.java | 4 - .../flink-application-properties-dev.json | 6 +- .../src/main/resources/log4j2.properties | 2 +- 5 files changed, 92 insertions(+), 48 deletions(-) diff --git a/java/Iceberg/IcebergDataStreamSource/pom.xml b/java/Iceberg/IcebergDataStreamSource/pom.xml index 0a1b7e6..f9195cc 100644 --- a/java/Iceberg/IcebergDataStreamSource/pom.xml +++ b/java/Iceberg/IcebergDataStreamSource/pom.xml @@ -107,6 +107,17 @@ ${iceberg.version} + + + software.amazon.awssdk + s3tables + 2.29.26 + + + software.amazon.s3tables + s3-tables-catalog-for-iceberg + 0.1.3 + org.apache.hadoop hadoop-client diff --git a/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/StreamingJob.java b/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/StreamingJob.java index bbeffec..de014db 100644 --- a/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/StreamingJob.java +++ b/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/StreamingJob.java @@ -1,25 +1,34 @@ package com.amazonaws.services.msf; +import org.apache.avro.generic.GenericRecord; import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; -import org.apache.flink.connector.datagen.source.DataGeneratorSource; import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.RowData; import org.apache.flink.util.Preconditions; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import com.amazonaws.services.msf.avro.AvroSchemaUtils; -import com.amazonaws.services.msf.datagen.AvroGenericStockTradeGeneratorFunction; -import com.amazonaws.services.msf.iceberg.IcebergSinkBuilder; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.iceberg.flink.sink.FlinkSink; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.source.FlinkSource; +import org.apache.iceberg.flink.source.IcebergSource; +import org.apache.iceberg.flink.source.StreamingStartingStrategy; +import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory; +import org.apache.iceberg.flink.source.reader.AvroGenericRecordReaderFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Properties; @@ -48,22 +57,6 @@ private static Map loadApplicationProperties(StreamExecution } } - - // Data Generator source generating random trades as AVRO GenericRecords - private static DataGeneratorSource createDataGenerator(Properties generatorProperties, Schema avroSchema) { - double recordsPerSecond = Double.parseDouble(generatorProperties.getProperty("records.per.sec", "10.0")); - Preconditions.checkArgument(recordsPerSecond > 0, "Generator records per sec must be > 0"); - - LOG.info("Data generator: {} record/sec", recordsPerSecond); - return new DataGeneratorSource<>( - new AvroGenericStockTradeGeneratorFunction(avroSchema), - Long.MAX_VALUE, - RateLimiterStrategy.perSecond(recordsPerSecond), - new GenericRecordAvroTypeInfo(avroSchema) - ); - } - - public static void main(String[] args) throws Exception { // Set up the streaming execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -78,32 +71,76 @@ public static void main(String[] args) throws Exception { // changes are not propagated to the Iceberg table. Schema avroSchema = AvroSchemaUtils.loadSchema(); - // Create Generic Record TypeInfo from schema. - GenericRecordAvroTypeInfo avroTypeInfo = new GenericRecordAvroTypeInfo(avroSchema); + // Local dev specific settings if (isLocal(env)) { + org.apache.flink.configuration.Configuration conf = new org.apache.flink.configuration.Configuration(); + env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); + env.disableOperatorChaining(); + // Checkpointing and parallelism are set by Amazon Managed Service for Apache Flink when running on AWS - env.enableCheckpointing(10000); + env.enableCheckpointing(60000); env.setParallelism(2); - } - // Data Generator Source. - // Simulates an external source that receives AVRO Generic Records - Properties dataGeneratorProperties = applicationProperties.get("DataGen"); - DataStream genericRecordDataStream = env.fromSource( - createDataGenerator(dataGeneratorProperties, avroSchema), - WatermarkStrategy.noWatermarks(), - "DataGen"); - // Flink Sink Builder + } + Properties icebergProperties = applicationProperties.get("Iceberg"); - FlinkSink.Builder icebergSinkBuilder = IcebergSinkBuilder.createBuilder( - icebergProperties, - genericRecordDataStream, avroSchema); - // Sink to Iceberg Table - icebergSinkBuilder.append(); - env.execute("Flink DataStream Sink"); + // TODO: Call Iceberg Data Source + // Creates an Iceberg Data Source + + String s3BucketPrefix = Preconditions.checkNotNull(icebergProperties.getProperty("bucket.prefix"), "Iceberg S3 bucket prefix not defined"); + + + // Catalog properties for using Glue Data Catalog + Map catalogProperties = new HashMap<>(); + catalogProperties.put("type", "iceberg"); + catalogProperties.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); + catalogProperties.put("warehouse", s3BucketPrefix); + + CatalogLoader glueCatalogLoader = + CatalogLoader.custom( + "glue", + catalogProperties, + new Configuration(), + "org.apache.iceberg.aws.glue.GlueCatalog"); + + String DEFAULT_GLUE_DB = "iceberg"; + String DEFAULT_ICEBERG_TABLE_NAME = "prices_iceberg"; + + String glueDatabase = icebergProperties.getProperty("catalog.db", DEFAULT_GLUE_DB); + String glueTable = icebergProperties.getProperty("catalog.table", DEFAULT_ICEBERG_TABLE_NAME); + TableIdentifier inputTable = TableIdentifier.of(glueDatabase, glueTable); + + TableLoader tableLoader = TableLoader.fromCatalog(glueCatalogLoader, inputTable); + Table table; + try(TableLoader loader = tableLoader) { + loader.open(); + table = loader.loadTable(); + } + + AvroGenericRecordReaderFunction readerFunction = AvroGenericRecordReaderFunction.fromTable(table); + + + + IcebergSource source = + IcebergSource.builder() + .tableLoader(tableLoader) + .readerFunction(readerFunction) + .assignerFactory(new SimpleSplitAssignerFactory()) + .monitorInterval(Duration.ofSeconds(60)) + .streaming(true) + .build(); + + + DataStreamSource stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), + "Iceberg Source as Avro GenericRecord", new GenericRecordAvroTypeInfo(avroSchema)); + + + stream.print(); + + env.execute("Flink DataStream Source"); } } \ No newline at end of file diff --git a/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSourceBuilder.java b/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSourceBuilder.java index 005e729..e69de29 100644 --- a/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSourceBuilder.java +++ b/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSourceBuilder.java @@ -1,4 +0,0 @@ -package com.amazonaws.services.msf.iceberg; - -public class IcebergSourceBuilder { -} diff --git a/java/Iceberg/IcebergDataStreamSource/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/IcebergDataStreamSource/src/main/resources/flink-application-properties-dev.json index 2bd46f0..6feeeef 100644 --- a/java/Iceberg/IcebergDataStreamSource/src/main/resources/flink-application-properties-dev.json +++ b/java/Iceberg/IcebergDataStreamSource/src/main/resources/flink-application-properties-dev.json @@ -8,12 +8,12 @@ { "PropertyGroupId": "Iceberg", "PropertyMap": { - "bucket.prefix": "s3:///iceberg", - "catalog.db": "default", + "bucket.prefix": "s3://my-iceberg-bucket-jeremy/iceberg-example", + "catalog.db": "iceberg", "catalog.table": "prices_iceberg", "partition.fields": "symbol", "sort.field": "timestamp", - "operation": "upsert", + "operation": "append", "upsert.equality.fields": "symbol" } } diff --git a/java/Iceberg/IcebergDataStreamSource/src/main/resources/log4j2.properties b/java/Iceberg/IcebergDataStreamSource/src/main/resources/log4j2.properties index c7d36aa..3c0515a 100644 --- a/java/Iceberg/IcebergDataStreamSource/src/main/resources/log4j2.properties +++ b/java/Iceberg/IcebergDataStreamSource/src/main/resources/log4j2.properties @@ -3,5 +3,5 @@ appender.console.type = CONSOLE appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n -rootLogger.level = INFO +rootLogger.level = WARN rootLogger.appenderRef.console.ref = ConsoleAppender \ No newline at end of file From f9404180e52dd04425b21dd7a8d8f7cd19eeb5f8 Mon Sep 17 00:00:00 2001 From: Jeremy Ber Date: Fri, 10 Jan 2025 13:35:04 -0600 Subject: [PATCH 03/13] s3 table sink close to working --- .gitignore | 1 + java/Iceberg/S3TableSink/README.md | 89 +++++++ java/Iceberg/S3TableSink/pom.xml | 247 ++++++++++++++++++ .../amazonaws/services/msf/StreamingJob.java | 132 ++++++++++ .../services/msf/avro/AvroSchemaUtils.java | 22 ++ ...vroGenericStockTradeGeneratorFunction.java | 46 ++++ .../msf/iceberg/IcebergSinkBuilder.java | 139 ++++++++++ .../src/main/resources/log4j2.properties | 7 + .../S3TableSink/src/main/resources/price.avsc | 23 ++ ...enericStockTradeGeneratorFunctionTest.java | 26 ++ 10 files changed, 732 insertions(+) create mode 100644 java/Iceberg/S3TableSink/README.md create mode 100644 java/Iceberg/S3TableSink/pom.xml create mode 100644 java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java create mode 100644 java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/avro/AvroSchemaUtils.java create mode 100644 java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunction.java create mode 100644 java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java create mode 100644 java/Iceberg/S3TableSink/src/main/resources/log4j2.properties create mode 100644 java/Iceberg/S3TableSink/src/main/resources/price.avsc create mode 100644 java/Iceberg/S3TableSink/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java diff --git a/.gitignore b/.gitignore index d97a443..d3c2508 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ venv/ .java-version /pyflink/ /.run/ +*-dev.json diff --git a/java/Iceberg/S3TableSink/README.md b/java/Iceberg/S3TableSink/README.md new file mode 100644 index 0000000..9cd23e8 --- /dev/null +++ b/java/Iceberg/S3TableSink/README.md @@ -0,0 +1,89 @@ +# Flink Iceberg Sink using DataStream API + +* Flink version: 1.19.0 +* Flink API: DataStream API +* Iceberg 1.6.1 +* Language: Java (11) +* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/) + and [Iceberg](https://iceberg.apache.org/docs/latest/flink/) + +This example demonstrate how to use +[Flink Iceberg Sink Connector](https://iceberg.apache.org/docs/latest/flink-writes/) with S3 Tables. + +For simplicity, the application generates synthetic data, random stock prices, internally. +Data is generated as AVRO Generic Record, simulating a real source, for example a Kafka Source, that receives records +serialized with AVRO. + +### Prerequisites + +### Create a Table Bucket +The sample application expects the S3 Table Bucket to exist and to have the ARN in the local environment: +```bash +aws s3tables create-table-bucket --name flink-example +{ + "arn": "arn:aws:s3tables:us-east-1:111122223333:bucket/flink-example" + +} +``` + + +#### IAM Permissions + +The application must have IAM permissions to: +* Write and Read from the S3 Table + +### Runtime configuration + +When running on Amazon Managed Service for Apache Flink the runtime configuration is read from Runtime Properties. + +When running locally, the configuration is read from the +[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file. + +Runtime parameters: + +| Group ID | Key | Default | Description | +|-----------|--------------------------|-------------------|---------------------------------------------------------------------------------------------------------------------| +| `DataGen` | `records.per.sec` | `100.0` | Records per second generated. | +| `Iceberg` | `table.bucket.arn` | (mandatory) | ARN of the S3 bucket, e.g., `arn:aws:s3tables:region:account-id:bucket/bucket-name` | +| `Iceberg` | `catalog.db` | `test_from_flink` | Name of the S3 table database. | +| `Iceberg` | `catalog.table` | `test_table` | Name of the S3 table. | +| `Iceberg` | `partition.fields` | `symbol` | Comma separated list of partition fields. | +| `Iceberg` | `sort.field` | `timestamp` | Sort field. | +| `Iceberg` | `operation` | `upsert` | Iceberg operation. One of `upsert`, `append` or `overwrite`. | +| `Iceberg` | `upsert.equality.fields` | `symbol` | Comma separated list of fields used for upsert. It must match partition fields. Required if `operation` = `upsert`. | + +### Checkpoints + +Checkpointing must be enabled. Iceberg commits writes on checkpoint. + +When running locally, the application enables checkpoints programmatically, every 10 seconds. +When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration. + + +### Known limitations + +At the moment there are current limitations concerning Flink Iceberg integration: +* Doesn't support Iceberg Table with hidden partitioning +* Doesn't support adding columns, removing columns, renaming columns or changing columns. + +### Schema and schema evolution + +The application must "know" the AVRO schema on start. +The schema cannot be dynamically inferred based on the incoming records, for example using a schema registry. +This is due to a limitation of the Flink Iceberg integration, that requires knowing the table schema upfront. + +This implementation does support schema evolution in the incoming data, as long as new schema versions are FORWARD compatible. +Schema changes are not propagated to Iceberg. +As long as the schema of incoming records is FORWARD compatible, the application deserializes incoming records using +the schema it knows. Any new field in the incoming record is discarded. + +In this example, the schema is loaded from a schema definition file, [price.avsc](./src/main/resources/price.avsc) embedded +with the application. +It is technically possible to fetch the schema on application start from an external source, like a schema registry or a +schema definition file in an S3 bucket. This is beyond the scope of this example. + +### Running locally, in IntelliJ + +You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation. + +See [Running examples locally](https://github.com/nicusX/amazon-managed-service-for-apache-flink-examples/blob/main/java/running-examples-locally.md) for details. diff --git a/java/Iceberg/S3TableSink/pom.xml b/java/Iceberg/S3TableSink/pom.xml new file mode 100644 index 0000000..f43c091 --- /dev/null +++ b/java/Iceberg/S3TableSink/pom.xml @@ -0,0 +1,247 @@ + + + 4.0.0 + + com.amazonaws + s3-table-flink + 1.0 + jar + + + UTF-8 + 11 + ${target.java.version} + ${target.java.version} + + 1.19.0 + 1.11.3 + 2.12 + 3.4.0 + 1.6.1 + 1.2.0 + 2.23.1 + 5.8.1 + + + + + + org.apache.flink + flink-runtime-web + ${flink.version} + provided + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-table-runtime + ${flink.version} + provided + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + + + org.apache.flink + flink-table-common + ${flink.version} + + + org.apache.flink + flink-metrics-dropwizard + ${flink.version} + + + org.apache.flink + flink-avro + ${flink.version} + + + + + + org.apache.flink + flink-table-planner_${scala.version} + ${flink.version} + provided + + + + + + com.amazonaws + aws-kinesisanalytics-runtime + ${kda.runtime.version} + provided + + + + + software.amazon.awssdk + s3tables + 2.29.26 + + + software.amazon.s3tables + s3-tables-catalog-for-iceberg + 0.1.3 + + + org.apache.flink + flink-connector-files + ${flink.version} + provided + + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + org.apache.avro + avro + + + org.slf4j + slf4j-reload4j + + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + + + + + org.apache.iceberg + iceberg-core + ${iceberg.version} + + + org.apache.iceberg + iceberg-flink + ${iceberg.version} + + + org.apache.iceberg + iceberg-flink + ${iceberg.version} + + + org.apache.iceberg + iceberg-aws-bundle + ${iceberg.version} + + + org.apache.iceberg + iceberg-aws + ${iceberg.version} + + + + + org.junit.jupiter + junit-jupiter + ${junit5.version} + test + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + runtime + + + org.apache.iceberg + iceberg-flink-1.19 + 1.7.0 + compile + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${target.java.version} + ${target.java.version} + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + log4j:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.amazonaws.services.msf.StreamingJob + + + + + + + + + diff --git a/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java new file mode 100644 index 0000000..f0835e7 --- /dev/null +++ b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java @@ -0,0 +1,132 @@ +package com.amazonaws.services.msf; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.util.Preconditions; + +import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; +import com.amazonaws.services.msf.avro.AvroSchemaUtils; +import com.amazonaws.services.msf.datagen.AvroGenericStockTradeGeneratorFunction; +import com.amazonaws.services.msf.iceberg.IcebergSinkBuilder; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.iceberg.flink.sink.FlinkSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + +public class StreamingJob { + private static final Logger LOG = LoggerFactory.getLogger(StreamingJob.class); + private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; + + private static boolean isLocal(StreamExecutionEnvironment env) { + return env instanceof LocalStreamEnvironment; + } + + /** + * Load application properties from Amazon Managed Service for Apache Flink runtime or from a local resource, when the environment is local + */ + private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { + if (isLocal(env)) { + LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); + return KinesisAnalyticsRuntime.getApplicationProperties( + Objects.requireNonNull(StreamingJob.class.getClassLoader() + .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE)).getPath()); + } else { + LOG.info("Loading application properties from Amazon Managed Service for Apache Flink"); + return KinesisAnalyticsRuntime.getApplicationProperties(); + } + } + + // Data Generator source generating random trades as AVRO GenericRecords + private static DataGeneratorSource createDataGenerator(Properties generatorProperties, Schema avroSchema) { + double recordsPerSecond = Double.parseDouble(generatorProperties.getProperty("records.per.sec", "10.0")); + Preconditions.checkArgument(recordsPerSecond > 0, "Generator records per sec must be > 0"); + + LOG.info("Data generator: {} record/sec", recordsPerSecond); + return new DataGeneratorSource<>( + new AvroGenericStockTradeGeneratorFunction(avroSchema), + Long.MAX_VALUE, + RateLimiterStrategy.perSecond(recordsPerSecond), + new GenericRecordAvroTypeInfo(avroSchema) + ); + } + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); + StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + + // Local dev specific settings + if (isLocal(env)) { + // Checkpointing and parallelism are set by Amazon Managed Service for Apache Flink when running on AWS + env.enableCheckpointing(30000); + env.setParallelism(2); + } + + Map applicationProperties = loadApplicationProperties(env); + Properties icebergProperties = applicationProperties.get("Iceberg"); + + Catalog s3 = createCatalog(tableEnv, createIcebergConfiguration(icebergProperties)); + s3.createDatabase("test_from_flink", new CatalogDatabaseImpl(Map.of(), "Sample Database"), true); + + // Get AVRO Schema from the definition bundled with the application + // Note that the application must "knows" the AVRO schema upfront, i.e. the schema must be either embedded + // with the application or fetched at start time. + // If the schema of the records received by the source changes, all changes must be FORWARD compatible. + // This way, the application will be able to write the data with the new schema into the old schema, but schema + // changes are not propagated to the Iceberg table. + Schema avroSchema = AvroSchemaUtils.loadSchema(); + + // Create Generic Record TypeInfo from schema. + new GenericRecordAvroTypeInfo(avroSchema); + + // Data Generator Source. + // Simulates an external source that receives AVRO Generic Records + DataStream genericRecordDataStream = createDataStream(env, applicationProperties, avroSchema); + + // Flink Sink Builder + FlinkSink.Builder icebergSinkBuilder = IcebergSinkBuilder.createBuilder(icebergProperties, genericRecordDataStream, avroSchema); + // Sink to Iceberg Table + icebergSinkBuilder.append(); + + env.execute("Flink S3 Table Sink"); + } + + private static Catalog createCatalog(StreamTableEnvironment tableEnv, Configuration conf) { + final String catalogName = "s3"; + CatalogDescriptor descriptor = CatalogDescriptor.of(catalogName, conf); + tableEnv.createCatalog(catalogName, descriptor); + return tableEnv.getCatalog(catalogName).get(); + } + + private static Configuration createIcebergConfiguration(Properties icebergProperties) { + Configuration conf = new Configuration(); + conf.setString("warehouse", icebergProperties.getProperty("table.bucket.arn")); + conf.setString("catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog"); + conf.setString("type", "iceberg"); + conf.setString("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); + return conf; + } + + private static DataStream createDataStream(StreamExecutionEnvironment env, Map applicationProperties, Schema avroSchema) { + Properties dataGeneratorProperties = applicationProperties.get("DataGen"); + return env.fromSource( + createDataGenerator(dataGeneratorProperties, avroSchema), + WatermarkStrategy.noWatermarks(), + "DataGen"); + } +} diff --git a/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/avro/AvroSchemaUtils.java b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/avro/AvroSchemaUtils.java new file mode 100644 index 0000000..de34c5e --- /dev/null +++ b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/avro/AvroSchemaUtils.java @@ -0,0 +1,22 @@ +package com.amazonaws.services.msf.avro; + +import com.amazonaws.services.msf.StreamingJob; +import org.apache.avro.Schema; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; + +public class AvroSchemaUtils implements Serializable { + + private static final String AVRO_SCHEMA_RESOURCE = "price.avsc"; + + /** + * Load the AVRO Schema from the resources folder + */ + public static Schema loadSchema() throws IOException { + // Get AVRO Schema from the definition bundled with the application + InputStream inputStream = StreamingJob.class.getClassLoader().getResourceAsStream(AVRO_SCHEMA_RESOURCE); + return new org.apache.avro.Schema.Parser().parse(inputStream); + } +} diff --git a/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunction.java b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunction.java new file mode 100644 index 0000000..5b1cf34 --- /dev/null +++ b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunction.java @@ -0,0 +1,46 @@ +package com.amazonaws.services.msf.datagen; + +import org.apache.flink.connector.datagen.source.GeneratorFunction; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.RandomUtils; + +import java.time.Instant; + +/** + * Function used by DataGen source to generate random records as AVRO GenericRecord. + *

+ * The generator assumes that the AVRO schema provided contains the generated fields + */ +public class AvroGenericStockTradeGeneratorFunction implements GeneratorFunction { + + private static final String[] SYMBOLS = {"AAPL", "AMZN", "MSFT", "INTC", "TBV"}; + + private Schema avroSchema; + + public AvroGenericStockTradeGeneratorFunction(Schema avroSchema) { + this.avroSchema = avroSchema; + } + + /** + * Generates a random trade + */ + @Override + public GenericRecord map(Long value) { + GenericData.Record record = new GenericData.Record(avroSchema); + + String symbol = SYMBOLS[RandomUtils.nextInt(0, SYMBOLS.length)]; + float price = RandomUtils.nextFloat(0, 10); + int volumes = RandomUtils.nextInt(0, 1000000); + String timestamp = Instant.now().toString(); + + record.put("symbol", symbol); + record.put("price", price); + record.put("volumes", volumes); + record.put("timestamp", timestamp); + + return record; + } +} diff --git a/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java new file mode 100644 index 0000000..abf2a5f --- /dev/null +++ b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java @@ -0,0 +1,139 @@ +package com.amazonaws.services.msf.iceberg; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.generic.GenericRecord; +import org.apache.iceberg.*; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.*; +import org.apache.iceberg.flink.sink.AvroGenericRecordToRowDataMapper; +import org.apache.iceberg.flink.sink.FlinkSink; +import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; + +import java.util.*; + +/** + * Wraps the code to initialize an Iceberg sink that uses Glue Data Catalog as catalog + */ +public class IcebergSinkBuilder { + private static final String DEFAULT_GLUE_DB = "default"; + private static final String DEFAULT_ICEBERG_TABLE_NAME = "prices_iceberg"; + private static final String DEFAULT_ICEBERG_SORT_ORDER_FIELD = "accountNr"; + private static final String DEFAULT_ICEBERG_PARTITION_FIELDS = "symbol"; + private static final String DEFAULT_ICEBERG_OPERATION = "upsert"; + private static final String DEFAULT_ICEBERG_UPSERT_FIELDS = "symbol"; + + /** + * If Iceberg Table has not been previously created, we will create it using the Partition Fields specified in the + * Properties, as well as add a Sort Field to improve query performance + */ + private static void createTable(Catalog catalog, TableIdentifier outputTable, org.apache.iceberg.Schema icebergSchema, PartitionSpec partitionSpec, String sortField) { + // If table has been previously created, we do not do any operation or modification + if (!catalog.tableExists(outputTable)) { + Table icebergTable = catalog.createTable(outputTable, icebergSchema, partitionSpec); + // Modifying newly created iceberg table to have a sort field + icebergTable.replaceSortOrder() + .asc(sortField, NullOrder.NULLS_LAST) + .commit(); + // The catalog.create table creates an Iceberg V1 table. If we want to perform upserts, we need to upgrade the table version to 2. + TableOperations tableOperations = ((BaseTable) icebergTable).operations(); + TableMetadata appendTableMetadata = tableOperations.current(); + tableOperations.commit(appendTableMetadata, appendTableMetadata.upgradeToFormatVersion(2)); + } + } + + /** + * Generate the PartitionSpec, if when creating table, you want it to be partitioned. + * If you are doing Upserts in your Iceberg Table, your Equality Fields must be the same as the fields used for Partitioning. + */ + private static PartitionSpec getPartitionSpec(org.apache.iceberg.Schema icebergSchema, List partitionFieldsList) { + PartitionSpec.Builder partitionBuilder = PartitionSpec.builderFor(icebergSchema); + for (String s : partitionFieldsList) { + partitionBuilder = partitionBuilder.identity(s); + } + return partitionBuilder.build(); + } + + // Iceberg Flink Sink Builder + public static FlinkSink.Builder createBuilder(Properties icebergProperties, DataStream dataStream, org.apache.avro.Schema avroSchema) { + // Retrieve configuration from application parameters + String s3BucketPrefix = Preconditions.checkNotNull(icebergProperties.getProperty("table.bucket.arn"), "Iceberg S3 bucket prefix not defined"); + + String s3_table_db = icebergProperties.getProperty("catalog.db", DEFAULT_GLUE_DB); + String s3_table_name = icebergProperties.getProperty("catalog.table", DEFAULT_ICEBERG_TABLE_NAME); + + String partitionFields = icebergProperties.getProperty("partition.fields", DEFAULT_ICEBERG_PARTITION_FIELDS); + List partitionFieldList = Arrays.asList(partitionFields.split("\\s*,\\s*")); + + String sortField = icebergProperties.getProperty("sort.field", DEFAULT_ICEBERG_SORT_ORDER_FIELD); + + // Iceberg you can perform Appends, Upserts and Overwrites. + String icebergOperation = icebergProperties.getProperty("operation", DEFAULT_ICEBERG_OPERATION); + Preconditions.checkArgument(icebergOperation.equals("append") || icebergOperation.equals("upsert") || icebergOperation.equals("overwrite"), "Invalid Iceberg Operation"); + + // If operation is upsert, we need to specify the fields that will be used for equality in the upsert operation + // If the table is partitioned, we must include the partition fields + // This is a comma-separated list of fields + String upsertEqualityFields = icebergProperties.getProperty("upsert.equality.fields", DEFAULT_ICEBERG_UPSERT_FIELDS); + List equalityFieldsList = Arrays.asList(upsertEqualityFields.split("[, ]+")); + + // Convert Avro Schema to Iceberg Schema, this will be used for creating the table + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + // Avro Generic Record to Row Data Mapper + MapFunction avroGenericRecordToRowDataMapper = AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema); + + // Catalog properties for using Glue Data Catalog + Map catalogProperties = new HashMap<>(); + catalogProperties.put("type", "iceberg"); + catalogProperties.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); + catalogProperties.put("warehouse", s3BucketPrefix); + catalogProperties.put("catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog"); + + // Load S3 Table Data Catalog + CatalogLoader icebergCatalogLoader = CatalogLoader.custom( + "flink", + catalogProperties, + new org.apache.hadoop.conf.Configuration(), + "software.amazon.s3tables.iceberg.S3TablesCatalog"); + // Table Object that represents the table in the Glue Data Catalog + TableIdentifier outputTable = TableIdentifier.of(s3_table_db, s3_table_name); + // Load created Iceberg Catalog to perform table operations + Catalog catalog = icebergCatalogLoader.loadCatalog(); + + // Based on how many fields we want to partition, we create the Partition Spec + PartitionSpec partitionSpec = getPartitionSpec(icebergSchema, partitionFieldList); + // We create the Iceberg Table, using the Iceberg Catalog, Table Identifier, Schema parsed in Iceberg Schema Format and the partition spec + createTable(catalog, outputTable, icebergSchema, partitionSpec, sortField); + // Once the table has been created in the job or before, we load it + TableLoader tableLoader = TableLoader.fromCatalog(icebergCatalogLoader, outputTable); + // Get RowType Schema from Iceberg Schema + RowType rowType = FlinkSchemaUtil.convert(icebergSchema); + + // Iceberg DataStream sink builder + FlinkSink.Builder flinkSinkBuilder = FlinkSink.builderFor( + dataStream, + avroGenericRecordToRowDataMapper, + FlinkCompatibilityUtil.toTypeInfo(rowType)) + .tableLoader(tableLoader); + + // Returns the builder for the selected operation + switch (icebergOperation) { + case "upsert": + // If operation is "upsert" we need to set up the equality fields + return flinkSinkBuilder + .equalityFieldColumns(equalityFieldsList) + .upsert(true); + case "overwrite": + return flinkSinkBuilder + .overwrite(true); + default: + return flinkSinkBuilder; + } + } +} diff --git a/java/Iceberg/S3TableSink/src/main/resources/log4j2.properties b/java/Iceberg/S3TableSink/src/main/resources/log4j2.properties new file mode 100644 index 0000000..3c0515a --- /dev/null +++ b/java/Iceberg/S3TableSink/src/main/resources/log4j2.properties @@ -0,0 +1,7 @@ +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n + +rootLogger.level = WARN +rootLogger.appenderRef.console.ref = ConsoleAppender \ No newline at end of file diff --git a/java/Iceberg/S3TableSink/src/main/resources/price.avsc b/java/Iceberg/S3TableSink/src/main/resources/price.avsc new file mode 100644 index 0000000..4603eb2 --- /dev/null +++ b/java/Iceberg/S3TableSink/src/main/resources/price.avsc @@ -0,0 +1,23 @@ +{ + "type": "record", + "name": "Price", + "namespace": "com.amazonaws.services.msf.avro", + "fields": [ + { + "name": "timestamp", + "type": "string" + }, + { + "name": "symbol", + "type": "string" + }, + { + "name": "price", + "type": "float" + }, + { + "name": "volumes", + "type": "int" + } + ] +} \ No newline at end of file diff --git a/java/Iceberg/S3TableSink/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java b/java/Iceberg/S3TableSink/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java new file mode 100644 index 0000000..5593ca1 --- /dev/null +++ b/java/Iceberg/S3TableSink/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java @@ -0,0 +1,26 @@ +package com.amazonaws.services.msf.datagen; + +import com.amazonaws.services.msf.avro.AvroSchemaUtils; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertInstanceOf; + +class AvroGenericStockTradeGeneratorFunctionTest { + + @Test + void generateRecord() throws Exception { + Schema avroSchema = AvroSchemaUtils.loadSchema(); + AvroGenericStockTradeGeneratorFunction generatorFunction = new AvroGenericStockTradeGeneratorFunction(avroSchema); + + GenericRecord record = generatorFunction.map(42L); + + + assertInstanceOf(String.class, record.get("timestamp")); + assertInstanceOf(String.class, record.get("symbol")); + assertInstanceOf(Float.class, record.get("price")); + assertInstanceOf(Integer.class, record.get("volumes")); + } + +} \ No newline at end of file From 74603351faa26e0c5e95a002a734406943b4c84c Mon Sep 17 00:00:00 2001 From: Jeremy Ber Date: Fri, 10 Jan 2025 13:40:34 -0600 Subject: [PATCH 04/13] cleaned up sample --- .../amazonaws/services/msf/StreamingJob.java | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java index f0835e7..d8e77c4 100644 --- a/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java +++ b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java @@ -32,6 +32,7 @@ public class StreamingJob { private static final Logger LOG = LoggerFactory.getLogger(StreamingJob.class); private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; + private static Properties icebergProperties; private static boolean isLocal(StreamExecutionEnvironment env) { return env instanceof LocalStreamEnvironment; @@ -78,10 +79,12 @@ public static void main(String[] args) throws Exception { } Map applicationProperties = loadApplicationProperties(env); - Properties icebergProperties = applicationProperties.get("Iceberg"); + icebergProperties = applicationProperties.get("Iceberg"); - Catalog s3 = createCatalog(tableEnv, createIcebergConfiguration(icebergProperties)); - s3.createDatabase("test_from_flink", new CatalogDatabaseImpl(Map.of(), "Sample Database"), true); + Catalog s3 = createCatalog(tableEnv); + s3.createDatabase(icebergProperties.getProperty("catalog.db"), + new CatalogDatabaseImpl(Map.of(), + "Sample Database"), true); // Get AVRO Schema from the definition bundled with the application // Note that the application must "knows" the AVRO schema upfront, i.e. the schema must be either embedded @@ -106,22 +109,21 @@ public static void main(String[] args) throws Exception { env.execute("Flink S3 Table Sink"); } - private static Catalog createCatalog(StreamTableEnvironment tableEnv, Configuration conf) { - final String catalogName = "s3"; - CatalogDescriptor descriptor = CatalogDescriptor.of(catalogName, conf); - tableEnv.createCatalog(catalogName, descriptor); - return tableEnv.getCatalog(catalogName).get(); - } + private static Catalog createCatalog(StreamTableEnvironment tableEnv) { - private static Configuration createIcebergConfiguration(Properties icebergProperties) { Configuration conf = new Configuration(); conf.setString("warehouse", icebergProperties.getProperty("table.bucket.arn")); conf.setString("catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog"); conf.setString("type", "iceberg"); conf.setString("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); - return conf; + + final String catalogName = "s3"; + CatalogDescriptor descriptor = CatalogDescriptor.of(catalogName, conf); + tableEnv.createCatalog(catalogName, descriptor); + return tableEnv.getCatalog(catalogName).get(); } + private static DataStream createDataStream(StreamExecutionEnvironment env, Map applicationProperties, Schema avroSchema) { Properties dataGeneratorProperties = applicationProperties.get("DataGen"); return env.fromSource( From bbce7148a122cbf48b9d97d04cf76bc8401eeb29 Mon Sep 17 00:00:00 2001 From: Jeremy Ber Date: Tue, 21 Jan 2025 11:03:59 -0600 Subject: [PATCH 05/13] auto-redact ARN --- .gitattributes | 1 + .gitignore | 4 +++- java/Iceberg/S3TableSink/README.md | 8 +++++++ .../S3TableSink/src/main/resources/clean.sh | 0 .../flink-application-properties-dev.json | 21 +++++++++++++++++++ .../S3TableSink/src/main/resources/smudge.sh | 0 6 files changed, 33 insertions(+), 1 deletion(-) create mode 100644 .gitattributes create mode 100644 java/Iceberg/S3TableSink/src/main/resources/clean.sh create mode 100644 java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json create mode 100644 java/Iceberg/S3TableSink/src/main/resources/smudge.sh diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..b858c86 --- /dev/null +++ b/.gitattributes @@ -0,0 +1 @@ +flink-application-properties-dev.json filter=arn-filter diff --git a/.gitignore b/.gitignore index 2487b70..6be6265 100644 --- a/.gitignore +++ b/.gitignore @@ -14,4 +14,6 @@ venv/ /pyflink/ /.run/ -*-dev.json + +clean.sh +smudge.sh \ No newline at end of file diff --git a/java/Iceberg/S3TableSink/README.md b/java/Iceberg/S3TableSink/README.md index 9cd23e8..ab105ff 100644 --- a/java/Iceberg/S3TableSink/README.md +++ b/java/Iceberg/S3TableSink/README.md @@ -26,6 +26,14 @@ aws s3tables create-table-bucket --name flink-example } ``` +If you already did this, you can query to get the ARN like this: + +```bash +aws s3tables list-table-buckets +``` + +This will show you the list of table buckets. Select the one you wish to write to and paste it into the config file in this project. + #### IAM Permissions diff --git a/java/Iceberg/S3TableSink/src/main/resources/clean.sh b/java/Iceberg/S3TableSink/src/main/resources/clean.sh new file mode 100644 index 0000000..e69de29 diff --git a/java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json new file mode 100644 index 0000000..207377c --- /dev/null +++ b/java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json @@ -0,0 +1,21 @@ +[ + { + "PropertyGroupId": "DataGen", + "PropertyMap": { + "records.per.sec": 100.0 + } + }, + { + "PropertyGroupId": "Iceberg", + "PropertyMap": { + + "table.bucket.arn": "REDACTED_ARN", + "catalog.db": "test_from_flink", + "catalog.table": "test_table", + "partition.fields": "symbol", + "sort.field": "timestamp", + "operation": "upsert", + "upsert.equality.fields": "symbol" + } + } +] \ No newline at end of file diff --git a/java/Iceberg/S3TableSink/src/main/resources/smudge.sh b/java/Iceberg/S3TableSink/src/main/resources/smudge.sh new file mode 100644 index 0000000..e69de29 From af7efd84d27c67e291d4064f8b19b492f5d7b9af Mon Sep 17 00:00:00 2001 From: Jeremy Ber Date: Tue, 21 Jan 2025 11:11:53 -0600 Subject: [PATCH 06/13] removed superflous files --- java/Iceberg/S3TableSink/src/main/resources/clean.sh | 0 java/Iceberg/S3TableSink/src/main/resources/smudge.sh | 0 2 files changed, 0 insertions(+), 0 deletions(-) delete mode 100644 java/Iceberg/S3TableSink/src/main/resources/clean.sh delete mode 100644 java/Iceberg/S3TableSink/src/main/resources/smudge.sh diff --git a/java/Iceberg/S3TableSink/src/main/resources/clean.sh b/java/Iceberg/S3TableSink/src/main/resources/clean.sh deleted file mode 100644 index e69de29..0000000 diff --git a/java/Iceberg/S3TableSink/src/main/resources/smudge.sh b/java/Iceberg/S3TableSink/src/main/resources/smudge.sh deleted file mode 100644 index e69de29..0000000 From 329a8efc95bb73d329eda02dbe9ec9444a9c65f5 Mon Sep 17 00:00:00 2001 From: Jeremy Ber Date: Tue, 21 Jan 2025 11:26:32 -0600 Subject: [PATCH 07/13] fixed files --- .../src/main/resources/flink-application-properties-dev.json | 4 ++-- .../src/main/resources/flink-application-properties-dev.json | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/java/Iceberg/IcebergDataStreamSink/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/IcebergDataStreamSink/src/main/resources/flink-application-properties-dev.json index eec7dfd..2bd46f0 100644 --- a/java/Iceberg/IcebergDataStreamSink/src/main/resources/flink-application-properties-dev.json +++ b/java/Iceberg/IcebergDataStreamSink/src/main/resources/flink-application-properties-dev.json @@ -8,8 +8,8 @@ { "PropertyGroupId": "Iceberg", "PropertyMap": { - "bucket.prefix": "arn:aws:s3tables:us-east-1:143479883528:bucket/my-table-bucket", - "catalog.db": "iceberg", + "bucket.prefix": "s3:///iceberg", + "catalog.db": "default", "catalog.table": "prices_iceberg", "partition.fields": "symbol", "sort.field": "timestamp", diff --git a/java/Iceberg/IcebergDataStreamSource/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/IcebergDataStreamSource/src/main/resources/flink-application-properties-dev.json index 6feeeef..28fe270 100644 --- a/java/Iceberg/IcebergDataStreamSource/src/main/resources/flink-application-properties-dev.json +++ b/java/Iceberg/IcebergDataStreamSource/src/main/resources/flink-application-properties-dev.json @@ -8,7 +8,7 @@ { "PropertyGroupId": "Iceberg", "PropertyMap": { - "bucket.prefix": "s3://my-iceberg-bucket-jeremy/iceberg-example", + "bucket.prefix": "s3:///iceberg", "catalog.db": "iceberg", "catalog.table": "prices_iceberg", "partition.fields": "symbol", From e79cdc746e7bd2ff1a0ce061c253aaadff5f2d16 Mon Sep 17 00:00:00 2001 From: Jeremy Ber Date: Thu, 23 Jan 2025 14:49:18 -0600 Subject: [PATCH 08/13] cleaned up iceberg source --- .../Iceberg/IcebergDataStreamSource/README.md | 4 ++-- java/Iceberg/IcebergDataStreamSource/pom.xml | 20 ++----------------- .../amazonaws/services/msf/StreamingJob.java | 3 --- 3 files changed, 4 insertions(+), 23 deletions(-) diff --git a/java/Iceberg/IcebergDataStreamSource/README.md b/java/Iceberg/IcebergDataStreamSource/README.md index 7da74d0..54e16fb 100644 --- a/java/Iceberg/IcebergDataStreamSource/README.md +++ b/java/Iceberg/IcebergDataStreamSource/README.md @@ -1,4 +1,4 @@ -# Flink Iceberg Sink using DataStream API +# Flink Iceberg Source using DataStream API * Flink version: 1.20.0 * Flink API: DataStream API @@ -8,7 +8,7 @@ and [Iceberg](https://iceberg.apache.org/docs/latest/flink/) This example demonstrate how to use -[Flink Iceberg Sink Connector](https://iceberg.apache.org/docs/latest/flink-writes/) with the Glue Data Catalog. +[Flink Iceberg Source Connector](https://iceberg.apache.org/docs/latest/flink-writes/) with the Glue Data Catalog. For simplicity, the application generates synthetic data, random stock prices, internally. Data is generated as AVRO Generic Record, simulating a real source, for example a Kafka Source, that receives records diff --git a/java/Iceberg/IcebergDataStreamSource/pom.xml b/java/Iceberg/IcebergDataStreamSource/pom.xml index f9195cc..2b97e4d 100644 --- a/java/Iceberg/IcebergDataStreamSource/pom.xml +++ b/java/Iceberg/IcebergDataStreamSource/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.amazonaws - iceberg-datastream-sink + iceberg-datastream-source 1.0 jar @@ -38,12 +38,7 @@ ${flink.version} provided - - org.apache.flink - flink-clients - ${flink.version} - provided - + org.apache.flink flink-connector-files @@ -107,17 +102,6 @@ ${iceberg.version} - - - software.amazon.awssdk - s3tables - 2.29.26 - - - software.amazon.s3tables - s3-tables-catalog-for-iceberg - 0.1.3 - org.apache.hadoop hadoop-client diff --git a/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/StreamingJob.java b/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/StreamingJob.java index de014db..a9e0ea0 100644 --- a/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/StreamingJob.java +++ b/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/StreamingJob.java @@ -88,9 +88,6 @@ public static void main(String[] args) throws Exception { Properties icebergProperties = applicationProperties.get("Iceberg"); - // TODO: Call Iceberg Data Source - // Creates an Iceberg Data Source - String s3BucketPrefix = Preconditions.checkNotNull(icebergProperties.getProperty("bucket.prefix"), "Iceberg S3 bucket prefix not defined"); From 05437aa8c7a0233ebec546d9c3ad49a2a75d1a19 Mon Sep 17 00:00:00 2001 From: Jeremy Ber Date: Thu, 23 Jan 2025 14:59:13 -0600 Subject: [PATCH 09/13] add properties with redacted ARN --- .../src/main/resources/flink-application-properties-dev.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json index 207377c..96ed0be 100644 --- a/java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json +++ b/java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json @@ -9,7 +9,7 @@ "PropertyGroupId": "Iceberg", "PropertyMap": { - "table.bucket.arn": "REDACTED_ARN", + "table.bucket.arn": "arn1:aws:s3tables:us-east-1:143479883528:bucket/flink-example", "catalog.db": "test_from_flink", "catalog.table": "test_table", "partition.fields": "symbol", From fa4cb466cc5b5e35617c48fd7ab02d337271f4b3 Mon Sep 17 00:00:00 2001 From: Jeremy Ber Date: Thu, 23 Jan 2025 15:34:34 -0600 Subject: [PATCH 10/13] changed from REDACTED ARN to ARN --- .../msf/iceberg/IcebergSinkBuilder.java | 17 ++++++++++------- .../flink-application-properties-dev.json | 2 +- java/pom.xml | 4 +++- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java index abf2a5f..c5ed0a0 100644 --- a/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java +++ b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java @@ -19,10 +19,10 @@ import java.util.*; /** - * Wraps the code to initialize an Iceberg sink that uses Glue Data Catalog as catalog - */ + * Wraps the code to initialize an Iceberg sink that uses S3 Tables Internal catalog + * */ public class IcebergSinkBuilder { - private static final String DEFAULT_GLUE_DB = "default"; + private static final String DEFAULT_S3_CATALOG_DB = "default"; private static final String DEFAULT_ICEBERG_TABLE_NAME = "prices_iceberg"; private static final String DEFAULT_ICEBERG_SORT_ORDER_FIELD = "accountNr"; private static final String DEFAULT_ICEBERG_PARTITION_FIELDS = "symbol"; @@ -60,12 +60,15 @@ private static PartitionSpec getPartitionSpec(org.apache.iceberg.Schema icebergS return partitionBuilder.build(); } - // Iceberg Flink Sink Builder + /** + * S3 Table Sink Builder - It is unique in that it leverages the S3 Table Catalog as opposed to an external catalog like Glue or Hive. + * catalog-impl = software.amazon.s3tables.iceberg.S3TablesCatalog + */ public static FlinkSink.Builder createBuilder(Properties icebergProperties, DataStream dataStream, org.apache.avro.Schema avroSchema) { // Retrieve configuration from application parameters String s3BucketPrefix = Preconditions.checkNotNull(icebergProperties.getProperty("table.bucket.arn"), "Iceberg S3 bucket prefix not defined"); - String s3_table_db = icebergProperties.getProperty("catalog.db", DEFAULT_GLUE_DB); + String s3_table_db = icebergProperties.getProperty("catalog.db", DEFAULT_S3_CATALOG_DB); String s3_table_name = icebergProperties.getProperty("catalog.table", DEFAULT_ICEBERG_TABLE_NAME); String partitionFields = icebergProperties.getProperty("partition.fields", DEFAULT_ICEBERG_PARTITION_FIELDS); @@ -88,7 +91,7 @@ public static FlinkSink.Builder createBuilder(Properties icebergProperties, Data // Avro Generic Record to Row Data Mapper MapFunction avroGenericRecordToRowDataMapper = AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema); - // Catalog properties for using Glue Data Catalog + // Catalog properties for using S3 Tables Map catalogProperties = new HashMap<>(); catalogProperties.put("type", "iceberg"); catalogProperties.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); @@ -101,7 +104,7 @@ public static FlinkSink.Builder createBuilder(Properties icebergProperties, Data catalogProperties, new org.apache.hadoop.conf.Configuration(), "software.amazon.s3tables.iceberg.S3TablesCatalog"); - // Table Object that represents the table in the Glue Data Catalog + // Table Object that represents the table in S3 Tables TableIdentifier outputTable = TableIdentifier.of(s3_table_db, s3_table_name); // Load created Iceberg Catalog to perform table operations Catalog catalog = icebergCatalogLoader.loadCatalog(); diff --git a/java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json index 96ed0be..fd94ccb 100644 --- a/java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json +++ b/java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json @@ -9,7 +9,7 @@ "PropertyGroupId": "Iceberg", "PropertyMap": { - "table.bucket.arn": "arn1:aws:s3tables:us-east-1:143479883528:bucket/flink-example", + "table.bucket.arn": "<>", "catalog.db": "test_from_flink", "catalog.table": "test_table", "partition.fields": "symbol", diff --git a/java/pom.xml b/java/pom.xml index 29e281b..8c57051 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -21,7 +21,9 @@ CustomMetrics GettingStarted GettingStartedTable - IcebergDatastreamSink + Iceberg/IcebergDataStreamSink + Iceberg/IcebergDataStreamSource + Iceberg/S3TableSink KafkaConfigProviders/Kafka-mTLS-Keystore-ConfigProviders KafkaConfigProviders/Kafka-SASL_SSL-ConfigProviders KafkaConfigProviders/Kafka-mTLS-Keystore-Sql-ConfigProviders From 2ec8ac9e748335bf38c82186e2cb1c3df96cb85f Mon Sep 17 00:00:00 2001 From: Jeremy Ber Date: Thu, 23 Jan 2025 15:40:38 -0600 Subject: [PATCH 11/13] fixed the remaining comments in PR --- java/Iceberg/S3TableSink/README.md | 3 ++- .../services/msf/iceberg/IcebergSinkBuilder.java | 8 ++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/java/Iceberg/S3TableSink/README.md b/java/Iceberg/S3TableSink/README.md index ab105ff..7b24105 100644 --- a/java/Iceberg/S3TableSink/README.md +++ b/java/Iceberg/S3TableSink/README.md @@ -70,7 +70,8 @@ When deployed to Managed Service for Apache Flink, checkpointing is controlled b ### Known limitations -At the moment there are current limitations concerning Flink Iceberg integration: +At the moment there are current limitations concerning Flink Iceberg integration with S3 Tables: +* * Currently, this example needs to be in Flink v1.19, v1.20 isn't supported with the S3 Table Sink yet. * Doesn't support Iceberg Table with hidden partitioning * Doesn't support adding columns, removing columns, renaming columns or changing columns. diff --git a/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java index c5ed0a0..91ccac0 100644 --- a/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java +++ b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java @@ -66,7 +66,11 @@ private static PartitionSpec getPartitionSpec(org.apache.iceberg.Schema icebergS */ public static FlinkSink.Builder createBuilder(Properties icebergProperties, DataStream dataStream, org.apache.avro.Schema avroSchema) { // Retrieve configuration from application parameters - String s3BucketPrefix = Preconditions.checkNotNull(icebergProperties.getProperty("table.bucket.arn"), "Iceberg S3 bucket prefix not defined"); + + /** + * This table bucket ARN will be used as the Table Catalog for Iceberg, this is unique compared to the standard Iceberg table. + */ + String s3TableBucketARN = Preconditions.checkNotNull(icebergProperties.getProperty("table.bucket.arn"), "Iceberg S3 table bucket ARN not defined"); String s3_table_db = icebergProperties.getProperty("catalog.db", DEFAULT_S3_CATALOG_DB); String s3_table_name = icebergProperties.getProperty("catalog.table", DEFAULT_ICEBERG_TABLE_NAME); @@ -95,7 +99,7 @@ public static FlinkSink.Builder createBuilder(Properties icebergProperties, Data Map catalogProperties = new HashMap<>(); catalogProperties.put("type", "iceberg"); catalogProperties.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); - catalogProperties.put("warehouse", s3BucketPrefix); + catalogProperties.put("warehouse", s3TableBucketARN); catalogProperties.put("catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog"); // Load S3 Table Data Catalog From e3bca796b6248fff0f917b99db8e73b816b02cce Mon Sep 17 00:00:00 2001 From: Jeremy Ber Date: Fri, 7 Feb 2025 11:00:15 -0600 Subject: [PATCH 12/13] iceberg-sql-json --- java/Iceberg/IcebergSQLJSONGlue/README.md | 86 ++++++ java/Iceberg/IcebergSQLJSONGlue/pom.xml | 247 ++++++++++++++++++ .../main/java/GlueTableSQLJSONExample.java | 181 +++++++++++++ .../src/main/java/StockPrice.java | 51 ++++ .../java/StockPriceGeneratorFunction.java | 22 ++ .../flink-application-properties-dev.json | 20 ++ java/Iceberg/S3TableSQLJSON/README.md | 86 ++++++ java/Iceberg/S3TableSQLJSON/pom.xml | 247 ++++++++++++++++++ .../src/main/java/S3TableSQLJSONExample.java | 176 +++++++++++++ .../src/main/java/StockPrice.java | 51 ++++ .../java/StockPriceGeneratorFunction.java | 22 ++ .../flink-application-properties-dev.json | 20 ++ 12 files changed, 1209 insertions(+) create mode 100644 java/Iceberg/IcebergSQLJSONGlue/README.md create mode 100644 java/Iceberg/IcebergSQLJSONGlue/pom.xml create mode 100644 java/Iceberg/IcebergSQLJSONGlue/src/main/java/GlueTableSQLJSONExample.java create mode 100644 java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPrice.java create mode 100644 java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPriceGeneratorFunction.java create mode 100644 java/Iceberg/IcebergSQLJSONGlue/src/main/resources/flink-application-properties-dev.json create mode 100644 java/Iceberg/S3TableSQLJSON/README.md create mode 100644 java/Iceberg/S3TableSQLJSON/pom.xml create mode 100644 java/Iceberg/S3TableSQLJSON/src/main/java/S3TableSQLJSONExample.java create mode 100644 java/Iceberg/S3TableSQLJSON/src/main/java/StockPrice.java create mode 100644 java/Iceberg/S3TableSQLJSON/src/main/java/StockPriceGeneratorFunction.java create mode 100644 java/Iceberg/S3TableSQLJSON/src/main/resources/flink-application-properties-dev.json diff --git a/java/Iceberg/IcebergSQLJSONGlue/README.md b/java/Iceberg/IcebergSQLJSONGlue/README.md new file mode 100644 index 0000000..7da74d0 --- /dev/null +++ b/java/Iceberg/IcebergSQLJSONGlue/README.md @@ -0,0 +1,86 @@ +# Flink Iceberg Sink using DataStream API + +* Flink version: 1.20.0 +* Flink API: DataStream API +* Iceberg 1.6.1 +* Language: Java (11) +* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/) + and [Iceberg](https://iceberg.apache.org/docs/latest/flink/) + +This example demonstrate how to use +[Flink Iceberg Sink Connector](https://iceberg.apache.org/docs/latest/flink-writes/) with the Glue Data Catalog. + +For simplicity, the application generates synthetic data, random stock prices, internally. +Data is generated as AVRO Generic Record, simulating a real source, for example a Kafka Source, that receives records +serialized with AVRO. + +### Prerequisites + +The application expects the following resources: +* A Glue Data Catalog database in the current AWS region. The database name is configurable (default: "default"). + The application creates the Table, but the Catalog must exist already. +* An S3 bucket to write the Iceberg table. + +#### IAM Permissions + +The application must have IAM permissions to: +* Show and alter Glue Data Catalog databases, show and create Glue Data Catalog tables. + See [Glue Data Catalog permissions](https://docs.aws.amazon.com/athena/latest/ug/fine-grained-access-to-glue-resources.html). +* Read and Write from the S3 bucket. + +### Runtime configuration + +When running on Amazon Managed Service for Apache Flink the runtime configuration is read from Runtime Properties. + +When running locally, the configuration is read from the +[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file. + +Runtime parameters: + +| Group ID | Key | Default | Description | +|-----------|--------------------------|-------------------|---------------------------------------------------------------------------------------------------------------------| +| `DataGen` | `records.per.sec` | `10.0` | Records per second generated. | +| `Iceberg` | `bucket.prefix` | (mandatory) | S3 bucket prefix, for example `s3://my-bucket/iceberg`. | +| `Iceberg` | `catalog.db` | `default` | Name of the Glue Data Catalog database. | +| `Iceberg` | `catalog.table` | `prices_iceberg` | Name of the Glue Data Catalog table. | +| `Iceberg` | `partition.fields` | `symbol` | Comma separated list of partition fields. | +| `Iceberg` | `sort.field` | `timestamp` | Sort field. | +| `Iceberg` | `operation` | `updsert` | Iceberg operation. One of `upsert`, `append` or `overwrite`. | +| `Iceberg` | `upsert.equality.fields` | `symbol` | Comma separated list of fields used for upsert. It must match partition fields. Required if `operation` = `upsert`. | + + +### Checkpoints + +Checkpointing must be enabled. Iceberg commits writes on checkpoint. + +When running locally, the application enables checkpoints programmatically, every 10 seconds. +When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration. + + +### Known limitations + +At the moment there are current limitations concerning Flink Iceberg integration: +* Doesn't support Iceberg Table with hidden partitioning +* Doesn't support adding columns, removing columns, renaming columns or changing columns. + +### Schema and schema evolution + +The application must "know" the AVRO schema on start. +The schema cannot be dynamically inferred based on the incoming records, for example using a schema registry. +This is due to a limitation of the Flink Iceberg integration, that requires knowing the table schema upfront. + +This implementation does support schema evolution in the incoming data, as long as new schema versions are FORWARD compatible. +Schema changes are not propagated to Iceberg. +As long as the schema of incoming records is FORWARD compatible, the application deserializes incoming records using +the schema it knows. Any new field in the incoming record is discarded. + +In this example, the schema is loaded from a schema definition file, [price.avsc](./src/main/resources/price.avsc) embedded +with the application. +It is technically possible to fetch the schema on application start from an external source, like a schema registry or a +schema definition file in an S3 bucket. This is beyond the scope of this example. + +### Running locally, in IntelliJ + +You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation. + +See [Running examples locally](https://github.com/nicusX/amazon-managed-service-for-apache-flink-examples/blob/main/java/running-examples-locally.md) for details. diff --git a/java/Iceberg/IcebergSQLJSONGlue/pom.xml b/java/Iceberg/IcebergSQLJSONGlue/pom.xml new file mode 100644 index 0000000..90100fb --- /dev/null +++ b/java/Iceberg/IcebergSQLJSONGlue/pom.xml @@ -0,0 +1,247 @@ + + + 4.0.0 + + com.amazonaws + iceberg-sql-flink + 1.0 + jar + + + UTF-8 + 11 + ${target.java.version} + ${target.java.version} + + 1.19.0 + 1.11.3 + 2.12 + 3.4.0 + 1.6.1 + 1.2.0 + 2.23.1 + 5.8.1 + + + + + + org.apache.flink + flink-runtime-web + ${flink.version} + provided + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-table-runtime + ${flink.version} + provided + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + + + org.apache.flink + flink-table-common + ${flink.version} + + + org.apache.flink + flink-metrics-dropwizard + ${flink.version} + + + org.apache.flink + flink-avro + ${flink.version} + + + + + + org.apache.flink + flink-table-planner_${scala.version} + ${flink.version} + provided + + + + + + com.amazonaws + aws-kinesisanalytics-runtime + ${kda.runtime.version} + provided + + + + + software.amazon.awssdk + s3tables + 2.29.26 + + + software.amazon.s3tables + s3-tables-catalog-for-iceberg + 0.1.3 + + + org.apache.flink + flink-connector-files + ${flink.version} + provided + + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + org.apache.avro + avro + + + org.slf4j + slf4j-reload4j + + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + + + + + org.apache.iceberg + iceberg-core + ${iceberg.version} + + + org.apache.iceberg + iceberg-flink + ${iceberg.version} + + + org.apache.iceberg + iceberg-flink + ${iceberg.version} + + + org.apache.iceberg + iceberg-aws-bundle + ${iceberg.version} + + + org.apache.iceberg + iceberg-aws + ${iceberg.version} + + + + + org.junit.jupiter + junit-jupiter + ${junit5.version} + test + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + runtime + + + org.apache.iceberg + iceberg-flink-1.19 + 1.7.0 + compile + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${target.java.version} + ${target.java.version} + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + log4j:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.amazonaws.services.msf.StreamingJob + + + + + + + + + diff --git a/java/Iceberg/IcebergSQLJSONGlue/src/main/java/GlueTableSQLJSONExample.java b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/GlueTableSQLJSONExample.java new file mode 100644 index 0000000..6a86d53 --- /dev/null +++ b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/GlueTableSQLJSONExample.java @@ -0,0 +1,181 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: MIT-0 + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this + * software and associated documentation files (the "Software"), to deal in the Software + * without restriction, including without limitation the rights to use, copy, modify, + * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A + * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + +public class GlueTableSQLJSONExample { + // Constants + private static final String CATALOG_NAME = "glue"; + private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; + private static final Logger LOG = LoggerFactory.getLogger(GlueTableSQLJSONExample.class); + + // Configuration properties + private static String s3BucketPrefix; + private static String glueDatabase; + private static String glueTable; + + public static void main(String[] args) throws Exception { + // 1. Initialize environments + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); + final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + + // 2. Load properties and configure environment + Map applicationProperties = loadApplicationProperties(env); + Properties icebergProperties = applicationProperties.get("Iceberg"); + + // Configure local development settings if needed + if (isLocal(env)) { + env.enableCheckpointing(30000); + env.setParallelism(2); + } + + // 3. Setup S3 configuration + setupS3TableProperties(icebergProperties); + Catalog glueCatalog = createGlueCatalog(tableEnv); + + // 4. Create data generator source + Properties dataGenProperties = applicationProperties.get("DataGen"); + DataStream stockPriceDataStream = env.fromSource( + createDataGenerator(dataGenProperties), + WatermarkStrategy.noWatermarks(), + "DataGen"); + + // 5. Convert DataStream to Table and create view + Table stockPriceTable = tableEnv.fromDataStream(stockPriceDataStream); + tableEnv.createTemporaryView("stockPriceTable", stockPriceTable); + + // 6. Create database and define table structure + glueCatalog.createDatabase(glueDatabase, + new CatalogDatabaseImpl(Map.of(), "Glue Database"), true); + + String sinkTableName = CATALOG_NAME + "." + glueDatabase + "." + glueTable; + + // Define and create table + String createTableStatement = "CREATE TABLE IF NOT EXISTS " + sinkTableName + "(" + + "price DOUBLE, " + + "ticker STRING," + + "eventtime TIMESTAMP(3)" + + ");"; + tableEnv.executeSql(createTableStatement); + + // 7. Execute SQL operations + // Insert data from stock price stream + String insertQuery = "INSERT INTO " + sinkTableName + + " SELECT price, ticker, eventtime FROM stockPriceTable"; + TableResult insertResult = tableEnv.executeSql(insertQuery); + insertResult.await(); + + // Query the results + String selectQuery = "SELECT * from " + sinkTableName + ";"; + TableResult selectResults = tableEnv.executeSql(selectQuery); + selectResults.print(); + + // 8. Optionally Cleanup resources +// glueCatalog.dropTable(new ObjectPath(glueDatabase, glueTable), false); +// glueCatalog.dropDatabase(glueDatabase, false); + } + + private static void setupS3TableProperties(Properties icebergProperties) { + s3BucketPrefix = icebergProperties.getProperty("bucket.prefix"); + glueDatabase = icebergProperties.getProperty("catalog.db"); + glueTable = icebergProperties.getProperty("catalog.table"); + Preconditions.checkNotNull(s3BucketPrefix, "You must supply an s3 bucket ARN for the warehouse."); + Preconditions.checkNotNull(glueDatabase, "You must supply a database name"); + Preconditions.checkNotNull(glueTable, "You must supply a table name"); + // check if it's a valid ARN + validateURI(s3BucketPrefix); + } + + private static DataGeneratorSource createDataGenerator(Properties dataGeneratorProperties) { + double recordsPerSecond = Double.parseDouble(dataGeneratorProperties.getProperty("records.per.sec", "10.0")); + Preconditions.checkArgument(recordsPerSecond > 0, "Generator records per sec must be > 0"); + + return new DataGeneratorSource(new StockPriceGeneratorFunction(), + 100, + RateLimiterStrategy.perSecond(recordsPerSecond), + TypeInformation.of(StockPrice.class)); + } + + /** + * Defines a config object with Glue specific catalog and io implementations + * Then, uses that to create the Flink catalog + */ + private static Catalog createGlueCatalog(StreamTableEnvironment tableEnv) { + + Configuration conf = new Configuration(); + conf.setString("warehouse", s3BucketPrefix); + conf.setString("catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog"); + conf.setString("type", "iceberg"); + conf.setString("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); + conf.setString("catalog-name", CATALOG_NAME); + + CatalogDescriptor descriptor = CatalogDescriptor.of(CATALOG_NAME, conf); + + tableEnv.createCatalog(CATALOG_NAME, descriptor); + return tableEnv.getCatalog(CATALOG_NAME).get(); + } + + private static boolean isLocal(StreamExecutionEnvironment env) { + return env instanceof LocalStreamEnvironment; + } + + /** + * Load application properties from Amazon Managed Service for Apache Flink runtime + * or from a local resource, when the environment is local + */ + private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { + if (isLocal(env)) { + LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); + return KinesisAnalyticsRuntime.getApplicationProperties( + Objects.requireNonNull(GlueTableSQLJSONExample.class.getClassLoader() + .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE)).getPath()); + } else { + LOG.info("Loading application properties from Amazon Managed Service for Apache Flink"); + return KinesisAnalyticsRuntime.getApplicationProperties(); + } + } + + public static void validateURI(String uri) { + String s3UriPattern = "^s3://([a-z0-9.-]+)(/[a-z0-9-_/]+/?)$"; + Preconditions.checkArgument(uri != null && uri.matches(s3UriPattern), + "Invalid S3 URI format: %s. URI must match pattern: s3://bucket-name/path/", uri); + } +} diff --git a/java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPrice.java b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPrice.java new file mode 100644 index 0000000..add5f68 --- /dev/null +++ b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPrice.java @@ -0,0 +1,51 @@ + +import java.sql.Timestamp; + +public class StockPrice { + private Timestamp eventtime; + private String ticker; + private Double price; + + + public StockPrice() { + } + + public StockPrice(Timestamp eventtime, String ticker, Double price) { + this.eventtime = eventtime; + this.ticker = ticker; + this.price = price; + } + + public Timestamp getEventtime() { + return eventtime; + } + + public void setEventtime(Timestamp eventtime) { + this.eventtime = eventtime; + } + + public String getTicker() { + return ticker; + } + + public void setTicker(String ticker) { + this.ticker = ticker; + } + + public Double getPrice() { + return price; + } + + public void setPrice(Double price) { + this.price = price; + } + + @Override + public String toString() { + return "StockPrice{" + + "eventtime=" + eventtime + + ", ticker='" + ticker + '\'' + + ", price=" + price + + '}'; + } +} diff --git a/java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPriceGeneratorFunction.java b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPriceGeneratorFunction.java new file mode 100644 index 0000000..0a150d7 --- /dev/null +++ b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPriceGeneratorFunction.java @@ -0,0 +1,22 @@ +import org.apache.commons.lang3.RandomUtils; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import java.sql.Timestamp; + +/** + * Generates random stock price data for simulation purposes. + */ +public class StockPriceGeneratorFunction implements GeneratorFunction { + + private static final String[] STOCK_SYMBOLS = {"AAPL", "AMZN", "MSFT", "INTC", "TBV"}; + private static final double MAX_PRICE = 100.0; + private static final double MIN_PRICE = 1.0; + + @Override + public StockPrice map(Long sequence) throws Exception { + Timestamp currentTimestamp = new Timestamp(System.currentTimeMillis()); + String randomSymbol = STOCK_SYMBOLS[RandomUtils.nextInt(0, STOCK_SYMBOLS.length)]; + double randomPrice = RandomUtils.nextDouble(MIN_PRICE, MAX_PRICE); + + return new StockPrice(currentTimestamp, randomSymbol, randomPrice); + } +} diff --git a/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/flink-application-properties-dev.json new file mode 100644 index 0000000..c4bde33 --- /dev/null +++ b/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/flink-application-properties-dev.json @@ -0,0 +1,20 @@ +[ + { + "PropertyGroupId": "DataGen", + "PropertyMap": { + "records.per.sec": 100.0 + } + }, + { + "PropertyGroupId": "Iceberg", + "PropertyMap": { + "catalog.db": "iceberg", + "catalog.table": "stock_data", + "bucket.prefix": "s3://<>/glue-example/", + "partition.fields": "symbol", + "sort.field": "timestamp", + "operation": "upsert", + "upsert.equality.fields": "symbol" + } + } +] \ No newline at end of file diff --git a/java/Iceberg/S3TableSQLJSON/README.md b/java/Iceberg/S3TableSQLJSON/README.md new file mode 100644 index 0000000..7da74d0 --- /dev/null +++ b/java/Iceberg/S3TableSQLJSON/README.md @@ -0,0 +1,86 @@ +# Flink Iceberg Sink using DataStream API + +* Flink version: 1.20.0 +* Flink API: DataStream API +* Iceberg 1.6.1 +* Language: Java (11) +* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/) + and [Iceberg](https://iceberg.apache.org/docs/latest/flink/) + +This example demonstrate how to use +[Flink Iceberg Sink Connector](https://iceberg.apache.org/docs/latest/flink-writes/) with the Glue Data Catalog. + +For simplicity, the application generates synthetic data, random stock prices, internally. +Data is generated as AVRO Generic Record, simulating a real source, for example a Kafka Source, that receives records +serialized with AVRO. + +### Prerequisites + +The application expects the following resources: +* A Glue Data Catalog database in the current AWS region. The database name is configurable (default: "default"). + The application creates the Table, but the Catalog must exist already. +* An S3 bucket to write the Iceberg table. + +#### IAM Permissions + +The application must have IAM permissions to: +* Show and alter Glue Data Catalog databases, show and create Glue Data Catalog tables. + See [Glue Data Catalog permissions](https://docs.aws.amazon.com/athena/latest/ug/fine-grained-access-to-glue-resources.html). +* Read and Write from the S3 bucket. + +### Runtime configuration + +When running on Amazon Managed Service for Apache Flink the runtime configuration is read from Runtime Properties. + +When running locally, the configuration is read from the +[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file. + +Runtime parameters: + +| Group ID | Key | Default | Description | +|-----------|--------------------------|-------------------|---------------------------------------------------------------------------------------------------------------------| +| `DataGen` | `records.per.sec` | `10.0` | Records per second generated. | +| `Iceberg` | `bucket.prefix` | (mandatory) | S3 bucket prefix, for example `s3://my-bucket/iceberg`. | +| `Iceberg` | `catalog.db` | `default` | Name of the Glue Data Catalog database. | +| `Iceberg` | `catalog.table` | `prices_iceberg` | Name of the Glue Data Catalog table. | +| `Iceberg` | `partition.fields` | `symbol` | Comma separated list of partition fields. | +| `Iceberg` | `sort.field` | `timestamp` | Sort field. | +| `Iceberg` | `operation` | `updsert` | Iceberg operation. One of `upsert`, `append` or `overwrite`. | +| `Iceberg` | `upsert.equality.fields` | `symbol` | Comma separated list of fields used for upsert. It must match partition fields. Required if `operation` = `upsert`. | + + +### Checkpoints + +Checkpointing must be enabled. Iceberg commits writes on checkpoint. + +When running locally, the application enables checkpoints programmatically, every 10 seconds. +When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration. + + +### Known limitations + +At the moment there are current limitations concerning Flink Iceberg integration: +* Doesn't support Iceberg Table with hidden partitioning +* Doesn't support adding columns, removing columns, renaming columns or changing columns. + +### Schema and schema evolution + +The application must "know" the AVRO schema on start. +The schema cannot be dynamically inferred based on the incoming records, for example using a schema registry. +This is due to a limitation of the Flink Iceberg integration, that requires knowing the table schema upfront. + +This implementation does support schema evolution in the incoming data, as long as new schema versions are FORWARD compatible. +Schema changes are not propagated to Iceberg. +As long as the schema of incoming records is FORWARD compatible, the application deserializes incoming records using +the schema it knows. Any new field in the incoming record is discarded. + +In this example, the schema is loaded from a schema definition file, [price.avsc](./src/main/resources/price.avsc) embedded +with the application. +It is technically possible to fetch the schema on application start from an external source, like a schema registry or a +schema definition file in an S3 bucket. This is beyond the scope of this example. + +### Running locally, in IntelliJ + +You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation. + +See [Running examples locally](https://github.com/nicusX/amazon-managed-service-for-apache-flink-examples/blob/main/java/running-examples-locally.md) for details. diff --git a/java/Iceberg/S3TableSQLJSON/pom.xml b/java/Iceberg/S3TableSQLJSON/pom.xml new file mode 100644 index 0000000..803f017 --- /dev/null +++ b/java/Iceberg/S3TableSQLJSON/pom.xml @@ -0,0 +1,247 @@ + + + 4.0.0 + + com.amazonaws + s3-table-sql-flink + 1.0 + jar + + + UTF-8 + 11 + ${target.java.version} + ${target.java.version} + + 1.19.0 + 1.11.3 + 2.12 + 3.4.0 + 1.6.1 + 1.2.0 + 2.23.1 + 5.8.1 + + + + + + org.apache.flink + flink-runtime-web + ${flink.version} + provided + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-table-runtime + ${flink.version} + provided + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + + + org.apache.flink + flink-table-common + ${flink.version} + + + org.apache.flink + flink-metrics-dropwizard + ${flink.version} + + + org.apache.flink + flink-avro + ${flink.version} + + + + + + org.apache.flink + flink-table-planner_${scala.version} + ${flink.version} + provided + + + + + + com.amazonaws + aws-kinesisanalytics-runtime + ${kda.runtime.version} + provided + + + + + software.amazon.awssdk + s3tables + 2.29.26 + + + software.amazon.s3tables + s3-tables-catalog-for-iceberg + 0.1.3 + + + org.apache.flink + flink-connector-files + ${flink.version} + provided + + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + org.apache.avro + avro + + + org.slf4j + slf4j-reload4j + + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + + + + + org.apache.iceberg + iceberg-core + ${iceberg.version} + + + org.apache.iceberg + iceberg-flink + ${iceberg.version} + + + org.apache.iceberg + iceberg-flink + ${iceberg.version} + + + org.apache.iceberg + iceberg-aws-bundle + ${iceberg.version} + + + org.apache.iceberg + iceberg-aws + ${iceberg.version} + + + + + org.junit.jupiter + junit-jupiter + ${junit5.version} + test + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + runtime + + + org.apache.iceberg + iceberg-flink-1.19 + 1.7.0 + compile + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${target.java.version} + ${target.java.version} + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + log4j:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.amazonaws.services.msf.StreamingJob + + + + + + + + + diff --git a/java/Iceberg/S3TableSQLJSON/src/main/java/S3TableSQLJSONExample.java b/java/Iceberg/S3TableSQLJSON/src/main/java/S3TableSQLJSONExample.java new file mode 100644 index 0000000..a53ff23 --- /dev/null +++ b/java/Iceberg/S3TableSQLJSON/src/main/java/S3TableSQLJSONExample.java @@ -0,0 +1,176 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: MIT-0 + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this + * software and associated documentation files (the "Software"), to deal in the Software + * without restriction, including without limitation the rights to use, copy, modify, + * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A + * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.catalog.*; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + +public class S3TableSQLJSONExample { + // Constants + private static final String CATALOG_NAME = "s3"; + private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; + private static final Logger LOG = LoggerFactory.getLogger(S3TableSQLJSONExample.class); + + // Configuration properties + private static String tableBucketArn; + private static String s3TableDatabase; + private static String s3Table; + + public static void main(String[] args) throws Exception { + // 1. Initialize environments + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); + final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + + // 2. Load properties and configure environment + Map applicationProperties = loadApplicationProperties(env); + Properties icebergProperties = applicationProperties.get("Iceberg"); + + // Configure local development settings if needed + if (isLocal(env)) { + env.enableCheckpointing(30000); + env.setParallelism(2); + } + + // 3. Setup S3 configuration + setupS3TableProperties(icebergProperties); + Catalog s3Catalog = createS3Catalog(tableEnv); + + // 4. Create data generator source + Properties dataGenProperties = applicationProperties.get("DataGen"); + DataStream stockPriceDataStream = env.fromSource( + createDataGenerator(dataGenProperties), + WatermarkStrategy.noWatermarks(), + "DataGen"); + + // 5. Convert DataStream to Table and create view + Table stockPriceTable = tableEnv.fromDataStream(stockPriceDataStream); + tableEnv.createTemporaryView("stockPriceTable", stockPriceTable); + + // 6. Create database and define table structure + s3Catalog.createDatabase(s3TableDatabase, + new CatalogDatabaseImpl(Map.of(), "Sample Database"), true); + + String sinkTableName = CATALOG_NAME + "." + s3TableDatabase + "." + s3Table; + + // Define and create table + String createTableStatement = "CREATE TABLE IF NOT EXISTS " + sinkTableName + "(" + + "price DOUBLE, " + + "ticker STRING," + + "eventtime TIMESTAMP(3)" + + ");"; + tableEnv.executeSql(createTableStatement); + + // 7. Execute SQL operations + // Insert data from stock price stream + String insertQuery = "INSERT INTO " + sinkTableName + + " SELECT price, ticker, eventtime FROM stockPriceTable"; + TableResult insertResult = tableEnv.executeSql(insertQuery); + insertResult.await(); + + // Query the results + String selectQuery = "SELECT * from " + sinkTableName + ";"; + TableResult selectResults = tableEnv.executeSql(selectQuery); + selectResults.print(); + + // 8. Optionally Cleanup resources +// s3Catalog.dropTable(new ObjectPath(s3TableDatabase, s3Table), false); +// s3Catalog.dropDatabase(s3TableDatabase, false); + } + + private static void setupS3TableProperties(Properties icebergProperties) { + tableBucketArn = icebergProperties.getProperty("table.bucket.arn"); + s3TableDatabase = icebergProperties.getProperty("catalog.db"); + s3Table = icebergProperties.getProperty("catalog.table"); + Preconditions.checkNotNull(tableBucketArn, "You must supply a table bucket ARN."); + Preconditions.checkNotNull(s3TableDatabase, "You must supply a database name"); + Preconditions.checkNotNull(s3Table, "You must supply a table name"); + // check if it's a valid ARN + validateArn(tableBucketArn); + } + + private static DataGeneratorSource createDataGenerator(Properties dataGeneratorProperties) { + double recordsPerSecond = Double.parseDouble(dataGeneratorProperties.getProperty("records.per.sec", "10.0")); + Preconditions.checkArgument(recordsPerSecond > 0, "Generator records per sec must be > 0"); + + return new DataGeneratorSource(new StockPriceGeneratorFunction(), + 100, + RateLimiterStrategy.perSecond(recordsPerSecond), + TypeInformation.of(StockPrice.class)); + } + + /** + * Defines a config object with S3 Table specific catalog and io implementations + * Then, uses that to create the Flink catalog + */ + private static Catalog createS3Catalog(StreamTableEnvironment tableEnv) { + Configuration conf = new Configuration(); + conf.setString("warehouse", tableBucketArn); + conf.setString("catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog"); + conf.setString("type", "iceberg"); + conf.setString("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); + + CatalogDescriptor descriptor = CatalogDescriptor.of(CATALOG_NAME, conf); + + tableEnv.createCatalog(CATALOG_NAME, descriptor); + return tableEnv.getCatalog(CATALOG_NAME).get(); + } + + private static boolean isLocal(StreamExecutionEnvironment env) { + return env instanceof LocalStreamEnvironment; + } + + /** + * Load application properties from Amazon Managed Service for Apache Flink runtime + * or from a local resource, when the environment is local + */ + private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { + if (isLocal(env)) { + LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); + return KinesisAnalyticsRuntime.getApplicationProperties( + Objects.requireNonNull(S3TableSQLJSONExample.class.getClassLoader() + .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE)).getPath()); + } else { + LOG.info("Loading application properties from Amazon Managed Service for Apache Flink"); + return KinesisAnalyticsRuntime.getApplicationProperties(); + } + } + + public static void validateArn(String arn) { + String arnPattern = "^arn:aws[a-zA-Z-]*:[a-zA-Z0-9-]+:[a-zA-Z0-9-]*:[0-9]{12}:[a-zA-Z0-9-_/:.]+$"; + Preconditions.checkArgument(arn != null && arn.matches(arnPattern), + "Invalid ARN format: %s. ARN must match pattern: arn:partition:service:region:account-id:resource", arn); + } +} diff --git a/java/Iceberg/S3TableSQLJSON/src/main/java/StockPrice.java b/java/Iceberg/S3TableSQLJSON/src/main/java/StockPrice.java new file mode 100644 index 0000000..add5f68 --- /dev/null +++ b/java/Iceberg/S3TableSQLJSON/src/main/java/StockPrice.java @@ -0,0 +1,51 @@ + +import java.sql.Timestamp; + +public class StockPrice { + private Timestamp eventtime; + private String ticker; + private Double price; + + + public StockPrice() { + } + + public StockPrice(Timestamp eventtime, String ticker, Double price) { + this.eventtime = eventtime; + this.ticker = ticker; + this.price = price; + } + + public Timestamp getEventtime() { + return eventtime; + } + + public void setEventtime(Timestamp eventtime) { + this.eventtime = eventtime; + } + + public String getTicker() { + return ticker; + } + + public void setTicker(String ticker) { + this.ticker = ticker; + } + + public Double getPrice() { + return price; + } + + public void setPrice(Double price) { + this.price = price; + } + + @Override + public String toString() { + return "StockPrice{" + + "eventtime=" + eventtime + + ", ticker='" + ticker + '\'' + + ", price=" + price + + '}'; + } +} diff --git a/java/Iceberg/S3TableSQLJSON/src/main/java/StockPriceGeneratorFunction.java b/java/Iceberg/S3TableSQLJSON/src/main/java/StockPriceGeneratorFunction.java new file mode 100644 index 0000000..0a150d7 --- /dev/null +++ b/java/Iceberg/S3TableSQLJSON/src/main/java/StockPriceGeneratorFunction.java @@ -0,0 +1,22 @@ +import org.apache.commons.lang3.RandomUtils; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import java.sql.Timestamp; + +/** + * Generates random stock price data for simulation purposes. + */ +public class StockPriceGeneratorFunction implements GeneratorFunction { + + private static final String[] STOCK_SYMBOLS = {"AAPL", "AMZN", "MSFT", "INTC", "TBV"}; + private static final double MAX_PRICE = 100.0; + private static final double MIN_PRICE = 1.0; + + @Override + public StockPrice map(Long sequence) throws Exception { + Timestamp currentTimestamp = new Timestamp(System.currentTimeMillis()); + String randomSymbol = STOCK_SYMBOLS[RandomUtils.nextInt(0, STOCK_SYMBOLS.length)]; + double randomPrice = RandomUtils.nextDouble(MIN_PRICE, MAX_PRICE); + + return new StockPrice(currentTimestamp, randomSymbol, randomPrice); + } +} diff --git a/java/Iceberg/S3TableSQLJSON/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/S3TableSQLJSON/src/main/resources/flink-application-properties-dev.json new file mode 100644 index 0000000..ecadcd3 --- /dev/null +++ b/java/Iceberg/S3TableSQLJSON/src/main/resources/flink-application-properties-dev.json @@ -0,0 +1,20 @@ +[ + { + "PropertyGroupId": "DataGen", + "PropertyMap": { + "records.per.sec": 100.0 + } + }, + { + "PropertyGroupId": "Iceberg", + "PropertyMap": { + "table.bucket.arn": "<>", + "catalog.db": "test_from_flink", + "catalog.table": "test_table", + "partition.fields": "symbol", + "sort.field": "timestamp", + "operation": "upsert", + "upsert.equality.fields": "symbol" + } + } +] \ No newline at end of file From a53d1318b28764ddf462cd4f25818913c5e58c5d Mon Sep 17 00:00:00 2001 From: Jeremy Ber Date: Mon, 10 Feb 2025 10:59:36 -0600 Subject: [PATCH 13/13] fixed readme and cleaned up pom --- java/Iceberg/IcebergSQLJSONGlue/README.md | 86 +++++++----------- java/Iceberg/IcebergSQLJSONGlue/pom.xml | 12 --- .../main/java/GlueTableSQLJSONExample.java | 5 +- .../flink-application-properties-dev.json | 2 +- java/Iceberg/S3TableSQLJSON/README.md | 91 +++++++------------ 5 files changed, 72 insertions(+), 124 deletions(-) diff --git a/java/Iceberg/IcebergSQLJSONGlue/README.md b/java/Iceberg/IcebergSQLJSONGlue/README.md index 7da74d0..a22ff45 100644 --- a/java/Iceberg/IcebergSQLJSONGlue/README.md +++ b/java/Iceberg/IcebergSQLJSONGlue/README.md @@ -1,30 +1,26 @@ -# Flink Iceberg Sink using DataStream API +Here's the rewritten README.md that matches the code example: + +# Flink SQL Example with Glue Catalog and Iceberg * Flink version: 1.20.0 -* Flink API: DataStream API +* Flink API: SQL API * Iceberg 1.6.1 * Language: Java (11) -* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/) - and [Iceberg](https://iceberg.apache.org/docs/latest/flink/) - -This example demonstrate how to use -[Flink Iceberg Sink Connector](https://iceberg.apache.org/docs/latest/flink-writes/) with the Glue Data Catalog. +* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/) + and [Iceberg](https://iceberg.apache.org/docs/latest/flink/) -For simplicity, the application generates synthetic data, random stock prices, internally. -Data is generated as AVRO Generic Record, simulating a real source, for example a Kafka Source, that receives records -serialized with AVRO. +This example demonstrates how to use Flink SQL API with Apache Iceberg and AWS Glue Catalog. The application generates synthetic stock price data using Flink's DataGen connector and writes it to an Iceberg table in the Glue Catalog. ### Prerequisites The application expects the following resources: -* A Glue Data Catalog database in the current AWS region. The database name is configurable (default: "default"). - The application creates the Table, but the Catalog must exist already. -* An S3 bucket to write the Iceberg table. +* A Glue Data Catalog database in the current AWS region. The database name is configurable. +* An S3 bucket to store the Iceberg table data. #### IAM Permissions The application must have IAM permissions to: -* Show and alter Glue Data Catalog databases, show and create Glue Data Catalog tables. +* Show and alter Glue Data Catalog databases, show and create Glue Data Catalog tables. See [Glue Data Catalog permissions](https://docs.aws.amazon.com/athena/latest/ug/fine-grained-access-to-glue-resources.html). * Read and Write from the S3 bucket. @@ -33,54 +29,40 @@ The application must have IAM permissions to: When running on Amazon Managed Service for Apache Flink the runtime configuration is read from Runtime Properties. When running locally, the configuration is read from the -[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file. +`resources/flink-application-properties-dev.json` file. Runtime parameters: -| Group ID | Key | Default | Description | -|-----------|--------------------------|-------------------|---------------------------------------------------------------------------------------------------------------------| -| `DataGen` | `records.per.sec` | `10.0` | Records per second generated. | -| `Iceberg` | `bucket.prefix` | (mandatory) | S3 bucket prefix, for example `s3://my-bucket/iceberg`. | -| `Iceberg` | `catalog.db` | `default` | Name of the Glue Data Catalog database. | -| `Iceberg` | `catalog.table` | `prices_iceberg` | Name of the Glue Data Catalog table. | -| `Iceberg` | `partition.fields` | `symbol` | Comma separated list of partition fields. | -| `Iceberg` | `sort.field` | `timestamp` | Sort field. | -| `Iceberg` | `operation` | `updsert` | Iceberg operation. One of `upsert`, `append` or `overwrite`. | -| `Iceberg` | `upsert.equality.fields` | `symbol` | Comma separated list of fields used for upsert. It must match partition fields. Required if `operation` = `upsert`. | - +| Group ID | Key | Default | Description | +|-----------|-----------------|--------------------------------------|------------------------------------------------| +| `DataGen` | `records.per.sec` | `10.0` | Records per second generated | +| `Iceberg` | `bucket.prefix` | `s3://<>/glue-example/` | S3 bucket prefix for Iceberg table storage | +| `Iceberg` | `catalog.db` | `iceberg` | Name of the Glue Data Catalog database | +| `Iceberg` | `catalog.table` | `stock_data` | Name of the Glue Data Catalog table | ### Checkpoints -Checkpointing must be enabled. Iceberg commits writes on checkpoint. - -When running locally, the application enables checkpoints programmatically, every 10 seconds. -When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration. - - -### Known limitations - -At the moment there are current limitations concerning Flink Iceberg integration: -* Doesn't support Iceberg Table with hidden partitioning -* Doesn't support adding columns, removing columns, renaming columns or changing columns. +Checkpointing must be enabled for proper operation. When running locally, the application enables checkpoints programmatically every 30 seconds with a parallelism of 2. When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration. -### Schema and schema evolution +### Schema -The application must "know" the AVRO schema on start. -The schema cannot be dynamically inferred based on the incoming records, for example using a schema registry. -This is due to a limitation of the Flink Iceberg integration, that requires knowing the table schema upfront. +The application creates a table with the following schema: +- price: DOUBLE +- ticker: STRING +- eventtime: TIMESTAMP(3) -This implementation does support schema evolution in the incoming data, as long as new schema versions are FORWARD compatible. -Schema changes are not propagated to Iceberg. -As long as the schema of incoming records is FORWARD compatible, the application deserializes incoming records using -the schema it knows. Any new field in the incoming record is discarded. +The table is created using Flink SQL DDL and data is inserted using SQL INSERT statements. -In this example, the schema is loaded from a schema definition file, [price.avsc](./src/main/resources/price.avsc) embedded -with the application. -It is technically possible to fetch the schema on application start from an external source, like a schema registry or a -schema definition file in an S3 bucket. This is beyond the scope of this example. +### Running locally -### Running locally, in IntelliJ +You can run this example directly in a local environment with a web UI. The application will: +1. Create a local Flink environment with web UI +2. Set up the Glue catalog configuration +3. Generate synthetic stock price data +4. Create the database and table in Glue +5. Insert the generated data into the Iceberg table +6. Query and display the results -You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation. +Make sure to configure the appropriate AWS credentials and region when running locally. -See [Running examples locally](https://github.com/nicusX/amazon-managed-service-for-apache-flink-examples/blob/main/java/running-examples-locally.md) for details. +See [Running examples locally](../../running-examples-locally.md) for details. diff --git a/java/Iceberg/IcebergSQLJSONGlue/pom.xml b/java/Iceberg/IcebergSQLJSONGlue/pom.xml index 90100fb..10a0660 100644 --- a/java/Iceberg/IcebergSQLJSONGlue/pom.xml +++ b/java/Iceberg/IcebergSQLJSONGlue/pom.xml @@ -83,18 +83,6 @@ ${kda.runtime.version} provided - - - - software.amazon.awssdk - s3tables - 2.29.26 - - - software.amazon.s3tables - s3-tables-catalog-for-iceberg - 0.1.3 - org.apache.flink flink-connector-files diff --git a/java/Iceberg/IcebergSQLJSONGlue/src/main/java/GlueTableSQLJSONExample.java b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/GlueTableSQLJSONExample.java index 6a86d53..0881179 100644 --- a/java/Iceberg/IcebergSQLJSONGlue/src/main/java/GlueTableSQLJSONExample.java +++ b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/GlueTableSQLJSONExample.java @@ -31,7 +31,6 @@ import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogDatabaseImpl; import org.apache.flink.table.catalog.CatalogDescriptor; -import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +67,7 @@ public static void main(String[] args) throws Exception { } // 3. Setup S3 configuration - setupS3TableProperties(icebergProperties); + setupGlueTableProperties(icebergProperties); Catalog glueCatalog = createGlueCatalog(tableEnv); // 4. Create data generator source @@ -113,7 +112,7 @@ public static void main(String[] args) throws Exception { // glueCatalog.dropDatabase(glueDatabase, false); } - private static void setupS3TableProperties(Properties icebergProperties) { + private static void setupGlueTableProperties(Properties icebergProperties) { s3BucketPrefix = icebergProperties.getProperty("bucket.prefix"); glueDatabase = icebergProperties.getProperty("catalog.db"); glueTable = icebergProperties.getProperty("catalog.table"); diff --git a/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/flink-application-properties-dev.json index c4bde33..6e9ede0 100644 --- a/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/flink-application-properties-dev.json +++ b/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/flink-application-properties-dev.json @@ -10,7 +10,7 @@ "PropertyMap": { "catalog.db": "iceberg", "catalog.table": "stock_data", - "bucket.prefix": "s3://<>/glue-example/", + "bucket.prefix": "s3://my-iceberg-bucket-jeremy/glue-example/", "partition.fields": "symbol", "sort.field": "timestamp", "operation": "upsert", diff --git a/java/Iceberg/S3TableSQLJSON/README.md b/java/Iceberg/S3TableSQLJSON/README.md index 7da74d0..3139892 100644 --- a/java/Iceberg/S3TableSQLJSON/README.md +++ b/java/Iceberg/S3TableSQLJSON/README.md @@ -1,86 +1,65 @@ -# Flink Iceberg Sink using DataStream API +# Flink SQL Example with S3 Tables Catalog * Flink version: 1.20.0 -* Flink API: DataStream API -* Iceberg 1.6.1 +* Flink API: SQL API * Language: Java (11) -* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/) - and [Iceberg](https://iceberg.apache.org/docs/latest/flink/) +* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/) + and S3 Tables -This example demonstrate how to use -[Flink Iceberg Sink Connector](https://iceberg.apache.org/docs/latest/flink-writes/) with the Glue Data Catalog. - -For simplicity, the application generates synthetic data, random stock prices, internally. -Data is generated as AVRO Generic Record, simulating a real source, for example a Kafka Source, that receives records -serialized with AVRO. +This example demonstrates how to use Flink SQL API with Amazon S3 Tables Catalog. The application generates synthetic stock price data using Flink's DataGen connector and writes it to a table in the S3 Tables Catalog. ### Prerequisites The application expects the following resources: -* A Glue Data Catalog database in the current AWS region. The database name is configurable (default: "default"). - The application creates the Table, but the Catalog must exist already. -* An S3 bucket to write the Iceberg table. +* An S3 bucket configured for S3 Tables +* Appropriate AWS account and region configuration +* S3 Tables Catalog setup in your AWS account #### IAM Permissions The application must have IAM permissions to: -* Show and alter Glue Data Catalog databases, show and create Glue Data Catalog tables. - See [Glue Data Catalog permissions](https://docs.aws.amazon.com/athena/latest/ug/fine-grained-access-to-glue-resources.html). -* Read and Write from the S3 bucket. +* Create and manage S3 Tables Catalog databases and tables +* Read and Write from the specified S3 bucket +* Access S3 Tables service endpoints ### Runtime configuration When running on Amazon Managed Service for Apache Flink the runtime configuration is read from Runtime Properties. When running locally, the configuration is read from the -[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file. +`resources/flink-application-properties-dev.json` file. Runtime parameters: -| Group ID | Key | Default | Description | -|-----------|--------------------------|-------------------|---------------------------------------------------------------------------------------------------------------------| -| `DataGen` | `records.per.sec` | `10.0` | Records per second generated. | -| `Iceberg` | `bucket.prefix` | (mandatory) | S3 bucket prefix, for example `s3://my-bucket/iceberg`. | -| `Iceberg` | `catalog.db` | `default` | Name of the Glue Data Catalog database. | -| `Iceberg` | `catalog.table` | `prices_iceberg` | Name of the Glue Data Catalog table. | -| `Iceberg` | `partition.fields` | `symbol` | Comma separated list of partition fields. | -| `Iceberg` | `sort.field` | `timestamp` | Sort field. | -| `Iceberg` | `operation` | `updsert` | Iceberg operation. One of `upsert`, `append` or `overwrite`. | -| `Iceberg` | `upsert.equality.fields` | `symbol` | Comma separated list of fields used for upsert. It must match partition fields. Required if `operation` = `upsert`. | - +| Group ID | Key | Default | Description | +|-----------|-------------------|------------------------------------------------------------|-------------------------------------------------| +| `DataGen` | `records.per.sec` | `10.0` | Records per second generated | +| `Iceberg` | `table.bucket.arn` | `arn:aws:s3tables:<>:>:bucket/my-table-bucket` | ARN of the S3 Tables bucket | +| `Iceberg` | `catalog.db` | `test_from_flink` | Name of the S3 Tables Catalog database | +| `Iceberg` | `catalog.table` | `test_table` | Name of the table to be created | ### Checkpoints -Checkpointing must be enabled. Iceberg commits writes on checkpoint. - -When running locally, the application enables checkpoints programmatically, every 10 seconds. -When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration. - - -### Known limitations - -At the moment there are current limitations concerning Flink Iceberg integration: -* Doesn't support Iceberg Table with hidden partitioning -* Doesn't support adding columns, removing columns, renaming columns or changing columns. - -### Schema and schema evolution +Checkpointing must be enabled for proper operation. When running locally, the application enables checkpoints programmatically every 30 seconds with a parallelism of 2. When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration. -The application must "know" the AVRO schema on start. -The schema cannot be dynamically inferred based on the incoming records, for example using a schema registry. -This is due to a limitation of the Flink Iceberg integration, that requires knowing the table schema upfront. +### Schema -This implementation does support schema evolution in the incoming data, as long as new schema versions are FORWARD compatible. -Schema changes are not propagated to Iceberg. -As long as the schema of incoming records is FORWARD compatible, the application deserializes incoming records using -the schema it knows. Any new field in the incoming record is discarded. +The application creates a table with the following schema: +- price: DOUBLE +- ticker: STRING +- eventtime: TIMESTAMP(3) -In this example, the schema is loaded from a schema definition file, [price.avsc](./src/main/resources/price.avsc) embedded -with the application. -It is technically possible to fetch the schema on application start from an external source, like a schema registry or a -schema definition file in an S3 bucket. This is beyond the scope of this example. +The table is created using Flink SQL DDL and data is inserted using SQL INSERT statements. -### Running locally, in IntelliJ +### Running locally -You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation. +You can run this example directly in a local environment with a web UI. The application will: +1. Create a local Flink environment with web UI +2. Set up the S3 Tables catalog configuration +3. Generate synthetic stock price data +4. Create the database and table in S3 Tables Catalog +5. Insert the generated data into the table +6. Query and display the results +7. Optionally cleanup resources (database and table) -See [Running examples locally](https://github.com/nicusX/amazon-managed-service-for-apache-flink-examples/blob/main/java/running-examples-locally.md) for details. +Make sure to configure the appropriate AWS credentials and region when running locally, and ensure the provided S3 Tables bucket ARN is valid and accessible. \ No newline at end of file