Skip to content

Commit

Permalink
Merge pull request #65 from erikvanoosten/dep-accessor-methods
Browse files Browse the repository at this point in the history
Zio-kafka - Deprecate accessor methods
  • Loading branch information
987Nabil authored Feb 9, 2025
2 parents ed2210d + db4b367 commit 8971a8f
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 173 deletions.
37 changes: 23 additions & 14 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,38 @@
name: CI
env:
JDK_JAVA_OPTIONS: -XX:+PrintCommandLineFlags
JVM_OPTS: -XX:+PrintCommandLineFlags
'on':
workflow_dispatch: {}
release:
types:
- published
push: {}
pull_request: {}
create: {}
pull_request:
branches-ignore:
- gh-pages
concurrency:
group: ${{ github.workflow }}-${{ github.ref == format('refs/heads/{0}', github.event.repository.default_branch) && github.run_id || github.ref }}
cancel-in-progress: true
jobs:
build:
name: Build
runs-on: ubuntu-latest
continue-on-error: true
steps:
- name: Git Checkout
uses: actions/checkout@v3.3.0
uses: actions/checkout@v4
with:
fetch-depth: '0'
- name: Install libuv
run: sudo apt-get update && sudo apt-get install -y libuv1-dev
- name: Setup Scala
uses: actions/setup-java@v3.10.0
uses: actions/setup-java@v4
with:
distribution: temurin
java-version: '8'
distribution: corretto
java-version: '17'
check-latest: true
- name: Setup SBT
uses: sbt/setup-sbt@v1
- name: Cache Dependencies
uses: coursier/cache-action@v6
- name: Check all code compiles
Expand All @@ -43,17 +48,19 @@ jobs:
continue-on-error: false
steps:
- name: Git Checkout
uses: actions/checkout@v3.3.0
uses: actions/checkout@v4
with:
fetch-depth: '0'
- name: Install libuv
run: sudo apt-get update && sudo apt-get install -y libuv1-dev
- name: Setup Scala
uses: actions/setup-java@v3.10.0
uses: actions/setup-java@v4
with:
distribution: temurin
java-version: '8'
distribution: corretto
java-version: '17'
check-latest: true
- name: Setup SBT
uses: sbt/setup-sbt@v1
- name: Cache Dependencies
uses: coursier/cache-action@v6
- name: Check if the site workflow is up to date
Expand All @@ -78,15 +85,17 @@ jobs:
- name: Install libuv
run: sudo apt-get update && sudo apt-get install -y libuv1-dev
- name: Setup Scala
uses: actions/setup-java@v3.10.0
uses: actions/setup-java@v4
with:
distribution: temurin
distribution: corretto
java-version: ${{ matrix.java }}
check-latest: true
- name: Setup SBT
uses: sbt/setup-sbt@v1
- name: Cache Dependencies
uses: coursier/cache-action@v6
- name: Git Checkout
uses: actions/checkout@v3.3.0
uses: actions/checkout@v4
with:
fetch-depth: '0'
- name: Test
Expand Down
7 changes: 3 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@ inThisBuild(
ciPostReleaseJobs := Seq.empty,
ciCheckWebsiteBuildProcess := Seq.empty,
scalaVersion := scala213.value,
ciTargetScalaVersions := makeTargetScalaMap(
ciTargetScalaVersions := targetScalaVersionsFor(
`zio-quickstart-encode-decode-json`,
`zio-quickstart-sql`,
`zio-quickstart-prelude`,
`zio-quickstart-restful-webservice`
).value,
ciDefaultTargetJavaVersions := Seq("17"),
semanticdbEnabled := true,
semanticdbVersion := scalafixSemanticdb.revision
ciTargetJavaVersions := Seq("17"),
semanticdbEnabled := true
)
)

Expand Down
12 changes: 6 additions & 6 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
val zioSbtVersion = "0.4.0-alpha.6+15-525bdf8e-SNAPSHOT"
val zioSbtVersion = "0.4.0-alpha.30"

//addSbtPlugin("dev.zio" % "zio-sbt-ecosystem" % zioSbtVersion)
addSbtPlugin("dev.zio" % "zio-sbt-website" % zioSbtVersion)
addSbtPlugin("dev.zio" % "zio-sbt-ci" % zioSbtVersion)
addSbtPlugin("dev.zio" % "zio-sbt-ecosystem" % zioSbtVersion)
addSbtPlugin("dev.zio" % "zio-sbt-website" % zioSbtVersion)
addSbtPlugin("dev.zio" % "zio-sbt-ci" % zioSbtVersion)

addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.9.16")
addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.11.1")
addSbtPlugin("io.spray" % "sbt-revolver" % "0.10.0")
addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.11.1")
addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.14.0")

resolvers ++= Resolver.sonatypeOssRepos("public")
4 changes: 2 additions & 2 deletions zio-quickstart-kafka/build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
scalaVersion := "2.13.13"

libraryDependencies ++= Seq(
"dev.zio" %% "zio-kafka" % "2.7.4",
"dev.zio" %% "zio-json" % "0.6.2"
"dev.zio" %% "zio-kafka" % "2.10.0",
"dev.zio" %% "zio-json" % "0.7.16"
)
33 changes: 15 additions & 18 deletions zio-quickstart-kafka/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181

kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
broker:
image: apache/kafka:3.9.0
container_name: broker
ports:
- 29092:29092
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import zio.stream.ZStream
import java.time.OffsetDateTime
import java.util.UUID

/** This is the data we will be sending to Kafka in JSON format. */
case class Event(uuid: UUID, timestamp: OffsetDateTime, message: String)

/** A zio-json encoder/decoder for [[Event]]. */
object Event {
implicit val encoder: JsonEncoder[Event] =
DeriveJsonEncoder.gen[Event]
Expand All @@ -21,62 +23,68 @@ object Event {
DeriveJsonDecoder.gen[Event]
}

object KafkaSerde {
val key: Serde[Any, Int] =
Serde.int

val value: Serde[Any, Event] =
Serde.string.inmapM[Any, Event](s =>
/** A zio-kafka serializer/deserializer for [[Event]]. */
object EventKafkaSerde {
val event: Serde[Any, Event] =
Serde.string.inmapZIO[Any, Event](s =>
ZIO
.fromEither(s.fromJson[Event])
.mapError(e => new RuntimeException(e))
)(r => ZIO.succeed(r.toJson))
}

object JsonStreamingKafkaApp extends ZIOAppDefault {
private val BOOSTRAP_SERVERS = List("localhost:29092")
private val BOOSTRAP_SERVERS = List("localhost:9092")
private val KAFKA_TOPIC = "json-streaming-hello"

private val producer: ZLayer[Any, Throwable, Producer] =
ZLayer.scoped(
Producer.make(
ProducerSettings(BOOSTRAP_SERVERS)
)
)

private val consumer: ZLayer[Any, Throwable, Consumer] =
ZLayer.scoped(
Consumer.make(
ConsumerSettings(BOOSTRAP_SERVERS)
.withGroupId("streaming-kafka-app")
)
)
def run: ZIO[Any, Throwable, Unit] = {
val p: ZIO[Any, Throwable, Unit] =
ZIO.scoped {
for {
producer <- Producer.make(ProducerSettings(BOOSTRAP_SERVERS))
_ <- ZStream
.repeatZIO(Random.nextUUID <*> Clock.currentDateTime)
.schedule(Schedule.spaced(1.second))
.map { case (uuid, time) =>
new ProducerRecord(
KAFKA_TOPIC,
time.getMinute,
Event(uuid, time, "Hello, World!")
)
}
.via(producer.produceAll(Serde.int, EventKafkaSerde.event))
.runDrain
} yield ()
}

def run = {
val p: ZStream[Producer, Throwable, Nothing] =
ZStream
.repeatZIO(Random.nextUUID <*> Clock.currentDateTime)
.schedule(Schedule.spaced(1.second))
.map { case (uuid, time) =>
new ProducerRecord(
KAFKA_TOPIC,
time.getMinute,
Event(uuid, time, "Hello, World!")
val c: ZIO[Any, Throwable, Unit] =
ZIO.scoped {
for {
consumer <- Consumer.make(
ConsumerSettings(BOOSTRAP_SERVERS).withGroupId(
"streaming-kafka-app"
)
)
}
.via(Producer.produceAll(KafkaSerde.key, KafkaSerde.value))
.drain

val c: ZStream[Consumer, Throwable, Nothing] =
Consumer
.plainStream(Subscription.topics(KAFKA_TOPIC), Serde.int, Serde.string)
.tap(r => Console.printLine(r.value))
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(_.commit)
.drain
_ <- consumer
.plainStream(
Subscription.topics(KAFKA_TOPIC),
Serde.int,
EventKafkaSerde.event
)
.tap { r =>
val event: Event = r.value
Console.printLine(
s"Event ${event.uuid} was sent at ${event.timestamp} with message ${event.message}"
)
}
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(_.commit)
.runDrain
} yield ()
}

(p merge c).runDrain.provide(producer, consumer)
p <&> c
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package dev.zio.quickstart

import org.apache.kafka.clients.producer.RecordMetadata
import zio._
import zio.kafka.consumer._
import zio.kafka.producer.{Producer, ProducerSettings}
Expand All @@ -10,53 +9,41 @@ import zio.kafka.serde._
* without using ZIO Streams.
*/
object SimpleKafkaApp extends ZIOAppDefault {
private val BOOSTRAP_SERVERS = List("localhost:29092")
private val BOOSTRAP_SERVERS = List("localhost:9092")
private val KAFKA_TOPIC = "hello"

private def produce(
topic: String,
key: Long,
value: String
): RIO[Any with Producer, RecordMetadata] =
Producer.produce[Any, Long, String](
topic = topic,
key = key,
value = value,
keySerializer = Serde.long,
valueSerializer = Serde.string
)

private def consumeAndPrintEvents(
groupId: String,
topic: String
): RIO[Any, Unit] =
Consumer.consumeWith(
settings = ConsumerSettings(BOOSTRAP_SERVERS)
.withGroupId(groupId),
subscription = Subscription.topics(topic),
keyDeserializer = Serde.long,
valueDeserializer = Serde.string
)(record => Console.printLine((record.key(), record.value())).orDie)
def run: ZIO[Scope, Throwable, Unit] = {
for {
c <- Consumer
.consumeWith(
settings =
ConsumerSettings(BOOSTRAP_SERVERS).withGroupId("simple-kafka-app"),
subscription = Subscription.topics(KAFKA_TOPIC),
keyDeserializer = Serde.long,
valueDeserializer = Serde.string
) { record =>
Console
.printLine(s"Consumed ${record.key()}, ${record.value()}")
.orDie
}
.fork

private val producer: ZLayer[Any, Throwable, Producer] =
ZLayer.scoped(
Producer.make(
ProducerSettings(BOOSTRAP_SERVERS)
)
)
producer <- Producer.make(ProducerSettings(BOOSTRAP_SERVERS))
p <- Clock.currentDateTime
.flatMap { time =>
producer.produce[Any, Long, String](
topic = KAFKA_TOPIC,
key = time.getHour.toLong,
value = s"$time -- Hello, World!",
keySerializer = Serde.long,
valueSerializer = Serde.string
)
}
.schedule(Schedule.spaced(1.second))
.fork

def run =
for {
c <- consumeAndPrintEvents("simple-kafka-app", KAFKA_TOPIC).fork
p <-
Clock.currentDateTime
.flatMap { time =>
produce(KAFKA_TOPIC, time.getHour.toLong, s"$time -- Hello, World!")
}
.schedule(Schedule.spaced(1.second))
.provide(producer)
.fork
_ <- (c <*> p).join
} yield ()
}

}
Loading

0 comments on commit 8971a8f

Please sign in to comment.