diff --git a/tests/test_client.py b/tests/test_client.py index ab0ab0a..8224be0 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -27,6 +27,7 @@ import v3io.dataplane.output import v3io.dataplane.response import v3io.logger +from v3io.dataplane.kv_large_string import LARGE_STRING_MIN_SIZE class Test(unittest.TestCase): @@ -431,6 +432,7 @@ def _get_float_array(): "array_with_ints": _get_int_array(), "array_with_floats": _get_float_array(), "now": datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc), + "large_string": "a" * 10 * LARGE_STRING_MIN_SIZE, } } diff --git a/v3io/dataplane/kv_large_string.py b/v3io/dataplane/kv_large_string.py new file mode 100644 index 0000000..dac43c6 --- /dev/null +++ b/v3io/dataplane/kv_large_string.py @@ -0,0 +1,33 @@ +# Copyright 2024 Iguazio +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import zlib + +LARGE_STRING_MIN_SIZE = 60000 + +prefix = b'_v3io_large_string' + +def is_large_bstring(attribute_value): + return attribute_value[:len(prefix)] == prefix + +def large_bstring_to_string(attribute_value): + compressed_value=attribute_value[len(prefix):] + return zlib.decompress(compressed_value).decode('utf-8') + +def string_to_large_bstring(attribute_value): + bvalue = zlib.compress(attribute_value.encode('utf-8')) + return prefix + bvalue + + diff --git a/v3io/dataplane/output.py b/v3io/dataplane/output.py index 9b81942..9721036 100644 --- a/v3io/dataplane/output.py +++ b/v3io/dataplane/output.py @@ -18,6 +18,7 @@ import v3io.dataplane.kv_array import v3io.dataplane.kv_timestamp +from v3io.dataplane.kv_large_string import is_large_bstring, large_bstring_to_string class Output(object): @@ -33,12 +34,14 @@ def _decode_typed_attributes(self, typed_attributes): decoded_attribute = float(attribute_value) elif attribute_type == "B": decoded_attribute = base64.b64decode(attribute_value) - - # try to decode as an array - try: - decoded_attribute = v3io.dataplane.kv_array.decode(decoded_attribute) - except BaseException: - pass + if is_large_bstring(decoded_attribute): + decoded_attribute = large_bstring_to_string(decoded_attribute) + else: + # try to decode as an array + try: + decoded_attribute = v3io.dataplane.kv_array.decode(decoded_attribute) + except BaseException: + pass elif attribute_type == "S": if type(attribute_value) in [float, int]: diff --git a/v3io/dataplane/request.py b/v3io/dataplane/request.py index a74adc0..029e8d8 100644 --- a/v3io/dataplane/request.py +++ b/v3io/dataplane/request.py @@ -29,6 +29,10 @@ import v3io.common.helpers import v3io.dataplane.kv_array import v3io.dataplane.kv_timestamp +from v3io.dataplane.kv_large_string import ( + LARGE_STRING_MIN_SIZE, + string_to_large_bstring, +) # # Request @@ -420,11 +424,21 @@ def _dict_to_typed_attributes(d): type_value = None if isinstance(value, future.utils.text_type): - type_key = "S" - type_value = value + if len(value) > LARGE_STRING_MIN_SIZE: + type_key = "B" + type_value = string_to_large_bstring(value) + type_value = _to_base64(type_value) + else: + type_key = "S" + type_value = value elif isinstance(value, future.utils.string_types): - type_key = "S" type_value = str(value) + if len(type_value) > LARGE_STRING_MIN_SIZE: + type_key = "B" + type_value = string_to_large_bstring(type_value) + type_value = _to_base64(type_value) + else: + type_key = "S" elif attribute_type in [int, float]: type_key = "N" type_value = str(value)