Skip to content

Commit 9acd9d1

Browse files
authored
Merge branch 'main' into apigateway-websocket
2 parents 2da54a0 + de51f68 commit 9acd9d1

9 files changed

+229
-12
lines changed

events/README_S3_Object_Lambda.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
)
2222

2323
func handler(ctx context.Context, event events.S3ObjectLambdaEvent) error {
24-
url := event.GetObjectContext.InputS3Url
24+
url := event.GetObjectContext.InputS3URL
2525
resp, err := http.Get(url)
2626
if err != nil {
2727
return err

events/s3_batch_job.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,28 @@ type S3BatchJobTask struct {
2323
S3BucketARN string `json:"s3BucketArn"`
2424
}
2525

26+
// S3BatchJobEventV2 encapsulates the detail of a s3 batch job
27+
type S3BatchJobEventV2 struct {
28+
InvocationSchemaVersion string `json:"invocationSchemaVersion"`
29+
InvocationID string `json:"invocationId"`
30+
Job S3BatchJobV2 `json:"job"`
31+
Tasks []S3BatchJobTaskV2 `json:"tasks"`
32+
}
33+
34+
// S3BatchJobV2 whichs have the job id
35+
type S3BatchJobV2 struct {
36+
ID string `json:"id"`
37+
UserArguments map[string]string `json:"userArguments"`
38+
}
39+
40+
// S3BatchJobTaskV2 represents one task in the s3 batch job and have all task details
41+
type S3BatchJobTaskV2 struct {
42+
TaskID string `json:"taskId"`
43+
S3Key string `json:"s3Key"`
44+
S3VersionID string `json:"s3VersionId"`
45+
S3Bucket string `json:"s3Bucket"`
46+
}
47+
2648
// S3BatchJobResponse is the response of a iven s3 batch job with the results
2749
type S3BatchJobResponse struct {
2850
InvocationSchemaVersion string `json:"invocationSchemaVersion"`

events/s3_batch_job_test.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
func TestS3BatchJobEventMarshaling(t *testing.T) {
1414

1515
// 1. read JSON from file
16-
inputJSON := test.ReadJSONFromFile(t, "./testdata/s3-batch-job-event-request.json")
16+
inputJSON := test.ReadJSONFromFile(t, "./testdata/s3-batch-job-event-request-1.0.json")
1717

1818
// 2. de-serialize into Go object
1919
var inputEvent S3BatchJobEvent
@@ -31,6 +31,26 @@ func TestS3BatchJobEventMarshaling(t *testing.T) {
3131
assert.JSONEq(t, string(inputJSON), string(outputJSON))
3232
}
3333

34+
func TestS3BatchJobEventV2Marshaling(t *testing.T) {
35+
// 1. read JSON from file
36+
inputJSON := test.ReadJSONFromFile(t, "./testdata/s3-batch-job-event-request-2.0.json")
37+
38+
// 2. de-serialize into Go object
39+
var inputEvent S3BatchJobEventV2
40+
if err := json.Unmarshal(inputJSON, &inputEvent); err != nil {
41+
t.Errorf("could not unmarshal event. details: %v", err)
42+
}
43+
44+
// 3. serialize to JSON
45+
outputJSON, err := json.Marshal(inputEvent)
46+
if err != nil {
47+
t.Errorf("could not marshal event. details: %v", err)
48+
}
49+
50+
// 4. check result
51+
assert.JSONEq(t, string(inputJSON), string(outputJSON))
52+
}
53+
3454
func TestS3BatchJobResponseMarshaling(t *testing.T) {
3555

3656
// 1. read JSON from file

events/testdata/s3-batch-job-event-request.json renamed to events/testdata/s3-batch-job-event-request-1.0.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@
1212
"s3BucketArn": "arn:aws:s3:us-east-1:0123456788:awsexamplebucket"
1313
}
1414
]
15-
}
15+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"invocationSchemaVersion": "2.0",
3+
"invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo",
4+
"job": {
5+
"id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce",
6+
"userArguments": {
7+
"k1": "v1",
8+
"k2": "v2"
9+
}
10+
},
11+
"tasks": [
12+
{
13+
"taskId": "dGFza2lkZ29lc2hlcmUK",
14+
"s3Key": "customerImage1.jpg",
15+
"s3VersionId": "jbo9_jhdPEyB4RrmOxWS0kU0EoNrU_oI",
16+
"s3Bucket": "awsexamplebucket"
17+
}
18+
]
19+
}

lambda/invoke_loop.go

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,13 @@ func handleInvoke(invoke *invoke, handler *handlerOptions) error {
104104
func reportFailure(invoke *invoke, invokeErr *messages.InvokeResponse_Error) error {
105105
errorPayload := safeMarshal(invokeErr)
106106
log.Printf("%s", errorPayload)
107-
if err := invoke.failure(bytes.NewReader(errorPayload), contentTypeJSON); err != nil {
107+
108+
causeForXRay, err := json.Marshal(makeXRayError(invokeErr))
109+
if err != nil {
110+
return fmt.Errorf("unexpected error occured when serializing the function error cause for X-Ray: %v", err)
111+
}
112+
113+
if err := invoke.failure(bytes.NewReader(errorPayload), contentTypeJSON, causeForXRay); err != nil {
108114
return fmt.Errorf("unexpected error occurred when sending the function error to the API: %v", err)
109115
}
110116
return nil
@@ -166,3 +172,41 @@ func safeMarshal(v interface{}) []byte {
166172
}
167173
return payload
168174
}
175+
176+
type xrayException struct {
177+
Type string `json:"type"`
178+
Message string `json:"message"`
179+
Stack []*messages.InvokeResponse_Error_StackFrame `json:"stack"`
180+
}
181+
182+
type xrayError struct {
183+
WorkingDirectory string `json:"working_directory"`
184+
Exceptions []xrayException `json:"exceptions"`
185+
Paths []string `json:"paths"`
186+
}
187+
188+
func makeXRayError(invokeResponseError *messages.InvokeResponse_Error) *xrayError {
189+
paths := make([]string, 0, len(invokeResponseError.StackTrace))
190+
visitedPaths := make(map[string]struct{}, len(invokeResponseError.StackTrace))
191+
for _, frame := range invokeResponseError.StackTrace {
192+
if _, exists := visitedPaths[frame.Path]; !exists {
193+
visitedPaths[frame.Path] = struct{}{}
194+
paths = append(paths, frame.Path)
195+
}
196+
}
197+
198+
cwd, _ := os.Getwd()
199+
exceptions := []xrayException{{
200+
Type: invokeResponseError.Type,
201+
Message: invokeResponseError.Message,
202+
Stack: invokeResponseError.StackTrace,
203+
}}
204+
if exceptions[0].Stack == nil {
205+
exceptions[0].Stack = []*messages.InvokeResponse_Error_StackFrame{}
206+
}
207+
return &xrayError{
208+
WorkingDirectory: cwd,
209+
Paths: paths,
210+
Exceptions: exceptions,
211+
}
212+
}

lambda/invoke_loop_test.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,110 @@ func TestCustomErrorMarshaling(t *testing.T) {
9090
}
9191
}
9292

93+
func TestXRayCausePlumbing(t *testing.T) {
94+
errors := []error{
95+
errors.New("barf"),
96+
messages.InvokeResponse_Error{
97+
Type: "yoloError",
98+
Message: "hello yolo",
99+
StackTrace: []*messages.InvokeResponse_Error_StackFrame{
100+
{Label: "yolo", Path: "yolo", Line: 2},
101+
{Label: "hi", Path: "hello/hello", Line: 12},
102+
},
103+
},
104+
messages.InvokeResponse_Error{
105+
Type: "yoloError",
106+
Message: "hello yolo",
107+
StackTrace: []*messages.InvokeResponse_Error_StackFrame{
108+
{Label: "hi", Path: "hello/hello", Line: 12},
109+
{Label: "hihi", Path: "hello/hello", Line: 13},
110+
{Label: "yolo", Path: "yolo", Line: 2},
111+
{Label: "hi", Path: "hello/hello", Line: 14},
112+
},
113+
},
114+
messages.InvokeResponse_Error{
115+
Type: "yoloError",
116+
Message: "hello yolo",
117+
StackTrace: []*messages.InvokeResponse_Error_StackFrame{},
118+
},
119+
messages.InvokeResponse_Error{
120+
Type: "yoloError",
121+
Message: "hello yolo",
122+
},
123+
}
124+
wd, _ := os.Getwd()
125+
expected := []string{
126+
`{
127+
"working_directory":"` + wd + `",
128+
"paths": [],
129+
"exceptions": [{
130+
"type": "errorString",
131+
"message": "barf",
132+
"stack": []
133+
}]
134+
}`,
135+
`{
136+
"working_directory":"` + wd + `",
137+
"paths": ["yolo", "hello/hello"],
138+
"exceptions": [{
139+
"type": "yoloError",
140+
"message": "hello yolo",
141+
"stack": [
142+
{"label": "yolo", "path": "yolo", "line": 2},
143+
{"label": "hi", "path": "hello/hello", "line": 12}
144+
]
145+
}]
146+
}`,
147+
`{
148+
"working_directory":"` + wd + `",
149+
"paths": ["hello/hello", "yolo"],
150+
"exceptions": [{
151+
"type": "yoloError",
152+
"message": "hello yolo",
153+
"stack": [
154+
{"label": "hi", "path": "hello/hello", "line": 12},
155+
{"label": "hihi", "path": "hello/hello", "line": 13},
156+
{"label": "yolo", "path": "yolo", "line": 2},
157+
{"label": "hi", "path": "hello/hello", "line": 14}
158+
]
159+
}]
160+
}`,
161+
`{
162+
"working_directory":"` + wd + `",
163+
"paths": [],
164+
"exceptions": [{
165+
"type": "yoloError",
166+
"message": "hello yolo",
167+
"stack": []
168+
}]
169+
}`,
170+
`{
171+
"working_directory":"` + wd + `",
172+
"paths": [],
173+
"exceptions": [{
174+
"type": "yoloError",
175+
"message": "hello yolo",
176+
"stack": []
177+
}]
178+
}`,
179+
}
180+
require.Equal(t, len(errors), len(expected))
181+
ts, record := runtimeAPIServer(``, len(errors))
182+
defer ts.Close()
183+
n := 0
184+
handler := NewHandler(func() error {
185+
defer func() { n++ }()
186+
return errors[n]
187+
})
188+
endpoint := strings.Split(ts.URL, "://")[1]
189+
expectedError := fmt.Sprintf("failed to GET http://%s/2018-06-01/runtime/invocation/next: got unexpected status code: 410", endpoint)
190+
assert.EqualError(t, startRuntimeAPILoop(endpoint, handler), expectedError)
191+
for i := range errors {
192+
assert.JSONEq(t, expected[i], string(record.xrayCauses[i]))
193+
}
194+
195+
}
196+
93197
func TestRuntimeAPIContextPlumbing(t *testing.T) {
94198
handler := NewHandler(func(ctx context.Context) (interface{}, error) {
95199
lc, _ := lambdacontext.FromContext(ctx)
@@ -271,6 +375,7 @@ type requestRecord struct {
271375
nPosts int
272376
responses [][]byte
273377
contentTypes []string
378+
xrayCauses []string
274379
}
275380

276381
type eventMetadata struct {
@@ -336,6 +441,7 @@ func runtimeAPIServer(eventPayload string, failAfter int, overrides ...eventMeta
336441
w.WriteHeader(http.StatusAccepted)
337442
record.responses = append(record.responses, response.Bytes())
338443
record.contentTypes = append(record.contentTypes, r.Header.Get("Content-Type"))
444+
record.xrayCauses = append(record.xrayCauses, r.Header.Get(headerXRayErrorCause))
339445
default:
340446
w.WriteHeader(http.StatusBadRequest)
341447
}

lambda/runtime_api_client.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@ const (
2222
headerCognitoIdentity = "Lambda-Runtime-Cognito-Identity"
2323
headerClientContext = "Lambda-Runtime-Client-Context"
2424
headerInvokedFunctionARN = "Lambda-Runtime-Invoked-Function-Arn"
25+
headerXRayErrorCause = "Lambda-Runtime-Function-Xray-Error-Cause"
2526
trailerLambdaErrorType = "Lambda-Runtime-Function-Error-Type"
2627
trailerLambdaErrorBody = "Lambda-Runtime-Function-Error-Body"
2728
contentTypeJSON = "application/json"
2829
contentTypeBytes = "application/octet-stream"
2930
apiVersion = "2018-06-01"
31+
xrayErrorCauseMaxSize = 1024 * 1024
3032
)
3133

3234
type runtimeAPIClient struct {
@@ -57,17 +59,17 @@ type invoke struct {
5759
// - An invoke is not complete until next() is called again!
5860
func (i *invoke) success(body io.Reader, contentType string) error {
5961
url := i.client.baseURL + i.id + "/response"
60-
return i.client.post(url, body, contentType)
62+
return i.client.post(url, body, contentType, nil)
6163
}
6264

6365
// failure sends the payload to the Runtime API. This marks the function's invoke as a failure.
6466
// Notes:
6567
// - The execution of the function process continues, and is billed, until next() is called again!
6668
// - A Lambda Function continues to be re-used for future invokes even after a failure.
6769
// If the error is fatal (panic, unrecoverable state), exit the process immediately after calling failure()
68-
func (i *invoke) failure(body io.Reader, contentType string) error {
70+
func (i *invoke) failure(body io.Reader, contentType string, causeForXRay []byte) error {
6971
url := i.client.baseURL + i.id + "/error"
70-
return i.client.post(url, body, contentType)
72+
return i.client.post(url, body, contentType, causeForXRay)
7173
}
7274

7375
// next connects to the Runtime API and waits for a new invoke Request to be available.
@@ -108,7 +110,7 @@ func (c *runtimeAPIClient) next() (*invoke, error) {
108110
}, nil
109111
}
110112

111-
func (c *runtimeAPIClient) post(url string, body io.Reader, contentType string) error {
113+
func (c *runtimeAPIClient) post(url string, body io.Reader, contentType string, xrayErrorCause []byte) error {
112114
b := newErrorCapturingReader(body)
113115
req, err := http.NewRequest(http.MethodPost, url, b)
114116
if err != nil {
@@ -118,6 +120,10 @@ func (c *runtimeAPIClient) post(url string, body io.Reader, contentType string)
118120
req.Header.Set("User-Agent", c.userAgent)
119121
req.Header.Set("Content-Type", contentType)
120122

123+
if xrayErrorCause != nil && len(xrayErrorCause) < xrayErrorCauseMaxSize {
124+
req.Header.Set(headerXRayErrorCause, string(xrayErrorCause))
125+
}
126+
121127
resp, err := c.httpClient.Do(req)
122128
if err != nil {
123129
return fmt.Errorf("failed to POST to %s: %v", url, err)

lambda/runtime_api_client_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func TestClientDoneAndError(t *testing.T) {
9292
assert.NoError(t, err)
9393
})
9494
t.Run(fmt.Sprintf("happy Error with payload[%d]", i), func(t *testing.T) {
95-
err := invoke.failure(bytes.NewReader(payload), contentTypeJSON)
95+
err := invoke.failure(bytes.NewReader(payload), contentTypeJSON, nil)
9696
assert.NoError(t, err)
9797
})
9898
}
@@ -105,7 +105,7 @@ func TestInvalidRequestsForMalformedEndpoint(t *testing.T) {
105105
require.Error(t, err)
106106
err = (&invoke{client: newRuntimeAPIClient("🚨")}).success(nil, "")
107107
require.Error(t, err)
108-
err = (&invoke{client: newRuntimeAPIClient("🚨")}).failure(nil, "")
108+
err = (&invoke{client: newRuntimeAPIClient("🚨")}).failure(nil, "", nil)
109109
require.Error(t, err)
110110
}
111111

@@ -145,7 +145,7 @@ func TestStatusCodes(t *testing.T) {
145145
require.NoError(t, err)
146146
})
147147
t.Run("failure should not error", func(t *testing.T) {
148-
err := invoke.failure(nil, "")
148+
err := invoke.failure(nil, "", nil)
149149
require.NoError(t, err)
150150
})
151151
} else {
@@ -158,7 +158,7 @@ func TestStatusCodes(t *testing.T) {
158158
}
159159
})
160160
t.Run("failure should error", func(t *testing.T) {
161-
err := invoke.failure(nil, "")
161+
err := invoke.failure(nil, "", nil)
162162
require.Error(t, err)
163163
if i != 301 && i != 302 && i != 303 {
164164
assert.Contains(t, err.Error(), "unexpected status code")

0 commit comments

Comments
 (0)