File tree Expand file tree Collapse file tree 1 file changed +29
-0
lines changed
integration-kinesis-to-lambda-with-batch-item-handling Expand file tree Collapse file tree 1 file changed +29
-0
lines changed Original file line number Diff line number Diff line change
1
+ require 'aws-sdk'
2
+
3
+ def lambda_handler ( event :, context :)
4
+ batch_item_failures = [ ]
5
+
6
+ event [ 'Records' ] . each do |record |
7
+ begin
8
+ puts "Processed Kinesis Event - EventID: #{ record [ 'eventID' ] } "
9
+ record_data = get_record_data_async ( record [ 'kinesis' ] )
10
+ puts "Record Data: #{ record_data } "
11
+ # TODO: Do interesting work based on the new data
12
+ rescue StandardError => err
13
+ puts "An error occurred #{ err } "
14
+ # Since we are working with streams, we can return the failed item immediately.
15
+ # Lambda will immediately begin to retry processing from this failed item onwards.
16
+ return { batchItemFailures : [ { itemIdentifier : record [ 'kinesis' ] [ 'sequenceNumber' ] } ] }
17
+ end
18
+ end
19
+
20
+ puts "Successfully processed #{ event [ 'Records' ] . length } records."
21
+ { batchItemFailures : batch_item_failures }
22
+ end
23
+
24
+ def get_record_data_async ( payload )
25
+ data = Base64 . decode64 ( payload [ 'data' ] ) . force_encoding ( 'utf-8' )
26
+ # Placeholder for actual async work
27
+ sleep ( 1 )
28
+ data
29
+ end
You can’t perform that action at this time.
0 commit comments