Skip to content

Commit

Permalink
Added Concat Fields SMT (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarashchuk authored Dec 4, 2023
1 parent a48f50b commit 0d5c34d
Show file tree
Hide file tree
Showing 3 changed files with 297 additions and 0 deletions.
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,30 @@ transforms.encode.type=org.onliner.kafka.transforms.JsonSerialize$Value
transforms.encode.fields=comma,separated,list,of,fields
```

### `ConcatFields`

This transformation concat fields of the original record's data to single string with delimiter.

The transformation:
- expects the record value/key to be either a `STRUCT` or a `MAP`;

Exists in two variants:
- `org.onliner.kafka.transforms.ConcatFields$Key` - works on keys;
- `org.onliner.kafka.transforms.ConcatFields$Value` - works on values.

The transformation defines the following configurations:
- `fields` - List of fields to concat. Cannot be `null` or empty.
- `delimiter` - Delimiter for concat. Cannot be `null` or empty.
- `output` - Output field. Cannot be `null` or empty.

```properties
transforms=concat
transforms.concat.type=org.onliner.kafka.transforms.ConcatFields$Value
transforms.concat.fields=latitude,longitude
transforms.concat.delimiter=,
transforms.concat.output=location
```

## License

This project is licensed under the [MIT license](LICENSE).
Expand Down
173 changes: 173 additions & 0 deletions src/main/kotlin/org/onliner/kafka/transforms/ConcatFields.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package org.onliner.kafka.transforms

import org.apache.kafka.common.cache.LRUCache
import org.apache.kafka.common.cache.SynchronizedCache
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.connect.connector.ConnectRecord
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.transforms.Transformation
import org.apache.kafka.connect.transforms.util.Requirements
import org.apache.kafka.connect.transforms.util.SchemaUtil
import org.apache.kafka.connect.transforms.util.SimpleConfig

@Suppress("TooManyFunctions")
abstract class ConcatFields<R : ConnectRecord<R>?> : Transformation<R> {
companion object {
const val OVERVIEW_DOC = "Concat fields in one specified field with delimeter"

val CONFIG_DEF: ConfigDef = ConfigDef()
.define(
"fields",
ConfigDef.Type.LIST,
ConfigDef.Importance.HIGH,
"List of fields to concat"
)
.define(
"delimiter",
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
"Delimiter for concat"
)
.define(
"output",
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
"Output field"
)

private val cache = SynchronizedCache(LRUCache<Schema, Schema>(16))

private const val PURPOSE = "onliner-kafka-smt-concat-fields"
}

private lateinit var _fields: List<String>
private lateinit var _delimiter: String
private lateinit var _outputField: String

override fun configure(props: Map<String?, *>?) {
val config = SimpleConfig(CONFIG_DEF, props)

_fields = config.getList("fields")
_delimiter = config.getString("delimiter")
_outputField = config.getString("output")
}


override fun apply(record: R): R = when {
operatingValue(record) == null -> {
record
}

operatingSchema(record) == null -> {
applySchemaless(record)
}

else -> {
applyWithSchema(record)
}
}

@Suppress("EmptyFunctionBlock")
override fun close() {
}

override fun config(): ConfigDef = CONFIG_DEF

protected abstract fun operatingSchema(record: R?): Schema?
protected abstract fun operatingValue(record: R?): Any?
protected abstract fun newRecord(record: R?, schema: Schema?, value: Any?): R

private fun applySchemaless(record: R): R {
val value = Requirements.requireMap(operatingValue(record), PURPOSE)
val output = mutableListOf<String>()

for (field in _fields) {
if (value.containsKey(field)) {
output.add(value[field].toString())
}
}

value[_outputField] = output.joinToString(separator = _delimiter)

return newRecord(record, null, value)
}

private fun applyWithSchema(record: R): R {
val oldValue = Requirements.requireStruct(operatingValue(record), PURPOSE)
val oldSchema = operatingSchema(record) ?: return record

val newSchema = copySchema(oldSchema)
val newValue = copyValue(oldSchema, newSchema, oldValue)
val output = mutableListOf<String>()

for (field in _fields) {
val part = newValue.get(field)

if (part !== null) {
output.add(part.toString())
}
}

newValue.put(_outputField, output.joinToString(separator = _delimiter))

return newRecord(record, newSchema, newValue)
}

private fun copySchema(schema: Schema): Schema {
val cached = cache.get(schema)

if (cached != null) {
return cached
}

val output = SchemaUtil.copySchemaBasics(schema)

schema.fields().forEach { field -> output.field(field.name(), field.schema()) }
output.field(_outputField, Schema.STRING_SCHEMA)

cache.put(schema, output)

return output
}

private fun copyValue(oldSchema: Schema, newSchema: Schema, oldValue: Struct): Struct {
val newValue = Struct(newSchema)

oldSchema.fields().forEach { field -> newValue.put(field.name(), oldValue.get(field)) }

return newValue
}

class Key<R : ConnectRecord<R>?> : ConcatFields<R>() {
override fun operatingSchema(record: R?): Schema? = record?.keySchema()

override fun operatingValue(record: R?): Any? = record?.key()

override fun newRecord(record: R?, schema: Schema?, value: Any?): R = record!!.newRecord(
record.topic(),
record.kafkaPartition(),
schema,
value,
record.valueSchema(),
record.value(),
record.timestamp()
)
}

class Value<R : ConnectRecord<R>?> : ConcatFields<R>() {
override fun operatingSchema(record: R?): Schema? = record?.valueSchema()

override fun operatingValue(record: R?): Any? = record?.value()

override fun newRecord(record: R?, schema: Schema?, value: Any?): R = record!!.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
schema,
value,
record.timestamp()
)
}
}
100 changes: 100 additions & 0 deletions src/test/kotlin/org/onliner/kafka/transforms/ConcatFieldsTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package org.onliner.kafka.transforms

import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.source.SourceRecord
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions
import kotlin.test.Test

internal class ConcatFieldsTest {
private val xformKey: ConcatFields<SourceRecord> = ConcatFields.Key()
private val xformValue: ConcatFields<SourceRecord> = ConcatFields.Value()

@AfterEach
fun teardown() {
xformKey.close()
xformValue.close()
}

@Test
fun handlesNullValue() {
configure(xformValue)
val given = SourceRecord(
null,
null,
"topic",
0,
null,
null
)
val expected = null
val actual: Any? = xformValue.apply(given).value()
Assertions.assertEquals(expected, actual)
}

@Test
fun schemalessValueConcatFields() {
configure(xformValue)

val original = mapOf(
"id" to 1,
"latitude" to 53.9000000,
"longitude" to -27.5666700,
)

val record = SourceRecord(null, null, "test", 0, null, original)
val transformed = xformValue.apply(record).value() as Map<*, *>

Assertions.assertEquals(1, transformed["id"])
Assertions.assertEquals(53.9000000, transformed["latitude"])
Assertions.assertEquals(-27.5666700, transformed["longitude"])
Assertions.assertEquals("53.9,-27.56667", transformed["location"])
}

@Test
fun copyValueSchemaAndConvertFields() {
configure(xformValue)

val schema = SchemaBuilder
.struct()
.name("name")
.version(1)
.doc("doc")
.field("id", Schema.INT32_SCHEMA)
.field("latitude", Schema.FLOAT32_SCHEMA)
.field("longitude", Schema.FLOAT32_SCHEMA)
.build()

val value = Struct(schema)
.put("id", 1)
.put("latitude", 53.9000000.toFloat())
.put("longitude", (-27.5666700).toFloat())

val original = SourceRecord(null, null, "test", 0, schema, value)
val transformed: SourceRecord = xformValue.apply(original)

Assertions.assertEquals(schema.name(), transformed.valueSchema().name())
Assertions.assertEquals(schema.version(), transformed.valueSchema().version())
Assertions.assertEquals(schema.doc(), transformed.valueSchema().doc())

Assertions.assertEquals(Schema.STRING_SCHEMA, transformed.valueSchema().field("location").schema())
Assertions.assertEquals("53.9,-27.56667", (transformed.value() as Struct).get("location"))

Assertions.assertEquals(Schema.INT32_SCHEMA, transformed.valueSchema().field("id").schema())
Assertions.assertEquals(1, (transformed.value() as Struct).getInt32("id"))

Assertions.assertEquals(Schema.FLOAT32_SCHEMA, transformed.valueSchema().field("latitude").schema())
Assertions.assertEquals(53.9000000.toFloat(), (transformed.value() as Struct).getFloat32("latitude"))

Assertions.assertEquals(Schema.FLOAT32_SCHEMA, transformed.valueSchema().field("longitude").schema())
Assertions.assertEquals((-27.5666700).toFloat(), (transformed.value() as Struct).getFloat32("longitude"))
}

private fun configure(transform: ConcatFields<SourceRecord>) {
val props = mapOf("fields" to listOf("latitude", "longitude"), "delimiter" to ",", "output" to "location")

transform.configure(props.toMap())
}
}

0 comments on commit 0d5c34d

Please sign in to comment.