From 1ebd56ed55231a4aa00ee293c2bb93aa6fd1f0f1 Mon Sep 17 00:00:00 2001 From: Alexander Karashchuk Date: Thu, 16 Nov 2023 10:45:18 +0300 Subject: [PATCH] Tests for schema copy --- .../onliner/kafka/transforms/ConcatFields.kt | 30 ++++++++---- .../kafka/transforms/ConcatFieldsTest.kt | 46 ++++++++++++++++++- 2 files changed, 65 insertions(+), 11 deletions(-) diff --git a/src/main/kotlin/org/onliner/kafka/transforms/ConcatFields.kt b/src/main/kotlin/org/onliner/kafka/transforms/ConcatFields.kt index ad39a90..77d9205 100644 --- a/src/main/kotlin/org/onliner/kafka/transforms/ConcatFields.kt +++ b/src/main/kotlin/org/onliner/kafka/transforms/ConcatFields.kt @@ -1,16 +1,17 @@ package org.onliner.kafka.transforms -import com.fasterxml.jackson.databind.ObjectMapper import org.apache.kafka.common.cache.LRUCache import org.apache.kafka.common.cache.SynchronizedCache import org.apache.kafka.common.config.ConfigDef import org.apache.kafka.connect.connector.ConnectRecord import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.Struct import org.apache.kafka.connect.transforms.Transformation import org.apache.kafka.connect.transforms.util.Requirements import org.apache.kafka.connect.transforms.util.SchemaUtil import org.apache.kafka.connect.transforms.util.SimpleConfig +@Suppress("TooManyFunctions") abstract class ConcatFields?> : Transformation { companion object { const val OVERVIEW_DOC = "Concat fields in one specified field with delimeter" @@ -93,21 +94,24 @@ abstract class ConcatFields?> : Transformation { } private fun applyWithSchema(record: R): R { - val value = Requirements.requireStruct(operatingValue(record), PURPOSE) - val schema = operatingSchema(record) ?: return record + val oldValue = Requirements.requireStruct(operatingValue(record), PURPOSE) + val oldSchema = operatingSchema(record) ?: return record - val outputSchema = copySchema(schema) + val newSchema = copySchema(oldSchema) + val newValue = copyValue(oldSchema, newSchema, oldValue) val output = mutableListOf() - for (field in schema.fields()) { - if (_fields.contains(field.name())) { - output.add(value.getStruct(field.name()).toString()) + for (field in _fields) { + val part = newValue.get(field) + + if (part !== null) { + output.add(part.toString()) } } - value.put(_outputField, output.joinToString(separator = _delimiter)) + newValue.put(_outputField, output.joinToString(separator = _delimiter)) - return newRecord(record, outputSchema, value) + return newRecord(record, newSchema, newValue) } private fun copySchema(schema: Schema): Schema { @@ -127,6 +131,14 @@ abstract class ConcatFields?> : Transformation { return output } + private fun copyValue(oldSchema: Schema, newSchema: Schema, oldValue: Struct): Struct { + val newValue = Struct(newSchema) + + oldSchema.fields().forEach { field -> newValue.put(field.name(), oldValue.get(field)) } + + return newValue + } + class Key?> : ConcatFields() { override fun operatingSchema(record: R?): Schema? = record?.keySchema() diff --git a/src/test/kotlin/org/onliner/kafka/transforms/ConcatFieldsTest.kt b/src/test/kotlin/org/onliner/kafka/transforms/ConcatFieldsTest.kt index 99953b0..2b1d1ed 100644 --- a/src/test/kotlin/org/onliner/kafka/transforms/ConcatFieldsTest.kt +++ b/src/test/kotlin/org/onliner/kafka/transforms/ConcatFieldsTest.kt @@ -1,5 +1,8 @@ package org.onliner.kafka.transforms +import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.SchemaBuilder +import org.apache.kafka.connect.data.Struct import org.apache.kafka.connect.source.SourceRecord import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions @@ -22,14 +25,53 @@ internal class ConcatFieldsTest { val original = mapOf( "id" to 1, "latitude" to 53.9000000, - "longitude" to 27.5666700, + "longitude" to -27.5666700, ) val record = SourceRecord(null, null, "test", 0, null, original) val transformed = xformValue.apply(record).value() as Map<*, *> Assertions.assertEquals(1, transformed["id"]) - Assertions.assertEquals("53.9,27.56667", transformed["location"]) + Assertions.assertEquals("53.9,-27.56667", transformed["location"]) + } + + @Test + fun copyValueSchemaAndConvertFields() { + configure(xformValue, listOf("latitude", "longitude"), ",", "location") + + val schema = SchemaBuilder + .struct() + .name("name") + .version(1) + .doc("doc") + .field("id", Schema.INT32_SCHEMA) + .field("latitude", Schema.FLOAT32_SCHEMA) + .field("longitude", Schema.FLOAT32_SCHEMA) + .build() + + val value = Struct(schema) + .put("id", 1) + .put("latitude", 53.9000000.toFloat()) + .put("longitude", (-27.5666700).toFloat()) + + val original = SourceRecord(null, null, "test", 0, schema, value) + val transformed: SourceRecord = xformValue.apply(original) + + Assertions.assertEquals(schema.name(), transformed.valueSchema().name()) + Assertions.assertEquals(schema.version(), transformed.valueSchema().version()) + Assertions.assertEquals(schema.doc(), transformed.valueSchema().doc()) + + Assertions.assertEquals(Schema.STRING_SCHEMA, transformed.valueSchema().field("location").schema()) + Assertions.assertEquals("53.9,-27.56667", (transformed.value() as Struct).get("location")) + + Assertions.assertEquals(Schema.INT32_SCHEMA, transformed.valueSchema().field("id").schema()) + Assertions.assertEquals(1, (transformed.value() as Struct).getInt32("id")) + + Assertions.assertEquals(Schema.FLOAT32_SCHEMA, transformed.valueSchema().field("latitude").schema()) + Assertions.assertEquals(53.9000000.toFloat(), (transformed.value() as Struct).getFloat32("latitude")) + + Assertions.assertEquals(Schema.FLOAT32_SCHEMA, transformed.valueSchema().field("longitude").schema()) + Assertions.assertEquals((-27.5666700).toFloat(), (transformed.value() as Struct).getFloat32("longitude")) } private fun configure(