Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Codec refactorings #1241

Merged
merged 49 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
aeedd9b
wip in moving to zio2, compilation issues in core tests and frontends…
sirocchj Mar 7, 2023
02b87bc
merged w/ master
Jul 20, 2023
e988dc8
merged w/ master
Jul 27, 2023
a01e9a9
zio2
Aug 5, 2023
b0e8c9a
merged w/ master
Aug 5, 2023
e6bb3ba
fmt
Aug 5, 2023
cbd4e3e
fixing 2.12 issues
Aug 5, 2023
151d151
merged w/ master & addressed additional warnings w/ newer scala
Aug 8, 2023
b164e80
merged w/ master
Aug 9, 2023
489671a
added vulcan to the implicit not found error
Aug 9, 2023
83a3f9f
refactoring
Aug 9, 2023
47837bd
fmt
Aug 9, 2023
ccba4de
merged w/ master + updated deps
Sep 1, 2023
e1801d3
remove exclusion
Sep 1, 2023
7d52217
Merge branch 'zio2' into codec-improvements
Sep 1, 2023
7c82163
Merge branch 'master' into codec-improvements
Sep 5, 2023
7d3efdc
Merge branch 'master' into codec-improvements
sirocchj Sep 10, 2023
cbc2afb
Merge branch 'master' into codec-improvements
sirocchj Sep 18, 2023
df7b2a1
add SchemaResolver
sirocchj Sep 14, 2023
9d9f91a
not working yet
sirocchj Sep 22, 2023
ea375b7
still not working but now MkSerdes is implicitly available in Setup i…
sirocchj Sep 22, 2023
f3327c2
refactoring complete
sirocchj Sep 23, 2023
60b312b
Merge branch 'master' into codec-improvements
sirocchj Sep 23, 2023
3d48596
cleanup
sirocchj Sep 23, 2023
07911dd
removed scalafix
sirocchj Sep 23, 2023
dbe6af8
Merge branch 'master' into codec-improvements
sirocchj Sep 24, 2023
a78f50b
Merge branch 'master' into codec-improvements
sirocchj Oct 6, 2023
e7064ab
Merge branch 'master' into codec-improvements
sirocchj Oct 8, 2023
5f3c934
Merge branch 'master' into codec-improvements
sirocchj Oct 18, 2023
8dd4b7c
Merge branch 'master' into codec-improvements
sirocchj Oct 20, 2023
163f3b1
Merge branch 'master' into codec-improvements
sirocchj Oct 27, 2023
db16f01
Merge branch 'master' into codec-improvements
sirocchj Oct 28, 2023
3461455
Merge branch 'master' into codec-improvements
sirocchj Oct 30, 2023
f007161
Merge branch 'master' into codec-improvements
sirocchj Nov 6, 2023
51d6b05
merged w/ master
sirocchj Nov 10, 2023
ce7d946
Merge branch 'master' into codec-improvements
sirocchj Nov 15, 2023
9955808
Merge branch 'master' into codec-improvements
sirocchj Nov 17, 2023
18078a0
Merge branch 'master' into codec-improvements
sirocchj Nov 19, 2023
279a967
Merge branch 'master' into codec-improvements
sirocchj Nov 21, 2023
907ddaf
adding sttp registry, removing confluent
sirocchj Nov 21, 2023
a2a53d3
simplification
sirocchj Nov 22, 2023
684573c
Merge branch 'master' into codec-improvements
sirocchj Nov 23, 2023
c1c2672
Merge branch 'master' into codec-improvements
sirocchj Jan 8, 2024
82e781e
Merge branch 'master' into codec-improvements
sirocchj Jan 15, 2024
58023b1
Merge branch 'master' into codec-improvements
sirocchj Jan 15, 2024
c0df420
Merge branch 'master' into codec-improvements
sirocchj Jan 16, 2024
6134672
Merge branch 'master' into codec-improvements
sirocchj Jan 20, 2024
82f1bcd
more changes
sirocchj Jan 22, 2024
2584901
Merge branch 'master' into codec-improvements
sirocchj Jan 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ val V = new {
val postgresql = "42.7.1"
val `scala-collection-compat` = "2.11.0"
val slf4j = "2.0.11"
val sttp = "3.9.2"
val sttp = "4.0.0-M8"
val upickle = "3.1.4"
val vulcan = "1.10.1"
val `zio-interop` = "23.1.0.0"
val `zio-cache` = "0.2.3"
val `zio-json` = "0.6.2"
val `zio-kafka` = "2.7.2"
val `zio-nio` = "2.0.2"
Expand Down Expand Up @@ -61,9 +63,12 @@ lazy val D = new {
val `oci-java-sdk-objectstorage` = "com.oracle.oci.sdk" % "oci-java-sdk-objectstorage" % V.ocisdk
val postgresql = "org.postgresql" % "postgresql" % V.postgresql
val `scala-collection-compat` = "org.scala-lang.modules" %% "scala-collection-compat" % V.`scala-collection-compat`
val `sttp-zio` = "com.softwaremill.sttp.client3" %% "zio" % V.sttp
val `sttp-upickle` = "com.softwaremill.sttp.client4" %% "upickle" % V.sttp
val `sttp-zio` = "com.softwaremill.sttp.client4" %% "zio" % V.sttp
val upickle = "com.lihaoyi" %% "upickle" % V.upickle
val vulcan = "com.github.fd4s" %% "vulcan" % V.vulcan
val `vulcan-generic` = "com.github.fd4s" %% "vulcan-generic" % V.vulcan
val `zio-cache` = "dev.zio" %% "zio-cache" % V.`zio-cache`
val `zio-interop-cats` = "dev.zio" %% "zio-interop-cats" % V.`zio-interop`
val `zio-json` = "dev.zio" %% "zio-json" % V.`zio-json`
val `zio-kafka` = "dev.zio" %% "zio-kafka" % V.`zio-kafka`
Expand Down Expand Up @@ -95,13 +100,15 @@ lazy val core = project
.settings(
name := "tamer-core",
libraryDependencies ++= Seq(
D.`cats-effect`,
D.`jackson-annotations`,
D.`jackson-core`,
D.`jackson-databind`,
D.`kafka-clients`,
D.`kafka-schema-registry-client`,
D.`log-effect-zio`,
D.`sttp-upickle`,
D.`sttp-zio`,
D.upickle,
D.`zio-cache`,
D.`zio-kafka`,
// optional dependencies
D.`circe-parser` % Optional,
Expand Down
137 changes: 103 additions & 34 deletions core/src/main/scala/tamer/Codec.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,66 @@
package tamer

import java.io.{InputStream, OutputStream}

import io.confluent.kafka.schemaregistry.ParsedSchema
import java.io.{InputStream, OutputStream, OutputStreamWriter}
import java.nio.charset.StandardCharsets.UTF_8

import scala.{specialized => sp}
import scala.annotation.{implicitNotFound, nowarn}

sealed abstract class Schema {
def show: String
def isCompatible(previous: String): List[String]
}
object Schema {
final case class Avro(underlying: org.apache.avro.Schema) extends Schema {
import org.apache.avro.SchemaCompatibility.Incompatibility
import org.apache.avro.SchemaCompatibility.SchemaIncompatibilityType._
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

private[this] final def asString(i: Incompatibility): String = {
val errorDescription = i.getType() match {
case FIXED_SIZE_MISMATCH =>
s"The size of FIXED type field at path '${i.getLocation()}' in the reader schema (${i
.getReaderFragment()}) does not match with the writer schema (${i.getWriterFragment()})"
case TYPE_MISMATCH =>
s"The type (path '${i.getLocation()}') of a field in the reader schema (${i
.getReaderFragment()}) does not match with the writer schema (${i.getWriterFragment()})"
case NAME_MISMATCH =>
s"The name of the schema has changed (path '${i.getLocation()}')"
case MISSING_ENUM_SYMBOLS =>
s"The reader schema (${i.getReaderFragment()}) is missing enum symbols '${i.getMessage()}' at path '${i
.getLocation()}' in the writer schema (${i.getWriterFragment()})"
case MISSING_UNION_BRANCH =>
s"The reader schema (${i.getReaderFragment()}) is missing a type inside a union field at path '${i
.getLocation()}' in the writer schema (${i.getWriterFragment()})"
case READER_FIELD_MISSING_DEFAULT_VALUE =>
s"The field '${i.getMessage()}' at path '${i.getLocation()}' in the reader schema (${i
.getReaderFragment()}) has no default value and is missing in the writer schema (${i.getWriterFragment()})"
}
s"{errorType:'${i.getType()}', description:'$errorDescription', additionalInfo:'${i.getMessage()}'}"
}

override final def show: String = underlying.toString()
override final def isCompatible(previous: String): List[String] =
try
org.apache.avro.SchemaCompatibility
.checkReaderWriterCompatibility(underlying, Avro.parse(previous))
.getResult()
.getIncompatibilities()
.asScala
.map(asString)
.toList
catch { case NonFatal(e) => s"Unexpected exception during compatibility check: ${e.getMessage()}" :: Nil }
}
private object Avro {
private[this] final val parser = new org.apache.avro.Schema.Parser()

final def parse(s: String): org.apache.avro.Schema = parser.parse(s)
}

final def fromAvro(s: =>org.apache.avro.Schema): Schema.Avro = Schema.Avro(s)
}

@implicitNotFound(
"\n" +
"Could not find or construct a \u001b[36mtamer.Codec\u001b[0m instance for type:\n" +
Expand Down Expand Up @@ -38,26 +93,29 @@ import scala.annotation.{implicitNotFound, nowarn}
" d. Jsoniter Scala: `import \u001b[36mtamer.Codec.optionalJsoniterScalaCodec[\u001b[32m${A}\u001b[0m\u001b[36m]\u001b[0m`\n" +
" e. ZIO Json: `import \u001b[36mtamer.Codec.optionalZioJsonCodec[\u001b[32m${A}\u001b[0m\u001b[36m]\u001b[0m`.\n"
)
sealed trait Codec[@specialized A] {
sealed trait Codec[@sp A] {
def decode(is: InputStream): A
def encode(value: A, os: OutputStream): Unit
def maybeSchema: Option[ParsedSchema]
def maybeSchema: Option[Schema]
}

// The series of tricks used to summon implicit instances using optional dependencies
// was proposed by Kai and Pavel Shirshov in https://blog.7mind.io/no-more-orphans.html
object Codec extends LowPriorityCodecs {
final def apply[A](implicit A: Codec[A]): Codec[A] = A

implicit final def optionalVulcanCodec[A, C[_]: VulcanCodec](implicit ca: C[A]): Codec[A] = new Codec[A] {
// Vulcan
implicit final def optionalVulcanCodec[A, C[_]: VulcanCodec](implicit ca: C[A]): Codec[A] = new AvroCodec[A] {
private[this] final val _vulcanCodec = ca.asInstanceOf[vulcan.Codec[A]]
assert(_vulcanCodec.schema.isRight)
private[this] final val _vulcanSchema = _vulcanCodec.schema.getOrElse(???)

override final val schema: Schema.Avro = Schema.fromAvro(_vulcanCodec.schema.fold(error => throw error.throwable, identity))

private[this] final val _genericDatumReader = new org.apache.avro.generic.GenericDatumReader[Any](schema.underlying)
private[this] final val _genericDatumWriter = new org.apache.avro.generic.GenericDatumWriter[Any](schema.underlying)

override final def decode(is: InputStream): A = {
val decoder = org.apache.avro.io.DecoderFactory.get.binaryDecoder(is, null)
val value = new org.apache.avro.generic.GenericDatumReader[Any](_vulcanSchema).read(null, decoder)
_vulcanCodec.decode(value, _vulcanSchema) match {
_vulcanCodec.decode(_genericDatumReader.read(null, decoder), schema.underlying) match {
case Left(error) => throw error.throwable
case Right(value) => value
}
Expand All @@ -66,62 +124,73 @@ object Codec extends LowPriorityCodecs {
case Left(error) => throw error.throwable
case Right(encoded) =>
val encoder = org.apache.avro.io.EncoderFactory.get.binaryEncoder(os, null)
new org.apache.avro.generic.GenericDatumWriter[Any](_vulcanSchema).write(encoded, encoder)
_genericDatumWriter.write(encoded, encoder)
encoder.flush()
}
override final val maybeSchema: Option[ParsedSchema] = Some(new io.confluent.kafka.schemaregistry.avro.AvroSchema(_vulcanSchema))
}

// Avro4s
implicit final def optionalAvro4sCodec[A, D[_]: Avro4sDecoder, E[_]: Avro4sEncoder, SF[_]: Avro4sSchemaFor](
implicit da: D[A],
ea: E[A],
sfa: SF[A]
): Codec[A] = new Codec[A] {
private[this] final val _avroSchema = sfa.asInstanceOf[com.sksamuel.avro4s.SchemaFor[A]].schema
): Codec[A] = new AvroCodec[A] {
override final val schema: Schema.Avro = Schema.fromAvro(sfa.asInstanceOf[com.sksamuel.avro4s.SchemaFor[A]].schema)

private[this] final val _avroDecoderBuilder = com.sksamuel.avro4s.AvroInputStream.binary(da.asInstanceOf[com.sksamuel.avro4s.Decoder[A]])
private[this] final val _avroEncoderBuilder =
OutputStreamEncoder.avro4sOutputStream(_avroSchema, ea.asInstanceOf[com.sksamuel.avro4s.Encoder[A]])
OutputStreamEncoder.avro4sOutputStream(schema.underlying, ea.asInstanceOf[com.sksamuel.avro4s.Encoder[A]])

override final def decode(is: InputStream): A = _avroDecoderBuilder.from(is).build(_avroSchema).iterator.next()
override final def decode(is: InputStream): A = _avroDecoderBuilder.from(is).build(schema.underlying).iterator.next()
override final def encode(value: A, os: OutputStream): Unit = {
val ser = _avroEncoderBuilder.to(os).build()
ser.write(value)
ser.close()
}
override final val maybeSchema: Option[ParsedSchema] = Some(new io.confluent.kafka.schemaregistry.avro.AvroSchema(_avroSchema))
}
}
private[tamer] sealed trait LowPriorityCodecs {
implicit final def optionalCirceCodec[A, D[_]: CirceDecoder, E[_]: CirceEncoder](implicit da: D[A], ea: E[A]): Codec[A] = new Codec[A] {
private[this] final val _circeDecoder = da.asInstanceOf[io.circe.Decoder[A]]
private[this] final val _circeEncoder = ea.asInstanceOf[io.circe.Encoder[A]]

override final def decode(is: InputStream): A = io.circe.jawn.decodeChannel(java.nio.channels.Channels.newChannel(is))(_circeDecoder) match {
case Left(error) => throw error
case Right(value) => value
private[tamer] sealed trait LowPriorityCodecs extends LowestPriorityCodecs {

// Circe
implicit final def optionalCirceCodec[A, D[_]: CirceDecoder, E[_]: CirceEncoder](implicit da: D[A], ea: E[A]): Codec[A] =
new SchemalessCodec[A] {
private[this] final val _circeDecoder = da.asInstanceOf[io.circe.Decoder[A]]
private[this] final val _circeEncoder = ea.asInstanceOf[io.circe.Encoder[A]]

override final def decode(is: InputStream): A = io.circe.jawn.decodeChannel(java.nio.channels.Channels.newChannel(is))(_circeDecoder) match {
case Left(error) => throw error
case Right(value) => value
}
override final def encode(value: A, os: OutputStream): Unit =
new OutputStreamWriter(os, UTF_8).append(_circeEncoder(value).noSpaces).flush()
}
override final def encode(value: A, os: OutputStream): Unit =
new java.io.OutputStreamWriter(os, java.nio.charset.StandardCharsets.UTF_8).append(_circeEncoder(value).noSpaces).flush()
override final val maybeSchema: Option[ParsedSchema] = None
}

implicit final def optionalJsoniterScalaCodec[@specialized A, C[_]: JsoniterScalaCodec](implicit ca: C[A]): Codec[A] = new Codec[A] {
// Jsoniter-Scala
implicit final def optionalJsoniterScalaCodec[@sp A, C[_]: JsoniterScalaCodec](implicit ca: C[A]): Codec[A] = new SchemalessCodec[A] {
private[this] final val _jsoniterCodec = ca.asInstanceOf[com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec[A]]

override final def decode(is: InputStream): A = com.github.plokhotnyuk.jsoniter_scala.core.readFromStream(is)(_jsoniterCodec)
override final def encode(value: A, os: OutputStream): Unit = com.github.plokhotnyuk.jsoniter_scala.core.writeToStream(value, os)(_jsoniterCodec)
override final val maybeSchema: Option[ParsedSchema] = None
}

implicit final def optionalZioJsonCodec[A, C[_]: ZioJsonCodec](implicit ca: C[A]): Codec[A] = new Codec[A] {
// ZIO-Json
implicit final def optionalZioJsonCodec[A, C[_]: ZioJsonCodec](implicit ca: C[A]): Codec[A] = new SchemalessCodec[A] {
private[this] final val _zioJsonCodec = ca.asInstanceOf[zio.json.JsonCodec[A]]

override final def decode(is: InputStream): A = zio.Unsafe.unsafe { implicit unsafe =>
zio.Runtime.default.unsafe.run(_zioJsonCodec.decoder.decodeJsonStreamInput(zio.stream.ZStream.fromInputStream(is))).getOrThrowFiberFailure()
}
override final def encode(value: A, os: OutputStream): Unit =
new java.io.OutputStreamWriter(os).append(_zioJsonCodec.encodeJson(value, None)).flush()
override final val maybeSchema: Option[ParsedSchema] = None
new OutputStreamWriter(os, UTF_8).append(_zioJsonCodec.encodeJson(value, None)).flush()
}
}
private[tamer] sealed trait LowestPriorityCodecs {
private[tamer] sealed abstract class AvroCodec[@sp A] extends Codec[A] {
def schema: Schema.Avro
override final def maybeSchema: Option[Schema] = Some(schema)
}
private[tamer] sealed abstract class SchemalessCodec[@sp A] extends Codec[A] {
override final val maybeSchema: Option[Schema] = None
}
}

Expand Down
Loading
Loading