From e59b81b0bb124a5978c3a21934af709469876666 Mon Sep 17 00:00:00 2001 From: Siarhei Kazhura Date: Tue, 12 Dec 2023 12:29:39 -0800 Subject: [PATCH 1/4] Sample Amazon SQS function code using Rust without batch item handling. --- integration-sqs-to-lambda/main.rs | 27 +++++++++++++++++++++ integration-sqs-to-lambda/snippet-data.json | 11 +++++++++ 2 files changed, 38 insertions(+) create mode 100644 integration-sqs-to-lambda/main.rs diff --git a/integration-sqs-to-lambda/main.rs b/integration-sqs-to-lambda/main.rs new file mode 100644 index 0000000..f43d0c3 --- /dev/null +++ b/integration-sqs-to-lambda/main.rs @@ -0,0 +1,27 @@ +use aws_lambda_events::event::sqs::SqsEvent; +use lambda_runtime::{run, service_fn, Error, LambdaEvent}; + +static UNDEFINED: &str = "undefined"; + +async fn function_handler(event: LambdaEvent) -> Result<(), Error> { + + event.payload.records.iter().for_each(|record| { + // process the record + tracing::info!("Message body: {}", record.body.as_deref().unwrap_or(UNDEFINED)) + }); + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + // disable printing the name of the module in every log line. + .with_target(false) + // disabling time is handy because CloudWatch will add the ingestion time. + .without_time() + .init(); + + run(service_fn(function_handler)).await +} \ No newline at end of file diff --git a/integration-sqs-to-lambda/snippet-data.json b/integration-sqs-to-lambda/snippet-data.json index e56583e..4adf024 100644 --- a/integration-sqs-to-lambda/snippet-data.json +++ b/integration-sqs-to-lambda/snippet-data.json @@ -96,6 +96,17 @@ "language": "go" } ] + }, + { + "id": "Rust", + "title": "Usage Example with Rust:", + "description": "Sample Amazon SQS function code using Rust without batch item handling.", + "snippets": [ + { + "snippetPath": "main.rs", + "language": "rust" + } + ] } ] } From 8bffa42debdf90a746a16d2bdbb00f4db16fc6e6 Mon Sep 17 00:00:00 2001 From: Siarhei Kazhura Date: Tue, 12 Dec 2023 13:45:55 -0800 Subject: [PATCH 2/4] Consuming Kinesis event with Lambda using Rust without batch item handling. --- integration-kinesis-to-lambda/main.rs | 49 +++++++++++++++++++ .../snippet-data.json | 11 +++++ 2 files changed, 60 insertions(+) create mode 100644 integration-kinesis-to-lambda/main.rs diff --git a/integration-kinesis-to-lambda/main.rs b/integration-kinesis-to-lambda/main.rs new file mode 100644 index 0000000..7312641 --- /dev/null +++ b/integration-kinesis-to-lambda/main.rs @@ -0,0 +1,49 @@ +use aws_lambda_events::event::kinesis::KinesisEvent; +use lambda_runtime::{run, service_fn, Error, LambdaEvent}; + +static UNDEFINED: &str = "undefined"; + +async fn function_handler(event: LambdaEvent) -> Result<(), Error> { + + if event.payload.records.is_empty() { + tracing::info!("No records found. Exiting."); + return Ok(()); + } + + event.payload.records.iter().for_each(|record| { + + tracing::info!("EventId: {}", record.event_id.as_deref().unwrap_or(UNDEFINED)); + + let record_data = std::str::from_utf8(&record.kinesis.data); + + match record_data { + Ok(data) => { + // log the record data + tracing::info!("Data: {}", data); + } + Err(e) => { + tracing::error!("Error: {}", e); + } + } + }); + + tracing::info!( + "Successfully processed {} records", + event.payload.records.len() + ); + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + // disable printing the name of the module in every log line. + .with_target(false) + // disabling time is handy because CloudWatch will add the ingestion time. + .without_time() + .init(); + + run(service_fn(function_handler)).await +} \ No newline at end of file diff --git a/integration-kinesis-to-lambda/snippet-data.json b/integration-kinesis-to-lambda/snippet-data.json index cff00d9..4666449 100644 --- a/integration-kinesis-to-lambda/snippet-data.json +++ b/integration-kinesis-to-lambda/snippet-data.json @@ -74,6 +74,17 @@ "language": "go" } ] + }, + { + "id": "Rust", + "title": "Usage Example with Rust:", + "description": "Consuming Kinesis event with Lambda using Rust without batch item handling.", + "snippets": [ + { + "snippetPath": "main.rs", + "language": "rust" + } + ] } ] } From b5fb89ed38cd92a6f244e7574fc621916b71c950 Mon Sep 17 00:00:00 2001 From: Siarhei Kazhura Date: Tue, 12 Dec 2023 16:25:26 -0800 Subject: [PATCH 3/4] Consuming Kinesis event with Lambda using Rust with batch item handling. --- .../main.rs | 74 +++++++++++++++++++ .../snippet-data.json | 11 +++ 2 files changed, 85 insertions(+) create mode 100644 integration-kinesis-to-lambda-with-batch-item-handling/main.rs diff --git a/integration-kinesis-to-lambda-with-batch-item-handling/main.rs b/integration-kinesis-to-lambda-with-batch-item-handling/main.rs new file mode 100644 index 0000000..b13bbbe --- /dev/null +++ b/integration-kinesis-to-lambda-with-batch-item-handling/main.rs @@ -0,0 +1,74 @@ +use aws_lambda_events::{ + event::kinesis::KinesisEvent, + kinesis::KinesisEventRecord, + streams::{KinesisBatchItemFailure, KinesisEventResponse}, +}; +use lambda_runtime::{run, service_fn, Error, LambdaEvent}; + +static UNDEFINED: &str = "undefined"; + +async fn function_handler(event: LambdaEvent) -> Result { + let mut response = KinesisEventResponse { + batch_item_failures: vec![], + }; + + if event.payload.records.is_empty() { + tracing::info!("No records found. Exiting."); + return Ok(response); + } + + for record in &event.payload.records { + tracing::info!( + "EventId: {}", + record.event_id.as_deref().unwrap_or(UNDEFINED) + ); + + let record_processing_result = process_record(record); + + if record_processing_result.is_err() { + response.batch_item_failures.push(KinesisBatchItemFailure { + item_identifier: record.kinesis.sequence_number.clone(), + }); + /* Since we are working with streams, we can return the failed item immediately. + Lambda will immediately begin to retry processing from this failed item onwards. */ + return Ok(response); + } + } + + tracing::info!( + "Successfully processed {} records", + event.payload.records.len() + ); + + Ok(response) +} + +fn process_record(record: &KinesisEventRecord) -> Result<(), Error> { + let record_data = std::str::from_utf8(record.kinesis.data.as_slice()); + + if record_data.is_err() { + let err = record_data.err().unwrap(); + tracing::error!("Error: {}", err); + return Err(Error::from(err)); + } + + let record_data = record_data.unwrap(); + + // do something interesting with the data + tracing::info!("Data: {}", record_data); + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + // disable printing the name of the module in every log line. + .with_target(false) + // disabling time is handy because CloudWatch will add the ingestion time. + .without_time() + .init(); + + run(service_fn(function_handler)).await +} diff --git a/integration-kinesis-to-lambda-with-batch-item-handling/snippet-data.json b/integration-kinesis-to-lambda-with-batch-item-handling/snippet-data.json index 91a76ab..1e68eac 100644 --- a/integration-kinesis-to-lambda-with-batch-item-handling/snippet-data.json +++ b/integration-kinesis-to-lambda-with-batch-item-handling/snippet-data.json @@ -74,6 +74,17 @@ "language": "dotnet" } ] + }, + { + "id": "Rust", + "title": "Usage Example with Rust:", + "description": "Consuming Kinesis event with Lambda using Rust with batch item handling.", + "snippets": [ + { + "snippetPath": "main.rs", + "language": "rust" + } + ] } ] } From 8b44853ccc342fba7c0dc4053c627dd99574a70b Mon Sep 17 00:00:00 2001 From: Siarhei Kazhura Date: Mon, 8 Jan 2024 15:04:39 -0800 Subject: [PATCH 4/4] code review related fixes --- .../main.rs | 11 ++++------- integration-kinesis-to-lambda/main.rs | 6 +----- integration-sqs-to-lambda/main.rs | 5 +---- 3 files changed, 6 insertions(+), 16 deletions(-) diff --git a/integration-kinesis-to-lambda-with-batch-item-handling/main.rs b/integration-kinesis-to-lambda-with-batch-item-handling/main.rs index b13bbbe..085c05b 100644 --- a/integration-kinesis-to-lambda-with-batch-item-handling/main.rs +++ b/integration-kinesis-to-lambda-with-batch-item-handling/main.rs @@ -5,8 +5,6 @@ use aws_lambda_events::{ }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; -static UNDEFINED: &str = "undefined"; - async fn function_handler(event: LambdaEvent) -> Result { let mut response = KinesisEventResponse { batch_item_failures: vec![], @@ -20,7 +18,7 @@ async fn function_handler(event: LambdaEvent) -> Result) -> Result Result<(), Error> { let record_data = std::str::from_utf8(record.kinesis.data.as_slice()); - if record_data.is_err() { - let err = record_data.err().unwrap(); + if let Some(err) = record_data.err() { tracing::error!("Error: {}", err); return Err(Error::from(err)); } - let record_data = record_data.unwrap(); + let record_data = record_data.unwrap_or_default(); // do something interesting with the data tracing::info!("Data: {}", record_data); @@ -71,4 +68,4 @@ async fn main() -> Result<(), Error> { .init(); run(service_fn(function_handler)).await -} +} \ No newline at end of file diff --git a/integration-kinesis-to-lambda/main.rs b/integration-kinesis-to-lambda/main.rs index 7312641..58abb9b 100644 --- a/integration-kinesis-to-lambda/main.rs +++ b/integration-kinesis-to-lambda/main.rs @@ -1,18 +1,14 @@ use aws_lambda_events::event::kinesis::KinesisEvent; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; -static UNDEFINED: &str = "undefined"; - async fn function_handler(event: LambdaEvent) -> Result<(), Error> { - if event.payload.records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(()); } event.payload.records.iter().for_each(|record| { - - tracing::info!("EventId: {}", record.event_id.as_deref().unwrap_or(UNDEFINED)); + tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default()); let record_data = std::str::from_utf8(&record.kinesis.data); diff --git a/integration-sqs-to-lambda/main.rs b/integration-sqs-to-lambda/main.rs index f43d0c3..534f442 100644 --- a/integration-sqs-to-lambda/main.rs +++ b/integration-sqs-to-lambda/main.rs @@ -1,13 +1,10 @@ use aws_lambda_events::event::sqs::SqsEvent; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; -static UNDEFINED: &str = "undefined"; - async fn function_handler(event: LambdaEvent) -> Result<(), Error> { - event.payload.records.iter().for_each(|record| { // process the record - tracing::info!("Message body: {}", record.body.as_deref().unwrap_or(UNDEFINED)) + tracing::info!("Message body: {}", record.body.as_deref().unwrap_or_default()) }); Ok(())