From 16ffec46ca2a09edef5cd8909b71d57165cfa54d Mon Sep 17 00:00:00 2001 From: Pete Date: Tue, 7 Jan 2025 11:35:28 +0000 Subject: [PATCH 1/2] Add support for allowWatchBookmarks to the dynamic client --- kubernetes/base/dynamic/client.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/kubernetes/base/dynamic/client.py b/kubernetes/base/dynamic/client.py index 352e11a809..64163d7b5c 100644 --- a/kubernetes/base/dynamic/client.py +++ b/kubernetes/base/dynamic/client.py @@ -163,7 +163,7 @@ def server_side_apply(self, resource, body=None, name=None, namespace=None, forc return self.request('patch', path, body=body, force_conflicts=force_conflicts, **kwargs) - def watch(self, resource, namespace=None, name=None, label_selector=None, field_selector=None, resource_version=None, timeout=None, watcher=None): + def watch(self, resource, namespace=None, name=None, label_selector=None, field_selector=None, resource_version=None, timeout=None, watcher=None, allow_watch_bookmarks=None): """ Stream events for a resource from the Kubernetes API @@ -176,6 +176,7 @@ def watch(self, resource, namespace=None, name=None, label_selector=None, field_ a resource_version greater than this value will be returned :param timeout: The amount of time in seconds to wait before terminating the stream :param watcher: The Watcher object that will be used to stream the resource + :param allow_watch_bookmarks: Ask the API server to send BOOKMARK events :return: Event object with these keys: 'type': The type of event such as "ADDED", "DELETED", etc. @@ -206,7 +207,8 @@ def watch(self, resource, namespace=None, name=None, label_selector=None, field_ label_selector=label_selector, resource_version=resource_version, serialize=False, - timeout_seconds=timeout + timeout_seconds=timeout, + allow_watch_bookmarks=allow_watch_bookmarks, ): event['object'] = ResourceInstance(resource, event['object']) yield event @@ -248,6 +250,8 @@ def request(self, method, path, body=None, **params): query_params.append(('fieldManager', params['field_manager'])) if params.get('force_conflicts') is not None: query_params.append(('force', params['force_conflicts'])) + if params.get('allow_watch_bookmarks') is not None: + query_params.append(('allowWatchBookmarks', params['allow_watch_bookmarks'])) header_params = params.get('header_params', {}) form_params = [] From 0945c8b4b1b8185c773c5a4f9448972852e6627d Mon Sep 17 00:00:00 2001 From: Pete Date: Thu, 24 Apr 2025 19:37:45 +0100 Subject: [PATCH 2/2] Add an example of watch recovery using resource_version and bookmarks --- examples/watch/watch_recovery.py | 78 ++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 examples/watch/watch_recovery.py diff --git a/examples/watch/watch_recovery.py b/examples/watch/watch_recovery.py new file mode 100644 index 0000000000..f07caf5096 --- /dev/null +++ b/examples/watch/watch_recovery.py @@ -0,0 +1,78 @@ +# Copyright 2025 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Uses watch to print a stream of Pod events from the default namespace. +The allow_watch_bookmarks flag is set to True, so the API server can send +BOOKMARK events. + +If the connection to the API server is lost, the script will reconnect and +resume watching from the most recently received resource version. + +For more information, see: +- https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes +- https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-watch +""" + +import urllib3 + +from kubernetes import config +from kubernetes.client import api_client +from kubernetes.client.exceptions import ApiException +from kubernetes.dynamic.client import DynamicClient + +NAMESPACE = "default" + + +def main(): + # Configs can be set in Configuration class directly or using helper + # utility. If no argument provided, the config will be loaded from + # default location. + config.load_kube_config() + client = DynamicClient(api_client.ApiClient()) + api = client.resources.get(api_version="v1", kind="Pod") + + # Setting resource_version=None means the server will send synthetic + # ADDED events for all resources that exist when the watch starts. + resource_version = None + while True: + try: + for event in api.watch( + namespace=NAMESPACE, + resource_version=resource_version, + allow_watch_bookmarks=True, + ): + # Remember the last resourceVersion we saw, so we can resume + # watching from there if the connection is lost. + resource_version = event['object'].metadata.resourceVersion + + print("Event: %s %s %s" % ( + resource_version, + event['type'], + event['object'].metadata.name, + )) + + except ApiException as err: + if err.status == 410: + print("ERROR: The requested resource version is no longer available.") + resource_version = None + else: + raise + + except urllib3.exceptions.ProtocolError: + print("Lost connection to the k8s API server. Reconnecting...") + + +if __name__ == "__main__": + main()