Skip to content

Commit f379bba

Browse files
committed
update watch resource_version on BOOKMARK event
1 parent 34ed13f commit f379bba

File tree

2 files changed

+43
-9
lines changed

2 files changed

+43
-9
lines changed

kubernetes/base/watch/watch.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,11 @@ def get_watch_argument_name(self, func):
9696
def unmarshal_event(self, data, return_type):
9797
js = json.loads(data)
9898
js['raw_object'] = js['object']
99-
# BOOKMARK event is treated the same as ERROR for a quick fix of
100-
# decoding exception
101-
# TODO: make use of the resource_version in BOOKMARK event for more
102-
# efficient WATCH
103-
if return_type and js['type'] != 'ERROR' and js['type'] != 'BOOKMARK':
99+
if return_type and js['type'] != 'ERROR':
100+
if js['type'] == 'BOOKMARK':
101+
# treat BOOKMARK as a custom object, so that resource_version
102+
# is updated
103+
return_type = object
104104
obj = SimpleNamespace(data=json.dumps(js['raw_object']))
105105
js['object'] = self._api_client.deserialize(obj, return_type)
106106
if hasattr(js['object'], 'metadata'):

kubernetes/base/watch/watch_test.py

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,42 @@ def get_values(*args, **kwargs):
167167
# more strict test with worse error message
168168
self.assertEqual(fake_api.get_namespaces.mock_calls, calls)
169169

170+
def test_watch_resource_version_with_bookmark(self):
171+
"""
172+
test the resource_version get updated for bookmark events
173+
174+
"""
175+
fake_resp = Mock()
176+
fake_resp.close = Mock()
177+
fake_resp.release_conn = Mock()
178+
values = [
179+
'{"type": "ADDED", "object": {"metadata": {"name": "test1",'
180+
'"resourceVersion": "1"}, "spec": {}, "status": {}}}\n',
181+
'{ "type": "BOOKMARK", "object": {"kind": "Pod", "apiVersion": "v1",'
182+
'"metadata": {"resourceVersion": "3"} }}\n',
183+
'{"type": "ADDED", "object": {"metadata": {"name": "test3",'
184+
'"resourceVersion": "4"}, "spec": {}, "status": {}}}\n']
185+
186+
def get_values(*args, **kwargs):
187+
return values
188+
189+
fake_resp.stream = Mock(
190+
side_effect=get_values)
191+
192+
fake_api = Mock()
193+
fake_api.get_namespaces = Mock(return_value=fake_resp)
194+
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'
195+
196+
w = Watch()
197+
iterations = 3
198+
199+
for c, e in enumerate(w.stream(fake_api.get_namespaces,
200+
resource_version="0")):
201+
self.assertEqual(w.resource_version,
202+
e['raw_object']['metadata']['resourceVersion'])
203+
if c == len(values) * iterations:
204+
w.stop()
205+
170206
def test_watch_stream_twice(self):
171207
w = Watch(float)
172208
for step in ['first', 'second']:
@@ -263,10 +299,8 @@ def test_unmarshal_with_bookmark(self):
263299
'"metadata":{},"spec":{"containers":null}}},"status":{}}}',
264300
'V1Job')
265301
self.assertEqual("BOOKMARK", event['type'])
266-
# Watch.resource_version is *not* updated, as BOOKMARK is treated the
267-
# same as ERROR for a quick fix of decoding exception,
268-
# resource_version in BOOKMARK is *not* used at all.
269-
self.assertEqual(None, w.resource_version)
302+
303+
self.assertEqual("1", w.resource_version)
270304

271305
def test_watch_with_exception(self):
272306
fake_resp = Mock()

0 commit comments

Comments
 (0)