From cbb069bf04db63a7e8d77477f64f78af3aae56eb Mon Sep 17 00:00:00 2001 From: Rupayan Ghosh Date: Mon, 9 Jan 2023 11:29:02 +0000 Subject: [PATCH 1/4] emit multi-line logs with timestamps --- awslambdaric/bootstrap.py | 17 +++++++++-------- tests/test_bootstrap.py | 20 +++++++++++++++++--- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/awslambdaric/bootstrap.py b/awslambdaric/bootstrap.py index 3794e81..f304bea 100644 --- a/awslambdaric/bootstrap.py +++ b/awslambdaric/bootstrap.py @@ -312,19 +312,19 @@ class FramedTelemetryLogSink(object): framing protocol so message boundaries can be determined. Each frame can be visualized as follows:
     {@code
-    +----------------------+------------------------+-----------------------+
-    | Frame Type - 4 bytes | Length (len) - 4 bytes | Message - 'len' bytes |
-    +----------------------+------------------------+-----------------------+
+    +----------------------+------------------------+---------------------+-----------------------+
+    | Frame Type - 4 bytes | Length (len) - 4 bytes | Timestamp - 8 bytes | Message - 'len' bytes |
+    +----------------------+------------------------+---------------------+-----------------------+
     }
     
- The first 4 bytes indicate the type of the frame - log frames have a type defined as the hex value 0xa55a0001. The - second 4 bytes should indicate the message's length. The next 'len' bytes contain the message. The byte order is - big-endian. + The first 4 bytes indicate the type of the frame - log frames have a type defined as the hex value 0xa55a0003. The + second 4 bytes should indicate the message's length. The next 8 bytes should indicate the timestamp of the message. + The next 'len' bytes contain the message. The byte order is big-endian. """ def __init__(self, fd): self.fd = int(fd) - self.frame_type = 0xA55A0001.to_bytes(4, "big") + self.frame_type = 0xA55A0003.to_bytes(4, "big") def __enter__(self): self.file = os.fdopen(self.fd, "wb", 0) @@ -335,7 +335,8 @@ def __exit__(self, exc_type, exc_value, exc_tb): def log(self, msg): encoded_msg = msg.encode("utf8") - log_msg = self.frame_type + len(encoded_msg).to_bytes(4, "big") + encoded_msg + timestamp = int(time.time_ns() / 1000) # UNIX timestamp in microseconds + log_msg = self.frame_type + len(encoded_msg).to_bytes(4, "big") + timestamp.to_bytes(8, "big") + encoded_msg self.file.write(log_msg) def log_error(self, message_lines): diff --git a/tests/test_bootstrap.py b/tests/test_bootstrap.py index 777625e..db680bf 100644 --- a/tests/test_bootstrap.py +++ b/tests/test_bootstrap.py @@ -7,6 +7,7 @@ import os import re import tempfile +import time import traceback import unittest from io import StringIO @@ -1090,20 +1091,26 @@ def test_create_framed_telemetry_log_sinks(self): def test_single_frame(self): with NamedTemporaryFile() as temp_file: message = "hello world\nsomething on a new line!\n" + before = int(time.time_ns() / 1000) with bootstrap.FramedTelemetryLogSink( os.open(temp_file.name, os.O_CREAT | os.O_RDWR) ) as ls: ls.log(message) + after = int(time.time_ns() / 1000) with open(temp_file.name, "rb") as f: content = f.read() frame_type = int.from_bytes(content[:4], "big") - self.assertEqual(frame_type, 0xA55A0001) + self.assertEqual(frame_type, 0xA55A0003) length = int.from_bytes(content[4:8], "big") self.assertEqual(length, len(message)) - actual_message = content[8:].decode() + timestamp = int.from_bytes(content[8:16], "big") + self.assertTrue(before <= timestamp) + self.assertTrue(timestamp <= after) + + actual_message = content[16:].decode() self.assertEqual(actual_message, message) def test_multiple_frame(self): @@ -1111,24 +1118,31 @@ def test_multiple_frame(self): first_message = "hello world\nsomething on a new line!" second_message = "hello again\nhere's another message\n" + before = int(time.time_ns() / 1000) with bootstrap.FramedTelemetryLogSink( os.open(temp_file.name, os.O_CREAT | os.O_RDWR) ) as ls: ls.log(first_message) ls.log(second_message) + after = int(time.time_ns() / 1000) with open(temp_file.name, "rb") as f: content = f.read() pos = 0 for message in [first_message, second_message]: frame_type = int.from_bytes(content[pos : pos + 4], "big") - self.assertEqual(frame_type, 0xA55A0001) + self.assertEqual(frame_type, 0xA55A0003) pos += 4 length = int.from_bytes(content[pos : pos + 4], "big") self.assertEqual(length, len(message)) pos += 4 + timestamp = int.from_bytes(content[pos : pos + 8], "big") + self.assertTrue(before <= timestamp) + self.assertTrue(timestamp <= after) + pos += 8 + actual_message = content[pos : pos + len(message)].decode() self.assertEqual(actual_message, message) pos += len(message) From 4b74229ee236b571d78cfca4e75e7e00cfb0e1ee Mon Sep 17 00:00:00 2001 From: Rupayan Ghosh Date: Wed, 18 Jan 2023 12:24:09 +0000 Subject: [PATCH 2/4] black formatting --- awslambdaric/bootstrap.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/awslambdaric/bootstrap.py b/awslambdaric/bootstrap.py index f304bea..e7b9e5a 100644 --- a/awslambdaric/bootstrap.py +++ b/awslambdaric/bootstrap.py @@ -335,8 +335,13 @@ def __exit__(self, exc_type, exc_value, exc_tb): def log(self, msg): encoded_msg = msg.encode("utf8") - timestamp = int(time.time_ns() / 1000) # UNIX timestamp in microseconds - log_msg = self.frame_type + len(encoded_msg).to_bytes(4, "big") + timestamp.to_bytes(8, "big") + encoded_msg + timestamp = int(time.time_ns() / 1000) # UNIX timestamp in microseconds + log_msg = ( + self.frame_type + + len(encoded_msg).to_bytes(4, "big") + + timestamp.to_bytes(8, "big") + + encoded_msg + ) self.file.write(log_msg) def log_error(self, message_lines): From dc221a6e2067bf582487523cb56e15f768a6ae02 Mon Sep 17 00:00:00 2001 From: Rupayan Ghosh Date: Wed, 18 Jan 2023 14:06:47 +0000 Subject: [PATCH 3/4] fix tests --- tests/test_bootstrap.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_bootstrap.py b/tests/test_bootstrap.py index db680bf..8262a55 100644 --- a/tests/test_bootstrap.py +++ b/tests/test_bootstrap.py @@ -929,7 +929,7 @@ def test_log_error_framed_log_sink(self): content = f.read() frame_type = int.from_bytes(content[:4], "big") - self.assertEqual(frame_type, 0xA55A0001) + self.assertEqual(frame_type, 0xA55A0003) length = int.from_bytes(content[4:8], "big") self.assertEqual(length, len(expected_logged_error.encode("utf8"))) @@ -969,7 +969,7 @@ def test_log_error_indentation_framed_log_sink(self): content = f.read() frame_type = int.from_bytes(content[:4], "big") - self.assertEqual(frame_type, 0xA55A0001) + self.assertEqual(frame_type, 0xA55A0003) length = int.from_bytes(content[4:8], "big") self.assertEqual(length, len(expected_logged_error.encode("utf8"))) @@ -1006,7 +1006,7 @@ def test_log_error_empty_stacktrace_line_framed_log_sink(self): content = f.read() frame_type = int.from_bytes(content[:4], "big") - self.assertEqual(frame_type, 0xA55A0001) + self.assertEqual(frame_type, 0xA55A0003) length = int.from_bytes(content[4:8], "big") self.assertEqual(length, len(expected_logged_error)) @@ -1037,7 +1037,7 @@ def test_log_error_invokeId_line_framed_log_sink(self): content = f.read() frame_type = int.from_bytes(content[:4], "big") - self.assertEqual(frame_type, 0xA55A0001) + self.assertEqual(frame_type, 0xA55A0003) length = int.from_bytes(content[4:8], "big") self.assertEqual(length, len(expected_logged_error)) From 50fddc4b080200595bbee3b80c79c9309f6e3823 Mon Sep 17 00:00:00 2001 From: Rupayan Ghosh Date: Wed, 18 Jan 2023 15:24:02 +0000 Subject: [PATCH 4/4] read timestamp in tests --- tests/test_bootstrap.py | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/tests/test_bootstrap.py b/tests/test_bootstrap.py index 8262a55..edb0737 100644 --- a/tests/test_bootstrap.py +++ b/tests/test_bootstrap.py @@ -915,11 +915,13 @@ def test_log_error_standard_log_sink(self, mock_stdout): def test_log_error_framed_log_sink(self): with NamedTemporaryFile() as temp_file: + before = int(time.time_ns() / 1000) with bootstrap.FramedTelemetryLogSink( os.open(temp_file.name, os.O_CREAT | os.O_RDWR) ) as log_sink: err_to_log = bootstrap.make_error("Error message", "ErrorType", None) bootstrap.log_error(err_to_log, log_sink) + after = int(time.time_ns() / 1000) expected_logged_error = ( "[ERROR] ErrorType: Error message\nTraceback (most recent call last):" @@ -934,7 +936,11 @@ def test_log_error_framed_log_sink(self): length = int.from_bytes(content[4:8], "big") self.assertEqual(length, len(expected_logged_error.encode("utf8"))) - actual_message = content[8:].decode() + timestamp = int.from_bytes(content[8:16], "big") + self.assertTrue(before <= timestamp) + self.assertTrue(timestamp <= after) + + actual_message = content[16:].decode() self.assertEqual(actual_message, expected_logged_error) @patch("sys.stdout", new_callable=StringIO) @@ -952,6 +958,7 @@ def test_log_error_indentation_standard_log_sink(self, mock_stdout): def test_log_error_indentation_framed_log_sink(self): with NamedTemporaryFile() as temp_file: + before = int(time.time_ns() / 1000) with bootstrap.FramedTelemetryLogSink( os.open(temp_file.name, os.O_CREAT | os.O_RDWR) ) as log_sink: @@ -959,6 +966,7 @@ def test_log_error_indentation_framed_log_sink(self): "Error message", "ErrorType", [" line1 ", " line2 ", " "] ) bootstrap.log_error(err_to_log, log_sink) + after = int(time.time_ns() / 1000) expected_logged_error = ( "[ERROR] ErrorType: Error message\nTraceback (most recent call last):" @@ -974,7 +982,11 @@ def test_log_error_indentation_framed_log_sink(self): length = int.from_bytes(content[4:8], "big") self.assertEqual(length, len(expected_logged_error.encode("utf8"))) - actual_message = content[8:].decode() + timestamp = int.from_bytes(content[8:16], "big") + self.assertTrue(before <= timestamp) + self.assertTrue(timestamp <= after) + + actual_message = content[16:].decode() self.assertEqual(actual_message, expected_logged_error) @patch("sys.stdout", new_callable=StringIO) @@ -989,6 +1001,7 @@ def test_log_error_empty_stacktrace_line_standard_log_sink(self, mock_stdout): def test_log_error_empty_stacktrace_line_framed_log_sink(self): with NamedTemporaryFile() as temp_file: + before = int(time.time_ns() / 1000) with bootstrap.FramedTelemetryLogSink( os.open(temp_file.name, os.O_CREAT | os.O_RDWR) ) as log_sink: @@ -996,6 +1009,7 @@ def test_log_error_empty_stacktrace_line_framed_log_sink(self): "Error message", "ErrorType", ["line1", "", "line2"] ) bootstrap.log_error(err_to_log, log_sink) + after = int(time.time_ns() / 1000) expected_logged_error = ( "[ERROR] ErrorType: Error message\nTraceback " @@ -1011,12 +1025,17 @@ def test_log_error_empty_stacktrace_line_framed_log_sink(self): length = int.from_bytes(content[4:8], "big") self.assertEqual(length, len(expected_logged_error)) - actual_message = content[8:].decode() + timestamp = int.from_bytes(content[8:16], "big") + self.assertTrue(before <= timestamp) + self.assertTrue(timestamp <= after) + + actual_message = content[16:].decode() self.assertEqual(actual_message, expected_logged_error) # Just to ensure we are not logging the requestId from error response, just sending in the response def test_log_error_invokeId_line_framed_log_sink(self): with NamedTemporaryFile() as temp_file: + before = int(time.time_ns() / 1000) with bootstrap.FramedTelemetryLogSink( os.open(temp_file.name, os.O_CREAT | os.O_RDWR) ) as log_sink: @@ -1027,6 +1046,7 @@ def test_log_error_invokeId_line_framed_log_sink(self): "testrequestId", ) bootstrap.log_error(err_to_log, log_sink) + after = int(time.time_ns() / 1000) expected_logged_error = ( "[ERROR] ErrorType: Error message\nTraceback " @@ -1042,7 +1062,11 @@ def test_log_error_invokeId_line_framed_log_sink(self): length = int.from_bytes(content[4:8], "big") self.assertEqual(length, len(expected_logged_error)) - actual_message = content[8:].decode() + timestamp = int.from_bytes(content[8:16], "big") + self.assertTrue(before <= timestamp) + self.assertTrue(timestamp <= after) + + actual_message = content[16:].decode() self.assertEqual(actual_message, expected_logged_error)