Skip to content

Commit b5fb89e

Browse files
committed
Consuming Kinesis event with Lambda using Rust with batch item handling.
1 parent 8bffa42 commit b5fb89e

File tree

2 files changed

+85
-0
lines changed

2 files changed

+85
-0
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
use aws_lambda_events::{
2+
event::kinesis::KinesisEvent,
3+
kinesis::KinesisEventRecord,
4+
streams::{KinesisBatchItemFailure, KinesisEventResponse},
5+
};
6+
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
7+
8+
static UNDEFINED: &str = "undefined";
9+
10+
async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<KinesisEventResponse, Error> {
11+
let mut response = KinesisEventResponse {
12+
batch_item_failures: vec![],
13+
};
14+
15+
if event.payload.records.is_empty() {
16+
tracing::info!("No records found. Exiting.");
17+
return Ok(response);
18+
}
19+
20+
for record in &event.payload.records {
21+
tracing::info!(
22+
"EventId: {}",
23+
record.event_id.as_deref().unwrap_or(UNDEFINED)
24+
);
25+
26+
let record_processing_result = process_record(record);
27+
28+
if record_processing_result.is_err() {
29+
response.batch_item_failures.push(KinesisBatchItemFailure {
30+
item_identifier: record.kinesis.sequence_number.clone(),
31+
});
32+
/* Since we are working with streams, we can return the failed item immediately.
33+
Lambda will immediately begin to retry processing from this failed item onwards. */
34+
return Ok(response);
35+
}
36+
}
37+
38+
tracing::info!(
39+
"Successfully processed {} records",
40+
event.payload.records.len()
41+
);
42+
43+
Ok(response)
44+
}
45+
46+
fn process_record(record: &KinesisEventRecord) -> Result<(), Error> {
47+
let record_data = std::str::from_utf8(record.kinesis.data.as_slice());
48+
49+
if record_data.is_err() {
50+
let err = record_data.err().unwrap();
51+
tracing::error!("Error: {}", err);
52+
return Err(Error::from(err));
53+
}
54+
55+
let record_data = record_data.unwrap();
56+
57+
// do something interesting with the data
58+
tracing::info!("Data: {}", record_data);
59+
60+
Ok(())
61+
}
62+
63+
#[tokio::main]
64+
async fn main() -> Result<(), Error> {
65+
tracing_subscriber::fmt()
66+
.with_max_level(tracing::Level::INFO)
67+
// disable printing the name of the module in every log line.
68+
.with_target(false)
69+
// disabling time is handy because CloudWatch will add the ingestion time.
70+
.without_time()
71+
.init();
72+
73+
run(service_fn(function_handler)).await
74+
}

integration-kinesis-to-lambda-with-batch-item-handling/snippet-data.json

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,17 @@
7474
"language": "dotnet"
7575
}
7676
]
77+
},
78+
{
79+
"id": "Rust",
80+
"title": "Usage Example with Rust:",
81+
"description": "Consuming Kinesis event with Lambda using Rust with batch item handling.",
82+
"snippets": [
83+
{
84+
"snippetPath": "main.rs",
85+
"language": "rust"
86+
}
87+
]
7788
}
7889
]
7990
}

0 commit comments

Comments
 (0)