diff --git a/README.md b/README.md index 906af3e..a5683e9 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,30 @@ transforms.encode.type=org.onliner.kafka.transforms.JsonSerialize$Value transforms.encode.fields=comma,separated,list,of,fields ``` +### `ConcatFields` + +This transformation concat fields of the original record's data to single string with delimiter. + +The transformation: +- expects the record value/key to be either a `STRUCT` or a `MAP`; + +Exists in two variants: +- `org.onliner.kafka.transforms.ConcatFields$Key` - works on keys; +- `org.onliner.kafka.transforms.ConcatFields$Value` - works on values. + +The transformation defines the following configurations: +- `fields` - List of fields to concat. Cannot be `null` or empty. +- `delimiter` - Delimiter for concat. Cannot be `null` or empty. +- `output` - Output field. Cannot be `null` or empty. + +```properties +transforms=concat +transforms.concat.type=org.onliner.kafka.transforms.ConcatFields$Value +transforms.concat.fields=latitude,longitude +transforms.concat.delimiter=, +transforms.concat.output=location +``` + ## License This project is licensed under the [MIT license](LICENSE). diff --git a/src/main/kotlin/org/onliner/kafka/transforms/ConcatFields.kt b/src/main/kotlin/org/onliner/kafka/transforms/ConcatFields.kt new file mode 100644 index 0000000..77d9205 --- /dev/null +++ b/src/main/kotlin/org/onliner/kafka/transforms/ConcatFields.kt @@ -0,0 +1,173 @@ +package org.onliner.kafka.transforms + +import org.apache.kafka.common.cache.LRUCache +import org.apache.kafka.common.cache.SynchronizedCache +import org.apache.kafka.common.config.ConfigDef +import org.apache.kafka.connect.connector.ConnectRecord +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.Struct +import org.apache.kafka.connect.transforms.Transformation +import org.apache.kafka.connect.transforms.util.Requirements +import org.apache.kafka.connect.transforms.util.SchemaUtil +import org.apache.kafka.connect.transforms.util.SimpleConfig + +@Suppress("TooManyFunctions") +abstract class ConcatFields?> : Transformation { + companion object { + const val OVERVIEW_DOC = "Concat fields in one specified field with delimeter" + + val CONFIG_DEF: ConfigDef = ConfigDef() + .define( + "fields", + ConfigDef.Type.LIST, + ConfigDef.Importance.HIGH, + "List of fields to concat" + ) + .define( + "delimiter", + ConfigDef.Type.STRING, + ConfigDef.Importance.HIGH, + "Delimiter for concat" + ) + .define( + "output", + ConfigDef.Type.STRING, + ConfigDef.Importance.HIGH, + "Output field" + ) + + private val cache = SynchronizedCache(LRUCache(16)) + + private const val PURPOSE = "onliner-kafka-smt-concat-fields" + } + + private lateinit var _fields: List + private lateinit var _delimiter: String + private lateinit var _outputField: String + + override fun configure(props: Map?) { + val config = SimpleConfig(CONFIG_DEF, props) + + _fields = config.getList("fields") + _delimiter = config.getString("delimiter") + _outputField = config.getString("output") + } + + + override fun apply(record: R): R = when { + operatingValue(record) == null -> { + record + } + + operatingSchema(record) == null -> { + applySchemaless(record) + } + + else -> { + applyWithSchema(record) + } + } + + @Suppress("EmptyFunctionBlock") + override fun close() { + } + + override fun config(): ConfigDef = CONFIG_DEF + + protected abstract fun operatingSchema(record: R?): Schema? + protected abstract fun operatingValue(record: R?): Any? + protected abstract fun newRecord(record: R?, schema: Schema?, value: Any?): R + + private fun applySchemaless(record: R): R { + val value = Requirements.requireMap(operatingValue(record), PURPOSE) + val output = mutableListOf() + + for (field in _fields) { + if (value.containsKey(field)) { + output.add(value[field].toString()) + } + } + + value[_outputField] = output.joinToString(separator = _delimiter) + + return newRecord(record, null, value) + } + + private fun applyWithSchema(record: R): R { + val oldValue = Requirements.requireStruct(operatingValue(record), PURPOSE) + val oldSchema = operatingSchema(record) ?: return record + + val newSchema = copySchema(oldSchema) + val newValue = copyValue(oldSchema, newSchema, oldValue) + val output = mutableListOf() + + for (field in _fields) { + val part = newValue.get(field) + + if (part !== null) { + output.add(part.toString()) + } + } + + newValue.put(_outputField, output.joinToString(separator = _delimiter)) + + return newRecord(record, newSchema, newValue) + } + + private fun copySchema(schema: Schema): Schema { + val cached = cache.get(schema) + + if (cached != null) { + return cached + } + + val output = SchemaUtil.copySchemaBasics(schema) + + schema.fields().forEach { field -> output.field(field.name(), field.schema()) } + output.field(_outputField, Schema.STRING_SCHEMA) + + cache.put(schema, output) + + return output + } + + private fun copyValue(oldSchema: Schema, newSchema: Schema, oldValue: Struct): Struct { + val newValue = Struct(newSchema) + + oldSchema.fields().forEach { field -> newValue.put(field.name(), oldValue.get(field)) } + + return newValue + } + + class Key?> : ConcatFields() { + override fun operatingSchema(record: R?): Schema? = record?.keySchema() + + override fun operatingValue(record: R?): Any? = record?.key() + + override fun newRecord(record: R?, schema: Schema?, value: Any?): R = record!!.newRecord( + record.topic(), + record.kafkaPartition(), + schema, + value, + record.valueSchema(), + record.value(), + record.timestamp() + ) + } + + class Value?> : ConcatFields() { + override fun operatingSchema(record: R?): Schema? = record?.valueSchema() + + override fun operatingValue(record: R?): Any? = record?.value() + + override fun newRecord(record: R?, schema: Schema?, value: Any?): R = record!!.newRecord( + record.topic(), + record.kafkaPartition(), + record.keySchema(), + record.key(), + schema, + value, + record.timestamp() + ) + } +} diff --git a/src/test/kotlin/org/onliner/kafka/transforms/ConcatFieldsTest.kt b/src/test/kotlin/org/onliner/kafka/transforms/ConcatFieldsTest.kt new file mode 100644 index 0000000..0dcd0f4 --- /dev/null +++ b/src/test/kotlin/org/onliner/kafka/transforms/ConcatFieldsTest.kt @@ -0,0 +1,100 @@ +package org.onliner.kafka.transforms + +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.SchemaBuilder +import org.apache.kafka.connect.data.Struct +import org.apache.kafka.connect.source.SourceRecord +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions +import kotlin.test.Test + +internal class ConcatFieldsTest { + private val xformKey: ConcatFields = ConcatFields.Key() + private val xformValue: ConcatFields = ConcatFields.Value() + + @AfterEach + fun teardown() { + xformKey.close() + xformValue.close() + } + + @Test + fun handlesNullValue() { + configure(xformValue) + val given = SourceRecord( + null, + null, + "topic", + 0, + null, + null + ) + val expected = null + val actual: Any? = xformValue.apply(given).value() + Assertions.assertEquals(expected, actual) + } + + @Test + fun schemalessValueConcatFields() { + configure(xformValue) + + val original = mapOf( + "id" to 1, + "latitude" to 53.9000000, + "longitude" to -27.5666700, + ) + + val record = SourceRecord(null, null, "test", 0, null, original) + val transformed = xformValue.apply(record).value() as Map<*, *> + + Assertions.assertEquals(1, transformed["id"]) + Assertions.assertEquals(53.9000000, transformed["latitude"]) + Assertions.assertEquals(-27.5666700, transformed["longitude"]) + Assertions.assertEquals("53.9,-27.56667", transformed["location"]) + } + + @Test + fun copyValueSchemaAndConvertFields() { + configure(xformValue) + + val schema = SchemaBuilder + .struct() + .name("name") + .version(1) + .doc("doc") + .field("id", Schema.INT32_SCHEMA) + .field("latitude", Schema.FLOAT32_SCHEMA) + .field("longitude", Schema.FLOAT32_SCHEMA) + .build() + + val value = Struct(schema) + .put("id", 1) + .put("latitude", 53.9000000.toFloat()) + .put("longitude", (-27.5666700).toFloat()) + + val original = SourceRecord(null, null, "test", 0, schema, value) + val transformed: SourceRecord = xformValue.apply(original) + + Assertions.assertEquals(schema.name(), transformed.valueSchema().name()) + Assertions.assertEquals(schema.version(), transformed.valueSchema().version()) + Assertions.assertEquals(schema.doc(), transformed.valueSchema().doc()) + + Assertions.assertEquals(Schema.STRING_SCHEMA, transformed.valueSchema().field("location").schema()) + Assertions.assertEquals("53.9,-27.56667", (transformed.value() as Struct).get("location")) + + Assertions.assertEquals(Schema.INT32_SCHEMA, transformed.valueSchema().field("id").schema()) + Assertions.assertEquals(1, (transformed.value() as Struct).getInt32("id")) + + Assertions.assertEquals(Schema.FLOAT32_SCHEMA, transformed.valueSchema().field("latitude").schema()) + Assertions.assertEquals(53.9000000.toFloat(), (transformed.value() as Struct).getFloat32("latitude")) + + Assertions.assertEquals(Schema.FLOAT32_SCHEMA, transformed.valueSchema().field("longitude").schema()) + Assertions.assertEquals((-27.5666700).toFloat(), (transformed.value() as Struct).getFloat32("longitude")) + } + + private fun configure(transform: ConcatFields) { + val props = mapOf("fields" to listOf("latitude", "longitude"), "delimiter" to ",", "output" to "location") + + transform.configure(props.toMap()) + } +}