Skip to content

Commit

Permalink
Tests for schema copy
Browse files Browse the repository at this point in the history
  • Loading branch information
akarashchuk committed Nov 16, 2023
1 parent c67279f commit 1ebd56e
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 11 deletions.
30 changes: 21 additions & 9 deletions src/main/kotlin/org/onliner/kafka/transforms/ConcatFields.kt
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package org.onliner.kafka.transforms

import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.common.cache.LRUCache
import org.apache.kafka.common.cache.SynchronizedCache
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.connect.connector.ConnectRecord
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.transforms.Transformation
import org.apache.kafka.connect.transforms.util.Requirements
import org.apache.kafka.connect.transforms.util.SchemaUtil
import org.apache.kafka.connect.transforms.util.SimpleConfig

@Suppress("TooManyFunctions")
abstract class ConcatFields<R : ConnectRecord<R>?> : Transformation<R> {
companion object {
const val OVERVIEW_DOC = "Concat fields in one specified field with delimeter"
Expand Down Expand Up @@ -93,21 +94,24 @@ abstract class ConcatFields<R : ConnectRecord<R>?> : Transformation<R> {
}

private fun applyWithSchema(record: R): R {
val value = Requirements.requireStruct(operatingValue(record), PURPOSE)
val schema = operatingSchema(record) ?: return record
val oldValue = Requirements.requireStruct(operatingValue(record), PURPOSE)
val oldSchema = operatingSchema(record) ?: return record

val outputSchema = copySchema(schema)
val newSchema = copySchema(oldSchema)
val newValue = copyValue(oldSchema, newSchema, oldValue)
val output = mutableListOf<String>()

for (field in schema.fields()) {
if (_fields.contains(field.name())) {
output.add(value.getStruct(field.name()).toString())
for (field in _fields) {
val part = newValue.get(field)

if (part !== null) {
output.add(part.toString())
}
}

value.put(_outputField, output.joinToString(separator = _delimiter))
newValue.put(_outputField, output.joinToString(separator = _delimiter))

return newRecord(record, outputSchema, value)
return newRecord(record, newSchema, newValue)
}

private fun copySchema(schema: Schema): Schema {
Expand All @@ -127,6 +131,14 @@ abstract class ConcatFields<R : ConnectRecord<R>?> : Transformation<R> {
return output
}

private fun copyValue(oldSchema: Schema, newSchema: Schema, oldValue: Struct): Struct {
val newValue = Struct(newSchema)

oldSchema.fields().forEach { field -> newValue.put(field.name(), oldValue.get(field)) }

return newValue
}

class Key<R : ConnectRecord<R>?> : ConcatFields<R>() {
override fun operatingSchema(record: R?): Schema? = record?.keySchema()

Expand Down
46 changes: 44 additions & 2 deletions src/test/kotlin/org/onliner/kafka/transforms/ConcatFieldsTest.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.onliner.kafka.transforms

import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
import org.apache.kafka.connect.data.Struct
import org.apache.kafka.connect.source.SourceRecord
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions
Expand All @@ -22,14 +25,53 @@ internal class ConcatFieldsTest {
val original = mapOf(
"id" to 1,
"latitude" to 53.9000000,
"longitude" to 27.5666700,
"longitude" to -27.5666700,
)

val record = SourceRecord(null, null, "test", 0, null, original)
val transformed = xformValue.apply(record).value() as Map<*, *>

Assertions.assertEquals(1, transformed["id"])
Assertions.assertEquals("53.9,27.56667", transformed["location"])
Assertions.assertEquals("53.9,-27.56667", transformed["location"])
}

@Test
fun copyValueSchemaAndConvertFields() {
configure(xformValue, listOf("latitude", "longitude"), ",", "location")

val schema = SchemaBuilder
.struct()
.name("name")
.version(1)
.doc("doc")
.field("id", Schema.INT32_SCHEMA)
.field("latitude", Schema.FLOAT32_SCHEMA)
.field("longitude", Schema.FLOAT32_SCHEMA)
.build()

val value = Struct(schema)
.put("id", 1)
.put("latitude", 53.9000000.toFloat())
.put("longitude", (-27.5666700).toFloat())

val original = SourceRecord(null, null, "test", 0, schema, value)
val transformed: SourceRecord = xformValue.apply(original)

Assertions.assertEquals(schema.name(), transformed.valueSchema().name())
Assertions.assertEquals(schema.version(), transformed.valueSchema().version())
Assertions.assertEquals(schema.doc(), transformed.valueSchema().doc())

Assertions.assertEquals(Schema.STRING_SCHEMA, transformed.valueSchema().field("location").schema())
Assertions.assertEquals("53.9,-27.56667", (transformed.value() as Struct).get("location"))

Assertions.assertEquals(Schema.INT32_SCHEMA, transformed.valueSchema().field("id").schema())
Assertions.assertEquals(1, (transformed.value() as Struct).getInt32("id"))

Assertions.assertEquals(Schema.FLOAT32_SCHEMA, transformed.valueSchema().field("latitude").schema())
Assertions.assertEquals(53.9000000.toFloat(), (transformed.value() as Struct).getFloat32("latitude"))

Assertions.assertEquals(Schema.FLOAT32_SCHEMA, transformed.valueSchema().field("longitude").schema())
Assertions.assertEquals((-27.5666700).toFloat(), (transformed.value() as Struct).getFloat32("longitude"))
}

private fun configure(
Expand Down

0 comments on commit 1ebd56e

Please sign in to comment.