diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..b858c86 --- /dev/null +++ b/.gitattributes @@ -0,0 +1 @@ +flink-application-properties-dev.json filter=arn-filter diff --git a/.gitignore b/.gitignore index b35d78c..6be6265 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,6 @@ venv/ /pyflink/ /.run/ + +clean.sh +smudge.sh \ No newline at end of file diff --git a/java/IcebergDatastreamSink/README.md b/java/Iceberg/IcebergDataStreamSink/README.md similarity index 100% rename from java/IcebergDatastreamSink/README.md rename to java/Iceberg/IcebergDataStreamSink/README.md diff --git a/java/IcebergDatastreamSink/pom.xml b/java/Iceberg/IcebergDataStreamSink/pom.xml similarity index 100% rename from java/IcebergDatastreamSink/pom.xml rename to java/Iceberg/IcebergDataStreamSink/pom.xml diff --git a/java/IcebergDatastreamSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java b/java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java similarity index 100% rename from java/IcebergDatastreamSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java rename to java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java diff --git a/java/IcebergDatastreamSink/src/main/java/com/amazonaws/services/msf/avro/AvroSchemaUtils.java b/java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/avro/AvroSchemaUtils.java similarity index 100% rename from java/IcebergDatastreamSink/src/main/java/com/amazonaws/services/msf/avro/AvroSchemaUtils.java rename to java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/avro/AvroSchemaUtils.java diff --git a/java/IcebergDatastreamSink/src/main/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunction.java b/java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunction.java similarity index 100% rename from java/IcebergDatastreamSink/src/main/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunction.java rename to java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunction.java diff --git a/java/IcebergDatastreamSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java b/java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java similarity index 100% rename from java/IcebergDatastreamSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java rename to java/Iceberg/IcebergDataStreamSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java diff --git a/java/IcebergDatastreamSink/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/IcebergDataStreamSink/src/main/resources/flink-application-properties-dev.json similarity index 100% rename from java/IcebergDatastreamSink/src/main/resources/flink-application-properties-dev.json rename to java/Iceberg/IcebergDataStreamSink/src/main/resources/flink-application-properties-dev.json diff --git a/java/IcebergDatastreamSink/src/main/resources/log4j2.properties b/java/Iceberg/IcebergDataStreamSink/src/main/resources/log4j2.properties similarity index 100% rename from java/IcebergDatastreamSink/src/main/resources/log4j2.properties rename to java/Iceberg/IcebergDataStreamSink/src/main/resources/log4j2.properties diff --git a/java/IcebergDatastreamSink/src/main/resources/price.avsc b/java/Iceberg/IcebergDataStreamSink/src/main/resources/price.avsc similarity index 100% rename from java/IcebergDatastreamSink/src/main/resources/price.avsc rename to java/Iceberg/IcebergDataStreamSink/src/main/resources/price.avsc diff --git a/java/IcebergDatastreamSink/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java b/java/Iceberg/IcebergDataStreamSink/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java similarity index 100% rename from java/IcebergDatastreamSink/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java rename to java/Iceberg/IcebergDataStreamSink/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java diff --git a/java/Iceberg/IcebergDataStreamSource/README.md b/java/Iceberg/IcebergDataStreamSource/README.md new file mode 100644 index 0000000..54e16fb --- /dev/null +++ b/java/Iceberg/IcebergDataStreamSource/README.md @@ -0,0 +1,86 @@ +# Flink Iceberg Source using DataStream API + +* Flink version: 1.20.0 +* Flink API: DataStream API +* Iceberg 1.6.1 +* Language: Java (11) +* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/) + and [Iceberg](https://iceberg.apache.org/docs/latest/flink/) + +This example demonstrate how to use +[Flink Iceberg Source Connector](https://iceberg.apache.org/docs/latest/flink-writes/) with the Glue Data Catalog. + +For simplicity, the application generates synthetic data, random stock prices, internally. +Data is generated as AVRO Generic Record, simulating a real source, for example a Kafka Source, that receives records +serialized with AVRO. + +### Prerequisites + +The application expects the following resources: +* A Glue Data Catalog database in the current AWS region. The database name is configurable (default: "default"). + The application creates the Table, but the Catalog must exist already. +* An S3 bucket to write the Iceberg table. + +#### IAM Permissions + +The application must have IAM permissions to: +* Show and alter Glue Data Catalog databases, show and create Glue Data Catalog tables. + See [Glue Data Catalog permissions](https://docs.aws.amazon.com/athena/latest/ug/fine-grained-access-to-glue-resources.html). +* Read and Write from the S3 bucket. + +### Runtime configuration + +When running on Amazon Managed Service for Apache Flink the runtime configuration is read from Runtime Properties. + +When running locally, the configuration is read from the +[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file. + +Runtime parameters: + +| Group ID | Key | Default | Description | +|-----------|--------------------------|-------------------|---------------------------------------------------------------------------------------------------------------------| +| `DataGen` | `records.per.sec` | `10.0` | Records per second generated. | +| `Iceberg` | `bucket.prefix` | (mandatory) | S3 bucket prefix, for example `s3://my-bucket/iceberg`. | +| `Iceberg` | `catalog.db` | `default` | Name of the Glue Data Catalog database. | +| `Iceberg` | `catalog.table` | `prices_iceberg` | Name of the Glue Data Catalog table. | +| `Iceberg` | `partition.fields` | `symbol` | Comma separated list of partition fields. | +| `Iceberg` | `sort.field` | `timestamp` | Sort field. | +| `Iceberg` | `operation` | `updsert` | Iceberg operation. One of `upsert`, `append` or `overwrite`. | +| `Iceberg` | `upsert.equality.fields` | `symbol` | Comma separated list of fields used for upsert. It must match partition fields. Required if `operation` = `upsert`. | + + +### Checkpoints + +Checkpointing must be enabled. Iceberg commits writes on checkpoint. + +When running locally, the application enables checkpoints programmatically, every 10 seconds. +When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration. + + +### Known limitations + +At the moment there are current limitations concerning Flink Iceberg integration: +* Doesn't support Iceberg Table with hidden partitioning +* Doesn't support adding columns, removing columns, renaming columns or changing columns. + +### Schema and schema evolution + +The application must "know" the AVRO schema on start. +The schema cannot be dynamically inferred based on the incoming records, for example using a schema registry. +This is due to a limitation of the Flink Iceberg integration, that requires knowing the table schema upfront. + +This implementation does support schema evolution in the incoming data, as long as new schema versions are FORWARD compatible. +Schema changes are not propagated to Iceberg. +As long as the schema of incoming records is FORWARD compatible, the application deserializes incoming records using +the schema it knows. Any new field in the incoming record is discarded. + +In this example, the schema is loaded from a schema definition file, [price.avsc](./src/main/resources/price.avsc) embedded +with the application. +It is technically possible to fetch the schema on application start from an external source, like a schema registry or a +schema definition file in an S3 bucket. This is beyond the scope of this example. + +### Running locally, in IntelliJ + +You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation. + +See [Running examples locally](https://github.com/nicusX/amazon-managed-service-for-apache-flink-examples/blob/main/java/running-examples-locally.md) for details. diff --git a/java/Iceberg/IcebergDataStreamSource/pom.xml b/java/Iceberg/IcebergDataStreamSource/pom.xml new file mode 100644 index 0000000..2b97e4d --- /dev/null +++ b/java/Iceberg/IcebergDataStreamSource/pom.xml @@ -0,0 +1,212 @@ + + + 4.0.0 + + com.amazonaws + iceberg-datastream-source + 1.0 + jar + + + UTF-8 + 11 + ${target.java.version} + ${target.java.version} + + 1.20.0 + 1.11.3 + 3.4.0 + 1.6.1 + 1.2.0 + 2.23.1 + 5.8.1 + + + + + + org.apache.flink + flink-runtime-web + ${flink.version} + provided + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + + org.apache.flink + flink-connector-files + ${flink.version} + provided + + + org.apache.flink + flink-table-runtime + ${flink.version} + provided + + + + + org.apache.flink + flink-metrics-dropwizard + ${flink.version} + + + + + com.amazonaws + aws-kinesisanalytics-runtime + ${kda.runtime.version} + provided + + + + + org.apache.flink + flink-avro + ${flink.version} + + + + + + org.apache.iceberg + iceberg-core + ${iceberg.version} + + + org.apache.iceberg + iceberg-flink + ${iceberg.version} + + + org.apache.iceberg + iceberg-flink-1.19 + ${iceberg.version} + + + org.apache.iceberg + iceberg-aws-bundle + ${iceberg.version} + + + org.apache.iceberg + iceberg-aws + ${iceberg.version} + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + org.apache.avro + avro + + + + org.slf4j + slf4j-reload4j + + + + + + + + org.junit.jupiter + junit-jupiter + ${junit5.version} + test + + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + runtime + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${target.java.version} + ${target.java.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + log4j:* + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.amazonaws.services.msf.StreamingJob + + + + + + + + + \ No newline at end of file diff --git a/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/StreamingJob.java b/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/StreamingJob.java new file mode 100644 index 0000000..a9e0ea0 --- /dev/null +++ b/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/StreamingJob.java @@ -0,0 +1,143 @@ +package com.amazonaws.services.msf; + +import org.apache.avro.generic.GenericRecord; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Preconditions; + +import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; +import com.amazonaws.services.msf.avro.AvroSchemaUtils; +import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.source.FlinkSource; +import org.apache.iceberg.flink.source.IcebergSource; +import org.apache.iceberg.flink.source.StreamingStartingStrategy; +import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory; +import org.apache.iceberg.flink.source.reader.AvroGenericRecordReaderFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + + +public class StreamingJob { + private final static Logger LOG = LoggerFactory.getLogger(StreamingJob.class); + private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; + + private static boolean isLocal(StreamExecutionEnvironment env) { + return env instanceof LocalStreamEnvironment; + } + + /** + * Load application properties from Amazon Managed Service for Apache Flink runtime or from a local resource, when the environment is local + */ + private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { + if (isLocal(env)) { + LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); + return KinesisAnalyticsRuntime.getApplicationProperties( + Objects.requireNonNull(StreamingJob.class.getClassLoader() + .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE)).getPath()); + } else { + LOG.info("Loading application properties from Amazon Managed Service for Apache Flink"); + return KinesisAnalyticsRuntime.getApplicationProperties(); + } + } + + public static void main(String[] args) throws Exception { + // Set up the streaming execution environment + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + Map applicationProperties = loadApplicationProperties(env); + + + // Get AVRO Schema from the definition bundled with the application + // Note that the application must "knows" the AVRO schema upfront, i.e. the schema must be either embedded + // with the application or fetched at start time. + // If the schema of the records received by the source changes, all changes must be FORWARD compatible. + // This way, the application will be able to write the data with the new schema into the old schema, but schema + // changes are not propagated to the Iceberg table. + Schema avroSchema = AvroSchemaUtils.loadSchema(); + + + + // Local dev specific settings + if (isLocal(env)) { + org.apache.flink.configuration.Configuration conf = new org.apache.flink.configuration.Configuration(); + env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); + env.disableOperatorChaining(); + + // Checkpointing and parallelism are set by Amazon Managed Service for Apache Flink when running on AWS + env.enableCheckpointing(60000); + env.setParallelism(2); + + + } + + Properties icebergProperties = applicationProperties.get("Iceberg"); + + String s3BucketPrefix = Preconditions.checkNotNull(icebergProperties.getProperty("bucket.prefix"), "Iceberg S3 bucket prefix not defined"); + + + // Catalog properties for using Glue Data Catalog + Map catalogProperties = new HashMap<>(); + catalogProperties.put("type", "iceberg"); + catalogProperties.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); + catalogProperties.put("warehouse", s3BucketPrefix); + + CatalogLoader glueCatalogLoader = + CatalogLoader.custom( + "glue", + catalogProperties, + new Configuration(), + "org.apache.iceberg.aws.glue.GlueCatalog"); + + String DEFAULT_GLUE_DB = "iceberg"; + String DEFAULT_ICEBERG_TABLE_NAME = "prices_iceberg"; + + String glueDatabase = icebergProperties.getProperty("catalog.db", DEFAULT_GLUE_DB); + String glueTable = icebergProperties.getProperty("catalog.table", DEFAULT_ICEBERG_TABLE_NAME); + TableIdentifier inputTable = TableIdentifier.of(glueDatabase, glueTable); + + TableLoader tableLoader = TableLoader.fromCatalog(glueCatalogLoader, inputTable); + Table table; + try(TableLoader loader = tableLoader) { + loader.open(); + table = loader.loadTable(); + } + + AvroGenericRecordReaderFunction readerFunction = AvroGenericRecordReaderFunction.fromTable(table); + + + + IcebergSource source = + IcebergSource.builder() + .tableLoader(tableLoader) + .readerFunction(readerFunction) + .assignerFactory(new SimpleSplitAssignerFactory()) + .monitorInterval(Duration.ofSeconds(60)) + .streaming(true) + .build(); + + + DataStreamSource stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), + "Iceberg Source as Avro GenericRecord", new GenericRecordAvroTypeInfo(avroSchema)); + + + stream.print(); + + env.execute("Flink DataStream Source"); + } +} \ No newline at end of file diff --git a/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/avro/AvroSchemaUtils.java b/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/avro/AvroSchemaUtils.java new file mode 100644 index 0000000..de34c5e --- /dev/null +++ b/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/avro/AvroSchemaUtils.java @@ -0,0 +1,22 @@ +package com.amazonaws.services.msf.avro; + +import com.amazonaws.services.msf.StreamingJob; +import org.apache.avro.Schema; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; + +public class AvroSchemaUtils implements Serializable { + + private static final String AVRO_SCHEMA_RESOURCE = "price.avsc"; + + /** + * Load the AVRO Schema from the resources folder + */ + public static Schema loadSchema() throws IOException { + // Get AVRO Schema from the definition bundled with the application + InputStream inputStream = StreamingJob.class.getClassLoader().getResourceAsStream(AVRO_SCHEMA_RESOURCE); + return new org.apache.avro.Schema.Parser().parse(inputStream); + } +} diff --git a/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunction.java b/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunction.java new file mode 100644 index 0000000..5b1cf34 --- /dev/null +++ b/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunction.java @@ -0,0 +1,46 @@ +package com.amazonaws.services.msf.datagen; + +import org.apache.flink.connector.datagen.source.GeneratorFunction; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.RandomUtils; + +import java.time.Instant; + +/** + * Function used by DataGen source to generate random records as AVRO GenericRecord. + *

+ * The generator assumes that the AVRO schema provided contains the generated fields + */ +public class AvroGenericStockTradeGeneratorFunction implements GeneratorFunction { + + private static final String[] SYMBOLS = {"AAPL", "AMZN", "MSFT", "INTC", "TBV"}; + + private Schema avroSchema; + + public AvroGenericStockTradeGeneratorFunction(Schema avroSchema) { + this.avroSchema = avroSchema; + } + + /** + * Generates a random trade + */ + @Override + public GenericRecord map(Long value) { + GenericData.Record record = new GenericData.Record(avroSchema); + + String symbol = SYMBOLS[RandomUtils.nextInt(0, SYMBOLS.length)]; + float price = RandomUtils.nextFloat(0, 10); + int volumes = RandomUtils.nextInt(0, 1000000); + String timestamp = Instant.now().toString(); + + record.put("symbol", symbol); + record.put("price", price); + record.put("volumes", volumes); + record.put("timestamp", timestamp); + + return record; + } +} diff --git a/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java b/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java new file mode 100644 index 0000000..8dbbcdc --- /dev/null +++ b/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java @@ -0,0 +1,154 @@ +package com.amazonaws.services.msf.iceberg; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.generic.GenericRecord; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.NullOrder; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.sink.AvroGenericRecordToRowDataMapper; +import org.apache.iceberg.flink.sink.FlinkSink; +import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * Wraps the code to initialize an Iceberg sink that uses Glue Data Catalog as catalog + */ +public class IcebergSinkBuilder { + private static final String DEFAULT_GLUE_DB = "default"; + private static final String DEFAULT_ICEBERG_TABLE_NAME = "prices_iceberg"; + private static final String DEFAULT_ICEBERG_SORT_ORDER_FIELD = "accountNr"; + private static final String DEFAULT_ICEBERG_PARTITION_FIELDS = "symbol"; + private static final String DEFAULT_ICEBERG_OPERATION = "upsert"; + private static final String DEFAULT_ICEBERG_UPSERT_FIELDS = "symbol"; + + + /** + * If Iceberg Table has not been previously created, we will create it using the Partition Fields specified in the + * Properties, as well as add a Sort Field to improve query performance + */ + private static void createTable(Catalog catalog, TableIdentifier outputTable, org.apache.iceberg.Schema icebergSchema, PartitionSpec partitionSpec, String sortField) { + // If table has been previously created, we do not do any operation or modification + if (!catalog.tableExists(outputTable)) { + Table icebergTable = catalog.createTable(outputTable, icebergSchema, partitionSpec); + // Modifying newly created iceberg table to have a sort field + icebergTable.replaceSortOrder() + .asc(sortField, NullOrder.NULLS_LAST) + .commit(); + // The catalog.create table creates an Iceberg V1 table. If we want to perform upserts, we need to upgrade the table version to 2. + TableOperations tableOperations = ((BaseTable) icebergTable).operations(); + TableMetadata appendTableMetadata = tableOperations.current(); + tableOperations.commit(appendTableMetadata, appendTableMetadata.upgradeToFormatVersion(2)); + } + } + + /** + * Generate the PartitionSpec, if when creating table, you want it to be partitioned. + * If you are doing Upserts in your Iceberg Table, your Equality Fields must be the same as the fields used for Partitioning. + */ + private static PartitionSpec getPartitionSpec(org.apache.iceberg.Schema icebergSchema, List partitionFieldsList) { + PartitionSpec.Builder partitionBuilder = PartitionSpec.builderFor(icebergSchema); + for (String s : partitionFieldsList) { + partitionBuilder = partitionBuilder.identity(s); + } + return partitionBuilder.build(); + } + + // Iceberg Flink Sink Builder + public static FlinkSink.Builder createBuilder(Properties icebergProperties, DataStream dataStream, org.apache.avro.Schema avroSchema) { + // Retrieve configuration from application parameters + String s3BucketPrefix = Preconditions.checkNotNull(icebergProperties.getProperty("bucket.prefix"), "Iceberg S3 bucket prefix not defined"); + + String glueDatabase = icebergProperties.getProperty("catalog.db", DEFAULT_GLUE_DB); + String glueTable = icebergProperties.getProperty("catalog.table", DEFAULT_ICEBERG_TABLE_NAME); + + String partitionFields = icebergProperties.getProperty("partition.fields", DEFAULT_ICEBERG_PARTITION_FIELDS); + List partitionFieldList = Arrays.asList(partitionFields.split("\\s*,\\s*")); + + String sortField = icebergProperties.getProperty("sort.field", DEFAULT_ICEBERG_SORT_ORDER_FIELD); + + // Iceberg you can perform Appends, Upserts and Overwrites. + String icebergOperation = icebergProperties.getProperty("operation", DEFAULT_ICEBERG_OPERATION); + Preconditions.checkArgument(icebergOperation.equals("append") || icebergOperation.equals("upsert") || icebergOperation.equals("overwrite"), "Invalid Iceberg Operation"); + + // If operation is upsert, we need to specify the fields that will be used for equality in the upsert operation + // If the table is partitioned, we must include the partition fields + // This is a comma-separated list of fields + String upsertEqualityFields = icebergProperties.getProperty("upsert.equality.fields", DEFAULT_ICEBERG_UPSERT_FIELDS); + List equalityFieldsList = Arrays.asList(upsertEqualityFields.split("[, ]+")); + + + // Convert Avro Schema to Iceberg Schema, this will be used for creating the table + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + // Avro Generic Record to Row Data Mapper + MapFunction avroGenericRecordToRowDataMapper = AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema); + + + // Catalog properties for using Glue Data Catalog + Map catalogProperties = new HashMap<>(); + catalogProperties.put("type", "iceberg"); + catalogProperties.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); + catalogProperties.put("warehouse", s3BucketPrefix); + + // Load Glue Data Catalog + CatalogLoader glueCatalogLoader = CatalogLoader.custom( + "glue", + catalogProperties, + new org.apache.hadoop.conf.Configuration(), + "org.apache.iceberg.aws.glue.GlueCatalog"); + // Table Object that represents the table in the Glue Data Catalog + TableIdentifier outputTable = TableIdentifier.of(glueDatabase, glueTable); + // Load created Iceberg Catalog to perform table operations + Catalog catalog = glueCatalogLoader.loadCatalog(); + + + // Based on how many fields we want to partition, we create the Partition Spec + PartitionSpec partitionSpec = getPartitionSpec(icebergSchema, partitionFieldList); + // We create the Iceberg Table, using the Iceberg Catalog, Table Identifier, Schema parsed in Iceberg Schema Format and the partition spec + createTable(catalog, outputTable, icebergSchema, partitionSpec, sortField); + // Once the table has been created in the job or before, we load it + TableLoader tableLoader = TableLoader.fromCatalog(glueCatalogLoader, outputTable); + // Get RowType Schema from Iceberg Schema + RowType rowType = FlinkSchemaUtil.convert(icebergSchema); + + // Iceberg DataStream sink builder + FlinkSink.Builder flinkSinkBuilder = FlinkSink.builderFor( + dataStream, + avroGenericRecordToRowDataMapper, + FlinkCompatibilityUtil.toTypeInfo(rowType)) + .tableLoader(tableLoader); + + // Returns the builder for the selected operation + switch (icebergOperation) { + case "upsert": + // If operation is "upsert" we need to set up the equality fields + return flinkSinkBuilder + .equalityFieldColumns(equalityFieldsList) + .upsert(true); + case "overwrite": + return flinkSinkBuilder + .overwrite(true); + default: + return flinkSinkBuilder; + } + } + +} diff --git a/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSourceBuilder.java b/java/Iceberg/IcebergDataStreamSource/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSourceBuilder.java new file mode 100644 index 0000000..e69de29 diff --git a/java/Iceberg/IcebergDataStreamSource/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/IcebergDataStreamSource/src/main/resources/flink-application-properties-dev.json new file mode 100644 index 0000000..28fe270 --- /dev/null +++ b/java/Iceberg/IcebergDataStreamSource/src/main/resources/flink-application-properties-dev.json @@ -0,0 +1,20 @@ +[ + { + "PropertyGroupId": "DataGen", + "PropertyMap": { + "records.per.sec": 10.0 + } + }, + { + "PropertyGroupId": "Iceberg", + "PropertyMap": { + "bucket.prefix": "s3:///iceberg", + "catalog.db": "iceberg", + "catalog.table": "prices_iceberg", + "partition.fields": "symbol", + "sort.field": "timestamp", + "operation": "append", + "upsert.equality.fields": "symbol" + } + } +] \ No newline at end of file diff --git a/java/Iceberg/IcebergDataStreamSource/src/main/resources/log4j2.properties b/java/Iceberg/IcebergDataStreamSource/src/main/resources/log4j2.properties new file mode 100644 index 0000000..3c0515a --- /dev/null +++ b/java/Iceberg/IcebergDataStreamSource/src/main/resources/log4j2.properties @@ -0,0 +1,7 @@ +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n + +rootLogger.level = WARN +rootLogger.appenderRef.console.ref = ConsoleAppender \ No newline at end of file diff --git a/java/Iceberg/IcebergDataStreamSource/src/main/resources/price.avsc b/java/Iceberg/IcebergDataStreamSource/src/main/resources/price.avsc new file mode 100644 index 0000000..4603eb2 --- /dev/null +++ b/java/Iceberg/IcebergDataStreamSource/src/main/resources/price.avsc @@ -0,0 +1,23 @@ +{ + "type": "record", + "name": "Price", + "namespace": "com.amazonaws.services.msf.avro", + "fields": [ + { + "name": "timestamp", + "type": "string" + }, + { + "name": "symbol", + "type": "string" + }, + { + "name": "price", + "type": "float" + }, + { + "name": "volumes", + "type": "int" + } + ] +} \ No newline at end of file diff --git a/java/Iceberg/IcebergDataStreamSource/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java b/java/Iceberg/IcebergDataStreamSource/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java new file mode 100644 index 0000000..5593ca1 --- /dev/null +++ b/java/Iceberg/IcebergDataStreamSource/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java @@ -0,0 +1,26 @@ +package com.amazonaws.services.msf.datagen; + +import com.amazonaws.services.msf.avro.AvroSchemaUtils; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertInstanceOf; + +class AvroGenericStockTradeGeneratorFunctionTest { + + @Test + void generateRecord() throws Exception { + Schema avroSchema = AvroSchemaUtils.loadSchema(); + AvroGenericStockTradeGeneratorFunction generatorFunction = new AvroGenericStockTradeGeneratorFunction(avroSchema); + + GenericRecord record = generatorFunction.map(42L); + + + assertInstanceOf(String.class, record.get("timestamp")); + assertInstanceOf(String.class, record.get("symbol")); + assertInstanceOf(Float.class, record.get("price")); + assertInstanceOf(Integer.class, record.get("volumes")); + } + +} \ No newline at end of file diff --git a/java/Iceberg/IcebergSQLJSONGlue/README.md b/java/Iceberg/IcebergSQLJSONGlue/README.md new file mode 100644 index 0000000..a22ff45 --- /dev/null +++ b/java/Iceberg/IcebergSQLJSONGlue/README.md @@ -0,0 +1,68 @@ +Here's the rewritten README.md that matches the code example: + +# Flink SQL Example with Glue Catalog and Iceberg + +* Flink version: 1.20.0 +* Flink API: SQL API +* Iceberg 1.6.1 +* Language: Java (11) +* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/) + and [Iceberg](https://iceberg.apache.org/docs/latest/flink/) + +This example demonstrates how to use Flink SQL API with Apache Iceberg and AWS Glue Catalog. The application generates synthetic stock price data using Flink's DataGen connector and writes it to an Iceberg table in the Glue Catalog. + +### Prerequisites + +The application expects the following resources: +* A Glue Data Catalog database in the current AWS region. The database name is configurable. +* An S3 bucket to store the Iceberg table data. + +#### IAM Permissions + +The application must have IAM permissions to: +* Show and alter Glue Data Catalog databases, show and create Glue Data Catalog tables. + See [Glue Data Catalog permissions](https://docs.aws.amazon.com/athena/latest/ug/fine-grained-access-to-glue-resources.html). +* Read and Write from the S3 bucket. + +### Runtime configuration + +When running on Amazon Managed Service for Apache Flink the runtime configuration is read from Runtime Properties. + +When running locally, the configuration is read from the +`resources/flink-application-properties-dev.json` file. + +Runtime parameters: + +| Group ID | Key | Default | Description | +|-----------|-----------------|--------------------------------------|------------------------------------------------| +| `DataGen` | `records.per.sec` | `10.0` | Records per second generated | +| `Iceberg` | `bucket.prefix` | `s3://<>/glue-example/` | S3 bucket prefix for Iceberg table storage | +| `Iceberg` | `catalog.db` | `iceberg` | Name of the Glue Data Catalog database | +| `Iceberg` | `catalog.table` | `stock_data` | Name of the Glue Data Catalog table | + +### Checkpoints + +Checkpointing must be enabled for proper operation. When running locally, the application enables checkpoints programmatically every 30 seconds with a parallelism of 2. When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration. + +### Schema + +The application creates a table with the following schema: +- price: DOUBLE +- ticker: STRING +- eventtime: TIMESTAMP(3) + +The table is created using Flink SQL DDL and data is inserted using SQL INSERT statements. + +### Running locally + +You can run this example directly in a local environment with a web UI. The application will: +1. Create a local Flink environment with web UI +2. Set up the Glue catalog configuration +3. Generate synthetic stock price data +4. Create the database and table in Glue +5. Insert the generated data into the Iceberg table +6. Query and display the results + +Make sure to configure the appropriate AWS credentials and region when running locally. + +See [Running examples locally](../../running-examples-locally.md) for details. diff --git a/java/Iceberg/IcebergSQLJSONGlue/pom.xml b/java/Iceberg/IcebergSQLJSONGlue/pom.xml new file mode 100644 index 0000000..10a0660 --- /dev/null +++ b/java/Iceberg/IcebergSQLJSONGlue/pom.xml @@ -0,0 +1,235 @@ + + + 4.0.0 + + com.amazonaws + iceberg-sql-flink + 1.0 + jar + + + UTF-8 + 11 + ${target.java.version} + ${target.java.version} + + 1.19.0 + 1.11.3 + 2.12 + 3.4.0 + 1.6.1 + 1.2.0 + 2.23.1 + 5.8.1 + + + + + + org.apache.flink + flink-runtime-web + ${flink.version} + provided + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-table-runtime + ${flink.version} + provided + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + + + org.apache.flink + flink-table-common + ${flink.version} + + + org.apache.flink + flink-metrics-dropwizard + ${flink.version} + + + org.apache.flink + flink-avro + ${flink.version} + + + + + + org.apache.flink + flink-table-planner_${scala.version} + ${flink.version} + provided + + + + + + com.amazonaws + aws-kinesisanalytics-runtime + ${kda.runtime.version} + provided + + + org.apache.flink + flink-connector-files + ${flink.version} + provided + + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + org.apache.avro + avro + + + org.slf4j + slf4j-reload4j + + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + + + + + org.apache.iceberg + iceberg-core + ${iceberg.version} + + + org.apache.iceberg + iceberg-flink + ${iceberg.version} + + + org.apache.iceberg + iceberg-flink + ${iceberg.version} + + + org.apache.iceberg + iceberg-aws-bundle + ${iceberg.version} + + + org.apache.iceberg + iceberg-aws + ${iceberg.version} + + + + + org.junit.jupiter + junit-jupiter + ${junit5.version} + test + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + runtime + + + org.apache.iceberg + iceberg-flink-1.19 + 1.7.0 + compile + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${target.java.version} + ${target.java.version} + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + log4j:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.amazonaws.services.msf.StreamingJob + + + + + + + + + diff --git a/java/Iceberg/IcebergSQLJSONGlue/src/main/java/GlueTableSQLJSONExample.java b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/GlueTableSQLJSONExample.java new file mode 100644 index 0000000..0881179 --- /dev/null +++ b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/GlueTableSQLJSONExample.java @@ -0,0 +1,180 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: MIT-0 + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this + * software and associated documentation files (the "Software"), to deal in the Software + * without restriction, including without limitation the rights to use, copy, modify, + * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A + * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + +public class GlueTableSQLJSONExample { + // Constants + private static final String CATALOG_NAME = "glue"; + private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; + private static final Logger LOG = LoggerFactory.getLogger(GlueTableSQLJSONExample.class); + + // Configuration properties + private static String s3BucketPrefix; + private static String glueDatabase; + private static String glueTable; + + public static void main(String[] args) throws Exception { + // 1. Initialize environments + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); + final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + + // 2. Load properties and configure environment + Map applicationProperties = loadApplicationProperties(env); + Properties icebergProperties = applicationProperties.get("Iceberg"); + + // Configure local development settings if needed + if (isLocal(env)) { + env.enableCheckpointing(30000); + env.setParallelism(2); + } + + // 3. Setup S3 configuration + setupGlueTableProperties(icebergProperties); + Catalog glueCatalog = createGlueCatalog(tableEnv); + + // 4. Create data generator source + Properties dataGenProperties = applicationProperties.get("DataGen"); + DataStream stockPriceDataStream = env.fromSource( + createDataGenerator(dataGenProperties), + WatermarkStrategy.noWatermarks(), + "DataGen"); + + // 5. Convert DataStream to Table and create view + Table stockPriceTable = tableEnv.fromDataStream(stockPriceDataStream); + tableEnv.createTemporaryView("stockPriceTable", stockPriceTable); + + // 6. Create database and define table structure + glueCatalog.createDatabase(glueDatabase, + new CatalogDatabaseImpl(Map.of(), "Glue Database"), true); + + String sinkTableName = CATALOG_NAME + "." + glueDatabase + "." + glueTable; + + // Define and create table + String createTableStatement = "CREATE TABLE IF NOT EXISTS " + sinkTableName + "(" + + "price DOUBLE, " + + "ticker STRING," + + "eventtime TIMESTAMP(3)" + + ");"; + tableEnv.executeSql(createTableStatement); + + // 7. Execute SQL operations + // Insert data from stock price stream + String insertQuery = "INSERT INTO " + sinkTableName + + " SELECT price, ticker, eventtime FROM stockPriceTable"; + TableResult insertResult = tableEnv.executeSql(insertQuery); + insertResult.await(); + + // Query the results + String selectQuery = "SELECT * from " + sinkTableName + ";"; + TableResult selectResults = tableEnv.executeSql(selectQuery); + selectResults.print(); + + // 8. Optionally Cleanup resources +// glueCatalog.dropTable(new ObjectPath(glueDatabase, glueTable), false); +// glueCatalog.dropDatabase(glueDatabase, false); + } + + private static void setupGlueTableProperties(Properties icebergProperties) { + s3BucketPrefix = icebergProperties.getProperty("bucket.prefix"); + glueDatabase = icebergProperties.getProperty("catalog.db"); + glueTable = icebergProperties.getProperty("catalog.table"); + Preconditions.checkNotNull(s3BucketPrefix, "You must supply an s3 bucket ARN for the warehouse."); + Preconditions.checkNotNull(glueDatabase, "You must supply a database name"); + Preconditions.checkNotNull(glueTable, "You must supply a table name"); + // check if it's a valid ARN + validateURI(s3BucketPrefix); + } + + private static DataGeneratorSource createDataGenerator(Properties dataGeneratorProperties) { + double recordsPerSecond = Double.parseDouble(dataGeneratorProperties.getProperty("records.per.sec", "10.0")); + Preconditions.checkArgument(recordsPerSecond > 0, "Generator records per sec must be > 0"); + + return new DataGeneratorSource(new StockPriceGeneratorFunction(), + 100, + RateLimiterStrategy.perSecond(recordsPerSecond), + TypeInformation.of(StockPrice.class)); + } + + /** + * Defines a config object with Glue specific catalog and io implementations + * Then, uses that to create the Flink catalog + */ + private static Catalog createGlueCatalog(StreamTableEnvironment tableEnv) { + + Configuration conf = new Configuration(); + conf.setString("warehouse", s3BucketPrefix); + conf.setString("catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog"); + conf.setString("type", "iceberg"); + conf.setString("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); + conf.setString("catalog-name", CATALOG_NAME); + + CatalogDescriptor descriptor = CatalogDescriptor.of(CATALOG_NAME, conf); + + tableEnv.createCatalog(CATALOG_NAME, descriptor); + return tableEnv.getCatalog(CATALOG_NAME).get(); + } + + private static boolean isLocal(StreamExecutionEnvironment env) { + return env instanceof LocalStreamEnvironment; + } + + /** + * Load application properties from Amazon Managed Service for Apache Flink runtime + * or from a local resource, when the environment is local + */ + private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { + if (isLocal(env)) { + LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); + return KinesisAnalyticsRuntime.getApplicationProperties( + Objects.requireNonNull(GlueTableSQLJSONExample.class.getClassLoader() + .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE)).getPath()); + } else { + LOG.info("Loading application properties from Amazon Managed Service for Apache Flink"); + return KinesisAnalyticsRuntime.getApplicationProperties(); + } + } + + public static void validateURI(String uri) { + String s3UriPattern = "^s3://([a-z0-9.-]+)(/[a-z0-9-_/]+/?)$"; + Preconditions.checkArgument(uri != null && uri.matches(s3UriPattern), + "Invalid S3 URI format: %s. URI must match pattern: s3://bucket-name/path/", uri); + } +} diff --git a/java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPrice.java b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPrice.java new file mode 100644 index 0000000..add5f68 --- /dev/null +++ b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPrice.java @@ -0,0 +1,51 @@ + +import java.sql.Timestamp; + +public class StockPrice { + private Timestamp eventtime; + private String ticker; + private Double price; + + + public StockPrice() { + } + + public StockPrice(Timestamp eventtime, String ticker, Double price) { + this.eventtime = eventtime; + this.ticker = ticker; + this.price = price; + } + + public Timestamp getEventtime() { + return eventtime; + } + + public void setEventtime(Timestamp eventtime) { + this.eventtime = eventtime; + } + + public String getTicker() { + return ticker; + } + + public void setTicker(String ticker) { + this.ticker = ticker; + } + + public Double getPrice() { + return price; + } + + public void setPrice(Double price) { + this.price = price; + } + + @Override + public String toString() { + return "StockPrice{" + + "eventtime=" + eventtime + + ", ticker='" + ticker + '\'' + + ", price=" + price + + '}'; + } +} diff --git a/java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPriceGeneratorFunction.java b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPriceGeneratorFunction.java new file mode 100644 index 0000000..0a150d7 --- /dev/null +++ b/java/Iceberg/IcebergSQLJSONGlue/src/main/java/StockPriceGeneratorFunction.java @@ -0,0 +1,22 @@ +import org.apache.commons.lang3.RandomUtils; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import java.sql.Timestamp; + +/** + * Generates random stock price data for simulation purposes. + */ +public class StockPriceGeneratorFunction implements GeneratorFunction { + + private static final String[] STOCK_SYMBOLS = {"AAPL", "AMZN", "MSFT", "INTC", "TBV"}; + private static final double MAX_PRICE = 100.0; + private static final double MIN_PRICE = 1.0; + + @Override + public StockPrice map(Long sequence) throws Exception { + Timestamp currentTimestamp = new Timestamp(System.currentTimeMillis()); + String randomSymbol = STOCK_SYMBOLS[RandomUtils.nextInt(0, STOCK_SYMBOLS.length)]; + double randomPrice = RandomUtils.nextDouble(MIN_PRICE, MAX_PRICE); + + return new StockPrice(currentTimestamp, randomSymbol, randomPrice); + } +} diff --git a/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/flink-application-properties-dev.json new file mode 100644 index 0000000..6e9ede0 --- /dev/null +++ b/java/Iceberg/IcebergSQLJSONGlue/src/main/resources/flink-application-properties-dev.json @@ -0,0 +1,20 @@ +[ + { + "PropertyGroupId": "DataGen", + "PropertyMap": { + "records.per.sec": 100.0 + } + }, + { + "PropertyGroupId": "Iceberg", + "PropertyMap": { + "catalog.db": "iceberg", + "catalog.table": "stock_data", + "bucket.prefix": "s3://my-iceberg-bucket-jeremy/glue-example/", + "partition.fields": "symbol", + "sort.field": "timestamp", + "operation": "upsert", + "upsert.equality.fields": "symbol" + } + } +] \ No newline at end of file diff --git a/java/Iceberg/S3TableSQLJSON/README.md b/java/Iceberg/S3TableSQLJSON/README.md new file mode 100644 index 0000000..3139892 --- /dev/null +++ b/java/Iceberg/S3TableSQLJSON/README.md @@ -0,0 +1,65 @@ +# Flink SQL Example with S3 Tables Catalog + +* Flink version: 1.20.0 +* Flink API: SQL API +* Language: Java (11) +* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/) + and S3 Tables + +This example demonstrates how to use Flink SQL API with Amazon S3 Tables Catalog. The application generates synthetic stock price data using Flink's DataGen connector and writes it to a table in the S3 Tables Catalog. + +### Prerequisites + +The application expects the following resources: +* An S3 bucket configured for S3 Tables +* Appropriate AWS account and region configuration +* S3 Tables Catalog setup in your AWS account + +#### IAM Permissions + +The application must have IAM permissions to: +* Create and manage S3 Tables Catalog databases and tables +* Read and Write from the specified S3 bucket +* Access S3 Tables service endpoints + +### Runtime configuration + +When running on Amazon Managed Service for Apache Flink the runtime configuration is read from Runtime Properties. + +When running locally, the configuration is read from the +`resources/flink-application-properties-dev.json` file. + +Runtime parameters: + +| Group ID | Key | Default | Description | +|-----------|-------------------|------------------------------------------------------------|-------------------------------------------------| +| `DataGen` | `records.per.sec` | `10.0` | Records per second generated | +| `Iceberg` | `table.bucket.arn` | `arn:aws:s3tables:<>:>:bucket/my-table-bucket` | ARN of the S3 Tables bucket | +| `Iceberg` | `catalog.db` | `test_from_flink` | Name of the S3 Tables Catalog database | +| `Iceberg` | `catalog.table` | `test_table` | Name of the table to be created | + +### Checkpoints + +Checkpointing must be enabled for proper operation. When running locally, the application enables checkpoints programmatically every 30 seconds with a parallelism of 2. When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration. + +### Schema + +The application creates a table with the following schema: +- price: DOUBLE +- ticker: STRING +- eventtime: TIMESTAMP(3) + +The table is created using Flink SQL DDL and data is inserted using SQL INSERT statements. + +### Running locally + +You can run this example directly in a local environment with a web UI. The application will: +1. Create a local Flink environment with web UI +2. Set up the S3 Tables catalog configuration +3. Generate synthetic stock price data +4. Create the database and table in S3 Tables Catalog +5. Insert the generated data into the table +6. Query and display the results +7. Optionally cleanup resources (database and table) + +Make sure to configure the appropriate AWS credentials and region when running locally, and ensure the provided S3 Tables bucket ARN is valid and accessible. \ No newline at end of file diff --git a/java/Iceberg/S3TableSQLJSON/pom.xml b/java/Iceberg/S3TableSQLJSON/pom.xml new file mode 100644 index 0000000..803f017 --- /dev/null +++ b/java/Iceberg/S3TableSQLJSON/pom.xml @@ -0,0 +1,247 @@ + + + 4.0.0 + + com.amazonaws + s3-table-sql-flink + 1.0 + jar + + + UTF-8 + 11 + ${target.java.version} + ${target.java.version} + + 1.19.0 + 1.11.3 + 2.12 + 3.4.0 + 1.6.1 + 1.2.0 + 2.23.1 + 5.8.1 + + + + + + org.apache.flink + flink-runtime-web + ${flink.version} + provided + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-table-runtime + ${flink.version} + provided + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + + + org.apache.flink + flink-table-common + ${flink.version} + + + org.apache.flink + flink-metrics-dropwizard + ${flink.version} + + + org.apache.flink + flink-avro + ${flink.version} + + + + + + org.apache.flink + flink-table-planner_${scala.version} + ${flink.version} + provided + + + + + + com.amazonaws + aws-kinesisanalytics-runtime + ${kda.runtime.version} + provided + + + + + software.amazon.awssdk + s3tables + 2.29.26 + + + software.amazon.s3tables + s3-tables-catalog-for-iceberg + 0.1.3 + + + org.apache.flink + flink-connector-files + ${flink.version} + provided + + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + org.apache.avro + avro + + + org.slf4j + slf4j-reload4j + + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + + + + + org.apache.iceberg + iceberg-core + ${iceberg.version} + + + org.apache.iceberg + iceberg-flink + ${iceberg.version} + + + org.apache.iceberg + iceberg-flink + ${iceberg.version} + + + org.apache.iceberg + iceberg-aws-bundle + ${iceberg.version} + + + org.apache.iceberg + iceberg-aws + ${iceberg.version} + + + + + org.junit.jupiter + junit-jupiter + ${junit5.version} + test + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + runtime + + + org.apache.iceberg + iceberg-flink-1.19 + 1.7.0 + compile + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${target.java.version} + ${target.java.version} + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + log4j:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.amazonaws.services.msf.StreamingJob + + + + + + + + + diff --git a/java/Iceberg/S3TableSQLJSON/src/main/java/S3TableSQLJSONExample.java b/java/Iceberg/S3TableSQLJSON/src/main/java/S3TableSQLJSONExample.java new file mode 100644 index 0000000..a53ff23 --- /dev/null +++ b/java/Iceberg/S3TableSQLJSON/src/main/java/S3TableSQLJSONExample.java @@ -0,0 +1,176 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: MIT-0 + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this + * software and associated documentation files (the "Software"), to deal in the Software + * without restriction, including without limitation the rights to use, copy, modify, + * merge, publish, distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A + * PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.catalog.*; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + +public class S3TableSQLJSONExample { + // Constants + private static final String CATALOG_NAME = "s3"; + private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; + private static final Logger LOG = LoggerFactory.getLogger(S3TableSQLJSONExample.class); + + // Configuration properties + private static String tableBucketArn; + private static String s3TableDatabase; + private static String s3Table; + + public static void main(String[] args) throws Exception { + // 1. Initialize environments + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); + final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + + // 2. Load properties and configure environment + Map applicationProperties = loadApplicationProperties(env); + Properties icebergProperties = applicationProperties.get("Iceberg"); + + // Configure local development settings if needed + if (isLocal(env)) { + env.enableCheckpointing(30000); + env.setParallelism(2); + } + + // 3. Setup S3 configuration + setupS3TableProperties(icebergProperties); + Catalog s3Catalog = createS3Catalog(tableEnv); + + // 4. Create data generator source + Properties dataGenProperties = applicationProperties.get("DataGen"); + DataStream stockPriceDataStream = env.fromSource( + createDataGenerator(dataGenProperties), + WatermarkStrategy.noWatermarks(), + "DataGen"); + + // 5. Convert DataStream to Table and create view + Table stockPriceTable = tableEnv.fromDataStream(stockPriceDataStream); + tableEnv.createTemporaryView("stockPriceTable", stockPriceTable); + + // 6. Create database and define table structure + s3Catalog.createDatabase(s3TableDatabase, + new CatalogDatabaseImpl(Map.of(), "Sample Database"), true); + + String sinkTableName = CATALOG_NAME + "." + s3TableDatabase + "." + s3Table; + + // Define and create table + String createTableStatement = "CREATE TABLE IF NOT EXISTS " + sinkTableName + "(" + + "price DOUBLE, " + + "ticker STRING," + + "eventtime TIMESTAMP(3)" + + ");"; + tableEnv.executeSql(createTableStatement); + + // 7. Execute SQL operations + // Insert data from stock price stream + String insertQuery = "INSERT INTO " + sinkTableName + + " SELECT price, ticker, eventtime FROM stockPriceTable"; + TableResult insertResult = tableEnv.executeSql(insertQuery); + insertResult.await(); + + // Query the results + String selectQuery = "SELECT * from " + sinkTableName + ";"; + TableResult selectResults = tableEnv.executeSql(selectQuery); + selectResults.print(); + + // 8. Optionally Cleanup resources +// s3Catalog.dropTable(new ObjectPath(s3TableDatabase, s3Table), false); +// s3Catalog.dropDatabase(s3TableDatabase, false); + } + + private static void setupS3TableProperties(Properties icebergProperties) { + tableBucketArn = icebergProperties.getProperty("table.bucket.arn"); + s3TableDatabase = icebergProperties.getProperty("catalog.db"); + s3Table = icebergProperties.getProperty("catalog.table"); + Preconditions.checkNotNull(tableBucketArn, "You must supply a table bucket ARN."); + Preconditions.checkNotNull(s3TableDatabase, "You must supply a database name"); + Preconditions.checkNotNull(s3Table, "You must supply a table name"); + // check if it's a valid ARN + validateArn(tableBucketArn); + } + + private static DataGeneratorSource createDataGenerator(Properties dataGeneratorProperties) { + double recordsPerSecond = Double.parseDouble(dataGeneratorProperties.getProperty("records.per.sec", "10.0")); + Preconditions.checkArgument(recordsPerSecond > 0, "Generator records per sec must be > 0"); + + return new DataGeneratorSource(new StockPriceGeneratorFunction(), + 100, + RateLimiterStrategy.perSecond(recordsPerSecond), + TypeInformation.of(StockPrice.class)); + } + + /** + * Defines a config object with S3 Table specific catalog and io implementations + * Then, uses that to create the Flink catalog + */ + private static Catalog createS3Catalog(StreamTableEnvironment tableEnv) { + Configuration conf = new Configuration(); + conf.setString("warehouse", tableBucketArn); + conf.setString("catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog"); + conf.setString("type", "iceberg"); + conf.setString("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); + + CatalogDescriptor descriptor = CatalogDescriptor.of(CATALOG_NAME, conf); + + tableEnv.createCatalog(CATALOG_NAME, descriptor); + return tableEnv.getCatalog(CATALOG_NAME).get(); + } + + private static boolean isLocal(StreamExecutionEnvironment env) { + return env instanceof LocalStreamEnvironment; + } + + /** + * Load application properties from Amazon Managed Service for Apache Flink runtime + * or from a local resource, when the environment is local + */ + private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { + if (isLocal(env)) { + LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); + return KinesisAnalyticsRuntime.getApplicationProperties( + Objects.requireNonNull(S3TableSQLJSONExample.class.getClassLoader() + .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE)).getPath()); + } else { + LOG.info("Loading application properties from Amazon Managed Service for Apache Flink"); + return KinesisAnalyticsRuntime.getApplicationProperties(); + } + } + + public static void validateArn(String arn) { + String arnPattern = "^arn:aws[a-zA-Z-]*:[a-zA-Z0-9-]+:[a-zA-Z0-9-]*:[0-9]{12}:[a-zA-Z0-9-_/:.]+$"; + Preconditions.checkArgument(arn != null && arn.matches(arnPattern), + "Invalid ARN format: %s. ARN must match pattern: arn:partition:service:region:account-id:resource", arn); + } +} diff --git a/java/Iceberg/S3TableSQLJSON/src/main/java/StockPrice.java b/java/Iceberg/S3TableSQLJSON/src/main/java/StockPrice.java new file mode 100644 index 0000000..add5f68 --- /dev/null +++ b/java/Iceberg/S3TableSQLJSON/src/main/java/StockPrice.java @@ -0,0 +1,51 @@ + +import java.sql.Timestamp; + +public class StockPrice { + private Timestamp eventtime; + private String ticker; + private Double price; + + + public StockPrice() { + } + + public StockPrice(Timestamp eventtime, String ticker, Double price) { + this.eventtime = eventtime; + this.ticker = ticker; + this.price = price; + } + + public Timestamp getEventtime() { + return eventtime; + } + + public void setEventtime(Timestamp eventtime) { + this.eventtime = eventtime; + } + + public String getTicker() { + return ticker; + } + + public void setTicker(String ticker) { + this.ticker = ticker; + } + + public Double getPrice() { + return price; + } + + public void setPrice(Double price) { + this.price = price; + } + + @Override + public String toString() { + return "StockPrice{" + + "eventtime=" + eventtime + + ", ticker='" + ticker + '\'' + + ", price=" + price + + '}'; + } +} diff --git a/java/Iceberg/S3TableSQLJSON/src/main/java/StockPriceGeneratorFunction.java b/java/Iceberg/S3TableSQLJSON/src/main/java/StockPriceGeneratorFunction.java new file mode 100644 index 0000000..0a150d7 --- /dev/null +++ b/java/Iceberg/S3TableSQLJSON/src/main/java/StockPriceGeneratorFunction.java @@ -0,0 +1,22 @@ +import org.apache.commons.lang3.RandomUtils; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import java.sql.Timestamp; + +/** + * Generates random stock price data for simulation purposes. + */ +public class StockPriceGeneratorFunction implements GeneratorFunction { + + private static final String[] STOCK_SYMBOLS = {"AAPL", "AMZN", "MSFT", "INTC", "TBV"}; + private static final double MAX_PRICE = 100.0; + private static final double MIN_PRICE = 1.0; + + @Override + public StockPrice map(Long sequence) throws Exception { + Timestamp currentTimestamp = new Timestamp(System.currentTimeMillis()); + String randomSymbol = STOCK_SYMBOLS[RandomUtils.nextInt(0, STOCK_SYMBOLS.length)]; + double randomPrice = RandomUtils.nextDouble(MIN_PRICE, MAX_PRICE); + + return new StockPrice(currentTimestamp, randomSymbol, randomPrice); + } +} diff --git a/java/Iceberg/S3TableSQLJSON/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/S3TableSQLJSON/src/main/resources/flink-application-properties-dev.json new file mode 100644 index 0000000..ecadcd3 --- /dev/null +++ b/java/Iceberg/S3TableSQLJSON/src/main/resources/flink-application-properties-dev.json @@ -0,0 +1,20 @@ +[ + { + "PropertyGroupId": "DataGen", + "PropertyMap": { + "records.per.sec": 100.0 + } + }, + { + "PropertyGroupId": "Iceberg", + "PropertyMap": { + "table.bucket.arn": "<>", + "catalog.db": "test_from_flink", + "catalog.table": "test_table", + "partition.fields": "symbol", + "sort.field": "timestamp", + "operation": "upsert", + "upsert.equality.fields": "symbol" + } + } +] \ No newline at end of file diff --git a/java/Iceberg/S3TableSink/README.md b/java/Iceberg/S3TableSink/README.md new file mode 100644 index 0000000..7b24105 --- /dev/null +++ b/java/Iceberg/S3TableSink/README.md @@ -0,0 +1,98 @@ +# Flink Iceberg Sink using DataStream API + +* Flink version: 1.19.0 +* Flink API: DataStream API +* Iceberg 1.6.1 +* Language: Java (11) +* Flink connectors: [DataGen](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/datagen/) + and [Iceberg](https://iceberg.apache.org/docs/latest/flink/) + +This example demonstrate how to use +[Flink Iceberg Sink Connector](https://iceberg.apache.org/docs/latest/flink-writes/) with S3 Tables. + +For simplicity, the application generates synthetic data, random stock prices, internally. +Data is generated as AVRO Generic Record, simulating a real source, for example a Kafka Source, that receives records +serialized with AVRO. + +### Prerequisites + +### Create a Table Bucket +The sample application expects the S3 Table Bucket to exist and to have the ARN in the local environment: +```bash +aws s3tables create-table-bucket --name flink-example +{ + "arn": "arn:aws:s3tables:us-east-1:111122223333:bucket/flink-example" + +} +``` + +If you already did this, you can query to get the ARN like this: + +```bash +aws s3tables list-table-buckets +``` + +This will show you the list of table buckets. Select the one you wish to write to and paste it into the config file in this project. + + +#### IAM Permissions + +The application must have IAM permissions to: +* Write and Read from the S3 Table + +### Runtime configuration + +When running on Amazon Managed Service for Apache Flink the runtime configuration is read from Runtime Properties. + +When running locally, the configuration is read from the +[resources/flink-application-properties-dev.json](./src/main/resources/flink-application-properties-dev.json) file. + +Runtime parameters: + +| Group ID | Key | Default | Description | +|-----------|--------------------------|-------------------|---------------------------------------------------------------------------------------------------------------------| +| `DataGen` | `records.per.sec` | `100.0` | Records per second generated. | +| `Iceberg` | `table.bucket.arn` | (mandatory) | ARN of the S3 bucket, e.g., `arn:aws:s3tables:region:account-id:bucket/bucket-name` | +| `Iceberg` | `catalog.db` | `test_from_flink` | Name of the S3 table database. | +| `Iceberg` | `catalog.table` | `test_table` | Name of the S3 table. | +| `Iceberg` | `partition.fields` | `symbol` | Comma separated list of partition fields. | +| `Iceberg` | `sort.field` | `timestamp` | Sort field. | +| `Iceberg` | `operation` | `upsert` | Iceberg operation. One of `upsert`, `append` or `overwrite`. | +| `Iceberg` | `upsert.equality.fields` | `symbol` | Comma separated list of fields used for upsert. It must match partition fields. Required if `operation` = `upsert`. | + +### Checkpoints + +Checkpointing must be enabled. Iceberg commits writes on checkpoint. + +When running locally, the application enables checkpoints programmatically, every 10 seconds. +When deployed to Managed Service for Apache Flink, checkpointing is controlled by the application configuration. + + +### Known limitations + +At the moment there are current limitations concerning Flink Iceberg integration with S3 Tables: +* * Currently, this example needs to be in Flink v1.19, v1.20 isn't supported with the S3 Table Sink yet. +* Doesn't support Iceberg Table with hidden partitioning +* Doesn't support adding columns, removing columns, renaming columns or changing columns. + +### Schema and schema evolution + +The application must "know" the AVRO schema on start. +The schema cannot be dynamically inferred based on the incoming records, for example using a schema registry. +This is due to a limitation of the Flink Iceberg integration, that requires knowing the table schema upfront. + +This implementation does support schema evolution in the incoming data, as long as new schema versions are FORWARD compatible. +Schema changes are not propagated to Iceberg. +As long as the schema of incoming records is FORWARD compatible, the application deserializes incoming records using +the schema it knows. Any new field in the incoming record is discarded. + +In this example, the schema is loaded from a schema definition file, [price.avsc](./src/main/resources/price.avsc) embedded +with the application. +It is technically possible to fetch the schema on application start from an external source, like a schema registry or a +schema definition file in an S3 bucket. This is beyond the scope of this example. + +### Running locally, in IntelliJ + +You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation. + +See [Running examples locally](https://github.com/nicusX/amazon-managed-service-for-apache-flink-examples/blob/main/java/running-examples-locally.md) for details. diff --git a/java/Iceberg/S3TableSink/pom.xml b/java/Iceberg/S3TableSink/pom.xml new file mode 100644 index 0000000..f43c091 --- /dev/null +++ b/java/Iceberg/S3TableSink/pom.xml @@ -0,0 +1,247 @@ + + + 4.0.0 + + com.amazonaws + s3-table-flink + 1.0 + jar + + + UTF-8 + 11 + ${target.java.version} + ${target.java.version} + + 1.19.0 + 1.11.3 + 2.12 + 3.4.0 + 1.6.1 + 1.2.0 + 2.23.1 + 5.8.1 + + + + + + org.apache.flink + flink-runtime-web + ${flink.version} + provided + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-table-runtime + ${flink.version} + provided + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + + + org.apache.flink + flink-table-common + ${flink.version} + + + org.apache.flink + flink-metrics-dropwizard + ${flink.version} + + + org.apache.flink + flink-avro + ${flink.version} + + + + + + org.apache.flink + flink-table-planner_${scala.version} + ${flink.version} + provided + + + + + + com.amazonaws + aws-kinesisanalytics-runtime + ${kda.runtime.version} + provided + + + + + software.amazon.awssdk + s3tables + 2.29.26 + + + software.amazon.s3tables + s3-tables-catalog-for-iceberg + 0.1.3 + + + org.apache.flink + flink-connector-files + ${flink.version} + provided + + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + org.apache.avro + avro + + + org.slf4j + slf4j-reload4j + + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + + + + + org.apache.iceberg + iceberg-core + ${iceberg.version} + + + org.apache.iceberg + iceberg-flink + ${iceberg.version} + + + org.apache.iceberg + iceberg-flink + ${iceberg.version} + + + org.apache.iceberg + iceberg-aws-bundle + ${iceberg.version} + + + org.apache.iceberg + iceberg-aws + ${iceberg.version} + + + + + org.junit.jupiter + junit-jupiter + ${junit5.version} + test + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + runtime + + + org.apache.iceberg + iceberg-flink-1.19 + 1.7.0 + compile + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${target.java.version} + ${target.java.version} + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + log4j:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.amazonaws.services.msf.StreamingJob + + + + + + + + + diff --git a/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java new file mode 100644 index 0000000..d8e77c4 --- /dev/null +++ b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/StreamingJob.java @@ -0,0 +1,134 @@ +package com.amazonaws.services.msf; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.util.Preconditions; + +import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; +import com.amazonaws.services.msf.avro.AvroSchemaUtils; +import com.amazonaws.services.msf.datagen.AvroGenericStockTradeGeneratorFunction; +import com.amazonaws.services.msf.iceberg.IcebergSinkBuilder; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.iceberg.flink.sink.FlinkSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + +public class StreamingJob { + private static final Logger LOG = LoggerFactory.getLogger(StreamingJob.class); + private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; + private static Properties icebergProperties; + + private static boolean isLocal(StreamExecutionEnvironment env) { + return env instanceof LocalStreamEnvironment; + } + + /** + * Load application properties from Amazon Managed Service for Apache Flink runtime or from a local resource, when the environment is local + */ + private static Map loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { + if (isLocal(env)) { + LOG.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); + return KinesisAnalyticsRuntime.getApplicationProperties( + Objects.requireNonNull(StreamingJob.class.getClassLoader() + .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE)).getPath()); + } else { + LOG.info("Loading application properties from Amazon Managed Service for Apache Flink"); + return KinesisAnalyticsRuntime.getApplicationProperties(); + } + } + + // Data Generator source generating random trades as AVRO GenericRecords + private static DataGeneratorSource createDataGenerator(Properties generatorProperties, Schema avroSchema) { + double recordsPerSecond = Double.parseDouble(generatorProperties.getProperty("records.per.sec", "10.0")); + Preconditions.checkArgument(recordsPerSecond > 0, "Generator records per sec must be > 0"); + + LOG.info("Data generator: {} record/sec", recordsPerSecond); + return new DataGeneratorSource<>( + new AvroGenericStockTradeGeneratorFunction(avroSchema), + Long.MAX_VALUE, + RateLimiterStrategy.perSecond(recordsPerSecond), + new GenericRecordAvroTypeInfo(avroSchema) + ); + } + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); + StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + + // Local dev specific settings + if (isLocal(env)) { + // Checkpointing and parallelism are set by Amazon Managed Service for Apache Flink when running on AWS + env.enableCheckpointing(30000); + env.setParallelism(2); + } + + Map applicationProperties = loadApplicationProperties(env); + icebergProperties = applicationProperties.get("Iceberg"); + + Catalog s3 = createCatalog(tableEnv); + s3.createDatabase(icebergProperties.getProperty("catalog.db"), + new CatalogDatabaseImpl(Map.of(), + "Sample Database"), true); + + // Get AVRO Schema from the definition bundled with the application + // Note that the application must "knows" the AVRO schema upfront, i.e. the schema must be either embedded + // with the application or fetched at start time. + // If the schema of the records received by the source changes, all changes must be FORWARD compatible. + // This way, the application will be able to write the data with the new schema into the old schema, but schema + // changes are not propagated to the Iceberg table. + Schema avroSchema = AvroSchemaUtils.loadSchema(); + + // Create Generic Record TypeInfo from schema. + new GenericRecordAvroTypeInfo(avroSchema); + + // Data Generator Source. + // Simulates an external source that receives AVRO Generic Records + DataStream genericRecordDataStream = createDataStream(env, applicationProperties, avroSchema); + + // Flink Sink Builder + FlinkSink.Builder icebergSinkBuilder = IcebergSinkBuilder.createBuilder(icebergProperties, genericRecordDataStream, avroSchema); + // Sink to Iceberg Table + icebergSinkBuilder.append(); + + env.execute("Flink S3 Table Sink"); + } + + private static Catalog createCatalog(StreamTableEnvironment tableEnv) { + + Configuration conf = new Configuration(); + conf.setString("warehouse", icebergProperties.getProperty("table.bucket.arn")); + conf.setString("catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog"); + conf.setString("type", "iceberg"); + conf.setString("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); + + final String catalogName = "s3"; + CatalogDescriptor descriptor = CatalogDescriptor.of(catalogName, conf); + tableEnv.createCatalog(catalogName, descriptor); + return tableEnv.getCatalog(catalogName).get(); + } + + + private static DataStream createDataStream(StreamExecutionEnvironment env, Map applicationProperties, Schema avroSchema) { + Properties dataGeneratorProperties = applicationProperties.get("DataGen"); + return env.fromSource( + createDataGenerator(dataGeneratorProperties, avroSchema), + WatermarkStrategy.noWatermarks(), + "DataGen"); + } +} diff --git a/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/avro/AvroSchemaUtils.java b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/avro/AvroSchemaUtils.java new file mode 100644 index 0000000..de34c5e --- /dev/null +++ b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/avro/AvroSchemaUtils.java @@ -0,0 +1,22 @@ +package com.amazonaws.services.msf.avro; + +import com.amazonaws.services.msf.StreamingJob; +import org.apache.avro.Schema; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; + +public class AvroSchemaUtils implements Serializable { + + private static final String AVRO_SCHEMA_RESOURCE = "price.avsc"; + + /** + * Load the AVRO Schema from the resources folder + */ + public static Schema loadSchema() throws IOException { + // Get AVRO Schema from the definition bundled with the application + InputStream inputStream = StreamingJob.class.getClassLoader().getResourceAsStream(AVRO_SCHEMA_RESOURCE); + return new org.apache.avro.Schema.Parser().parse(inputStream); + } +} diff --git a/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunction.java b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunction.java new file mode 100644 index 0000000..5b1cf34 --- /dev/null +++ b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunction.java @@ -0,0 +1,46 @@ +package com.amazonaws.services.msf.datagen; + +import org.apache.flink.connector.datagen.source.GeneratorFunction; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.RandomUtils; + +import java.time.Instant; + +/** + * Function used by DataGen source to generate random records as AVRO GenericRecord. + *

+ * The generator assumes that the AVRO schema provided contains the generated fields + */ +public class AvroGenericStockTradeGeneratorFunction implements GeneratorFunction { + + private static final String[] SYMBOLS = {"AAPL", "AMZN", "MSFT", "INTC", "TBV"}; + + private Schema avroSchema; + + public AvroGenericStockTradeGeneratorFunction(Schema avroSchema) { + this.avroSchema = avroSchema; + } + + /** + * Generates a random trade + */ + @Override + public GenericRecord map(Long value) { + GenericData.Record record = new GenericData.Record(avroSchema); + + String symbol = SYMBOLS[RandomUtils.nextInt(0, SYMBOLS.length)]; + float price = RandomUtils.nextFloat(0, 10); + int volumes = RandomUtils.nextInt(0, 1000000); + String timestamp = Instant.now().toString(); + + record.put("symbol", symbol); + record.put("price", price); + record.put("volumes", volumes); + record.put("timestamp", timestamp); + + return record; + } +} diff --git a/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java new file mode 100644 index 0000000..91ccac0 --- /dev/null +++ b/java/Iceberg/S3TableSink/src/main/java/com/amazonaws/services/msf/iceberg/IcebergSinkBuilder.java @@ -0,0 +1,146 @@ +package com.amazonaws.services.msf.iceberg; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.generic.GenericRecord; +import org.apache.iceberg.*; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.*; +import org.apache.iceberg.flink.sink.AvroGenericRecordToRowDataMapper; +import org.apache.iceberg.flink.sink.FlinkSink; +import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; + +import java.util.*; + +/** + * Wraps the code to initialize an Iceberg sink that uses S3 Tables Internal catalog + * */ +public class IcebergSinkBuilder { + private static final String DEFAULT_S3_CATALOG_DB = "default"; + private static final String DEFAULT_ICEBERG_TABLE_NAME = "prices_iceberg"; + private static final String DEFAULT_ICEBERG_SORT_ORDER_FIELD = "accountNr"; + private static final String DEFAULT_ICEBERG_PARTITION_FIELDS = "symbol"; + private static final String DEFAULT_ICEBERG_OPERATION = "upsert"; + private static final String DEFAULT_ICEBERG_UPSERT_FIELDS = "symbol"; + + /** + * If Iceberg Table has not been previously created, we will create it using the Partition Fields specified in the + * Properties, as well as add a Sort Field to improve query performance + */ + private static void createTable(Catalog catalog, TableIdentifier outputTable, org.apache.iceberg.Schema icebergSchema, PartitionSpec partitionSpec, String sortField) { + // If table has been previously created, we do not do any operation or modification + if (!catalog.tableExists(outputTable)) { + Table icebergTable = catalog.createTable(outputTable, icebergSchema, partitionSpec); + // Modifying newly created iceberg table to have a sort field + icebergTable.replaceSortOrder() + .asc(sortField, NullOrder.NULLS_LAST) + .commit(); + // The catalog.create table creates an Iceberg V1 table. If we want to perform upserts, we need to upgrade the table version to 2. + TableOperations tableOperations = ((BaseTable) icebergTable).operations(); + TableMetadata appendTableMetadata = tableOperations.current(); + tableOperations.commit(appendTableMetadata, appendTableMetadata.upgradeToFormatVersion(2)); + } + } + + /** + * Generate the PartitionSpec, if when creating table, you want it to be partitioned. + * If you are doing Upserts in your Iceberg Table, your Equality Fields must be the same as the fields used for Partitioning. + */ + private static PartitionSpec getPartitionSpec(org.apache.iceberg.Schema icebergSchema, List partitionFieldsList) { + PartitionSpec.Builder partitionBuilder = PartitionSpec.builderFor(icebergSchema); + for (String s : partitionFieldsList) { + partitionBuilder = partitionBuilder.identity(s); + } + return partitionBuilder.build(); + } + + /** + * S3 Table Sink Builder - It is unique in that it leverages the S3 Table Catalog as opposed to an external catalog like Glue or Hive. + * catalog-impl = software.amazon.s3tables.iceberg.S3TablesCatalog + */ + public static FlinkSink.Builder createBuilder(Properties icebergProperties, DataStream dataStream, org.apache.avro.Schema avroSchema) { + // Retrieve configuration from application parameters + + /** + * This table bucket ARN will be used as the Table Catalog for Iceberg, this is unique compared to the standard Iceberg table. + */ + String s3TableBucketARN = Preconditions.checkNotNull(icebergProperties.getProperty("table.bucket.arn"), "Iceberg S3 table bucket ARN not defined"); + + String s3_table_db = icebergProperties.getProperty("catalog.db", DEFAULT_S3_CATALOG_DB); + String s3_table_name = icebergProperties.getProperty("catalog.table", DEFAULT_ICEBERG_TABLE_NAME); + + String partitionFields = icebergProperties.getProperty("partition.fields", DEFAULT_ICEBERG_PARTITION_FIELDS); + List partitionFieldList = Arrays.asList(partitionFields.split("\\s*,\\s*")); + + String sortField = icebergProperties.getProperty("sort.field", DEFAULT_ICEBERG_SORT_ORDER_FIELD); + + // Iceberg you can perform Appends, Upserts and Overwrites. + String icebergOperation = icebergProperties.getProperty("operation", DEFAULT_ICEBERG_OPERATION); + Preconditions.checkArgument(icebergOperation.equals("append") || icebergOperation.equals("upsert") || icebergOperation.equals("overwrite"), "Invalid Iceberg Operation"); + + // If operation is upsert, we need to specify the fields that will be used for equality in the upsert operation + // If the table is partitioned, we must include the partition fields + // This is a comma-separated list of fields + String upsertEqualityFields = icebergProperties.getProperty("upsert.equality.fields", DEFAULT_ICEBERG_UPSERT_FIELDS); + List equalityFieldsList = Arrays.asList(upsertEqualityFields.split("[, ]+")); + + // Convert Avro Schema to Iceberg Schema, this will be used for creating the table + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + // Avro Generic Record to Row Data Mapper + MapFunction avroGenericRecordToRowDataMapper = AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema); + + // Catalog properties for using S3 Tables + Map catalogProperties = new HashMap<>(); + catalogProperties.put("type", "iceberg"); + catalogProperties.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); + catalogProperties.put("warehouse", s3TableBucketARN); + catalogProperties.put("catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog"); + + // Load S3 Table Data Catalog + CatalogLoader icebergCatalogLoader = CatalogLoader.custom( + "flink", + catalogProperties, + new org.apache.hadoop.conf.Configuration(), + "software.amazon.s3tables.iceberg.S3TablesCatalog"); + // Table Object that represents the table in S3 Tables + TableIdentifier outputTable = TableIdentifier.of(s3_table_db, s3_table_name); + // Load created Iceberg Catalog to perform table operations + Catalog catalog = icebergCatalogLoader.loadCatalog(); + + // Based on how many fields we want to partition, we create the Partition Spec + PartitionSpec partitionSpec = getPartitionSpec(icebergSchema, partitionFieldList); + // We create the Iceberg Table, using the Iceberg Catalog, Table Identifier, Schema parsed in Iceberg Schema Format and the partition spec + createTable(catalog, outputTable, icebergSchema, partitionSpec, sortField); + // Once the table has been created in the job or before, we load it + TableLoader tableLoader = TableLoader.fromCatalog(icebergCatalogLoader, outputTable); + // Get RowType Schema from Iceberg Schema + RowType rowType = FlinkSchemaUtil.convert(icebergSchema); + + // Iceberg DataStream sink builder + FlinkSink.Builder flinkSinkBuilder = FlinkSink.builderFor( + dataStream, + avroGenericRecordToRowDataMapper, + FlinkCompatibilityUtil.toTypeInfo(rowType)) + .tableLoader(tableLoader); + + // Returns the builder for the selected operation + switch (icebergOperation) { + case "upsert": + // If operation is "upsert" we need to set up the equality fields + return flinkSinkBuilder + .equalityFieldColumns(equalityFieldsList) + .upsert(true); + case "overwrite": + return flinkSinkBuilder + .overwrite(true); + default: + return flinkSinkBuilder; + } + } +} diff --git a/java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json b/java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json new file mode 100644 index 0000000..fd94ccb --- /dev/null +++ b/java/Iceberg/S3TableSink/src/main/resources/flink-application-properties-dev.json @@ -0,0 +1,21 @@ +[ + { + "PropertyGroupId": "DataGen", + "PropertyMap": { + "records.per.sec": 100.0 + } + }, + { + "PropertyGroupId": "Iceberg", + "PropertyMap": { + + "table.bucket.arn": "<>", + "catalog.db": "test_from_flink", + "catalog.table": "test_table", + "partition.fields": "symbol", + "sort.field": "timestamp", + "operation": "upsert", + "upsert.equality.fields": "symbol" + } + } +] \ No newline at end of file diff --git a/java/Iceberg/S3TableSink/src/main/resources/log4j2.properties b/java/Iceberg/S3TableSink/src/main/resources/log4j2.properties new file mode 100644 index 0000000..3c0515a --- /dev/null +++ b/java/Iceberg/S3TableSink/src/main/resources/log4j2.properties @@ -0,0 +1,7 @@ +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n + +rootLogger.level = WARN +rootLogger.appenderRef.console.ref = ConsoleAppender \ No newline at end of file diff --git a/java/Iceberg/S3TableSink/src/main/resources/price.avsc b/java/Iceberg/S3TableSink/src/main/resources/price.avsc new file mode 100644 index 0000000..4603eb2 --- /dev/null +++ b/java/Iceberg/S3TableSink/src/main/resources/price.avsc @@ -0,0 +1,23 @@ +{ + "type": "record", + "name": "Price", + "namespace": "com.amazonaws.services.msf.avro", + "fields": [ + { + "name": "timestamp", + "type": "string" + }, + { + "name": "symbol", + "type": "string" + }, + { + "name": "price", + "type": "float" + }, + { + "name": "volumes", + "type": "int" + } + ] +} \ No newline at end of file diff --git a/java/Iceberg/S3TableSink/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java b/java/Iceberg/S3TableSink/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java new file mode 100644 index 0000000..5593ca1 --- /dev/null +++ b/java/Iceberg/S3TableSink/src/test/java/com/amazonaws/services/msf/datagen/AvroGenericStockTradeGeneratorFunctionTest.java @@ -0,0 +1,26 @@ +package com.amazonaws.services.msf.datagen; + +import com.amazonaws.services.msf.avro.AvroSchemaUtils; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertInstanceOf; + +class AvroGenericStockTradeGeneratorFunctionTest { + + @Test + void generateRecord() throws Exception { + Schema avroSchema = AvroSchemaUtils.loadSchema(); + AvroGenericStockTradeGeneratorFunction generatorFunction = new AvroGenericStockTradeGeneratorFunction(avroSchema); + + GenericRecord record = generatorFunction.map(42L); + + + assertInstanceOf(String.class, record.get("timestamp")); + assertInstanceOf(String.class, record.get("symbol")); + assertInstanceOf(Float.class, record.get("price")); + assertInstanceOf(Integer.class, record.get("volumes")); + } + +} \ No newline at end of file diff --git a/java/pom.xml b/java/pom.xml index 29e281b..8c57051 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -21,7 +21,9 @@ CustomMetrics GettingStarted GettingStartedTable - IcebergDatastreamSink + Iceberg/IcebergDataStreamSink + Iceberg/IcebergDataStreamSource + Iceberg/S3TableSink KafkaConfigProviders/Kafka-mTLS-Keystore-ConfigProviders KafkaConfigProviders/Kafka-SASL_SSL-ConfigProviders KafkaConfigProviders/Kafka-mTLS-Keystore-Sql-ConfigProviders