Skip to content

Commit 8bffa42

Browse files
committed
Consuming Kinesis event with Lambda using Rust without batch item handling.
1 parent e59b81b commit 8bffa42

File tree

2 files changed

+60
-0
lines changed

2 files changed

+60
-0
lines changed

integration-kinesis-to-lambda/main.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
use aws_lambda_events::event::kinesis::KinesisEvent;
2+
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
3+
4+
static UNDEFINED: &str = "undefined";
5+
6+
async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> {
7+
8+
if event.payload.records.is_empty() {
9+
tracing::info!("No records found. Exiting.");
10+
return Ok(());
11+
}
12+
13+
event.payload.records.iter().for_each(|record| {
14+
15+
tracing::info!("EventId: {}", record.event_id.as_deref().unwrap_or(UNDEFINED));
16+
17+
let record_data = std::str::from_utf8(&record.kinesis.data);
18+
19+
match record_data {
20+
Ok(data) => {
21+
// log the record data
22+
tracing::info!("Data: {}", data);
23+
}
24+
Err(e) => {
25+
tracing::error!("Error: {}", e);
26+
}
27+
}
28+
});
29+
30+
tracing::info!(
31+
"Successfully processed {} records",
32+
event.payload.records.len()
33+
);
34+
35+
Ok(())
36+
}
37+
38+
#[tokio::main]
39+
async fn main() -> Result<(), Error> {
40+
tracing_subscriber::fmt()
41+
.with_max_level(tracing::Level::INFO)
42+
// disable printing the name of the module in every log line.
43+
.with_target(false)
44+
// disabling time is handy because CloudWatch will add the ingestion time.
45+
.without_time()
46+
.init();
47+
48+
run(service_fn(function_handler)).await
49+
}

integration-kinesis-to-lambda/snippet-data.json

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,17 @@
7474
"language": "go"
7575
}
7676
]
77+
},
78+
{
79+
"id": "Rust",
80+
"title": "Usage Example with Rust:",
81+
"description": "Consuming Kinesis event with Lambda using Rust without batch item handling.",
82+
"snippets": [
83+
{
84+
"snippetPath": "main.rs",
85+
"language": "rust"
86+
}
87+
]
7788
}
7889
]
7990
}

0 commit comments

Comments
 (0)