diff --git a/java/AvroOneTopicManySubjects/.run/all.run.xml b/java/AvroOneTopicManySubjects/.run/all.run.xml new file mode 100644 index 0000000..a64e42d --- /dev/null +++ b/java/AvroOneTopicManySubjects/.run/all.run.xml @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/java/AvroOneTopicManySubjects/.run/flink-consumer.run.xml b/java/AvroOneTopicManySubjects/.run/flink-consumer.run.xml new file mode 100644 index 0000000..ab31e9c --- /dev/null +++ b/java/AvroOneTopicManySubjects/.run/flink-consumer.run.xml @@ -0,0 +1,17 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/java/AvroOneTopicManySubjects/.run/producer-app.run.xml b/java/AvroOneTopicManySubjects/.run/producer-app.run.xml new file mode 100644 index 0000000..1c0bf74 --- /dev/null +++ b/java/AvroOneTopicManySubjects/.run/producer-app.run.xml @@ -0,0 +1,17 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/java/AvroOneTopicManySubjects/README.md b/java/AvroOneTopicManySubjects/README.md new file mode 100644 index 0000000..ca19a3e --- /dev/null +++ b/java/AvroOneTopicManySubjects/README.md @@ -0,0 +1,57 @@ +## AVRO One Topic Many Subjects + +* Flink version: 1.20 +* Flink API: DataStream API +* Language: Java (11) +* Schema Registry: Confluent Schema Registry + +This example demonstrates how to serialize/deserialize Avro messages in Kafka when one topic stores multiple subject +types, leveraging Confluent Schema Registry for schema management and evolution. + +See [this article](https://martin.kleppmann.com/2018/01/18/event-types-in-kafka-topic.html) for more information about why it's sometimes essential to store multiple subjects in the same topic. + +This sample is based on a hypothetical use case where two sensors fitted into a room emit telemetry data to a single Kafka topic. +We use Confluent Schema Registry to govern the schema of telemetry messages serialized using Avro. + +Schema definitions for the messages are in `.avdl` files located in [./producer-app/src/main/resources/avro](./producer-app/src/main/resources/avro). + +This example uses Avro-generated classes (more details [below](#using-avro-generated-classes)). + +**producer-app** writes random samples of room temperature and air quality records to sensor-data topic. +**flink-consumer** reads sensor-data topic and processes them using `DataStream` API to print room temperatures and air quality exceeding a given threshold. + + +## Flink compatibility + +**Note:** This project is compatible with Flink 1.20+ and Amazon Managed Service for Apache Flink. + +### Flink API compatibility +This example uses the newer `KafkaSource` (as opposed to `FlinkKafkaConsumer`, which were deprecated with Flink 1.15). + +### Avro-generated classes + +This project uses classes generated at build-time as data objects. + +As a best practice, only the Avro schema definitions (IDL `.avdl` files in this case) are included in the project source code. + +The Avro Maven plugin generates the Java classes (source code) at build-time, during the [`generate-sources`](https://maven.apache.org/guides/introduction/introduction-to-the-lifecycle.html) phase. + +The generated classes are written into the `.//target/generated-sources/avro` directory and should **not** be committed with the project source. + +This way, the only dependency is on the schema definition file(s). If any change is required, the schema file is modified, and the Avro classes are re-generated automatically during the build. + +Code generation is supported by all common IDEs like IntelliJ. If your IDE does not see the Avro classes (`AirQuality` and `RoomTemperature`) when you import the project for the first time, you may manually run `mvn generate-sources` once to force source code generation from the IDE. + +### Using Avro-generated classes (SpecificRecord) in Apache Flink + +Using Avro-generated classes (`SpecificRecord`) within the flow of the Flink application (between operators) or in the Flink state has an additional benefit. Flink will [natively and efficiently serialize and deserialize](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos) these objects, without risking falling back to Kryo. + +### Running required services locally +Kafka and Confluent Schema Registry configuration is in [./docker-compose.yml](./docker-compose.yml). Start these services by running the `docker compose up` command. + +### Running in IntelliJ +To start the Flink job in IntelliJ: +1. Edit the Run/Debug configuration (Use compound configuration to run both applications at the same time). +2. Enable the option **"Add dependencies with 'provided' scope to the classpath"**. + +See this [page](../running-examples-locally.md) for more information. \ No newline at end of file diff --git a/java/AvroOneTopicManySubjects/docker-compose.yml b/java/AvroOneTopicManySubjects/docker-compose.yml new file mode 100644 index 0000000..3b8b3ea --- /dev/null +++ b/java/AvroOneTopicManySubjects/docker-compose.yml @@ -0,0 +1,43 @@ +services: + zookeeper: + image: confluentinc/cp-zookeeper:latest + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ports: + - 22181:2181 + + kafka: + image: confluentinc/cp-kafka:latest + depends_on: + - zookeeper + ports: + - 29092:29092 + - 11001:11001 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_JMX_PORT: 11001 + KAFKA_JMX_HOSTNAME: kafka + JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka -Dcom.sun.management.jmxremote.rmi.port=11001" + links: + - zookeeper + + scr: + image: confluentinc/cp-schema-registry + container_name: scr + depends_on: + - kafka + links: + - kafka + ports: + - 8085:8085 + environment: + CUB_CLASSPATH: '/usr/share/java/confluent-security/schema-registry/*:/usr/share/java/schema-registry/*:/usr/share/java/schema-registry-plugins/*:/usr/share/java/cp-base-new/*' + SCHEMA_REGISTRY_HOST_NAME: schemaregistry + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092 + SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8085" \ No newline at end of file diff --git a/java/AvroOneTopicManySubjects/flink-consumer/pom.xml b/java/AvroOneTopicManySubjects/flink-consumer/pom.xml new file mode 100644 index 0000000..0b57790 --- /dev/null +++ b/java/AvroOneTopicManySubjects/flink-consumer/pom.xml @@ -0,0 +1,252 @@ + + + 4.0.0 + + com.amazonaws + flink-consumer + 1.0-SNAPSHOT + + + UTF-8 + ${project.basedir}/target + ${project.name} + 11 + ${target.java.version} + com.amazonaws.services.msf.StreamingJob + + 2.12 + 1.10.2 + 3.3.0-1.20 + + 1.20.0 + 2.1.0 + 1.2.0 + + 1.7.32 + 2.17.2 + 5.9.1 + + + + + + confluent + https://packages.confluent.io/maven/ + + + + + + + + org.apache.flink + flink-java + ${flink.version} + provided + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-clients + ${flink.version} + provided + + + org.apache.flink + flink-runtime-web + ${flink.version} + + + + + com.amazonaws + aws-kinesisanalytics-runtime + ${kda.runtime.version} + provided + + + + + org.apache.flink + flink-connector-base + ${flink.version} + + + org.apache.flink + flink-connector-kafka + ${kafka.clients.version} + + + com.amazonaws + aws-kinesisanalytics-flink + ${kda.connectors.version} + + + + + org.apache.flink + flink-avro + ${flink.version} + + + org.apache.avro + avro + ${avro.version} + + + org.apache.flink + flink-avro-confluent-registry + ${flink.version} + + + org.apache.flink + flink-table-api-java + ${flink.version} + + + org.apache.flink + flink-connector-files + ${flink.version} + provided + + + io.confluent + kafka-avro-serializer + 7.9.0 + + + io.confluent + kafka-schema-registry-client + 7.9.0 + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + + + org.junit.jupiter + junit-jupiter-api + ${junit.version} + test + + + org.junit.jupiter + junit-jupiter-engine + ${junit.version} + test + + + + + ${buildDirectory} + ${jar.finalName} + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${target.java.version} + ${target.java.version} + true + + + + + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-sources + + schema + protocol + idl-protocol + + + + + ${project.basedir}/src/main/resources/avro + ${project.basedir}/src/test/resources/avro + private + String + true + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.1 + + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + org.apache.logging.log4j:* + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + ${main.class} + + + + + + + + + + + \ No newline at end of file diff --git a/java/AvroOneTopicManySubjects/flink-consumer/src/main/java/com/amazonaws/services/msf/Configuration.java b/java/AvroOneTopicManySubjects/flink-consumer/src/main/java/com/amazonaws/services/msf/Configuration.java new file mode 100644 index 0000000..848dc66 --- /dev/null +++ b/java/AvroOneTopicManySubjects/flink-consumer/src/main/java/com/amazonaws/services/msf/Configuration.java @@ -0,0 +1,36 @@ +package com.amazonaws.services.msf; + +import org.apache.flink.util.Preconditions; + +import java.util.Properties; + +public class Configuration { + private final String bootstrapServers; + private final String topic; + private final String consumerGroupId; + private final String schemaRegistryUrl; + + public Configuration(Properties properties) { + this.bootstrapServers = Preconditions.checkNotNull(properties.getProperty("bootstrap.servers"), "bootstrap.servers not defined"); + this.topic = Preconditions.checkNotNull(properties.getProperty("source.topic"), "source.topic not defined"); + this.consumerGroupId = properties.getProperty("source.consumer.group.id", "flink-consumer"); + this.schemaRegistryUrl = properties.getProperty("schema.registry.url", "http://localhost:8085"); + } + + + public String getBootstrapServers() { + return bootstrapServers; + } + + public String getTopic() { + return topic; + } + + public String getConsumerGroupId() { + return consumerGroupId; + } + + public String getSchemaRegistryUrl() { + return schemaRegistryUrl; + } +} diff --git a/java/AvroOneTopicManySubjects/flink-consumer/src/main/java/com/amazonaws/services/msf/Option.java b/java/AvroOneTopicManySubjects/flink-consumer/src/main/java/com/amazonaws/services/msf/Option.java new file mode 100644 index 0000000..0072e0f --- /dev/null +++ b/java/AvroOneTopicManySubjects/flink-consumer/src/main/java/com/amazonaws/services/msf/Option.java @@ -0,0 +1,26 @@ +package com.amazonaws.services.msf; + +/** + * Option type is a generic container to hold deserialized records. + * It can be useful to introduce fields into this type to handle partitioning + * strategies and event time extraction. However, for those scenarios to work + * all subjects should have a standard set of fields. + */ +class Option { + private Object value; + + public Option() { + } + + public Option(Object value) { + this.value = value; + } + + public Object getValue() { + return this.value; + } + + public void setValue(Object value) { + this.value = value; + } +} diff --git a/java/AvroOneTopicManySubjects/flink-consumer/src/main/java/com/amazonaws/services/msf/OptionDeserializationSchema.java b/java/AvroOneTopicManySubjects/flink-consumer/src/main/java/com/amazonaws/services/msf/OptionDeserializationSchema.java new file mode 100644 index 0000000..5884660 --- /dev/null +++ b/java/AvroOneTopicManySubjects/flink-consumer/src/main/java/com/amazonaws/services/msf/OptionDeserializationSchema.java @@ -0,0 +1,55 @@ +package com.amazonaws.services.msf; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; +import io.confluent.kafka.serializers.subject.RecordNameStrategy; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; +import org.apache.flink.util.Collector; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +// Custom deserialization schema for handling multiple generic Avro record types +class OptionDeserializationSchema implements KafkaRecordDeserializationSchema { + + private final String schemaRegistryUrl; + private transient KafkaAvroDeserializer deserializer; + + public OptionDeserializationSchema(String schemaRegistryUrl) { + this.schemaRegistryUrl = schemaRegistryUrl; + } + + @Override + public void open(DeserializationSchema.InitializationContext context) throws Exception { + SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 100); + + // With following configuration Avro deserializer should figure out the target type + // based on the schema subject name + // (which is the generated class's name for the corresponding avro record type). + Map config = new HashMap<>(); + config.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + config.put(KafkaAvroDeserializerConfig.VALUE_SUBJECT_NAME_STRATEGY, RecordNameStrategy.class.getName()); + config.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); + + // Create and configure the deserializer + deserializer = new KafkaAvroDeserializer(schemaRegistryClient, config); + deserializer.configure(config, false); + } + + @Override + public void deserialize(ConsumerRecord record, Collector out) throws IOException { + Object deserialized = deserializer.deserialize(record.topic(), record.value()); + out.collect(new Option(deserialized)); + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(Option.class); + } +} diff --git a/java/AvroOneTopicManySubjects/flink-consumer/src/main/java/com/amazonaws/services/msf/StreamingJob.java b/java/AvroOneTopicManySubjects/flink-consumer/src/main/java/com/amazonaws/services/msf/StreamingJob.java new file mode 100644 index 0000000..35d3cb5 --- /dev/null +++ b/java/AvroOneTopicManySubjects/flink-consumer/src/main/java/com/amazonaws/services/msf/StreamingJob.java @@ -0,0 +1,130 @@ +package com.amazonaws.services.msf; + +import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; +import com.amazonaws.services.msf.avro.AirQuality; +import com.amazonaws.services.msf.avro.RoomTemperature; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.kafka.clients.admin.*; +import org.apache.kafka.common.errors.TopicExistsException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ExecutionException; + +/** + * Sample Flink application that can run in Amazon Managed Service for Apache Flink + * + * It demonstrates how to process records in a Kafka topic with varying schema when + * they are produced using RecordName subject name strategy in Confluent Schema Registry. + */ +public class StreamingJob { + private static final Logger LOGGER = LoggerFactory.getLogger(StreamingJob.class); + // Name of the local JSON resource with the application properties in the same format as they are received from the MSF runtime + private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-consumer-dev.json"; + + // Names of the configuration group containing the application properties + private static final String APPLICATION_CONFIG_GROUP = "FlinkApplicationProperties"; + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + if (isLocal(env)) { + env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new org.apache.flink.configuration.Configuration()); + env.enableCheckpointing(60000); + } + + Configuration config = loadConfiguration(env); + + ensureTopicExist(config); + + KafkaSource source = kafkaSource(config); + DataStream options = env.fromSource(source, WatermarkStrategy.noWatermarks(), "source"); + + DataStream roomTemperature = options + .filter((FilterFunction) option -> option.getValue() instanceof RoomTemperature) + .map((MapFunction) option -> (RoomTemperature) option.getValue()) + .keyBy((KeySelector) RoomTemperature::getSensorId) + .filter((FilterFunction) temperatureSample -> temperatureSample.getTemperature() > 30.0); + + DataStream airQuality = options + .filter((FilterFunction) option -> option.getValue() instanceof AirQuality) + .map((MapFunction) option -> (AirQuality) option.getValue()) + .keyBy((KeySelector) AirQuality::getSensorId) + .filter((FilterFunction) airQuality12 -> airQuality12.getQualityIndex() > 5); + + + roomTemperature.print(); + airQuality.print(); + + env.execute("flink-consumer"); + } + + private static boolean isLocal(StreamExecutionEnvironment env) { + return env instanceof LocalStreamEnvironment; + } + + /** + * Load application configuration from the service runtime or from a local resource, when the environment is local + */ + private static Configuration loadConfiguration(StreamExecutionEnvironment env) throws IOException { + Map propertyMap; + + if (isLocal(env)) { + LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); + propertyMap = KinesisAnalyticsRuntime.getApplicationProperties(StreamingJob.class.getClassLoader().getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); + } else { + LOGGER.info("Loading application configuration from "); + propertyMap = KinesisAnalyticsRuntime.getApplicationProperties(); + } + + Properties properties = propertyMap.get(APPLICATION_CONFIG_GROUP); + + return new Configuration(properties); + } + + /** + * KafkaSource for any AVRO-generated class (SpecificRecord) Confluent schema registry. + * + * @param config Application configuration + * + * @return a KafkaSource instance + */ + private static KafkaSource kafkaSource(Configuration config) { + OptionDeserializationSchema d = new OptionDeserializationSchema(config.getSchemaRegistryUrl()); + return KafkaSource.builder() + .setBootstrapServers(config.getBootstrapServers()) + .setTopics(config.getTopic()) + .setGroupId(config.getConsumerGroupId()) + .setDeserializer(d) + .build(); + } + + private static void ensureTopicExist(Configuration config) throws Exception { + Properties adminClientConfig = new Properties(); + + adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); + + try (AdminClient adminClient = AdminClient.create(adminClientConfig)) { + // Create the topic if it does not exist + NewTopic newTopic = new NewTopic(config.getTopic(), 1, (short)1); + adminClient.createTopics(Collections.singleton(newTopic)).all().get(); + System.out.println("Topic '" + config.getTopic() + "' created successfully."); + } catch (ExecutionException e) { + if (e.getCause() instanceof TopicExistsException) { + // This is fine because we want the topic to exist + return; + } + throw e; + } + } +} + diff --git a/java/AvroOneTopicManySubjects/flink-consumer/src/main/resources/avro/air-quality.avdl b/java/AvroOneTopicManySubjects/flink-consumer/src/main/resources/avro/air-quality.avdl new file mode 100644 index 0000000..26889e5 --- /dev/null +++ b/java/AvroOneTopicManySubjects/flink-consumer/src/main/resources/avro/air-quality.avdl @@ -0,0 +1,9 @@ +@namespace("com.amazonaws.services.msf.avro") +protocol In { + record AirQuality { + int sensorId; + string room; + float qualityIndex; + timestamp_ms timestamp; + } +} \ No newline at end of file diff --git a/java/AvroOneTopicManySubjects/flink-consumer/src/main/resources/avro/room-temperature.avdl b/java/AvroOneTopicManySubjects/flink-consumer/src/main/resources/avro/room-temperature.avdl new file mode 100644 index 0000000..b32b660 --- /dev/null +++ b/java/AvroOneTopicManySubjects/flink-consumer/src/main/resources/avro/room-temperature.avdl @@ -0,0 +1,9 @@ +@namespace("com.amazonaws.services.msf.avro") +protocol Out { + record RoomTemperature { + int sensorId; + string room; + float temperature; + timestamp_ms timestamp; + } +} \ No newline at end of file diff --git a/java/AvroOneTopicManySubjects/flink-consumer/src/main/resources/flink-consumer-dev.json b/java/AvroOneTopicManySubjects/flink-consumer/src/main/resources/flink-consumer-dev.json new file mode 100644 index 0000000..f8f0d35 --- /dev/null +++ b/java/AvroOneTopicManySubjects/flink-consumer/src/main/resources/flink-consumer-dev.json @@ -0,0 +1,10 @@ +[ + { + "PropertyGroupId": "FlinkApplicationProperties", + "PropertyMap": { + "bootstrap.servers": "localhost:29092", + "source.topic": "sensor-data", + "source.consumer.group.id": "flink-consumer" + } + } +] diff --git a/java/AvroOneTopicManySubjects/flink-consumer/src/main/resources/log4j2.properties b/java/AvroOneTopicManySubjects/flink-consumer/src/main/resources/log4j2.properties new file mode 100644 index 0000000..c7d36aa --- /dev/null +++ b/java/AvroOneTopicManySubjects/flink-consumer/src/main/resources/log4j2.properties @@ -0,0 +1,7 @@ +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n + +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender \ No newline at end of file diff --git a/java/AvroOneTopicManySubjects/pom.xml b/java/AvroOneTopicManySubjects/pom.xml new file mode 100644 index 0000000..d4a2380 --- /dev/null +++ b/java/AvroOneTopicManySubjects/pom.xml @@ -0,0 +1,16 @@ + + + 4.0.0 + + com.amazonaws + avro-one-topic-many-subjects-sample + 1.0 + pom + + producer-app + flink-consumer + + + diff --git a/java/AvroOneTopicManySubjects/producer-app/pom.xml b/java/AvroOneTopicManySubjects/producer-app/pom.xml new file mode 100644 index 0000000..207adcd --- /dev/null +++ b/java/AvroOneTopicManySubjects/producer-app/pom.xml @@ -0,0 +1,185 @@ + + + 4.0.0 + + com.amazonaws + producer-app + 1.0-SNAPSHOT + + + UTF-8 + ${project.basedir}/target + ${project.name} + 11 + ${target.java.version} + com.amazonaws.services.msf.Sensors + + 1.10.2 + 7.9.0 + 3.9.0 + 1.7.32 + 2.17.2 + 5.9.1 + 1.9.0 + + + + + + confluent + https://packages.confluent.io/maven/ + + + + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + org.apache.avro + avro + ${avro.version} + + + io.confluent + kafka-avro-serializer + ${confluent.version} + + + io.confluent + kafka-schema-registry-client + ${confluent.version} + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + + + org.junit.jupiter + junit-jupiter-api + ${junit.version} + test + + + org.junit.jupiter + junit-jupiter-engine + ${junit.version} + test + + + commons-cli + commons-cli + ${commons.version} + + + + + ${buildDirectory} + ${jar.finalName} + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${target.java.version} + ${target.java.version} + true + + + + + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-sources + + schema + protocol + idl-protocol + + + + + ${project.basedir}/src/main/resources/avro + ${project.basedir}/src/test/resources/avro + private + String + true + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.1 + + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + org.apache.logging.log4j:* + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + ${main.class} + + + + + + + + + + + \ No newline at end of file diff --git a/java/AvroOneTopicManySubjects/producer-app/src/main/java/com/amazonaws/services/msf/Sensors.java b/java/AvroOneTopicManySubjects/producer-app/src/main/java/com/amazonaws/services/msf/Sensors.java new file mode 100644 index 0000000..5b9d9b5 --- /dev/null +++ b/java/AvroOneTopicManySubjects/producer-app/src/main/java/com/amazonaws/services/msf/Sensors.java @@ -0,0 +1,159 @@ +package com.amazonaws.services.msf; + +import com.amazonaws.services.msf.avro.AirQuality; +import com.amazonaws.services.msf.avro.RoomTemperature; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import io.confluent.kafka.serializers.subject.RecordNameStrategy; +import org.apache.commons.cli.*; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Sample application to simulate sensors writing air quality and room temperatures to a Kafka topic. + */ +public class Sensors { + private static final Logger LOG = LoggerFactory.getLogger(Sensors.class); + private static final String DEFAULT_BOOTSTRAP_SERVER = "localhost:29092"; + private static final String DEFAULT_TOPIC_NAME = "sensor-data"; + private static final Long DEFAULT_SLEEP_TIME_BETWEEN_RECORDS = 1000L; + private static final String DEFAULT_SCHEMA_REGISTRY_URL = "http://localhost:8085/"; + + private final String bootstrapServer; + private final String topicName; + private final String schemaRegistry; + private final long sleepTimeBetweenRecordsMillis; + + public static void main(String[] args) throws Exception { + Options options = new Options() + .addOption(Option.builder() + .longOpt("bootstrapServer") + .hasArg() + .desc("Bootstrap server (default: " + DEFAULT_BOOTSTRAP_SERVER + ")") + .build()) + .addOption(Option.builder() + .longOpt("topicName") + .hasArg() + .desc("Topic name (default: " + DEFAULT_TOPIC_NAME + ")") + .build()) + .addOption(Option.builder() + .longOpt("schemaRegistry") + .hasArg() + .desc("Confluent Schema Registry URL (default: " + DEFAULT_SCHEMA_REGISTRY_URL + ")") + .build()) + .addOption(Option.builder() + .longOpt("sleep") + .hasArg() + .desc("Sleep duration in seconds (default: " + DEFAULT_SLEEP_TIME_BETWEEN_RECORDS + ")") + .build()); + + CommandLineParser parser = new DefaultParser(); + HelpFormatter formatter = new HelpFormatter(); + + try { + CommandLine cmd = parser.parse(options, args); + String bootstrapServer = cmd.getOptionValue("bootstrapServer", DEFAULT_BOOTSTRAP_SERVER); + String topicName = cmd.getOptionValue("topicName", DEFAULT_TOPIC_NAME); + String schemaRegistry = cmd.getOptionValue("schemaRegistry", DEFAULT_SCHEMA_REGISTRY_URL); + long sleepTimeBetweenRecordsMillis = Long.parseLong(cmd.getOptionValue("sleep", String.valueOf(DEFAULT_SLEEP_TIME_BETWEEN_RECORDS))); + + LOG.info("BootstrapServer: {}, TopicName: {}", bootstrapServer, topicName); + LOG.info("SchemaRegistry: {}", schemaRegistry); + LOG.info("SleepTimeBetweenRecords: {} ms", sleepTimeBetweenRecordsMillis); + + Sensors sensors = new Sensors(bootstrapServer, topicName, schemaRegistry, sleepTimeBetweenRecordsMillis); + sensors.produce(); + } catch (ParseException e) { + System.err.println("Error parsing command line arguments: " + e.getMessage()); + formatter.printHelp("Sensors", options); + System.exit(1); + } catch (NumberFormatException e) { + System.err.println("Error: sleep parameter must be a valid integer"); + formatter.printHelp("Sensors", options); + System.exit(1); + } + } + + public Sensors(String bootstrapServer, String topicName, String schemaRegistry, long sleepTimeBetweenRecordsMillis) { + if (bootstrapServer == null || bootstrapServer.trim().isEmpty()) { + throw new IllegalArgumentException("bootstrapServer cannot be null or empty"); + } + if (topicName == null || topicName.trim().isEmpty()) { + throw new IllegalArgumentException("topicName cannot be null or empty"); + } + if (schemaRegistry == null || schemaRegistry.trim().isEmpty()) { + throw new IllegalArgumentException("schemaRegistry cannot be null or empty"); + } + if (sleepTimeBetweenRecordsMillis < 0) { + throw new IllegalArgumentException("sleepTimeBetweenRecordsMillis must be non-negative"); + } + + this.bootstrapServer = bootstrapServer; + this.topicName = topicName; + this.schemaRegistry = schemaRegistry; + this.sleepTimeBetweenRecordsMillis = sleepTimeBetweenRecordsMillis; + } + + private void produce() throws Exception { + Properties properties = new Properties(); + properties.put("bootstrap.servers", bootstrapServer); + properties.put("schema.registry.url", schemaRegistry); + properties.put("key.serializer", StringSerializer.class.getName()); + properties.put("value.serializer", KafkaAvroSerializer.class.getName()); + properties.put("value.subject.name.strategy", RecordNameStrategy.class.getName()); + + AtomicBoolean running = new AtomicBoolean(true); + + try (KafkaProducer producer = new KafkaProducer<>(properties)) { + Random random = new Random(); + + while (running.get()) { + // Alternate between AirQuality and RoomTemperature records + boolean isAirQuality = random.nextBoolean(); + + String roomId = "Room-" + random.nextInt(10); + Instant timestamp = Instant.now(); + + // Create record using generated POJOs + Object record; + if (isAirQuality) { + AirQuality airQuality = new AirQuality(); + airQuality.setRoom(roomId); + airQuality.setQualityIndex(400 + random.nextFloat() * 600); // CO2 between 400-1000 ppm + airQuality.setTimestamp(timestamp); + record = airQuality; + } else { + RoomTemperature roomTemp = new RoomTemperature(); + roomTemp.setRoom(roomId); + roomTemp.setTemperature(18 + random.nextFloat() * 22); // Temperature between 18-40°C + roomTemp.setTimestamp(timestamp); + record = roomTemp; + } + + // Create ProducerRecord with the topic name and record + ProducerRecord producerRecord = + new ProducerRecord<>(topicName, record); + + // Send the record asynchronously + producer.send(producerRecord, (metadata, exception) -> { + if (exception != null) { + LOG.error("Error sending record: ", exception); + running.set(false); + } + }); + + Thread.sleep(sleepTimeBetweenRecordsMillis); + } + } catch (InterruptedException e) { + LOG.warn("Producer interrupted", e); + throw e; + } + } + +} diff --git a/java/AvroOneTopicManySubjects/producer-app/src/main/resources/avro/air-quality.avdl b/java/AvroOneTopicManySubjects/producer-app/src/main/resources/avro/air-quality.avdl new file mode 100644 index 0000000..26889e5 --- /dev/null +++ b/java/AvroOneTopicManySubjects/producer-app/src/main/resources/avro/air-quality.avdl @@ -0,0 +1,9 @@ +@namespace("com.amazonaws.services.msf.avro") +protocol In { + record AirQuality { + int sensorId; + string room; + float qualityIndex; + timestamp_ms timestamp; + } +} \ No newline at end of file diff --git a/java/AvroOneTopicManySubjects/producer-app/src/main/resources/avro/room-temperature.avdl b/java/AvroOneTopicManySubjects/producer-app/src/main/resources/avro/room-temperature.avdl new file mode 100644 index 0000000..b32b660 --- /dev/null +++ b/java/AvroOneTopicManySubjects/producer-app/src/main/resources/avro/room-temperature.avdl @@ -0,0 +1,9 @@ +@namespace("com.amazonaws.services.msf.avro") +protocol Out { + record RoomTemperature { + int sensorId; + string room; + float temperature; + timestamp_ms timestamp; + } +} \ No newline at end of file diff --git a/java/AvroOneTopicManySubjects/producer-app/src/main/resources/log4j2.properties b/java/AvroOneTopicManySubjects/producer-app/src/main/resources/log4j2.properties new file mode 100644 index 0000000..c7d36aa --- /dev/null +++ b/java/AvroOneTopicManySubjects/producer-app/src/main/resources/log4j2.properties @@ -0,0 +1,7 @@ +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n + +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender \ No newline at end of file
+ * It demonstrates how to process records in a Kafka topic with varying schema when + * they are produced using RecordName subject name strategy in Confluent Schema Registry. + */ +public class StreamingJob { + private static final Logger LOGGER = LoggerFactory.getLogger(StreamingJob.class); + // Name of the local JSON resource with the application properties in the same format as they are received from the MSF runtime + private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE = "flink-consumer-dev.json"; + + // Names of the configuration group containing the application properties + private static final String APPLICATION_CONFIG_GROUP = "FlinkApplicationProperties"; + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + if (isLocal(env)) { + env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new org.apache.flink.configuration.Configuration()); + env.enableCheckpointing(60000); + } + + Configuration config = loadConfiguration(env); + + ensureTopicExist(config); + + KafkaSource source = kafkaSource(config); + DataStream options = env.fromSource(source, WatermarkStrategy.noWatermarks(), "source"); + + DataStream roomTemperature = options + .filter((FilterFunction) option -> option.getValue() instanceof RoomTemperature) + .map((MapFunction) option -> (RoomTemperature) option.getValue()) + .keyBy((KeySelector) RoomTemperature::getSensorId) + .filter((FilterFunction) temperatureSample -> temperatureSample.getTemperature() > 30.0); + + DataStream airQuality = options + .filter((FilterFunction) option -> option.getValue() instanceof AirQuality) + .map((MapFunction) option -> (AirQuality) option.getValue()) + .keyBy((KeySelector) AirQuality::getSensorId) + .filter((FilterFunction) airQuality12 -> airQuality12.getQualityIndex() > 5); + + + roomTemperature.print(); + airQuality.print(); + + env.execute("flink-consumer"); + } + + private static boolean isLocal(StreamExecutionEnvironment env) { + return env instanceof LocalStreamEnvironment; + } + + /** + * Load application configuration from the service runtime or from a local resource, when the environment is local + */ + private static Configuration loadConfiguration(StreamExecutionEnvironment env) throws IOException { + Map propertyMap; + + if (isLocal(env)) { + LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); + propertyMap = KinesisAnalyticsRuntime.getApplicationProperties(StreamingJob.class.getClassLoader().getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); + } else { + LOGGER.info("Loading application configuration from "); + propertyMap = KinesisAnalyticsRuntime.getApplicationProperties(); + } + + Properties properties = propertyMap.get(APPLICATION_CONFIG_GROUP); + + return new Configuration(properties); + } + + /** + * KafkaSource for any AVRO-generated class (SpecificRecord) Confluent schema registry. + * + * @param config Application configuration + * + * @return a KafkaSource instance + */ + private static KafkaSource kafkaSource(Configuration config) { + OptionDeserializationSchema d = new OptionDeserializationSchema(config.getSchemaRegistryUrl()); + return KafkaSource.builder() + .setBootstrapServers(config.getBootstrapServers()) + .setTopics(config.getTopic()) + .setGroupId(config.getConsumerGroupId()) + .setDeserializer(d) + .build(); + } + + private static void ensureTopicExist(Configuration config) throws Exception { + Properties adminClientConfig = new Properties(); + + adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); + + try (AdminClient adminClient = AdminClient.create(adminClientConfig)) { + // Create the topic if it does not exist + NewTopic newTopic = new NewTopic(config.getTopic(), 1, (short)1); + adminClient.createTopics(Collections.singleton(newTopic)).all().get(); + System.out.println("Topic '" + config.getTopic() + "' created successfully."); + } catch (ExecutionException e) { + if (e.getCause() instanceof TopicExistsException) { + // This is fine because we want the topic to exist + return; + } + throw e; + } + } +} + diff --git a/java/AvroOneTopicManySubjects/flink-consumer/src/main/resources/avro/air-quality.avdl b/java/AvroOneTopicManySubjects/flink-consumer/src/main/resources/avro/air-quality.avdl new file mode 100644 index 0000000..26889e5 --- /dev/null +++ b/java/AvroOneTopicManySubjects/flink-consumer/src/main/resources/avro/air-quality.avdl @@ -0,0 +1,9 @@ +@namespace("com.amazonaws.services.msf.avro") +protocol In { + record AirQuality { + int sensorId; + string room; + float qualityIndex; + timestamp_ms timestamp; + } +} \ No newline at end of file diff --git a/java/AvroOneTopicManySubjects/flink-consumer/src/main/resources/avro/room-temperature.avdl b/java/AvroOneTopicManySubjects/flink-consumer/src/main/resources/avro/room-temperature.avdl new file mode 100644 index 0000000..b32b660 --- /dev/null +++ b/java/AvroOneTopicManySubjects/flink-consumer/src/main/resources/avro/room-temperature.avdl @@ -0,0 +1,9 @@ +@namespace("com.amazonaws.services.msf.avro") +protocol Out { + record RoomTemperature { + int sensorId; + string room; + float temperature; + timestamp_ms timestamp; + } +} \ No newline at end of file diff --git a/java/AvroOneTopicManySubjects/flink-consumer/src/main/resources/flink-consumer-dev.json b/java/AvroOneTopicManySubjects/flink-consumer/src/main/resources/flink-consumer-dev.json new file mode 100644 index 0000000..f8f0d35 --- /dev/null +++ b/java/AvroOneTopicManySubjects/flink-consumer/src/main/resources/flink-consumer-dev.json @@ -0,0 +1,10 @@ +[ + { + "PropertyGroupId": "FlinkApplicationProperties", + "PropertyMap": { + "bootstrap.servers": "localhost:29092", + "source.topic": "sensor-data", + "source.consumer.group.id": "flink-consumer" + } + } +] diff --git a/java/AvroOneTopicManySubjects/flink-consumer/src/main/resources/log4j2.properties b/java/AvroOneTopicManySubjects/flink-consumer/src/main/resources/log4j2.properties new file mode 100644 index 0000000..c7d36aa --- /dev/null +++ b/java/AvroOneTopicManySubjects/flink-consumer/src/main/resources/log4j2.properties @@ -0,0 +1,7 @@ +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n + +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender \ No newline at end of file diff --git a/java/AvroOneTopicManySubjects/pom.xml b/java/AvroOneTopicManySubjects/pom.xml new file mode 100644 index 0000000..d4a2380 --- /dev/null +++ b/java/AvroOneTopicManySubjects/pom.xml @@ -0,0 +1,16 @@ + + + 4.0.0 + + com.amazonaws + avro-one-topic-many-subjects-sample + 1.0 + pom + + producer-app + flink-consumer + + + diff --git a/java/AvroOneTopicManySubjects/producer-app/pom.xml b/java/AvroOneTopicManySubjects/producer-app/pom.xml new file mode 100644 index 0000000..207adcd --- /dev/null +++ b/java/AvroOneTopicManySubjects/producer-app/pom.xml @@ -0,0 +1,185 @@ + + + 4.0.0 + + com.amazonaws + producer-app + 1.0-SNAPSHOT + + + UTF-8 + ${project.basedir}/target + ${project.name} + 11 + ${target.java.version} + com.amazonaws.services.msf.Sensors + + 1.10.2 + 7.9.0 + 3.9.0 + 1.7.32 + 2.17.2 + 5.9.1 + 1.9.0 + + + + + + confluent + https://packages.confluent.io/maven/ + + + + + + org.apache.kafka + kafka-clients + ${kafka.version} + + + org.apache.avro + avro + ${avro.version} + + + io.confluent + kafka-avro-serializer + ${confluent.version} + + + io.confluent + kafka-schema-registry-client + ${confluent.version} + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + + + org.junit.jupiter + junit-jupiter-api + ${junit.version} + test + + + org.junit.jupiter + junit-jupiter-engine + ${junit.version} + test + + + commons-cli + commons-cli + ${commons.version} + + + + + ${buildDirectory} + ${jar.finalName} + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${target.java.version} + ${target.java.version} + true + + + + + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-sources + + schema + protocol + idl-protocol + + + + + ${project.basedir}/src/main/resources/avro + ${project.basedir}/src/test/resources/avro + private + String + true + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.1 + + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + org.apache.logging.log4j:* + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + ${main.class} + + + + + + + + + + + \ No newline at end of file diff --git a/java/AvroOneTopicManySubjects/producer-app/src/main/java/com/amazonaws/services/msf/Sensors.java b/java/AvroOneTopicManySubjects/producer-app/src/main/java/com/amazonaws/services/msf/Sensors.java new file mode 100644 index 0000000..5b9d9b5 --- /dev/null +++ b/java/AvroOneTopicManySubjects/producer-app/src/main/java/com/amazonaws/services/msf/Sensors.java @@ -0,0 +1,159 @@ +package com.amazonaws.services.msf; + +import com.amazonaws.services.msf.avro.AirQuality; +import com.amazonaws.services.msf.avro.RoomTemperature; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import io.confluent.kafka.serializers.subject.RecordNameStrategy; +import org.apache.commons.cli.*; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Sample application to simulate sensors writing air quality and room temperatures to a Kafka topic. + */ +public class Sensors { + private static final Logger LOG = LoggerFactory.getLogger(Sensors.class); + private static final String DEFAULT_BOOTSTRAP_SERVER = "localhost:29092"; + private static final String DEFAULT_TOPIC_NAME = "sensor-data"; + private static final Long DEFAULT_SLEEP_TIME_BETWEEN_RECORDS = 1000L; + private static final String DEFAULT_SCHEMA_REGISTRY_URL = "http://localhost:8085/"; + + private final String bootstrapServer; + private final String topicName; + private final String schemaRegistry; + private final long sleepTimeBetweenRecordsMillis; + + public static void main(String[] args) throws Exception { + Options options = new Options() + .addOption(Option.builder() + .longOpt("bootstrapServer") + .hasArg() + .desc("Bootstrap server (default: " + DEFAULT_BOOTSTRAP_SERVER + ")") + .build()) + .addOption(Option.builder() + .longOpt("topicName") + .hasArg() + .desc("Topic name (default: " + DEFAULT_TOPIC_NAME + ")") + .build()) + .addOption(Option.builder() + .longOpt("schemaRegistry") + .hasArg() + .desc("Confluent Schema Registry URL (default: " + DEFAULT_SCHEMA_REGISTRY_URL + ")") + .build()) + .addOption(Option.builder() + .longOpt("sleep") + .hasArg() + .desc("Sleep duration in seconds (default: " + DEFAULT_SLEEP_TIME_BETWEEN_RECORDS + ")") + .build()); + + CommandLineParser parser = new DefaultParser(); + HelpFormatter formatter = new HelpFormatter(); + + try { + CommandLine cmd = parser.parse(options, args); + String bootstrapServer = cmd.getOptionValue("bootstrapServer", DEFAULT_BOOTSTRAP_SERVER); + String topicName = cmd.getOptionValue("topicName", DEFAULT_TOPIC_NAME); + String schemaRegistry = cmd.getOptionValue("schemaRegistry", DEFAULT_SCHEMA_REGISTRY_URL); + long sleepTimeBetweenRecordsMillis = Long.parseLong(cmd.getOptionValue("sleep", String.valueOf(DEFAULT_SLEEP_TIME_BETWEEN_RECORDS))); + + LOG.info("BootstrapServer: {}, TopicName: {}", bootstrapServer, topicName); + LOG.info("SchemaRegistry: {}", schemaRegistry); + LOG.info("SleepTimeBetweenRecords: {} ms", sleepTimeBetweenRecordsMillis); + + Sensors sensors = new Sensors(bootstrapServer, topicName, schemaRegistry, sleepTimeBetweenRecordsMillis); + sensors.produce(); + } catch (ParseException e) { + System.err.println("Error parsing command line arguments: " + e.getMessage()); + formatter.printHelp("Sensors", options); + System.exit(1); + } catch (NumberFormatException e) { + System.err.println("Error: sleep parameter must be a valid integer"); + formatter.printHelp("Sensors", options); + System.exit(1); + } + } + + public Sensors(String bootstrapServer, String topicName, String schemaRegistry, long sleepTimeBetweenRecordsMillis) { + if (bootstrapServer == null || bootstrapServer.trim().isEmpty()) { + throw new IllegalArgumentException("bootstrapServer cannot be null or empty"); + } + if (topicName == null || topicName.trim().isEmpty()) { + throw new IllegalArgumentException("topicName cannot be null or empty"); + } + if (schemaRegistry == null || schemaRegistry.trim().isEmpty()) { + throw new IllegalArgumentException("schemaRegistry cannot be null or empty"); + } + if (sleepTimeBetweenRecordsMillis < 0) { + throw new IllegalArgumentException("sleepTimeBetweenRecordsMillis must be non-negative"); + } + + this.bootstrapServer = bootstrapServer; + this.topicName = topicName; + this.schemaRegistry = schemaRegistry; + this.sleepTimeBetweenRecordsMillis = sleepTimeBetweenRecordsMillis; + } + + private void produce() throws Exception { + Properties properties = new Properties(); + properties.put("bootstrap.servers", bootstrapServer); + properties.put("schema.registry.url", schemaRegistry); + properties.put("key.serializer", StringSerializer.class.getName()); + properties.put("value.serializer", KafkaAvroSerializer.class.getName()); + properties.put("value.subject.name.strategy", RecordNameStrategy.class.getName()); + + AtomicBoolean running = new AtomicBoolean(true); + + try (KafkaProducer producer = new KafkaProducer<>(properties)) { + Random random = new Random(); + + while (running.get()) { + // Alternate between AirQuality and RoomTemperature records + boolean isAirQuality = random.nextBoolean(); + + String roomId = "Room-" + random.nextInt(10); + Instant timestamp = Instant.now(); + + // Create record using generated POJOs + Object record; + if (isAirQuality) { + AirQuality airQuality = new AirQuality(); + airQuality.setRoom(roomId); + airQuality.setQualityIndex(400 + random.nextFloat() * 600); // CO2 between 400-1000 ppm + airQuality.setTimestamp(timestamp); + record = airQuality; + } else { + RoomTemperature roomTemp = new RoomTemperature(); + roomTemp.setRoom(roomId); + roomTemp.setTemperature(18 + random.nextFloat() * 22); // Temperature between 18-40°C + roomTemp.setTimestamp(timestamp); + record = roomTemp; + } + + // Create ProducerRecord with the topic name and record + ProducerRecord producerRecord = + new ProducerRecord<>(topicName, record); + + // Send the record asynchronously + producer.send(producerRecord, (metadata, exception) -> { + if (exception != null) { + LOG.error("Error sending record: ", exception); + running.set(false); + } + }); + + Thread.sleep(sleepTimeBetweenRecordsMillis); + } + } catch (InterruptedException e) { + LOG.warn("Producer interrupted", e); + throw e; + } + } + +} diff --git a/java/AvroOneTopicManySubjects/producer-app/src/main/resources/avro/air-quality.avdl b/java/AvroOneTopicManySubjects/producer-app/src/main/resources/avro/air-quality.avdl new file mode 100644 index 0000000..26889e5 --- /dev/null +++ b/java/AvroOneTopicManySubjects/producer-app/src/main/resources/avro/air-quality.avdl @@ -0,0 +1,9 @@ +@namespace("com.amazonaws.services.msf.avro") +protocol In { + record AirQuality { + int sensorId; + string room; + float qualityIndex; + timestamp_ms timestamp; + } +} \ No newline at end of file diff --git a/java/AvroOneTopicManySubjects/producer-app/src/main/resources/avro/room-temperature.avdl b/java/AvroOneTopicManySubjects/producer-app/src/main/resources/avro/room-temperature.avdl new file mode 100644 index 0000000..b32b660 --- /dev/null +++ b/java/AvroOneTopicManySubjects/producer-app/src/main/resources/avro/room-temperature.avdl @@ -0,0 +1,9 @@ +@namespace("com.amazonaws.services.msf.avro") +protocol Out { + record RoomTemperature { + int sensorId; + string room; + float temperature; + timestamp_ms timestamp; + } +} \ No newline at end of file diff --git a/java/AvroOneTopicManySubjects/producer-app/src/main/resources/log4j2.properties b/java/AvroOneTopicManySubjects/producer-app/src/main/resources/log4j2.properties new file mode 100644 index 0000000..c7d36aa --- /dev/null +++ b/java/AvroOneTopicManySubjects/producer-app/src/main/resources/log4j2.properties @@ -0,0 +1,7 @@ +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n + +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender \ No newline at end of file