diff --git a/kubernetes/base/watch/watch.py b/kubernetes/base/watch/watch.py index da81f9702..2ede8638c 100644 --- a/kubernetes/base/watch/watch.py +++ b/kubernetes/base/watch/watch.py @@ -78,6 +78,8 @@ def iter_resp_lines(resp): buffer = buffer[next_newline+1:] if line: yield line + else: + yield '' # Only print one empty line next_newline = buffer.find(b'\n') @@ -107,24 +109,29 @@ def get_watch_argument_name(self, func): return 'watch' def unmarshal_event(self, data, return_type): - js = json.loads(data) - js['raw_object'] = js['object'] - # BOOKMARK event is treated the same as ERROR for a quick fix of - # decoding exception - # TODO: make use of the resource_version in BOOKMARK event for more - # efficient WATCH - if return_type and js['type'] != 'ERROR' and js['type'] != 'BOOKMARK': - obj = SimpleNamespace(data=json.dumps(js['raw_object'])) - js['object'] = self._api_client.deserialize(obj, return_type) - if hasattr(js['object'], 'metadata'): - self.resource_version = js['object'].metadata.resource_version - # For custom objects that we don't have model defined, json - # deserialization results in dictionary - elif (isinstance(js['object'], dict) and 'metadata' in js['object'] - and 'resourceVersion' in js['object']['metadata']): - self.resource_version = js['object']['metadata'][ - 'resourceVersion'] - return js + if not data or data.isspace(): + return None + try: + js = json.loads(data) + js['raw_object'] = js['object'] + # BOOKMARK event is treated the same as ERROR for a quick fix of + # decoding exception + # TODO: make use of the resource_version in BOOKMARK event for more + # efficient WATCH + if return_type and js['type'] != 'ERROR' and js['type'] != 'BOOKMARK': + obj = SimpleNamespace(data=json.dumps(js['raw_object'])) + js['object'] = self._api_client.deserialize(obj, return_type) + if hasattr(js['object'], 'metadata'): + self.resource_version = js['object'].metadata.resource_version + # For custom objects that we don't have model defined, json + # deserialization results in dictionary + elif (isinstance(js['object'], dict) and 'metadata' in js['object'] + and 'resourceVersion' in js['object']['metadata']): + self.resource_version = js['object']['metadata'][ + 'resourceVersion'] + return js + except json.JSONDecodeError: + return None def stream(self, func, *args, **kwargs): """Watch an API resource and stream the result back via a generator. @@ -198,7 +205,10 @@ def stream(self, func, *args, **kwargs): retry_after_410 = False yield event else: - yield line + if line: + yield line # Normal non-empty line + else: + yield '' # Only yield one empty line if self._stop: break finally: diff --git a/kubernetes/base/watch/watch_test.py b/kubernetes/base/watch/watch_test.py index c5bc5c378..f3880de7c 100644 --- a/kubernetes/base/watch/watch_test.py +++ b/kubernetes/base/watch/watch_test.py @@ -14,12 +14,18 @@ import unittest +import os + +import time + from unittest.mock import Mock, call -from kubernetes import client +from kubernetes import client,config from .watch import Watch +from kubernetes.client import ApiException + class WatchTests(unittest.TestCase): def setUp(self): @@ -99,6 +105,9 @@ def test_watch_with_interspersed_newlines(self): # Note that "timeout_seconds" below is not a timeout; rather, it disables retries and is # the only way to do so. Without that, the stream will re-read the test data forever. for e in w.stream(fake_api.get_namespaces, timeout_seconds=1): + # Here added a statement for exception for empty lines. + if e is None: + continue count += 1 self.assertEqual("test%d" % count, e['object'].metadata.name) self.assertEqual(3, count) @@ -488,7 +497,84 @@ def test_watch_with_error_event_and_timeout_param(self): amt=None, decode_content=False) fake_resp.close.assert_called_once() fake_resp.release_conn.assert_called_once() - + + @classmethod + def setUpClass(cls): + cls.api = Mock() + cls.namespace = "default" + + def test_pod_log_empty_lines(self): + pod_name = "demo-bug" + + try: + self.api.create_namespaced_pod = Mock() + self.api.read_namespaced_pod = Mock() + self.api.delete_namespaced_pod = Mock() + self.api.read_namespaced_pod_log = Mock() + + #pod creating step + self.api.create_namespaced_pod.return_value = None + + #Checking pod status + mock_pod = Mock() + mock_pod.status.phase = "Running" + self.api.read_namespaced_pod.return_value = mock_pod + + # Printing at pod output + self.api.read_namespaced_pod_log.return_value = iter(["Hello from Docker\n"]) + + # Wait for the pod to reach 'Running' + timeout = 60 + start_time = time.time() + while time.time() - start_time < timeout: + pod = self.api.read_namespaced_pod(name=pod_name, namespace=self.namespace) + if pod.status.phase == "Running": + break + time.sleep(2) + else: + self.fail("Pod did not reach 'Running' state within timeout") + + # Reading and streaming logs using Watch (mocked) + w = Watch() + log_output = [] + #Mock logs used for this test + w.stream = Mock(return_value=[ + "Hello from Docker", + "", + "", + "\n\n", + "Another log line", + "", + "\n", + "Final log" + ]) + for event in w.stream(self.api.read_namespaced_pod_log, name=pod_name, namespace=self.namespace, follow=True): + log_output.append(event) + print(event) + + # Print outputs + print(f"Captured logs: {log_output}") + # self.assertTrue(any("Hello from Docker" in line for line in log_output)) + # self.assertTrue(any(line.strip() == "" for line in log_output), "No empty lines found in logs") + expected_log = [ + "Hello from Docker", + "", + "", + "\n\n", + "Another log line", + "", + "\n", + "Final log" + ] + + self.assertEqual(log_output, expected_log, "Captured logs do not match expected logs") + + except ApiException as e: + self.fail(f"Kubernetes API exception: {e}") + finally: + #checking pod is calling for delete + self.api.delete_namespaced_pod(name=pod_name, namespace=self.namespace) + self.api.delete_namespaced_pod.assert_called_once_with(name=pod_name, namespace=self.namespace) if __name__ == '__main__': unittest.main()