Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JSON deserialize transformer added #3

Merged
merged 2 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,30 @@ transforms.encode.type=org.onliner.kafka.transforms.JsonSerialize$Value
transforms.encode.fields=comma,separated,list,of,fields
```

### `JsonDeserialize`

This transformation deserialize JSON strings of the original record's data to structure.

The transformation:
- expects the record value/key to be a `JSON` string;
- expects it to have a specified field;
- expects `JSON` string doesn't contain arrays;

Exists in two variants:
- `org.onliner.kafka.transforms.JsonDeserialize$Key` - works on keys;
- `org.onliner.kafka.transforms.JsonDeserialize$Value` - works on values.

The transformation defines the following configurations:
- `fields` - List of fields to serialize. Cannot be `null` or empty.

- Here's an example of this transformation configuration:

```properties
transforms=decode
transforms.decode.type=org.onliner.kafka.transforms.JsonDeserialize$Value
transforms.decode.fields=comma,separated,list,of,fields
```

### `ConcatFields`

This transformation concat fields of the original record's data to single string with delimiter.
Expand Down
223 changes: 223 additions & 0 deletions src/main/kotlin/org/onliner/kafka/transforms/JsonDeserialize.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
package org.onliner.kafka.transforms

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.common.cache.LRUCache
import org.apache.kafka.common.cache.SynchronizedCache
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.connect.connector.ConnectRecord
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.errors.DataException
import org.apache.kafka.connect.transforms.Transformation
import org.apache.kafka.connect.transforms.util.Requirements
import org.apache.kafka.connect.transforms.util.SchemaUtil
import org.apache.kafka.connect.transforms.util.SimpleConfig

@Suppress("TooManyFunctions")
abstract class JsonDeserialize<R : ConnectRecord<R>?> : Transformation<R> {
companion object {
const val OVERVIEW_DOC = "Deserialize specified fields to JSON structure"

val CONFIG_DEF: ConfigDef = ConfigDef()
.define(
"fields",
ConfigDef.Type.LIST,
ConfigDef.Importance.HIGH,
"List of fields to deserialize"
)

private val cache = SynchronizedCache(LRUCache<Schema, Schema>(16))
private val mapper = ObjectMapper()

private const val PURPOSE = "onliner-kafka-smt-json-decode"
}

private lateinit var _fields: List<String>

override fun configure(props: Map<String?, *>?) {
val config = SimpleConfig(CONFIG_DEF, props)

_fields = config.getList("fields")
}

override fun apply(record: R): R = when {
operatingValue(record) == null -> {
record
}

operatingSchema(record) == null -> {
applySchemaless(record)
}

else -> {
applyWithSchema(record)
}
}

@Suppress("EmptyFunctionBlock")
override fun close() {
}

override fun config(): ConfigDef = CONFIG_DEF

protected abstract fun operatingSchema(record: R?): Schema?
protected abstract fun operatingValue(record: R?): Any?
protected abstract fun newRecord(record: R?, schema: Schema?, value: Any?): R

private fun applySchemaless(record: R): R {
val value = Requirements.requireMap(operatingValue(record), PURPOSE)

for (field in _fields) {
if (!value.containsKey(field)) {
continue;
}

value[field] = convert(value[field])
}

return newRecord(record, null, value)
}

private fun applyWithSchema(record: R): R {
val value = Requirements.requireStruct(operatingValue(record), PURPOSE)
val schema = operatingSchema(record) ?: return record
val converted = hashMapOf<String, JsonNode>()

for (field in schema.fields()) {
if (_fields.contains(field.name())) {
converted[field.name()] = mapper.readTree(value.getString(field.name()))
}
}

val outputSchema = copySchema(schema, converted)
val outputValue = Struct(outputSchema)

for (field in outputSchema.fields()) {
val name = field.name()

if (converted.containsKey(name)) {
outputValue.put(name, asConnectValue(converted[name]!!, field.schema()))
} else {
outputValue.put(name, value.get(name))
}
}

return newRecord(record, outputSchema, outputValue)
}

private fun convert(value: Any?): Any? {
if (value is String) {
return mapper.readTree(value)
}

return value;
}

private fun copySchema(original: Schema, converted: HashMap<String, JsonNode>): Schema {
val cached = cache.get(original)

if (cached != null) {
return cached
}

val output = SchemaUtil.copySchemaBasics(original)

for (field in original.fields()) {
var schema = field.schema()

if (converted.containsKey(field.name())) {
schema = asConnectSchema(converted[field.name()]!!) ?: continue
}

output.field(field.name(), schema)
}

cache.put(original, output)

return output
}

private fun asConnectValue(value: JsonNode, schema: Schema): Any?
{
return when (schema.type()) {
Schema.Type.BOOLEAN -> value.booleanValue()
Schema.Type.FLOAT64 -> value.floatValue()
Schema.Type.FLOAT32 -> value.doubleValue()
Schema.Type.INT16 -> value.shortValue()
Schema.Type.INT32 -> value.intValue()
Schema.Type.INT64 -> value.longValue()
Schema.Type.STRING, Schema.Type.BYTES -> value.textValue()
Schema.Type.STRUCT -> {
val struct = Struct(schema)

for (field in schema.fields()) {
struct.put(field, asConnectValue(value.get(field.name()), field.schema()))
}

struct
}
else -> throw DataException("Couldn't translate unsupported schema type $schema.")
}
}

@Suppress("ComplexMethod")
private fun asConnectSchema(value: JsonNode): Schema? {
return when {
value.isBoolean -> Schema.BOOLEAN_SCHEMA
value.isShort -> Schema.INT16_SCHEMA
value.isInt -> Schema.INT32_SCHEMA
value.isLong -> Schema.INT64_SCHEMA
value.isFloat -> Schema.FLOAT32_SCHEMA
value.isDouble -> Schema.FLOAT64_SCHEMA
value.isBinary -> Schema.BYTES_SCHEMA
value.isTextual -> Schema.STRING_SCHEMA
value.isObject -> {
val builder = SchemaBuilder.struct()

for ((k,v) in value.fields()) {
val kSchema = asConnectSchema(v) ?: continue

builder.field(k, kSchema)
}

builder.build()
}
value.isArray -> throw DataException("JSON arrays unsupported")
else -> null
}
}

class Key<R : ConnectRecord<R>?> : JsonDeserialize<R>() {
override fun operatingSchema(record: R?): Schema? = record?.keySchema()

override fun operatingValue(record: R?): Any? = record?.key()

override fun newRecord(record: R?, schema: Schema?, value: Any?): R = record!!.newRecord(
record.topic(),
record.kafkaPartition(),
schema,
value,
record.valueSchema(),
record.value(),
record.timestamp()
)
}

class Value<R : ConnectRecord<R>?> : JsonDeserialize<R>() {
override fun operatingSchema(record: R?): Schema? = record?.valueSchema()

override fun operatingValue(record: R?): Any? = record?.value()

override fun newRecord(record: R?, schema: Schema?, value: Any?): R = record!!.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
schema,
value,
record.timestamp()
)
}
}
Loading
Loading