Skip to content

Commit

Permalink
Codec refactorings (#1241)
Browse files Browse the repository at this point in the history
  • Loading branch information
sirocchj authored Jan 23, 2024
1 parent fb1050e commit 8e6fdcd
Show file tree
Hide file tree
Showing 19 changed files with 488 additions and 304 deletions.
15 changes: 11 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ val V = new {
val postgresql = "42.7.1"
val `scala-collection-compat` = "2.11.0"
val slf4j = "2.0.11"
val sttp = "3.9.2"
val sttp = "4.0.0-M8"
val upickle = "3.1.4"
val vulcan = "1.10.1"
val `zio-interop` = "23.1.0.0"
val `zio-cache` = "0.2.3"
val `zio-json` = "0.6.2"
val `zio-kafka` = "2.7.2"
val `zio-nio` = "2.0.2"
Expand Down Expand Up @@ -61,9 +63,12 @@ lazy val D = new {
val `oci-java-sdk-objectstorage` = "com.oracle.oci.sdk" % "oci-java-sdk-objectstorage" % V.ocisdk
val postgresql = "org.postgresql" % "postgresql" % V.postgresql
val `scala-collection-compat` = "org.scala-lang.modules" %% "scala-collection-compat" % V.`scala-collection-compat`
val `sttp-zio` = "com.softwaremill.sttp.client3" %% "zio" % V.sttp
val `sttp-upickle` = "com.softwaremill.sttp.client4" %% "upickle" % V.sttp
val `sttp-zio` = "com.softwaremill.sttp.client4" %% "zio" % V.sttp
val upickle = "com.lihaoyi" %% "upickle" % V.upickle
val vulcan = "com.github.fd4s" %% "vulcan" % V.vulcan
val `vulcan-generic` = "com.github.fd4s" %% "vulcan-generic" % V.vulcan
val `zio-cache` = "dev.zio" %% "zio-cache" % V.`zio-cache`
val `zio-interop-cats` = "dev.zio" %% "zio-interop-cats" % V.`zio-interop`
val `zio-json` = "dev.zio" %% "zio-json" % V.`zio-json`
val `zio-kafka` = "dev.zio" %% "zio-kafka" % V.`zio-kafka`
Expand Down Expand Up @@ -95,13 +100,15 @@ lazy val core = project
.settings(
name := "tamer-core",
libraryDependencies ++= Seq(
D.`cats-effect`,
D.`jackson-annotations`,
D.`jackson-core`,
D.`jackson-databind`,
D.`kafka-clients`,
D.`kafka-schema-registry-client`,
D.`log-effect-zio`,
D.`sttp-upickle`,
D.`sttp-zio`,
D.upickle,
D.`zio-cache`,
D.`zio-kafka`,
// optional dependencies
D.`circe-parser` % Optional,
Expand Down
137 changes: 103 additions & 34 deletions core/src/main/scala/tamer/Codec.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,66 @@
package tamer

import java.io.{InputStream, OutputStream}

import io.confluent.kafka.schemaregistry.ParsedSchema
import java.io.{InputStream, OutputStream, OutputStreamWriter}
import java.nio.charset.StandardCharsets.UTF_8

import scala.{specialized => sp}
import scala.annotation.{implicitNotFound, nowarn}

sealed abstract class Schema {
def show: String
def isCompatible(previous: String): List[String]
}
object Schema {
final case class Avro(underlying: org.apache.avro.Schema) extends Schema {
import org.apache.avro.SchemaCompatibility.Incompatibility
import org.apache.avro.SchemaCompatibility.SchemaIncompatibilityType._
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

private[this] final def asString(i: Incompatibility): String = {
val errorDescription = i.getType() match {
case FIXED_SIZE_MISMATCH =>
s"The size of FIXED type field at path '${i.getLocation()}' in the reader schema (${i
.getReaderFragment()}) does not match with the writer schema (${i.getWriterFragment()})"
case TYPE_MISMATCH =>
s"The type (path '${i.getLocation()}') of a field in the reader schema (${i
.getReaderFragment()}) does not match with the writer schema (${i.getWriterFragment()})"
case NAME_MISMATCH =>
s"The name of the schema has changed (path '${i.getLocation()}')"
case MISSING_ENUM_SYMBOLS =>
s"The reader schema (${i.getReaderFragment()}) is missing enum symbols '${i.getMessage()}' at path '${i
.getLocation()}' in the writer schema (${i.getWriterFragment()})"
case MISSING_UNION_BRANCH =>
s"The reader schema (${i.getReaderFragment()}) is missing a type inside a union field at path '${i
.getLocation()}' in the writer schema (${i.getWriterFragment()})"
case READER_FIELD_MISSING_DEFAULT_VALUE =>
s"The field '${i.getMessage()}' at path '${i.getLocation()}' in the reader schema (${i
.getReaderFragment()}) has no default value and is missing in the writer schema (${i.getWriterFragment()})"
}
s"{errorType:'${i.getType()}', description:'$errorDescription', additionalInfo:'${i.getMessage()}'}"
}

override final def show: String = underlying.toString()
override final def isCompatible(previous: String): List[String] =
try
org.apache.avro.SchemaCompatibility
.checkReaderWriterCompatibility(underlying, Avro.parse(previous))
.getResult()
.getIncompatibilities()
.asScala
.map(asString)
.toList
catch { case NonFatal(e) => s"Unexpected exception during compatibility check: ${e.getMessage()}" :: Nil }
}
private object Avro {
private[this] final val parser = new org.apache.avro.Schema.Parser()

final def parse(s: String): org.apache.avro.Schema = parser.parse(s)
}

final def fromAvro(s: =>org.apache.avro.Schema): Schema.Avro = Schema.Avro(s)
}

@implicitNotFound(
"\n" +
"Could not find or construct a \u001b[36mtamer.Codec\u001b[0m instance for type:\n" +
Expand Down Expand Up @@ -38,26 +93,29 @@ import scala.annotation.{implicitNotFound, nowarn}
" d. Jsoniter Scala: `import \u001b[36mtamer.Codec.optionalJsoniterScalaCodec[\u001b[32m${A}\u001b[0m\u001b[36m]\u001b[0m`\n" +
" e. ZIO Json: `import \u001b[36mtamer.Codec.optionalZioJsonCodec[\u001b[32m${A}\u001b[0m\u001b[36m]\u001b[0m`.\n"
)
sealed trait Codec[@specialized A] {
sealed trait Codec[@sp A] {
def decode(is: InputStream): A
def encode(value: A, os: OutputStream): Unit
def maybeSchema: Option[ParsedSchema]
def maybeSchema: Option[Schema]
}

// The series of tricks used to summon implicit instances using optional dependencies
// was proposed by Kai and Pavel Shirshov in https://blog.7mind.io/no-more-orphans.html
object Codec extends LowPriorityCodecs {
final def apply[A](implicit A: Codec[A]): Codec[A] = A

implicit final def optionalVulcanCodec[A, C[_]: VulcanCodec](implicit ca: C[A]): Codec[A] = new Codec[A] {
// Vulcan
implicit final def optionalVulcanCodec[A, C[_]: VulcanCodec](implicit ca: C[A]): Codec[A] = new AvroCodec[A] {
private[this] final val _vulcanCodec = ca.asInstanceOf[vulcan.Codec[A]]
assert(_vulcanCodec.schema.isRight)
private[this] final val _vulcanSchema = _vulcanCodec.schema.getOrElse(???)

override final val schema: Schema.Avro = Schema.fromAvro(_vulcanCodec.schema.fold(error => throw error.throwable, identity))

private[this] final val _genericDatumReader = new org.apache.avro.generic.GenericDatumReader[Any](schema.underlying)
private[this] final val _genericDatumWriter = new org.apache.avro.generic.GenericDatumWriter[Any](schema.underlying)

override final def decode(is: InputStream): A = {
val decoder = org.apache.avro.io.DecoderFactory.get.binaryDecoder(is, null)
val value = new org.apache.avro.generic.GenericDatumReader[Any](_vulcanSchema).read(null, decoder)
_vulcanCodec.decode(value, _vulcanSchema) match {
_vulcanCodec.decode(_genericDatumReader.read(null, decoder), schema.underlying) match {
case Left(error) => throw error.throwable
case Right(value) => value
}
Expand All @@ -66,62 +124,73 @@ object Codec extends LowPriorityCodecs {
case Left(error) => throw error.throwable
case Right(encoded) =>
val encoder = org.apache.avro.io.EncoderFactory.get.binaryEncoder(os, null)
new org.apache.avro.generic.GenericDatumWriter[Any](_vulcanSchema).write(encoded, encoder)
_genericDatumWriter.write(encoded, encoder)
encoder.flush()
}
override final val maybeSchema: Option[ParsedSchema] = Some(new io.confluent.kafka.schemaregistry.avro.AvroSchema(_vulcanSchema))
}

// Avro4s
implicit final def optionalAvro4sCodec[A, D[_]: Avro4sDecoder, E[_]: Avro4sEncoder, SF[_]: Avro4sSchemaFor](
implicit da: D[A],
ea: E[A],
sfa: SF[A]
): Codec[A] = new Codec[A] {
private[this] final val _avroSchema = sfa.asInstanceOf[com.sksamuel.avro4s.SchemaFor[A]].schema
): Codec[A] = new AvroCodec[A] {
override final val schema: Schema.Avro = Schema.fromAvro(sfa.asInstanceOf[com.sksamuel.avro4s.SchemaFor[A]].schema)

private[this] final val _avroDecoderBuilder = com.sksamuel.avro4s.AvroInputStream.binary(da.asInstanceOf[com.sksamuel.avro4s.Decoder[A]])
private[this] final val _avroEncoderBuilder =
OutputStreamEncoder.avro4sOutputStream(_avroSchema, ea.asInstanceOf[com.sksamuel.avro4s.Encoder[A]])
OutputStreamEncoder.avro4sOutputStream(schema.underlying, ea.asInstanceOf[com.sksamuel.avro4s.Encoder[A]])

override final def decode(is: InputStream): A = _avroDecoderBuilder.from(is).build(_avroSchema).iterator.next()
override final def decode(is: InputStream): A = _avroDecoderBuilder.from(is).build(schema.underlying).iterator.next()
override final def encode(value: A, os: OutputStream): Unit = {
val ser = _avroEncoderBuilder.to(os).build()
ser.write(value)
ser.close()
}
override final val maybeSchema: Option[ParsedSchema] = Some(new io.confluent.kafka.schemaregistry.avro.AvroSchema(_avroSchema))
}
}
private[tamer] sealed trait LowPriorityCodecs {
implicit final def optionalCirceCodec[A, D[_]: CirceDecoder, E[_]: CirceEncoder](implicit da: D[A], ea: E[A]): Codec[A] = new Codec[A] {
private[this] final val _circeDecoder = da.asInstanceOf[io.circe.Decoder[A]]
private[this] final val _circeEncoder = ea.asInstanceOf[io.circe.Encoder[A]]

override final def decode(is: InputStream): A = io.circe.jawn.decodeChannel(java.nio.channels.Channels.newChannel(is))(_circeDecoder) match {
case Left(error) => throw error
case Right(value) => value
private[tamer] sealed trait LowPriorityCodecs extends LowestPriorityCodecs {

// Circe
implicit final def optionalCirceCodec[A, D[_]: CirceDecoder, E[_]: CirceEncoder](implicit da: D[A], ea: E[A]): Codec[A] =
new SchemalessCodec[A] {
private[this] final val _circeDecoder = da.asInstanceOf[io.circe.Decoder[A]]
private[this] final val _circeEncoder = ea.asInstanceOf[io.circe.Encoder[A]]

override final def decode(is: InputStream): A = io.circe.jawn.decodeChannel(java.nio.channels.Channels.newChannel(is))(_circeDecoder) match {
case Left(error) => throw error
case Right(value) => value
}
override final def encode(value: A, os: OutputStream): Unit =
new OutputStreamWriter(os, UTF_8).append(_circeEncoder(value).noSpaces).flush()
}
override final def encode(value: A, os: OutputStream): Unit =
new java.io.OutputStreamWriter(os, java.nio.charset.StandardCharsets.UTF_8).append(_circeEncoder(value).noSpaces).flush()
override final val maybeSchema: Option[ParsedSchema] = None
}

implicit final def optionalJsoniterScalaCodec[@specialized A, C[_]: JsoniterScalaCodec](implicit ca: C[A]): Codec[A] = new Codec[A] {
// Jsoniter-Scala
implicit final def optionalJsoniterScalaCodec[@sp A, C[_]: JsoniterScalaCodec](implicit ca: C[A]): Codec[A] = new SchemalessCodec[A] {
private[this] final val _jsoniterCodec = ca.asInstanceOf[com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec[A]]

override final def decode(is: InputStream): A = com.github.plokhotnyuk.jsoniter_scala.core.readFromStream(is)(_jsoniterCodec)
override final def encode(value: A, os: OutputStream): Unit = com.github.plokhotnyuk.jsoniter_scala.core.writeToStream(value, os)(_jsoniterCodec)
override final val maybeSchema: Option[ParsedSchema] = None
}

implicit final def optionalZioJsonCodec[A, C[_]: ZioJsonCodec](implicit ca: C[A]): Codec[A] = new Codec[A] {
// ZIO-Json
implicit final def optionalZioJsonCodec[A, C[_]: ZioJsonCodec](implicit ca: C[A]): Codec[A] = new SchemalessCodec[A] {
private[this] final val _zioJsonCodec = ca.asInstanceOf[zio.json.JsonCodec[A]]

override final def decode(is: InputStream): A = zio.Unsafe.unsafe { implicit unsafe =>
zio.Runtime.default.unsafe.run(_zioJsonCodec.decoder.decodeJsonStreamInput(zio.stream.ZStream.fromInputStream(is))).getOrThrowFiberFailure()
}
override final def encode(value: A, os: OutputStream): Unit =
new java.io.OutputStreamWriter(os).append(_zioJsonCodec.encodeJson(value, None)).flush()
override final val maybeSchema: Option[ParsedSchema] = None
new OutputStreamWriter(os, UTF_8).append(_zioJsonCodec.encodeJson(value, None)).flush()
}
}
private[tamer] sealed trait LowestPriorityCodecs {
private[tamer] sealed abstract class AvroCodec[@sp A] extends Codec[A] {
def schema: Schema.Avro
override final def maybeSchema: Option[Schema] = Some(schema)
}
private[tamer] sealed abstract class SchemalessCodec[@sp A] extends Codec[A] {
override final val maybeSchema: Option[Schema] = None
}
}

Expand Down
Loading

0 comments on commit 8e6fdcd

Please sign in to comment.