Skip to content

Commit

Permalink
Add support for batch deserialization (#65)
Browse files Browse the repository at this point in the history
  • Loading branch information
rokatyy authored Apr 21, 2024
1 parent 0dde8d9 commit ca8251b
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 25 deletions.
58 changes: 37 additions & 21 deletions nuclio_sdk/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ def _try_deserialize_json(body):
try:
return json.loads(body.decode("utf-8"))
except Exception as exc:

# newline to force flush
# NOTE: processor runs sdk with `-u` which means stderr is unbuffered which needs manual flushing
sys.stderr.write(
Expand Down Expand Up @@ -90,10 +89,19 @@ def _from_parsed_data_bytes(parsed_data, body):
topic=parsed_data.get(b"topic"),
)

@staticmethod
def decode_single_or_list_event(parsed_data, decode_single_event_function):
if isinstance(parsed_data, list):
return [
decode_single_event_function(single_event_data)
for single_event_data in parsed_data
]
else:
return decode_single_event_function(parsed_data)


class _EventDeserializerMsgPack(_EventDeserializer):
def __init__(self, raw=False):

# return the concrete function that handled raw/decoded event messages
# pre-assign to avoid if/else during event processing
self._from_msgpack_handler = (
Expand All @@ -103,32 +111,40 @@ def __init__(self, raw=False):
def deserialize(self, event_message):
return self._from_msgpack_handler(event_message)

@staticmethod
def _from_msgpack_raw(parsed_data):
event_body = parsed_data[b"body"]
if parsed_data[b"content_type"] == b"application/json":
event_body = _EventDeserializer._try_deserialize_json(event_body)
return _EventDeserializer._from_parsed_data_bytes(parsed_data, event_body)
def _from_msgpack_raw(self, parsed_data):
def _decode_single_event(single_event_data):
event_body = single_event_data[b"body"]
if single_event_data[b"content_type"] == b"application/json":
event_body = _EventDeserializer._try_deserialize_json(event_body)
return _EventDeserializer._from_parsed_data_bytes(
single_event_data, event_body
)

@staticmethod
def _from_msgpack_decoded(parsed_data):
event_body = parsed_data["body"]
if parsed_data["content_type"] == "application/json":
event_body = _EventDeserializer._try_deserialize_json(event_body)
return _EventDeserializer._from_parsed_data(parsed_data, event_body)
return self.decode_single_or_list_event(parsed_data, _decode_single_event)

def _from_msgpack_decoded(self, parsed_data):
def _decode_single_event(single_event_data):
event_body = single_event_data["body"]
if single_event_data["content_type"] == "application/json":
event_body = _EventDeserializer._try_deserialize_json(event_body)
return _EventDeserializer._from_parsed_data(single_event_data, event_body)

return self.decode_single_or_list_event(parsed_data, _decode_single_event)


class _EventDeserializerJSON(_EventDeserializer):
def deserialize(self, event_message):
parsed_data = json.loads(event_message)

# extract content type, needed to decode body
body = parsed_data["body"]
if parsed_data["content_type"] == "application/json" and not isinstance(
body, dict
):
body = _EventDeserializer._try_deserialize_json(body)
return _EventDeserializer._from_parsed_data(parsed_data, body)
def _deserialize_single_event(single_event):
body = single_event["body"]
if single_event["content_type"] == "application/json" and not isinstance(
body, dict
):
body = _EventDeserializer._try_deserialize_json(body)
return _EventDeserializer._from_parsed_data(single_event, body)

return self.decode_single_or_list_event(parsed_data, _deserialize_single_event)


class EventDeserializerKinds(enum.Enum):
Expand Down
66 changes: 62 additions & 4 deletions nuclio_sdk/test/test_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,52 @@ def test_event_to_json_bytes_body(self):
self.assertFalse(serialized_event.last_in_batch)
self.assertEqual(serialized_event.offset, 0)

def test_batch_to_json_bytes_body(self):
trigger_name = "my-http-trigger"
trigger_kind = "http"
event_batch = [
nuclio_sdk.Event(
body=b"bytes-body-1",
content_type="content-type",
trigger=nuclio_sdk.TriggerInfo(kind=trigger_kind, name=trigger_name),
method="GET",
),
nuclio_sdk.Event(
body=b"bytes-body-2",
content_type="content-type",
trigger=nuclio_sdk.TriggerInfo(kind=trigger_kind, name=trigger_name),
method="GET",
),
]
serialized_event_batch = self._deserialize_event(event_batch)
item1 = serialized_event_batch[0]
item2 = serialized_event_batch[1]
self.assertEqual(item1.body, "Ynl0ZXMtYm9keS0x")
self.assertEqual(item2.body, "Ynl0ZXMtYm9keS0y")
for item in [item1, item2]:
self.assertEqual(item.content_type, "content-type")
self.assertEqual(item.method, "GET")
self.assertDictEqual(
item.trigger.__dict__,
{"kind": trigger_kind, "name": trigger_name},
)
self.assertFalse(item.last_in_batch)
self.assertEqual(item.offset, 0)

def test_event_to_json_bytes_non_utf8able_body(self):
event = nuclio_sdk.Event(body=b"\x80abc")
serialized_event = self._deserialize_event(event)
self.assertEqual(serialized_event.body, "gGFiYw==")

def test_batch_to_json_bytes_non_utf8able_body(self):
event_batch = [
nuclio_sdk.Event(body=b"\x80abc"),
nuclio_sdk.Event(body=b"\x80abcd"),
]
serialized_event_batch = self._deserialize_event(event_batch)
self.assertEqual(serialized_event_batch[0].body, "gGFiYw==")
self.assertEqual(serialized_event_batch[1].body, "gGFiY2Q=")

def test_event_to_json_string_body(self):
request_body = "str-body"
topic = "my-topic"
Expand All @@ -64,16 +105,28 @@ def _deserialize_event(self, event):

class TestEventMsgPack(nuclio_sdk.test.TestCase, TestEvent):
def _deserialize_event(self, event):
event_json = {k: v for k, v in json.loads(event.to_json()).items()}
if isinstance(event, list):
event_json = [
{k: v for k, v in json.loads(item.to_json()).items()} for item in event
]
else:
event_json = {k: v for k, v in json.loads(event.to_json()).items()}
return nuclio_sdk.event.Event.deserialize(
event_json, nuclio_sdk.event.EventDeserializerKinds.msgpack
)


class TestEventMsgPackRaw(nuclio_sdk.test.TestCase, TestEvent):
def _deserialize_event(self, event):
event_json = {k: v for k, v in json.loads(event.to_json()).items()}
self._event_keys_to_byte_string(event_json)
if isinstance(event, list):
event_json = [
{k: v for k, v in json.loads(item.to_json()).items()} for item in event
]
for item in event_json:
self._event_keys_to_byte_string(item)
else:
event_json = {k: v for k, v in json.loads(event.to_json()).items()}
self._event_keys_to_byte_string(event_json)
return nuclio_sdk.event.Event.deserialize(
event_json, nuclio_sdk.event.EventDeserializerKinds.msgpack_raw
)
Expand All @@ -89,7 +142,12 @@ def _event_keys_to_byte_string(self, d):

class TestEventJson(nuclio_sdk.test.TestCase, TestEvent):
def _deserialize_event(self, event):
event_json = {k: v for k, v in json.loads(event.to_json()).items()}
if isinstance(event, list):
event_json = [
{k: v for k, v in json.loads(item.to_json()).items()} for item in event
]
else:
event_json = {k: v for k, v in json.loads(event.to_json()).items()}
return nuclio_sdk.event.Event.deserialize(
json.dumps(event_json), nuclio_sdk.event.EventDeserializerKinds.json
)
Expand Down

0 comments on commit ca8251b

Please sign in to comment.