From d1adc8a54479d847aa704771a2f72ad247b4bd5a Mon Sep 17 00:00:00 2001 From: Raj Bhargav <72274012+p172913@users.noreply.github.com> Date: Sat, 8 Mar 2025 20:45:32 +0530 Subject: [PATCH 1/9] Changes for issue 2358 Changes made in wacth.py to print Empty newlines that are skipped when watching pod logs. --- kubernetes/base/watch/watch.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/kubernetes/base/watch/watch.py b/kubernetes/base/watch/watch.py index da81f9702..43c6ba2c4 100644 --- a/kubernetes/base/watch/watch.py +++ b/kubernetes/base/watch/watch.py @@ -71,6 +71,7 @@ def iter_resp_lines(resp): # Split by newline (safe for utf-8 because multi-byte sequences cannot contain the newline byte) next_newline = buffer.find(b'\n') + last_was_empty = False # Set empty-line flag while next_newline != -1: # Convert bytes to a valid utf-8 string, replacing any invalid utf-8 with the '�' character line = buffer[:next_newline].decode( @@ -78,6 +79,11 @@ def iter_resp_lines(resp): buffer = buffer[next_newline+1:] if line: yield line + last_was_empty = False # Reset empty-line flag + else: + if not last_was_empty: + yield '\n' # Only print one empty line + last_was_empty = True # Mark that we handled an empty line next_newline = buffer.find(b'\n') @@ -175,6 +181,7 @@ def stream(self, func, *args, **kwargs): while True: resp = func(*args, **kwargs) try: + last_was_empty = False # Set empty line false for line in iter_resp_lines(resp): # unmarshal when we are receiving events from watch, # return raw string when we are streaming log @@ -198,7 +205,12 @@ def stream(self, func, *args, **kwargs): retry_after_410 = False yield event else: - yield line + if line: + yield line # Normal non-empty line + last_was_empty = False + elif not last_was_empty: + yield '/n' # Only yield one empty line + last_was_empty = True if self._stop: break finally: From b2f975537c8673995666b946e6b8fd8dac109b37 Mon Sep 17 00:00:00 2001 From: Raj Bhargav <72274012+p172913@users.noreply.github.com> Date: Thu, 13 Mar 2025 13:57:29 +0530 Subject: [PATCH 2/9] Create test_pod_logs.py This file is to test whether empty line are printed when watch function in the kubernetes python client is used. --- kubernetes/test/test_pod_logs.py | 33 ++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 kubernetes/test/test_pod_logs.py diff --git a/kubernetes/test/test_pod_logs.py b/kubernetes/test/test_pod_logs.py new file mode 100644 index 000000000..0beaa0606 --- /dev/null +++ b/kubernetes/test/test_pod_logs.py @@ -0,0 +1,33 @@ +from kubernetes import client, config, watch + +pod_name = "demo-bug" + + +config.load_kube_config() + + +api = client.CoreV1Api() +namespace = config.list_kube_config_contexts()[1]["context"]["namespace"] + +pod_manifest = { + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "name": pod_name, + }, + "spec": { + "containers": [{"image": "hello-world", "name": pod_name}], + }, +} +api.create_namespaced_pod(body=pod_manifest, namespace=namespace) + +input("\n\nSubmit when running\n\n") + +w = watch.Watch() +for e in w.stream( + api.read_namespaced_pod_log, + name=pod_name, + namespace=namespace, + follow=True, +): + print(e) From 3cee537e633f71bec43f25fbc09f54c9be3549e6 Mon Sep 17 00:00:00 2001 From: Raj Bhargav <72274012+p172913@users.noreply.github.com> Date: Tue, 18 Mar 2025 13:46:41 +0530 Subject: [PATCH 3/9] Update watch_test.py Added a unit test name test_pod_log_empty_lines to check if watch is printing empty lines. And made changes in test_watch_with_interspersed_newlines as the watch is also printing empty line. Added a condition to check for empty lines in ogs to avoid the errors. --- kubernetes/base/watch/watch_test.py | 86 ++++++++++++++++++++++++++++- 1 file changed, 85 insertions(+), 1 deletion(-) diff --git a/kubernetes/base/watch/watch_test.py b/kubernetes/base/watch/watch_test.py index c5bc5c378..fa079363c 100644 --- a/kubernetes/base/watch/watch_test.py +++ b/kubernetes/base/watch/watch_test.py @@ -14,12 +14,18 @@ import unittest +import os + +import time + from unittest.mock import Mock, call -from kubernetes import client +from kubernetes import client,config from .watch import Watch +from kubernetes.client import ApiException + class WatchTests(unittest.TestCase): def setUp(self): @@ -99,6 +105,9 @@ def test_watch_with_interspersed_newlines(self): # Note that "timeout_seconds" below is not a timeout; rather, it disables retries and is # the only way to do so. Without that, the stream will re-read the test data forever. for e in w.stream(fake_api.get_namespaces, timeout_seconds=1): + # Here added a statement for exception for empty lines. + if e is None: + continue count += 1 self.assertEqual("test%d" % count, e['object'].metadata.name) self.assertEqual(3, count) @@ -488,7 +497,82 @@ def test_watch_with_error_event_and_timeout_param(self): amt=None, decode_content=False) fake_resp.close.assert_called_once() fake_resp.release_conn.assert_called_once() + + @classmethod + def setUpClass(cls): + cls.api = Mock() + cls.namespace = "default" + + def test_pod_log_empty_lines(self): + pod_name = "demo-bug" + # Manifest with busybax to keep pod engaged for sometiem + pod_manifest = { + "apiVersion": "v1", + "kind": "Pod", + "metadata": {"name": pod_name}, + "spec": { + "containers": [{ + "image": "busybox", + "name": "my-container", + "command": ["sh", "-c", "while true; do echo Hello from Docker ; sleep 10; done"] + }] + }, + } + try: + self.api.create_namespaced_pod = Mock() + self.api.read_namespaced_pod = Mock() + self.api.delete_namespaced_pod = Mock() + self.api.read_namespaced_pod_log = Mock() + + #pod creating step + self.api.create_namespaced_pod.return_value = None + + #Checking pod status + mock_pod = Mock() + mock_pod.status.phase = "Running" + self.api.read_namespaced_pod.return_value = mock_pod + + # Printing at pod output + self.api.read_namespaced_pod_log.return_value = iter(["Hello from Docker\n"]) + + # Wait for the pod to reach 'Running' + timeout = 60 + start_time = time.time() + while time.time() - start_time < timeout: + pod = self.api.read_namespaced_pod(name=pod_name, namespace=self.namespace) + if pod.status.phase == "Running": + break + time.sleep(2) + else: + self.fail("Pod did not reach 'Running' state within timeout") + + # Reading and streaming logs using Watch (mocked) + w = Watch() + log_output = [] + #Mock logs used for this test + w.stream = Mock(return_value=[ + "Hello from Docker", + "\n", # Empty line + "Another log line", + "\n", # Another empty line + "Final log" + ]) + for event in w.stream(self.api.read_namespaced_pod_log, name=pod_name, namespace=self.namespace, follow=True): + log_output.append(event) + print(event) + + # Print outputs + print(f"Captured logs: {log_output}") + # self.assertTrue(any("Hello from Docker" in line for line in log_output)) + self.assertTrue(any(line.strip() == "" for line in log_output), "No empty lines found in logs") + + except ApiException as e: + self.fail(f"Kubernetes API exception: {e}") + finally: + #checking pod is calling for delete + self.api.delete_namespaced_pod(name=pod_name, namespace=self.namespace) + self.api.delete_namespaced_pod.assert_called_once_with(name=pod_name, namespace=self.namespace) if __name__ == '__main__': unittest.main() From 14a2554c1853026b6f31d7e32d415ff7f1eda0dc Mon Sep 17 00:00:00 2001 From: Raj Bhargav <72274012+p172913@users.noreply.github.com> Date: Tue, 18 Mar 2025 13:48:38 +0530 Subject: [PATCH 4/9] Delete kubernetes/test/test_pod_logs.py Deleting file with tests added in test/test_pod_los.py --- kubernetes/test/test_pod_logs.py | 33 -------------------------------- 1 file changed, 33 deletions(-) delete mode 100644 kubernetes/test/test_pod_logs.py diff --git a/kubernetes/test/test_pod_logs.py b/kubernetes/test/test_pod_logs.py deleted file mode 100644 index 0beaa0606..000000000 --- a/kubernetes/test/test_pod_logs.py +++ /dev/null @@ -1,33 +0,0 @@ -from kubernetes import client, config, watch - -pod_name = "demo-bug" - - -config.load_kube_config() - - -api = client.CoreV1Api() -namespace = config.list_kube_config_contexts()[1]["context"]["namespace"] - -pod_manifest = { - "apiVersion": "v1", - "kind": "Pod", - "metadata": { - "name": pod_name, - }, - "spec": { - "containers": [{"image": "hello-world", "name": pod_name}], - }, -} -api.create_namespaced_pod(body=pod_manifest, namespace=namespace) - -input("\n\nSubmit when running\n\n") - -w = watch.Watch() -for e in w.stream( - api.read_namespaced_pod_log, - name=pod_name, - namespace=namespace, - follow=True, -): - print(e) From f0a73c8824902626c23815aec442b5aa60fa72b6 Mon Sep 17 00:00:00 2001 From: Raj Bhargav <72274012+p172913@users.noreply.github.com> Date: Wed, 19 Mar 2025 14:15:44 +0530 Subject: [PATCH 5/9] Update watch.py Changes made in unmarshal_event for not having issues with empty lines. --- kubernetes/base/watch/watch.py | 45 +++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/kubernetes/base/watch/watch.py b/kubernetes/base/watch/watch.py index 43c6ba2c4..22c8e9084 100644 --- a/kubernetes/base/watch/watch.py +++ b/kubernetes/base/watch/watch.py @@ -82,7 +82,7 @@ def iter_resp_lines(resp): last_was_empty = False # Reset empty-line flag else: if not last_was_empty: - yield '\n' # Only print one empty line + yield '' # Only print one empty line last_was_empty = True # Mark that we handled an empty line next_newline = buffer.find(b'\n') @@ -113,24 +113,29 @@ def get_watch_argument_name(self, func): return 'watch' def unmarshal_event(self, data, return_type): - js = json.loads(data) - js['raw_object'] = js['object'] - # BOOKMARK event is treated the same as ERROR for a quick fix of - # decoding exception - # TODO: make use of the resource_version in BOOKMARK event for more - # efficient WATCH - if return_type and js['type'] != 'ERROR' and js['type'] != 'BOOKMARK': - obj = SimpleNamespace(data=json.dumps(js['raw_object'])) - js['object'] = self._api_client.deserialize(obj, return_type) - if hasattr(js['object'], 'metadata'): - self.resource_version = js['object'].metadata.resource_version - # For custom objects that we don't have model defined, json - # deserialization results in dictionary - elif (isinstance(js['object'], dict) and 'metadata' in js['object'] - and 'resourceVersion' in js['object']['metadata']): - self.resource_version = js['object']['metadata'][ - 'resourceVersion'] - return js + if not data or data.isspace(): + return None + try: + js = json.loads(data) + js['raw_object'] = js['object'] + # BOOKMARK event is treated the same as ERROR for a quick fix of + # decoding exception + # TODO: make use of the resource_version in BOOKMARK event for more + # efficient WATCH + if return_type and js['type'] != 'ERROR' and js['type'] != 'BOOKMARK': + obj = SimpleNamespace(data=json.dumps(js['raw_object'])) + js['object'] = self._api_client.deserialize(obj, return_type) + if hasattr(js['object'], 'metadata'): + self.resource_version = js['object'].metadata.resource_version + # For custom objects that we don't have model defined, json + # deserialization results in dictionary + elif (isinstance(js['object'], dict) and 'metadata' in js['object'] + and 'resourceVersion' in js['object']['metadata']): + self.resource_version = js['object']['metadata'][ + 'resourceVersion'] + return js + except json.JSONDecodeError: + return None def stream(self, func, *args, **kwargs): """Watch an API resource and stream the result back via a generator. @@ -209,7 +214,7 @@ def stream(self, func, *args, **kwargs): yield line # Normal non-empty line last_was_empty = False elif not last_was_empty: - yield '/n' # Only yield one empty line + yield '' # Only yield one empty line last_was_empty = True if self._stop: break From 1268769cc0720ca957fff596828f7b4ebb885565 Mon Sep 17 00:00:00 2001 From: Raj Bhargav <72274012+p172913@users.noreply.github.com> Date: Thu, 20 Mar 2025 00:27:16 +0530 Subject: [PATCH 6/9] Update watch_test.py Removed pod_manifest from watch_test.py. --- kubernetes/base/watch/watch_test.py | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/kubernetes/base/watch/watch_test.py b/kubernetes/base/watch/watch_test.py index fa079363c..822da3ea6 100644 --- a/kubernetes/base/watch/watch_test.py +++ b/kubernetes/base/watch/watch_test.py @@ -505,20 +505,7 @@ def setUpClass(cls): def test_pod_log_empty_lines(self): pod_name = "demo-bug" - # Manifest with busybax to keep pod engaged for sometiem - pod_manifest = { - "apiVersion": "v1", - "kind": "Pod", - "metadata": {"name": pod_name}, - "spec": { - "containers": [{ - "image": "busybox", - "name": "my-container", - "command": ["sh", "-c", "while true; do echo Hello from Docker ; sleep 10; done"] - }] - }, - } - + try: self.api.create_namespaced_pod = Mock() self.api.read_namespaced_pod = Mock() From 1e093d04aa5de2746b2c9e6bd3b7248c1491f100 Mon Sep 17 00:00:00 2001 From: Raj Bhargav <72274012+p172913@users.noreply.github.com> Date: Thu, 20 Mar 2025 01:44:56 +0530 Subject: [PATCH 7/9] Update watch_test.py Changes made to check whether entire log is printed or not. --- kubernetes/base/watch/watch_test.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/kubernetes/base/watch/watch_test.py b/kubernetes/base/watch/watch_test.py index 822da3ea6..64378e931 100644 --- a/kubernetes/base/watch/watch_test.py +++ b/kubernetes/base/watch/watch_test.py @@ -540,9 +540,9 @@ def test_pod_log_empty_lines(self): #Mock logs used for this test w.stream = Mock(return_value=[ "Hello from Docker", - "\n", # Empty line + "", # Empty line "Another log line", - "\n", # Another empty line + "", # Another empty line "Final log" ]) for event in w.stream(self.api.read_namespaced_pod_log, name=pod_name, namespace=self.namespace, follow=True): @@ -552,7 +552,16 @@ def test_pod_log_empty_lines(self): # Print outputs print(f"Captured logs: {log_output}") # self.assertTrue(any("Hello from Docker" in line for line in log_output)) - self.assertTrue(any(line.strip() == "" for line in log_output), "No empty lines found in logs") + # self.assertTrue(any(line.strip() == "" for line in log_output), "No empty lines found in logs") + expected_log = [ + "Hello from Docker", + "", + "Another log line", + "", + "Final log" + ] + + self.assertEqual(log_output, expected_log, "Captured logs do not match expected logs") except ApiException as e: self.fail(f"Kubernetes API exception: {e}") From f4d0842d0e30a3ec7023ffef032a651373bfe852 Mon Sep 17 00:00:00 2001 From: Raj Bhargav <72274012+p172913@users.noreply.github.com> Date: Thu, 20 Mar 2025 11:21:20 +0530 Subject: [PATCH 8/9] Update watch.py Changes made in watch.py to print multiple empty line if necessary. --- kubernetes/base/watch/watch.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/kubernetes/base/watch/watch.py b/kubernetes/base/watch/watch.py index 22c8e9084..2ede8638c 100644 --- a/kubernetes/base/watch/watch.py +++ b/kubernetes/base/watch/watch.py @@ -71,7 +71,6 @@ def iter_resp_lines(resp): # Split by newline (safe for utf-8 because multi-byte sequences cannot contain the newline byte) next_newline = buffer.find(b'\n') - last_was_empty = False # Set empty-line flag while next_newline != -1: # Convert bytes to a valid utf-8 string, replacing any invalid utf-8 with the '�' character line = buffer[:next_newline].decode( @@ -79,11 +78,8 @@ def iter_resp_lines(resp): buffer = buffer[next_newline+1:] if line: yield line - last_was_empty = False # Reset empty-line flag else: - if not last_was_empty: - yield '' # Only print one empty line - last_was_empty = True # Mark that we handled an empty line + yield '' # Only print one empty line next_newline = buffer.find(b'\n') @@ -186,7 +182,6 @@ def stream(self, func, *args, **kwargs): while True: resp = func(*args, **kwargs) try: - last_was_empty = False # Set empty line false for line in iter_resp_lines(resp): # unmarshal when we are receiving events from watch, # return raw string when we are streaming log @@ -212,10 +207,8 @@ def stream(self, func, *args, **kwargs): else: if line: yield line # Normal non-empty line - last_was_empty = False - elif not last_was_empty: + else: yield '' # Only yield one empty line - last_was_empty = True if self._stop: break finally: From d451d2fc7ae8a51dfa9c39ee48f05f3903d4c218 Mon Sep 17 00:00:00 2001 From: Raj Bhargav <72274012+p172913@users.noreply.github.com> Date: Thu, 20 Mar 2025 11:23:23 +0530 Subject: [PATCH 9/9] Update watch_test.py As per request added few empty lines to test case in watch_test.py --- kubernetes/base/watch/watch_test.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/kubernetes/base/watch/watch_test.py b/kubernetes/base/watch/watch_test.py index 64378e931..f3880de7c 100644 --- a/kubernetes/base/watch/watch_test.py +++ b/kubernetes/base/watch/watch_test.py @@ -540,9 +540,12 @@ def test_pod_log_empty_lines(self): #Mock logs used for this test w.stream = Mock(return_value=[ "Hello from Docker", - "", # Empty line + "", + "", + "\n\n", "Another log line", - "", # Another empty line + "", + "\n", "Final log" ]) for event in w.stream(self.api.read_namespaced_pod_log, name=pod_name, namespace=self.namespace, follow=True): @@ -556,8 +559,11 @@ def test_pod_log_empty_lines(self): expected_log = [ "Hello from Docker", "", + "", + "\n\n", "Another log line", "", + "\n", "Final log" ]