Skip to content

Commit

Permalink
Merge pull request #405 from aiven/jjaakola-aiven-remove-ujson
Browse files Browse the repository at this point in the history
Remove ujson dependency
  • Loading branch information
tvainika authored May 17, 2022
2 parents 9e3cf20 + 1d14492 commit fa70f43
Show file tree
Hide file tree
Showing 28 changed files with 293 additions and 307 deletions.
1 change: 0 additions & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
[MASTER]
jobs=4
extension-pkg-allow-list=ujson

[MESSAGES CONTROL]
enable=
Expand Down
8 changes: 4 additions & 4 deletions karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
from pathlib import Path
from typing import Dict, IO, List, Optional, Union

import json
import logging
import os
import socket
import ssl
import ujson

Config = Dict[str, Union[None, str, int, bool, List[str], AccessLogger]]
LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -154,13 +154,13 @@ def validate_config(config: Config) -> None:


def write_config(config_path: Path, custom_values: Config) -> None:
config_path.write_text(ujson.dumps(custom_values))
config_path.write_text(json.dumps(custom_values))


def read_config(config_handler: IO) -> Config:
try:
config = ujson.load(config_handler)
except ValueError as ex:
config = json.load(config_handler)
except json.JSONDecodeError as ex:
raise InvalidConfiguration("Configuration is not a valid JSON") from ex

return set_config_defaults(config)
Expand Down
4 changes: 2 additions & 2 deletions karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import asyncio
import base64
import json
import time
import ujson

RECORD_KEYS = ["key", "value", "partition"]
PUBLISH_KEYS = {"records", "value_schema", "value_schema_id", "key_schema", "key_schema_id"}
Expand Down Expand Up @@ -544,7 +544,7 @@ async def serialize(
# not pretty
if ser_format == "json":
# TODO -> get encoding from headers
return ujson.dumps(obj).encode("utf8")
return json.dumps(obj).encode("utf8")
if ser_format == "binary":
return base64.b64decode(obj)
if ser_format in {"avro", "jsonschema", "protobuf"}:
Expand Down
4 changes: 2 additions & 2 deletions karapace/kafka_rest_apis/consumer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

import asyncio
import base64
import json
import logging
import time
import ujson
import uuid

KNOWN_FORMATS = {"json", "avro", "binary", "jsonschema", "protobuf"}
Expand Down Expand Up @@ -506,7 +506,7 @@ async def deserialize(self, bytes_: bytes, fmt: str):
if fmt in {"avro", "jsonschema", "protobuf"}:
return await self.deserializer.deserialize(bytes_)
if fmt == "json":
return ujson.loads(bytes_.decode("utf-8"))
return json.loads(bytes_.decode("utf-8"))
return base64.b64encode(bytes_).decode("utf-8")

def close(self):
Expand Down
12 changes: 6 additions & 6 deletions karapace/master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
from threading import Event, Thread
from typing import Optional, Tuple

import json
import logging
import time
import ujson

# SR group errors
NO_ERROR = 0
Expand All @@ -42,7 +42,7 @@ def protocol_type(self):
def get_identity(self, *, host, port, scheme, json_encode=True):
res = {"version": 1, "host": host, "port": port, "scheme": scheme, "master_eligibility": self.master_eligibility}
if json_encode:
return ujson.dumps(res)
return json.dumps(res)
return res

def group_protocols(self):
Expand All @@ -55,7 +55,7 @@ def _perform_assignment(self, leader_id, protocol, members):
urls = {}
fallback_urls = {}
for member_id, member_data in members:
member_identity = ujson.loads(member_data.decode("utf8"))
member_identity = json.loads(member_data.decode("utf8"))
if member_identity["master_eligibility"] is True:
urls[get_identity_url(member_identity["scheme"], member_identity["host"], member_identity["port"])] = (
member_id,
Expand All @@ -72,7 +72,7 @@ def _perform_assignment(self, leader_id, protocol, members):
# Protocol guarantees there is at least one member thus if urls is empty, fallback_urls cannot be
chosen_url = sorted(fallback_urls, reverse=self.election_strategy.lower() == "highest")[0]
schema_master_id, member_data = fallback_urls[chosen_url]
member_identity = ujson.loads(member_data.decode("utf8"))
member_identity = json.loads(member_data.decode("utf8"))
identity = self.get_identity(
host=member_identity["host"],
port=member_identity["port"],
Expand All @@ -83,7 +83,7 @@ def _perform_assignment(self, leader_id, protocol, members):

assignments = {}
for member_id, member_data in members:
assignments[member_id] = ujson.dumps({"master": schema_master_id, "master_identity": identity, "error": error})
assignments[member_id] = json.dumps({"master": schema_master_id, "master_identity": identity, "error": error})
return assignments

def _on_join_prepare(self, generation, member_id):
Expand All @@ -98,7 +98,7 @@ def _on_join_complete(self, generation, member_id, protocol, member_assignment_b
protocol,
member_assignment_bytes,
)
member_assignment = ujson.loads(member_assignment_bytes.decode("utf8"))
member_assignment = json.loads(member_assignment_bytes.decode("utf8"))
member_identity = member_assignment["master_identity"]

master_url = get_identity_url(
Expand Down
4 changes: 2 additions & 2 deletions karapace/protobuf/exception.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import ujson
import json


class ProtobufParserRuntimeException(Exception):
Expand Down Expand Up @@ -30,7 +30,7 @@ class SchemaParseException(ProtobufException):


def pretty_print_json(obj: str) -> str:
return ujson.dumps(ujson.loads(obj), indent=2)
return json.dumps(json.loads(obj), indent=2)


class ProtobufSchemaResolutionException(ProtobufException):
Expand Down
4 changes: 2 additions & 2 deletions karapace/rapu.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import asyncio
import cgi
import hashlib
import json
import logging
import re
import time
import ujson

SERVER_NAME = "Karapace/{}".format(__version__)
JSON_CONTENT_TYPE = "application/json"
Expand Down Expand Up @@ -281,7 +281,7 @@ async def _handle_request(
_, options = cgi.parse_header(rapu_request.get_header("Content-Type"))
charset = options.get("charset", "utf-8")
body_string = body.decode(charset)
rapu_request.json = ujson.loads(body_string)
rapu_request.json = json.loads(body_string)
except UnicodeDecodeError:
raise HTTPResponse( # pylint: disable=raise-missing-from
body=f"Request body is not valid {charset}", status=HTTPStatus.BAD_REQUEST
Expand Down
18 changes: 9 additions & 9 deletions karapace/schema_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
from typing import Dict, List, Optional, Tuple

import argparse
import json
import logging
import os
import sys
import time
import ujson

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -138,7 +138,7 @@ def close(self):
def request_backup(self):
values = self._export()

ser = ujson.dumps(values)
ser = json.dumps(values)
if self.backup_location:
with open(self.backup_location, mode="w", encoding="utf8") as fp:
fp.write(ser)
Expand All @@ -161,7 +161,7 @@ def restore_backup(self):
values = None
with open(self.backup_location, mode="r", encoding="utf8") as fp:
raw_msg = fp.read()
values = ujson.loads(raw_msg)
values = json.loads(raw_msg)

if not values:
return
Expand All @@ -184,7 +184,7 @@ def export_anonymized_avro_schemas(self):
# Check that the message has key `schema` and type is Avro schema.
# The Avro schemas may have `schemaType` key, if not present the schema is Avro.
if value[1] and "schema" in value[1] and value[1].get("schemaType", "AVRO") == "AVRO":
original_schema = ujson.loads(value[1].get("schema"))
original_schema = json.loads(value[1].get("schema"))
anonymized_schema = anonymize_avro.anonymize(original_schema)
if anonymized_schema:
if "subject" in value[0]:
Expand All @@ -193,7 +193,7 @@ def export_anonymized_avro_schemas(self):
value[1]["subject"] = anonymize_avro.anonymize_name(value[1]["subject"])
value[1]["schema"] = anonymized_schema
anonymized_schemas.append((value[0], value[1]))
ser = ujson.dumps(anonymized_schemas)
ser = json.dumps(anonymized_schemas)
if self.backup_location:
with open(self.backup_location, mode="w", encoding="utf8") as fp:
fp.write(ser)
Expand All @@ -220,15 +220,15 @@ def _export(self) -> List[Tuple[str, Dict[str, str]]]:
for message in messages:
key = message.key.decode("utf8")
try:
key = ujson.loads(key)
except ValueError:
key = json.loads(key)
except json.JSONDecodeError:
LOG.debug("Invalid JSON in message.key: %r, value: %r", message.key, message.value)
value = None
if message.value:
value = message.value.decode("utf8")
try:
value = ujson.loads(value)
except ValueError:
value = json.loads(value)
except json.JSONDecodeError:
LOG.debug("Invalid JSON in message.value: %r, key: %r", message.value, message.key)
values.append((key, value))

Expand Down
9 changes: 4 additions & 5 deletions karapace/schema_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from typing import Any, Dict, Union

import json
import ujson


def parse_avro_schema_definition(s: str) -> AvroSchema:
Expand Down Expand Up @@ -44,7 +43,7 @@ def parse_jsonschema_definition(schema_definition: str) -> Draft7Validator:
Raises:
SchemaError: If `schema_definition` is not a valid Draft7 schema.
"""
schema = ujson.loads(schema_definition)
schema = json.loads(schema_definition)
Draft7Validator.check_schema(schema)
return Draft7Validator(schema)

Expand Down Expand Up @@ -89,7 +88,7 @@ def __init__(self, schema_type: SchemaType, schema_str: str):
def to_dict(self) -> Dict[str, Any]:
if self.schema_type is SchemaType.PROTOBUF:
raise InvalidSchema("Protobuf do not support to_dict serialization")
return ujson.loads(self.schema_str)
return json.loads(self.schema_str)

def __str__(self) -> str:
if self.schema_type == SchemaType.PROTOBUF:
Expand Down Expand Up @@ -119,14 +118,14 @@ def parse(schema_type: SchemaType, schema_str: str) -> "ValidatedTypedSchema":
if schema_type is SchemaType.AVRO:
try:
parsed_schema = parse_avro_schema_definition(schema_str)
except (SchemaParseException, ValueError, TypeError) as e:
except (SchemaParseException, json.JSONDecodeError, TypeError) as e:
raise InvalidSchema from e

elif schema_type is SchemaType.JSONSCHEMA:
try:
parsed_schema = parse_jsonschema_definition(schema_str)
# TypeError - Raised when the user forgets to encode the schema as a string.
except (TypeError, ValueError, SchemaError, AssertionError) as e:
except (TypeError, json.JSONDecodeError, SchemaError, AssertionError) as e:
raise InvalidSchema from e

elif schema_type is SchemaType.PROTOBUF:
Expand Down
14 changes: 7 additions & 7 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
from threading import Event, Lock, Thread
from typing import Any, Dict, Optional

import json
import logging
import ujson

Offset = int
Subject = str
Expand Down Expand Up @@ -260,16 +260,16 @@ def handle_messages(self) -> None:
for _, msgs in raw_msgs.items():
for msg in msgs:
try:
key = ujson.loads(msg.key.decode("utf8"))
except ValueError:
key = json.loads(msg.key.decode("utf8"))
except json.JSONDecodeError:
LOG.exception("Invalid JSON in msg.key")
continue

value = None
if msg.value:
try:
value = ujson.loads(msg.value.decode("utf8"))
except ValueError:
value = json.loads(msg.value.decode("utf8"))
except json.JSONDecodeError:
LOG.exception("Invalid JSON in msg.value")
continue

Expand Down Expand Up @@ -348,8 +348,8 @@ def _handle_msg_schema(self, key: dict, value: Optional[dict]) -> None:
# what is available in the topic.
if schema_type_parsed in [SchemaType.AVRO, SchemaType.JSONSCHEMA]:
try:
schema_str = ujson.dumps(ujson.loads(schema_str), sort_keys=True)
except ValueError:
schema_str = json.dumps(json.loads(schema_str), sort_keys=True)
except json.JSONDecodeError:
LOG.error("Schema is not invalid JSON")
return

Expand Down
3 changes: 2 additions & 1 deletion karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import aiohttp
import async_timeout
import asyncio
import json
import time


Expand Down Expand Up @@ -784,7 +785,7 @@ def write_new_schema_local(
new_schema = ValidatedTypedSchema.parse(schema_type=schema_type, schema_str=body["schema"])
except (InvalidSchema, InvalidSchemaType) as e:
self.log.warning("Invalid schema: %r", body["schema"], exc_info=True)
if isinstance(e.__cause__, (SchemaParseException, ValueError)):
if isinstance(e.__cause__, (SchemaParseException, json.JSONDecodeError)):
human_error = f"{e.__cause__.args[0]}" # pylint: disable=no-member
else:
human_error = "Provided schema is not valid"
Expand Down
4 changes: 2 additions & 2 deletions karapace/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import avro
import avro.schema
import io
import json
import struct
import ujson

START_BYTE = 0x0
HEADER_FORMAT = ">bI"
Expand Down Expand Up @@ -247,7 +247,7 @@ def read_value(config: dict, schema: TypedSchema, bio: io.BytesIO):
reader = DatumReader(writers_schema=schema.schema)
return reader.read(BinaryDecoder(bio))
if schema.schema_type is SchemaType.JSONSCHEMA:
value = ujson.load(bio)
value = json.load(bio)
try:
schema.schema.validate(value)
except ValidationError as e:
Expand Down
4 changes: 2 additions & 2 deletions karapace/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
from types import MappingProxyType
from typing import NoReturn, overload, Union

import json
import kafka.client_async
import logging
import time
import ujson

NS_BLACKOUT_DURATION_SECONDS = 120
LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -71,7 +71,7 @@ def default_json_serialization( # pylint: disable=inconsistent-return-statement


def json_encode(obj, *, sort_keys: bool = True, binary=False):
res = ujson.dumps(
res = json.dumps(
obj,
sort_keys=sort_keys,
default=default_json_serialization,
Expand Down
4 changes: 0 additions & 4 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ warn_no_return = True
warn_unreachable = True
strict_equality = True

[mypy-ujson]
ignore_errors = True
ignore_missing_imports = True

[mypy-karapace.schema_registry_apis]
ignore_errors = True

Expand Down
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ requests==2.27.1
networkx==2.5
python-dateutil==2.8.2
protobuf==3.19.4
ujson==5.1.0
avro==1.11.0
# Patched dependencies
#
Expand Down
Loading

0 comments on commit fa70f43

Please sign in to comment.