Skip to content

Commit d88d61b

Browse files
committed
feat: add switch control when watching events whether deserialization is required
This PR adds an option to disable automatic deserialization in the Watch stream(). By allowing clients to opt-out of automatic deserialization when only basic JSON parsing is needed, we can significantly reduce time cost and improve event processing throughput. This is particularly important in scenarios with high event volumes or resource constraints. Key changes: - Added a 'deserialize' parameter to Watch.stream() method (defaults to True for backward compatibility) - When deserialize=False, events are only JSON parsed without model conversion - Maintains the original behavior when deserialize=True - Added test cases to verify both behaviors
1 parent 7c22fc1 commit d88d61b

File tree

2 files changed

+45
-1
lines changed

2 files changed

+45
-1
lines changed

kubernetes/base/watch/watch.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,14 +179,19 @@ def stream(self, func, *args, **kwargs):
179179
# We want to ensure we are returning within that timeout.
180180
disable_retries = ('timeout_seconds' in kwargs)
181181
retry_after_410 = False
182+
deserialize = kwargs.pop('deserialize', True)
182183
while True:
183184
resp = func(*args, **kwargs)
184185
try:
185186
for line in iter_resp_lines(resp):
186187
# unmarshal when we are receiving events from watch,
187188
# return raw string when we are streaming log
188189
if watch_arg == "watch":
189-
event = self.unmarshal_event(line, return_type)
190+
if deserialize:
191+
event = self.unmarshal_event(line, return_type)
192+
else:
193+
# Only do basic JSON parsing, no deserialize
194+
event = json.loads(line)
190195
if isinstance(event, dict) \
191196
and event['type'] == 'ERROR':
192197
obj = event['raw_object']

kubernetes/base/watch/watch_test.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -576,5 +576,44 @@ def test_pod_log_empty_lines(self):
576576
self.api.delete_namespaced_pod(name=pod_name, namespace=self.namespace)
577577
self.api.delete_namespaced_pod.assert_called_once_with(name=pod_name, namespace=self.namespace)
578578

579+
if __name__ == '__main__':
580+
def test_watch_with_deserialize_param(self):
581+
"""test watch.stream() deserialize param"""
582+
# prepare test data
583+
test_json = '{"type": "ADDED", "object": {"metadata": {"name": "test1", "resourceVersion": "1"}, "spec": {}, "status": {}}}'
584+
fake_resp = Mock()
585+
fake_resp.close = Mock()
586+
fake_resp.release_conn = Mock()
587+
fake_resp.stream = Mock(return_value=[test_json + '\n'])
588+
589+
fake_api = Mock()
590+
fake_api.get_namespaces = Mock(return_value=fake_resp)
591+
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'
592+
593+
# test case with deserialize=True
594+
w = Watch()
595+
for e in w.stream(fake_api.get_namespaces, deserialize=True):
596+
self.assertEqual("ADDED", e['type'])
597+
# Verify that the object is deserialized correctly
598+
self.assertTrue(hasattr(e['object'], 'metadata'))
599+
self.assertEqual("test1", e['object'].metadata.name)
600+
self.assertEqual("1", e['object'].metadata.resource_version)
601+
# Verify that the original object is saved
602+
self.assertEqual(json.loads(test_json)['object'], e['raw_object'])
603+
604+
# test case with deserialize=False
605+
w = Watch()
606+
for e in w.stream(fake_api.get_namespaces, deserialize=False):
607+
self.assertEqual("ADDED", e['type'])
608+
# The validation object remains in the original dictionary format
609+
self.assertIsInstance(e['object'], dict)
610+
self.assertEqual("test1", e['object']['metadata']['name'])
611+
self.assertEqual("1", e['object']['metadata']['resourceVersion'])
612+
613+
# verify the api is called twice
614+
fake_api.get_namespaces.assert_has_calls([
615+
call(_preload_content=False, watch=True),
616+
call(_preload_content=False, watch=True)
617+
])
579618
if __name__ == '__main__':
580619
unittest.main()

0 commit comments

Comments
 (0)