Skip to content

Commit c330b84

Browse files
authored
Merge pull request #2406 from Kevinz857/feat-deserialize-control-v2
feat: Add option to control deserialization when watching events
2 parents 51f4db5 + d88d61b commit c330b84

File tree

2 files changed

+45
-1
lines changed

2 files changed

+45
-1
lines changed

kubernetes/base/watch/watch.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,14 +179,19 @@ def stream(self, func, *args, **kwargs):
179179
# We want to ensure we are returning within that timeout.
180180
disable_retries = ('timeout_seconds' in kwargs)
181181
retry_after_410 = False
182+
deserialize = kwargs.pop('deserialize', True)
182183
while True:
183184
resp = func(*args, **kwargs)
184185
try:
185186
for line in iter_resp_lines(resp):
186187
# unmarshal when we are receiving events from watch,
187188
# return raw string when we are streaming log
188189
if watch_arg == "watch":
189-
event = self.unmarshal_event(line, return_type)
190+
if deserialize:
191+
event = self.unmarshal_event(line, return_type)
192+
else:
193+
# Only do basic JSON parsing, no deserialize
194+
event = json.loads(line)
190195
if isinstance(event, dict) \
191196
and event['type'] == 'ERROR':
192197
obj = event['raw_object']

kubernetes/base/watch/watch_test.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -576,5 +576,44 @@ def test_pod_log_empty_lines(self):
576576
self.api.delete_namespaced_pod(name=pod_name, namespace=self.namespace)
577577
self.api.delete_namespaced_pod.assert_called_once_with(name=pod_name, namespace=self.namespace)
578578

579+
if __name__ == '__main__':
580+
def test_watch_with_deserialize_param(self):
581+
"""test watch.stream() deserialize param"""
582+
# prepare test data
583+
test_json = '{"type": "ADDED", "object": {"metadata": {"name": "test1", "resourceVersion": "1"}, "spec": {}, "status": {}}}'
584+
fake_resp = Mock()
585+
fake_resp.close = Mock()
586+
fake_resp.release_conn = Mock()
587+
fake_resp.stream = Mock(return_value=[test_json + '\n'])
588+
589+
fake_api = Mock()
590+
fake_api.get_namespaces = Mock(return_value=fake_resp)
591+
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'
592+
593+
# test case with deserialize=True
594+
w = Watch()
595+
for e in w.stream(fake_api.get_namespaces, deserialize=True):
596+
self.assertEqual("ADDED", e['type'])
597+
# Verify that the object is deserialized correctly
598+
self.assertTrue(hasattr(e['object'], 'metadata'))
599+
self.assertEqual("test1", e['object'].metadata.name)
600+
self.assertEqual("1", e['object'].metadata.resource_version)
601+
# Verify that the original object is saved
602+
self.assertEqual(json.loads(test_json)['object'], e['raw_object'])
603+
604+
# test case with deserialize=False
605+
w = Watch()
606+
for e in w.stream(fake_api.get_namespaces, deserialize=False):
607+
self.assertEqual("ADDED", e['type'])
608+
# The validation object remains in the original dictionary format
609+
self.assertIsInstance(e['object'], dict)
610+
self.assertEqual("test1", e['object']['metadata']['name'])
611+
self.assertEqual("1", e['object']['metadata']['resourceVersion'])
612+
613+
# verify the api is called twice
614+
fake_api.get_namespaces.assert_has_calls([
615+
call(_preload_content=False, watch=True),
616+
call(_preload_content=False, watch=True)
617+
])
579618
if __name__ == '__main__':
580619
unittest.main()

0 commit comments

Comments
 (0)