From 3d569b1c8541b113e60f51bbb5de99e52b6c5bee Mon Sep 17 00:00:00 2001 From: TomerShor <90552140+TomerShor@users.noreply.github.com> Date: Thu, 11 Aug 2022 15:28:46 +0300 Subject: [PATCH] Add QualifiedOffset object to store shard data (#38) * Introduce explicit ack methods * Add QualifiedOffset object to store shard data * cr * docstring fix * minor --- nuclio_sdk/__init__.py | 1 + nuclio_sdk/platform.py | 10 +++++----- nuclio_sdk/qualified_offset.py | 31 +++++++++++++++++++++++++++++++ 3 files changed, 37 insertions(+), 5 deletions(-) create mode 100644 nuclio_sdk/qualified_offset.py diff --git a/nuclio_sdk/__init__.py b/nuclio_sdk/__init__.py index 197630b..858dd56 100644 --- a/nuclio_sdk/__init__.py +++ b/nuclio_sdk/__init__.py @@ -18,3 +18,4 @@ from nuclio_sdk.platform import Platform from nuclio_sdk.response import Response from nuclio_sdk.exceptions import ExceptionWithResponse +from nuclio_sdk.qualified_offset import QualifiedOffset diff --git a/nuclio_sdk/platform.py b/nuclio_sdk/platform.py index 0d58a49..ec0f398 100644 --- a/nuclio_sdk/platform.py +++ b/nuclio_sdk/platform.py @@ -36,14 +36,14 @@ def __init__( self._control_callback = on_control_callback - async def explicit_ack(self, event): + async def explicit_ack(self, qualified_offset): """ - Notifying the processor to ack a stream message + Notifying the processor to ack on a qualified offset - :param event - :type event + :param qualified_offset: the qualified offset to ack + :type qualified_offset: QualifiedOffset """ - message = event.compile_explicit_ack_message() + message = qualified_offset.compile_explicit_ack_message() if self._control_callback: await self._control_callback(message) else: diff --git a/nuclio_sdk/qualified_offset.py b/nuclio_sdk/qualified_offset.py new file mode 100644 index 0000000..052ea60 --- /dev/null +++ b/nuclio_sdk/qualified_offset.py @@ -0,0 +1,31 @@ +# Copyright 2017 The Nuclio Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class QualifiedOffset(object): + def __init__(self, topic, partition, offset): + self.topic = topic + self.partition = partition + self.offset = offset + + @staticmethod + def from_event(event): + return QualifiedOffset(event.topic, event.partition, event.offset) + + def compile_explicit_ack_message(self): + return { + "topic": self.topic, + "partition": self.partition, + "offset": self.offset, + }