diff --git a/integration-kinesis-to-lambda-with-batch-item-handling/main.rs b/integration-kinesis-to-lambda-with-batch-item-handling/main.rs new file mode 100644 index 0000000..085c05b --- /dev/null +++ b/integration-kinesis-to-lambda-with-batch-item-handling/main.rs @@ -0,0 +1,71 @@ +use aws_lambda_events::{ + event::kinesis::KinesisEvent, + kinesis::KinesisEventRecord, + streams::{KinesisBatchItemFailure, KinesisEventResponse}, +}; +use lambda_runtime::{run, service_fn, Error, LambdaEvent}; + +async fn function_handler(event: LambdaEvent) -> Result { + let mut response = KinesisEventResponse { + batch_item_failures: vec![], + }; + + if event.payload.records.is_empty() { + tracing::info!("No records found. Exiting."); + return Ok(response); + } + + for record in &event.payload.records { + tracing::info!( + "EventId: {}", + record.event_id.as_deref().unwrap_or_default() + ); + + let record_processing_result = process_record(record); + + if record_processing_result.is_err() { + response.batch_item_failures.push(KinesisBatchItemFailure { + item_identifier: record.kinesis.sequence_number.clone(), + }); + /* Since we are working with streams, we can return the failed item immediately. + Lambda will immediately begin to retry processing from this failed item onwards. */ + return Ok(response); + } + } + + tracing::info!( + "Successfully processed {} records", + event.payload.records.len() + ); + + Ok(response) +} + +fn process_record(record: &KinesisEventRecord) -> Result<(), Error> { + let record_data = std::str::from_utf8(record.kinesis.data.as_slice()); + + if let Some(err) = record_data.err() { + tracing::error!("Error: {}", err); + return Err(Error::from(err)); + } + + let record_data = record_data.unwrap_or_default(); + + // do something interesting with the data + tracing::info!("Data: {}", record_data); + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + // disable printing the name of the module in every log line. + .with_target(false) + // disabling time is handy because CloudWatch will add the ingestion time. + .without_time() + .init(); + + run(service_fn(function_handler)).await +} \ No newline at end of file diff --git a/integration-kinesis-to-lambda-with-batch-item-handling/snippet-data.json b/integration-kinesis-to-lambda-with-batch-item-handling/snippet-data.json index 91a76ab..1e68eac 100644 --- a/integration-kinesis-to-lambda-with-batch-item-handling/snippet-data.json +++ b/integration-kinesis-to-lambda-with-batch-item-handling/snippet-data.json @@ -74,6 +74,17 @@ "language": "dotnet" } ] + }, + { + "id": "Rust", + "title": "Usage Example with Rust:", + "description": "Consuming Kinesis event with Lambda using Rust with batch item handling.", + "snippets": [ + { + "snippetPath": "main.rs", + "language": "rust" + } + ] } ] } diff --git a/integration-kinesis-to-lambda/main.rs b/integration-kinesis-to-lambda/main.rs new file mode 100644 index 0000000..58abb9b --- /dev/null +++ b/integration-kinesis-to-lambda/main.rs @@ -0,0 +1,45 @@ +use aws_lambda_events::event::kinesis::KinesisEvent; +use lambda_runtime::{run, service_fn, Error, LambdaEvent}; + +async fn function_handler(event: LambdaEvent) -> Result<(), Error> { + if event.payload.records.is_empty() { + tracing::info!("No records found. Exiting."); + return Ok(()); + } + + event.payload.records.iter().for_each(|record| { + tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default()); + + let record_data = std::str::from_utf8(&record.kinesis.data); + + match record_data { + Ok(data) => { + // log the record data + tracing::info!("Data: {}", data); + } + Err(e) => { + tracing::error!("Error: {}", e); + } + } + }); + + tracing::info!( + "Successfully processed {} records", + event.payload.records.len() + ); + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + // disable printing the name of the module in every log line. + .with_target(false) + // disabling time is handy because CloudWatch will add the ingestion time. + .without_time() + .init(); + + run(service_fn(function_handler)).await +} \ No newline at end of file diff --git a/integration-kinesis-to-lambda/snippet-data.json b/integration-kinesis-to-lambda/snippet-data.json index cf24999..ee904c2 100644 --- a/integration-kinesis-to-lambda/snippet-data.json +++ b/integration-kinesis-to-lambda/snippet-data.json @@ -75,6 +75,17 @@ } ] }, + { + "id": "Rust", + "title": "Usage Example with Rust:", + "description": "Consuming Kinesis event with Lambda using Rust without batch item handling.", + "snippets": [ + { + "snippetPath": "main.rs", + "language": "rust" + } + ] + }, { "id": "Python", "title": "Usage Example with Python", diff --git a/integration-sqs-to-lambda/main.rs b/integration-sqs-to-lambda/main.rs new file mode 100644 index 0000000..534f442 --- /dev/null +++ b/integration-sqs-to-lambda/main.rs @@ -0,0 +1,24 @@ +use aws_lambda_events::event::sqs::SqsEvent; +use lambda_runtime::{run, service_fn, Error, LambdaEvent}; + +async fn function_handler(event: LambdaEvent) -> Result<(), Error> { + event.payload.records.iter().for_each(|record| { + // process the record + tracing::info!("Message body: {}", record.body.as_deref().unwrap_or_default()) + }); + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + // disable printing the name of the module in every log line. + .with_target(false) + // disabling time is handy because CloudWatch will add the ingestion time. + .without_time() + .init(); + + run(service_fn(function_handler)).await +} \ No newline at end of file diff --git a/integration-sqs-to-lambda/snippet-data.json b/integration-sqs-to-lambda/snippet-data.json index e56583e..4adf024 100644 --- a/integration-sqs-to-lambda/snippet-data.json +++ b/integration-sqs-to-lambda/snippet-data.json @@ -96,6 +96,17 @@ "language": "go" } ] + }, + { + "id": "Rust", + "title": "Usage Example with Rust:", + "description": "Sample Amazon SQS function code using Rust without batch item handling.", + "snippets": [ + { + "snippetPath": "main.rs", + "language": "rust" + } + ] } ] }