diff --git a/README.md b/README.md index a5683e9..fd34268 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,30 @@ transforms.encode.type=org.onliner.kafka.transforms.JsonSerialize$Value transforms.encode.fields=comma,separated,list,of,fields ``` +### `JsonDeserialize` + +This transformation deserialize JSON strings of the original record's data to structure. + +The transformation: +- expects the record value/key to be a `JSON` string; +- expects it to have a specified field; +- expects `JSON` string doesn't contain arrays; + +Exists in two variants: +- `org.onliner.kafka.transforms.JsonDeserialize$Key` - works on keys; +- `org.onliner.kafka.transforms.JsonDeserialize$Value` - works on values. + +The transformation defines the following configurations: +- `fields` - List of fields to serialize. Cannot be `null` or empty. + +- Here's an example of this transformation configuration: + +```properties +transforms=decode +transforms.decode.type=org.onliner.kafka.transforms.JsonDeserialize$Value +transforms.decode.fields=comma,separated,list,of,fields +``` + ### `ConcatFields` This transformation concat fields of the original record's data to single string with delimiter. diff --git a/src/main/kotlin/org/onliner/kafka/transforms/JsonDeserialize.kt b/src/main/kotlin/org/onliner/kafka/transforms/JsonDeserialize.kt new file mode 100644 index 0000000..05f2288 --- /dev/null +++ b/src/main/kotlin/org/onliner/kafka/transforms/JsonDeserialize.kt @@ -0,0 +1,223 @@ +package org.onliner.kafka.transforms + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.kafka.common.cache.LRUCache +import org.apache.kafka.common.cache.SynchronizedCache +import org.apache.kafka.common.config.ConfigDef +import org.apache.kafka.connect.connector.ConnectRecord +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.SchemaBuilder +import org.apache.kafka.connect.data.Struct +import org.apache.kafka.connect.errors.DataException +import org.apache.kafka.connect.transforms.Transformation +import org.apache.kafka.connect.transforms.util.Requirements +import org.apache.kafka.connect.transforms.util.SchemaUtil +import org.apache.kafka.connect.transforms.util.SimpleConfig + +@Suppress("TooManyFunctions") +abstract class JsonDeserialize?> : Transformation { + companion object { + const val OVERVIEW_DOC = "Deserialize specified fields to JSON structure" + + val CONFIG_DEF: ConfigDef = ConfigDef() + .define( + "fields", + ConfigDef.Type.LIST, + ConfigDef.Importance.HIGH, + "List of fields to deserialize" + ) + + private val cache = SynchronizedCache(LRUCache(16)) + private val mapper = ObjectMapper() + + private const val PURPOSE = "onliner-kafka-smt-json-decode" + } + + private lateinit var _fields: List + + override fun configure(props: Map?) { + val config = SimpleConfig(CONFIG_DEF, props) + + _fields = config.getList("fields") + } + + override fun apply(record: R): R = when { + operatingValue(record) == null -> { + record + } + + operatingSchema(record) == null -> { + applySchemaless(record) + } + + else -> { + applyWithSchema(record) + } + } + + @Suppress("EmptyFunctionBlock") + override fun close() { + } + + override fun config(): ConfigDef = CONFIG_DEF + + protected abstract fun operatingSchema(record: R?): Schema? + protected abstract fun operatingValue(record: R?): Any? + protected abstract fun newRecord(record: R?, schema: Schema?, value: Any?): R + + private fun applySchemaless(record: R): R { + val value = Requirements.requireMap(operatingValue(record), PURPOSE) + + for (field in _fields) { + if (!value.containsKey(field)) { + continue; + } + + value[field] = convert(value[field]) + } + + return newRecord(record, null, value) + } + + private fun applyWithSchema(record: R): R { + val value = Requirements.requireStruct(operatingValue(record), PURPOSE) + val schema = operatingSchema(record) ?: return record + val converted = hashMapOf() + + for (field in schema.fields()) { + if (_fields.contains(field.name())) { + converted[field.name()] = mapper.readTree(value.getString(field.name())) + } + } + + val outputSchema = copySchema(schema, converted) + val outputValue = Struct(outputSchema) + + for (field in outputSchema.fields()) { + val name = field.name() + + if (converted.containsKey(name)) { + outputValue.put(name, asConnectValue(converted[name]!!, field.schema())) + } else { + outputValue.put(name, value.get(name)) + } + } + + return newRecord(record, outputSchema, outputValue) + } + + private fun convert(value: Any?): Any? { + if (value is String) { + return mapper.readTree(value) + } + + return value; + } + + private fun copySchema(original: Schema, converted: HashMap): Schema { + val cached = cache.get(original) + + if (cached != null) { + return cached + } + + val output = SchemaUtil.copySchemaBasics(original) + + for (field in original.fields()) { + var schema = field.schema() + + if (converted.containsKey(field.name())) { + schema = asConnectSchema(converted[field.name()]!!) ?: continue + } + + output.field(field.name(), schema) + } + + cache.put(original, output) + + return output + } + + private fun asConnectValue(value: JsonNode, schema: Schema): Any? + { + return when (schema.type()) { + Schema.Type.BOOLEAN -> value.booleanValue() + Schema.Type.FLOAT64 -> value.floatValue() + Schema.Type.FLOAT32 -> value.doubleValue() + Schema.Type.INT16 -> value.shortValue() + Schema.Type.INT32 -> value.intValue() + Schema.Type.INT64 -> value.longValue() + Schema.Type.STRING, Schema.Type.BYTES -> value.textValue() + Schema.Type.STRUCT -> { + val struct = Struct(schema) + + for (field in schema.fields()) { + struct.put(field, asConnectValue(value.get(field.name()), field.schema())) + } + + struct + } + else -> throw DataException("Couldn't translate unsupported schema type $schema.") + } + } + + @Suppress("ComplexMethod") + private fun asConnectSchema(value: JsonNode): Schema? { + return when { + value.isBoolean -> Schema.BOOLEAN_SCHEMA + value.isShort -> Schema.INT16_SCHEMA + value.isInt -> Schema.INT32_SCHEMA + value.isLong -> Schema.INT64_SCHEMA + value.isFloat -> Schema.FLOAT32_SCHEMA + value.isDouble -> Schema.FLOAT64_SCHEMA + value.isBinary -> Schema.BYTES_SCHEMA + value.isTextual -> Schema.STRING_SCHEMA + value.isObject -> { + val builder = SchemaBuilder.struct() + + for ((k,v) in value.fields()) { + val kSchema = asConnectSchema(v) ?: continue + + builder.field(k, kSchema) + } + + builder.build() + } + value.isArray -> throw DataException("JSON arrays unsupported") + else -> null + } + } + + class Key?> : JsonDeserialize() { + override fun operatingSchema(record: R?): Schema? = record?.keySchema() + + override fun operatingValue(record: R?): Any? = record?.key() + + override fun newRecord(record: R?, schema: Schema?, value: Any?): R = record!!.newRecord( + record.topic(), + record.kafkaPartition(), + schema, + value, + record.valueSchema(), + record.value(), + record.timestamp() + ) + } + + class Value?> : JsonDeserialize() { + override fun operatingSchema(record: R?): Schema? = record?.valueSchema() + + override fun operatingValue(record: R?): Any? = record?.value() + + override fun newRecord(record: R?, schema: Schema?, value: Any?): R = record!!.newRecord( + record.topic(), + record.kafkaPartition(), + record.keySchema(), + record.key(), + schema, + value, + record.timestamp() + ) + } +} diff --git a/src/test/kotlin/org/onliner/kafka/transforms/JsonDeserializeTest.kt b/src/test/kotlin/org/onliner/kafka/transforms/JsonDeserializeTest.kt new file mode 100644 index 0000000..2733864 --- /dev/null +++ b/src/test/kotlin/org/onliner/kafka/transforms/JsonDeserializeTest.kt @@ -0,0 +1,203 @@ +package org.onliner.kafka.transforms + +import com.fasterxml.jackson.databind.node.ObjectNode +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.SchemaBuilder +import org.apache.kafka.connect.data.Struct +import org.apache.kafka.connect.errors.DataException +import org.apache.kafka.connect.source.SourceRecord +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.* +import java.util.* +import kotlin.test.Test +import kotlin.test.assertNull + +internal class JsonDeserializeTest { + private val xformKey: JsonDeserialize = JsonDeserialize.Key() + private val xformValue: JsonDeserialize = JsonDeserialize.Value() + + @AfterEach + fun teardown() { + xformKey.close() + xformValue.close() + } + + @Test + fun handlesNullValue() { + configure(xformValue) + val given = SourceRecord( + null, + null, + "topic", + 0, + null, + null + ) + val expected = null + val actual: Any? = xformValue.apply(given).value() + assertEquals(expected, actual) + } + + @Test + fun handlesNullKey() { + configure(xformKey) + val given = SourceRecord( + null, + null, + "topic", + 0, + null, + null, + null, + null + ) + val expected = null + val actual: Any? = xformKey.apply(given).key() + assertEquals(expected, actual) + } + + @Test + fun copyValueSchemaAndConvertFields() { + configure(xformValue, "payload") + + val schema = SchemaBuilder + .struct() + .name("name") + .version(1) + .doc("doc") + .field("payload", Schema.STRING_SCHEMA) + .field("string", Schema.STRING_SCHEMA) + .build() + + val value = Struct(schema) + .put("payload", "{\"foo\":\"bar\",\"baz\":false}") + .put("string", "string") + + val original = SourceRecord(null, null, "test", 0, schema, value) + val transformed: SourceRecord = xformValue.apply(original) + val outputSchema = transformed.valueSchema() + val payloadStruct = (transformed.value() as Struct).getStruct("payload") + val payloadSchema = outputSchema.field("payload").schema() + + assertEquals(schema.name(), outputSchema.name()) + assertEquals(schema.version(), outputSchema.version()) + assertEquals(schema.doc(), outputSchema.doc()) + + assertEquals(payloadSchema.field("foo").schema(), Schema.STRING_SCHEMA) + assertEquals("bar", payloadStruct.getString("foo")) + assertEquals(payloadSchema.field("baz").schema(), Schema.BOOLEAN_SCHEMA) + assertEquals(false, payloadStruct.getBoolean("baz")) + + assertEquals(Schema.STRING_SCHEMA, outputSchema.field("string").schema()) + assertEquals("string", (transformed.value() as Struct).getString("string")) + } + + @Test + fun schemalessValueConvertField() { + configure(xformValue, "payload") + val original = mapOf( + "int32" to 42, + "payload" to "{\"foo\":\"bar\",\"baz\":false}" + ) + + val record = SourceRecord(null, null, "test", 0, null, original) + val transformed = xformValue.apply(record).value() as Map<*, *> + + assertEquals(42, transformed["int32"]) + + assertInstanceOf(ObjectNode::class.java, transformed["payload"]) + + val payload = transformed["payload"] as ObjectNode + + assertEquals("bar", payload.get("foo").textValue()) + assertEquals(false, payload.get("baz").booleanValue()) + } + + @Test + fun schemalessValueConvertNullField() { + configure(xformValue, "payload") + val original = mapOf( + "int32" to 42, + "payload" to null + ) + + val record = SourceRecord(null, null, "test", 0, null, original) + val transformed = xformValue.apply(record).value() as Map<*, *> + + assertEquals(42, transformed["int32"]) + assertNull(transformed["payload"]) + } + + @Test + fun passUnknownSchemaFields() { + configure(xformValue, "unknown") + val schema = SchemaBuilder + .struct() + .name("name") + .version(1) + .doc("doc") + .field("int32", Schema.INT32_SCHEMA) + .build() + + val expected = Struct(schema).put("int32", 42) + val original = SourceRecord(null, null, "test", 0, schema, expected) + val transformed: SourceRecord = xformValue.apply(original) + + assertEquals(schema.name(), transformed.valueSchema().name()) + assertEquals(schema.version(), transformed.valueSchema().version()) + assertEquals(schema.doc(), transformed.valueSchema().doc()) + assertEquals(Schema.INT32_SCHEMA, transformed.valueSchema().field("int32").schema()) + assertEquals(42, (transformed.value() as Struct).getInt32("int32")) + } + + @Test + fun topLevelStructRequired() { + configure(xformValue) + assertThrows(DataException::class.java) { + xformValue.apply( + SourceRecord( + null, null, + "topic", 0, Schema.INT32_SCHEMA, 42 + ) + ) + } + } + + @Test + fun topLevelMapRequired() { + configure(xformValue) + assertThrows(DataException::class.java) { + xformValue.apply( + SourceRecord( + null, null, + "topic", 0, null, 42 + ) + ) + } + } + + @Test + fun testOptionalStruct() { + configure(xformValue) + val builder = SchemaBuilder.struct().optional() + builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA) + val schema = builder.build() + val transformed: SourceRecord = xformValue.apply( + SourceRecord( + null, null, + "topic", 0, + schema, null + ) + ) + assertEquals(Schema.Type.STRUCT, transformed.valueSchema().type()) + assertNull(transformed.value()) + } + + private fun configure(transform: JsonDeserialize, fields: String = "") { + val props: MutableMap = HashMap() + + props["fields"] = fields + + transform.configure(props.toMap()) + } +}