From a050abce83e30c7fa567af21dc3bf1b2438d93cd Mon Sep 17 00:00:00 2001 From: Buddhike de Silva Date: Thu, 3 Apr 2025 16:34:49 +1100 Subject: [PATCH 1/8] Sample to show how process different types of avro subjects in a single topic --- java/AvroOneTopicManySubjects/.gitignore | 22 ++ java/AvroOneTopicManySubjects/README.md | 52 +++ .../docker-compose.yml | 43 +++ java/AvroOneTopicManySubjects/pom.xml | 250 +++++++++++++ .../amazonaws/services/msf/StreamingJob.java | 352 ++++++++++++++++++ .../src/main/resources/avro/air-quality.avdl | 9 + .../main/resources/avro/room-temperature.avdl | 9 + .../flink-application-properties-dev.json | 10 + .../src/main/resources/log4j2.properties | 7 + .../RoomAverageTemperatureCalculatorTest.java | 86 +++++ 10 files changed, 840 insertions(+) create mode 100644 java/AvroOneTopicManySubjects/.gitignore create mode 100644 java/AvroOneTopicManySubjects/README.md create mode 100644 java/AvroOneTopicManySubjects/docker-compose.yml create mode 100644 java/AvroOneTopicManySubjects/pom.xml create mode 100644 java/AvroOneTopicManySubjects/src/main/java/com/amazonaws/services/msf/StreamingJob.java create mode 100644 java/AvroOneTopicManySubjects/src/main/resources/avro/air-quality.avdl create mode 100644 java/AvroOneTopicManySubjects/src/main/resources/avro/room-temperature.avdl create mode 100644 java/AvroOneTopicManySubjects/src/main/resources/flink-application-properties-dev.json create mode 100644 java/AvroOneTopicManySubjects/src/main/resources/log4j2.properties create mode 100644 java/AvroOneTopicManySubjects/src/test/java/com/amazonaws/services/msf/domain/RoomAverageTemperatureCalculatorTest.java diff --git a/java/AvroOneTopicManySubjects/.gitignore b/java/AvroOneTopicManySubjects/.gitignore new file mode 100644 index 0000000..4d04ac7 --- /dev/null +++ b/java/AvroOneTopicManySubjects/.gitignore @@ -0,0 +1,22 @@ +**/.idea/ +.bsp +**/target +**/.DS_Store +**/dependency-reduced-pom.xml +**/*.iml +**/*.jar +**/*.log +.env +.venv +env/ +venv/ +.java-version +/pyflink/ +*.p12 +*.key +*.pem +*.crt +*.csr +*.der +*.jks + diff --git a/java/AvroOneTopicManySubjects/README.md b/java/AvroOneTopicManySubjects/README.md new file mode 100644 index 0000000..249c361 --- /dev/null +++ b/java/AvroOneTopicManySubjects/README.md @@ -0,0 +1,52 @@ +## AVRO One Topic Many Subjects + +* Flink version: 1.20 +* Flink API: DataStream API +* Language: Java (11) + +This example demonstrates how to serialize/deserialize Avro messages in Kafka when one topic stores multiple subject types. + +See [this article](https://martin.kleppmann.com/2018/01/18/event-types-in-kafka-topic.html) for more information about why it's sometimes essential to store multiple subjects in the same topic. + +This sample is based on a hypothetical use case where two sensors fitted into a room emit telemetry data to a single Kafka topic. We use Confluent Schema Registry to govern the schema of telemetry messages serialized using Avro. + +Schema definitions for the messages are in `.avdl` files located in [./src/main/resources/avro](./src/main/resources/avro). + +This example uses Avro-generated classes (more details [below](#using-avro-generated-classes)). + +A `KafkaSource` produces a stream of Avro data objects (`SpecificRecord`), fetching the writer's schema from AWS Glue Schema Registry. The Avro Kafka message value must have been serialized using AWS Glue Schema Registry. + +A `KafkaSink` serializes Avro data objects as Kafka message values, and a `String`, converted to bytes as UTF-8, as Kafka message keys. + +## Flink compatibility + +**Note:** This project is compatible with Flink 1.15+ and Amazon Managed Service for Apache Flink. + +### Flink API compatibility +This example uses the newer `KafkaSource` and `KafkaSink` (as opposed to `FlinkKafkaConsumer` and `FlinkKafkaProducer`, which were deprecated with Flink 1.15). + +### Avro-generated classes + +This project uses classes generated at build-time as data objects. + +As a best practice, only the Avro schema definitions (IDL `.avdl` files in this case) are included in the project source code. + +The Avro Maven plugin generates the Java classes (source code) at build-time, during the [`generate-sources`](https://maven.apache.org/guides/introduction/introduction-to-the-lifecycle.html) phase. + +The generated classes are written into the `./target/generated-sources/avro` directory and should **not** be committed with the project source. + +This way, the only dependency is on the schema definition file(s). If any change is required, the schema file is modified, and the Avro classes are re-generated automatically during the build. + +Code generation is supported by all common IDEs like IntelliJ. If your IDE does not see the Avro classes (`AirQuality` and `RoomTemperature`) when you import the project for the first time, you may manually run `mvn generate-sources` once to force source code generation from the IDE. + +### Using Avro-generated classes (SpecificRecord) in Apache Flink + +Using Avro-generated classes (`SpecificRecord`) within the flow of the Flink application (between operators) or in the Flink state has an additional benefit. Flink will [natively and efficiently serialize and deserialize](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos) these objects, without risking falling back to Kryo. + +### Running required services locally +Kafka and Confluent Schema Registry configuration is in [./docker-compose.yml](./docker-compose.yml). Start these services by running the `docker compose up` command. + +### Running in IntelliJ +To start the Flink job in IntelliJ: +1. Edit the Run/Debug configuration. +2. Enable the option **"Add dependencies with 'provided' scope to the classpath"**. \ No newline at end of file diff --git a/java/AvroOneTopicManySubjects/docker-compose.yml b/java/AvroOneTopicManySubjects/docker-compose.yml new file mode 100644 index 0000000..3b8b3ea --- /dev/null +++ b/java/AvroOneTopicManySubjects/docker-compose.yml @@ -0,0 +1,43 @@ +services: + zookeeper: + image: confluentinc/cp-zookeeper:latest + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ports: + - 22181:2181 + + kafka: + image: confluentinc/cp-kafka:latest + depends_on: + - zookeeper + ports: + - 29092:29092 + - 11001:11001 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_JMX_PORT: 11001 + KAFKA_JMX_HOSTNAME: kafka + JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka -Dcom.sun.management.jmxremote.rmi.port=11001" + links: + - zookeeper + + scr: + image: confluentinc/cp-schema-registry + container_name: scr + depends_on: + - kafka + links: + - kafka + ports: + - 8085:8085 + environment: + CUB_CLASSPATH: '/usr/share/java/confluent-security/schema-registry/*:/usr/share/java/schema-registry/*:/usr/share/java/schema-registry-plugins/*:/usr/share/java/cp-base-new/*' + SCHEMA_REGISTRY_HOST_NAME: schemaregistry + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092 + SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8085" \ No newline at end of file diff --git a/java/AvroOneTopicManySubjects/pom.xml b/java/AvroOneTopicManySubjects/pom.xml new file mode 100644 index 0000000..b85ba0f --- /dev/null +++ b/java/AvroOneTopicManySubjects/pom.xml @@ -0,0 +1,250 @@ + + + 4.0.0 + + com.amazonaws + avro-one-topic-many-subjects + 1.0-SNAPSHOT + + + UTF-8 + ${project.basedir}/target + ${project.name} + 11 + ${target.java.version} + com.amazonaws.services.msf.StreamingJob + + 2.12 + 1.10.2 + 3.3.0-1.20 + + 1.20.0 + 2.1.0 + 1.2.0 + + 1.7.32 + 2.17.2 + 5.9.1 + + + + + + confluent + https://packages.confluent.io/maven/ + + + + + + + + org.apache.flink + flink-java + ${flink.version} + provided + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-clients + ${flink.version} + provided + + + org.apache.flink + flink-runtime-web + ${flink.version} + + + + + com.amazonaws + aws-kinesisanalytics-runtime + ${kda.runtime.version} + provided + + + + + org.apache.flink + flink-connector-base + ${flink.version} + + + org.apache.flink + flink-connector-kafka + ${kafka.clients.version} + + + com.amazonaws + aws-kinesisanalytics-flink + ${kda.connectors.version} + + + + + org.apache.flink + flink-avro + ${flink.version} + + + org.apache.avro + avro + ${avro.version} + + + org.apache.flink + flink-avro-confluent-registry + ${flink.version} + + + org.apache.flink + flink-table-api-java + ${flink.version} + + + org.apache.flink + flink-connector-files + ${flink.version} + provided + + + io.confluent + kafka-avro-serializer + 7.9.0 + + + io.confluent + kafka-schema-registry-client + 7.9.0 + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + + + org.junit.jupiter + junit-jupiter-api + ${junit.version} + test + + + org.junit.jupiter + junit-jupiter-engine + ${junit.version} + test + + + + + ${buildDirectory} + ${jar.finalName} + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${target.java.version} + ${target.java.version} + true + + + + + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-sources + + idl-protocol + + + ${project.basedir}/src/main/resources/avro + ${project.basedir}/src/test/resources/avro + private + String + true + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.1 + + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + org.apache.logging.log4j:* + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + ${main.class} + + + + + + + + + + + \ No newline at end of file diff --git a/java/AvroOneTopicManySubjects/src/main/java/com/amazonaws/services/msf/StreamingJob.java b/java/AvroOneTopicManySubjects/src/main/java/com/amazonaws/services/msf/StreamingJob.java new file mode 100644 index 0000000..1976e72 --- /dev/null +++ b/java/AvroOneTopicManySubjects/src/main/java/com/amazonaws/services/msf/StreamingJob.java @@ -0,0 +1,352 @@ +package com.amazonaws.services.msf; + +import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; +import com.amazonaws.services.msf.avro.AirQuality; +import com.amazonaws.services.msf.avro.RoomTemperature; +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; +import io.confluent.kafka.serializers.subject.RecordNameStrategy; +import org.apache.avro.specific.SpecificRecord; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaSink; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; +import org.apache.kafka.clients.admin.*; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.TopicExistsException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.time.Instant; +import java.util.*; +import java.util.concurrent.ExecutionException; + +/** + * Sample Flink application that can run in Amazon Managed Service for Apache Flink + *

+ * It simulates a temperature and air quality sensor by writing randomly generated sample records to a Kafka topic. + * It then consumes those records and processes them according to their type to illustrate how users + * can consume and process either all or subset of subjects stored in the same topic.. + */ +public class StreamingJob { + private static final int NUM_ROOMS = 10; + private static final Logger LOGGER = LoggerFactory.getLogger(StreamingJob.class); + // Name of the local JSON resource with the application properties in the same format as they are received from the MSF runtime + private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; + + // Names of the configuration group containing the application properties + private static final String APPLICATION_CONFIG_GROUP = "FlinkApplicationProperties"; + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + if (isLocal(env)) { + env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); + env.enableCheckpointing(60000); + } + env.setRuntimeMode(RuntimeExecutionMode.STREAMING); + + Properties applicationProperties = loadApplicationProperties(env).get(APPLICATION_CONFIG_GROUP); + String bootstrapServers = Preconditions.checkNotNull(applicationProperties.getProperty("bootstrap.servers"), "bootstrap.servers not defined"); + String sourceTopic = Preconditions.checkNotNull(applicationProperties.getProperty("source.topic"), "source.topic not defined"); + String sourceConsumerGroupId = applicationProperties.getProperty("source.consumer.group.id", "avro-one-topic-many-subjects"); + String schemaRegistryUrl = applicationProperties.getProperty("schema.registry.url", "http://localhost:8085"); + + ensureTopicExist(bootstrapServers, sourceTopic, 3, (short)1); + + Map schemaRegistryConfig = new HashMap<>(); + setupRoomTemperatureGenerator(bootstrapServers, sourceTopic, schemaRegistryUrl, schemaRegistryConfig, env); + setupAirQualityGenerator(bootstrapServers, sourceTopic, schemaRegistryUrl, schemaRegistryConfig, env); + + KafkaSource

+ * It demonstrates how to process records in a Kafka topic with varying schema when + * they are produced using RecordName subject name strategy in Confluent Schema Registry. + */ +public class StreamingJob { + private static final Logger LOGGER = LoggerFactory.getLogger(StreamingJob.class); + // Name of the local JSON resource with the application properties in the same format as they are received from the MSF runtime + private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-consumer-dev.json"; + + // Names of the configuration group containing the application properties + private static final String APPLICATION_CONFIG_GROUP = "FlinkApplicationProperties"; + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + if (isLocal(env)) { + env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new org.apache.flink.configuration.Configuration()); + env.enableCheckpointing(60000); + } + + Configuration config = loadConfiguration(env); + + ensureTopicExist(config); + + KafkaSource

- * It simulates a temperature and air quality sensor by writing randomly generated sample records to a Kafka topic. - * It then consumes those records and processes them according to their type to illustrate how users - * can consume and process either all or subset of subjects stored in the same topic.. - */ -public class StreamingJob { - private static final int NUM_ROOMS = 10; - private static final Logger LOGGER = LoggerFactory.getLogger(StreamingJob.class); - // Name of the local JSON resource with the application properties in the same format as they are received from the MSF runtime - private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-application-properties-dev.json"; - - // Names of the configuration group containing the application properties - private static final String APPLICATION_CONFIG_GROUP = "FlinkApplicationProperties"; - - public static void main(String[] args) throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - if (isLocal(env)) { - env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); - env.enableCheckpointing(60000); - } - - Properties applicationProperties = loadApplicationProperties(env).get(APPLICATION_CONFIG_GROUP); - String bootstrapServers = Preconditions.checkNotNull(applicationProperties.getProperty("bootstrap.servers"), "bootstrap.servers not defined"); - String sourceTopic = Preconditions.checkNotNull(applicationProperties.getProperty("source.topic"), "source.topic not defined"); - String sourceConsumerGroupId = applicationProperties.getProperty("source.consumer.group.id", "avro-one-topic-many-subjects"); - String schemaRegistryUrl = applicationProperties.getProperty("schema.registry.url", "http://localhost:8085"); - - ensureTopicExist(bootstrapServers, sourceTopic, 3, (short)1); - - Map schemaRegistryConfig = new HashMap<>(); - setupRoomTemperatureGenerator(bootstrapServers, sourceTopic, schemaRegistryUrl, schemaRegistryConfig, env); - setupAirQualityGenerator(bootstrapServers, sourceTopic, schemaRegistryUrl, schemaRegistryConfig, env); - - KafkaSource