From ca8251bd5ef4ecd9de1cdeeaf18fcb00102d617a Mon Sep 17 00:00:00 2001 From: Katerina Molchanova <35141662+rokatyy@users.noreply.github.com> Date: Sun, 21 Apr 2024 10:32:08 +0100 Subject: [PATCH] Add support for batch deserialization (#65) --- nuclio_sdk/event.py | 58 +++++++++++++++++++----------- nuclio_sdk/test/test_event.py | 66 ++++++++++++++++++++++++++++++++--- 2 files changed, 99 insertions(+), 25 deletions(-) diff --git a/nuclio_sdk/event.py b/nuclio_sdk/event.py index 9e405f1..6d10c03 100644 --- a/nuclio_sdk/event.py +++ b/nuclio_sdk/event.py @@ -30,7 +30,6 @@ def _try_deserialize_json(body): try: return json.loads(body.decode("utf-8")) except Exception as exc: - # newline to force flush # NOTE: processor runs sdk with `-u` which means stderr is unbuffered which needs manual flushing sys.stderr.write( @@ -90,10 +89,19 @@ def _from_parsed_data_bytes(parsed_data, body): topic=parsed_data.get(b"topic"), ) + @staticmethod + def decode_single_or_list_event(parsed_data, decode_single_event_function): + if isinstance(parsed_data, list): + return [ + decode_single_event_function(single_event_data) + for single_event_data in parsed_data + ] + else: + return decode_single_event_function(parsed_data) + class _EventDeserializerMsgPack(_EventDeserializer): def __init__(self, raw=False): - # return the concrete function that handled raw/decoded event messages # pre-assign to avoid if/else during event processing self._from_msgpack_handler = ( @@ -103,32 +111,40 @@ def __init__(self, raw=False): def deserialize(self, event_message): return self._from_msgpack_handler(event_message) - @staticmethod - def _from_msgpack_raw(parsed_data): - event_body = parsed_data[b"body"] - if parsed_data[b"content_type"] == b"application/json": - event_body = _EventDeserializer._try_deserialize_json(event_body) - return _EventDeserializer._from_parsed_data_bytes(parsed_data, event_body) + def _from_msgpack_raw(self, parsed_data): + def _decode_single_event(single_event_data): + event_body = single_event_data[b"body"] + if single_event_data[b"content_type"] == b"application/json": + event_body = _EventDeserializer._try_deserialize_json(event_body) + return _EventDeserializer._from_parsed_data_bytes( + single_event_data, event_body + ) - @staticmethod - def _from_msgpack_decoded(parsed_data): - event_body = parsed_data["body"] - if parsed_data["content_type"] == "application/json": - event_body = _EventDeserializer._try_deserialize_json(event_body) - return _EventDeserializer._from_parsed_data(parsed_data, event_body) + return self.decode_single_or_list_event(parsed_data, _decode_single_event) + + def _from_msgpack_decoded(self, parsed_data): + def _decode_single_event(single_event_data): + event_body = single_event_data["body"] + if single_event_data["content_type"] == "application/json": + event_body = _EventDeserializer._try_deserialize_json(event_body) + return _EventDeserializer._from_parsed_data(single_event_data, event_body) + + return self.decode_single_or_list_event(parsed_data, _decode_single_event) class _EventDeserializerJSON(_EventDeserializer): def deserialize(self, event_message): parsed_data = json.loads(event_message) - # extract content type, needed to decode body - body = parsed_data["body"] - if parsed_data["content_type"] == "application/json" and not isinstance( - body, dict - ): - body = _EventDeserializer._try_deserialize_json(body) - return _EventDeserializer._from_parsed_data(parsed_data, body) + def _deserialize_single_event(single_event): + body = single_event["body"] + if single_event["content_type"] == "application/json" and not isinstance( + body, dict + ): + body = _EventDeserializer._try_deserialize_json(body) + return _EventDeserializer._from_parsed_data(single_event, body) + + return self.decode_single_or_list_event(parsed_data, _deserialize_single_event) class EventDeserializerKinds(enum.Enum): diff --git a/nuclio_sdk/test/test_event.py b/nuclio_sdk/test/test_event.py index 44381e0..01401e7 100644 --- a/nuclio_sdk/test/test_event.py +++ b/nuclio_sdk/test/test_event.py @@ -38,11 +38,52 @@ def test_event_to_json_bytes_body(self): self.assertFalse(serialized_event.last_in_batch) self.assertEqual(serialized_event.offset, 0) + def test_batch_to_json_bytes_body(self): + trigger_name = "my-http-trigger" + trigger_kind = "http" + event_batch = [ + nuclio_sdk.Event( + body=b"bytes-body-1", + content_type="content-type", + trigger=nuclio_sdk.TriggerInfo(kind=trigger_kind, name=trigger_name), + method="GET", + ), + nuclio_sdk.Event( + body=b"bytes-body-2", + content_type="content-type", + trigger=nuclio_sdk.TriggerInfo(kind=trigger_kind, name=trigger_name), + method="GET", + ), + ] + serialized_event_batch = self._deserialize_event(event_batch) + item1 = serialized_event_batch[0] + item2 = serialized_event_batch[1] + self.assertEqual(item1.body, "Ynl0ZXMtYm9keS0x") + self.assertEqual(item2.body, "Ynl0ZXMtYm9keS0y") + for item in [item1, item2]: + self.assertEqual(item.content_type, "content-type") + self.assertEqual(item.method, "GET") + self.assertDictEqual( + item.trigger.__dict__, + {"kind": trigger_kind, "name": trigger_name}, + ) + self.assertFalse(item.last_in_batch) + self.assertEqual(item.offset, 0) + def test_event_to_json_bytes_non_utf8able_body(self): event = nuclio_sdk.Event(body=b"\x80abc") serialized_event = self._deserialize_event(event) self.assertEqual(serialized_event.body, "gGFiYw==") + def test_batch_to_json_bytes_non_utf8able_body(self): + event_batch = [ + nuclio_sdk.Event(body=b"\x80abc"), + nuclio_sdk.Event(body=b"\x80abcd"), + ] + serialized_event_batch = self._deserialize_event(event_batch) + self.assertEqual(serialized_event_batch[0].body, "gGFiYw==") + self.assertEqual(serialized_event_batch[1].body, "gGFiY2Q=") + def test_event_to_json_string_body(self): request_body = "str-body" topic = "my-topic" @@ -64,7 +105,12 @@ def _deserialize_event(self, event): class TestEventMsgPack(nuclio_sdk.test.TestCase, TestEvent): def _deserialize_event(self, event): - event_json = {k: v for k, v in json.loads(event.to_json()).items()} + if isinstance(event, list): + event_json = [ + {k: v for k, v in json.loads(item.to_json()).items()} for item in event + ] + else: + event_json = {k: v for k, v in json.loads(event.to_json()).items()} return nuclio_sdk.event.Event.deserialize( event_json, nuclio_sdk.event.EventDeserializerKinds.msgpack ) @@ -72,8 +118,15 @@ def _deserialize_event(self, event): class TestEventMsgPackRaw(nuclio_sdk.test.TestCase, TestEvent): def _deserialize_event(self, event): - event_json = {k: v for k, v in json.loads(event.to_json()).items()} - self._event_keys_to_byte_string(event_json) + if isinstance(event, list): + event_json = [ + {k: v for k, v in json.loads(item.to_json()).items()} for item in event + ] + for item in event_json: + self._event_keys_to_byte_string(item) + else: + event_json = {k: v for k, v in json.loads(event.to_json()).items()} + self._event_keys_to_byte_string(event_json) return nuclio_sdk.event.Event.deserialize( event_json, nuclio_sdk.event.EventDeserializerKinds.msgpack_raw ) @@ -89,7 +142,12 @@ def _event_keys_to_byte_string(self, d): class TestEventJson(nuclio_sdk.test.TestCase, TestEvent): def _deserialize_event(self, event): - event_json = {k: v for k, v in json.loads(event.to_json()).items()} + if isinstance(event, list): + event_json = [ + {k: v for k, v in json.loads(item.to_json()).items()} for item in event + ] + else: + event_json = {k: v for k, v in json.loads(event.to_json()).items()} return nuclio_sdk.event.Event.deserialize( json.dumps(event_json), nuclio_sdk.event.EventDeserializerKinds.json )