Skip to content

Commit

Permalink
Allow to pass built topology to MockedStreams
Browse files Browse the repository at this point in the history
  • Loading branch information
wojda committed Apr 12, 2018
1 parent d699340 commit 51e7e06
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 11 deletions.
24 changes: 14 additions & 10 deletions src/main/scala/MockedStreams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.madewithtea.mockedstreams
import java.util.{Properties, UUID}

import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.{StreamsBuilder, StreamsConfig}
import org.apache.kafka.streams.{StreamsBuilder, StreamsConfig, Topology}
import org.apache.kafka.streams.state.ReadOnlyWindowStore
import org.apache.kafka.test.{ProcessorTopologyTestDriver => Driver}

Expand All @@ -31,14 +31,23 @@ object MockedStreams {

case class Record(topic: String, key: Array[Byte], value: Array[Byte])

case class Builder(topology: Option[(StreamsBuilder => Unit)] = None,
case class Builder(topology: Option[() => Topology] = None,
configuration: Properties = new Properties(),
stateStores: Seq[String] = Seq(),
inputs: List[Record] = List.empty) {

def config(configuration: Properties) = this.copy(configuration = configuration)

def topology(func: (StreamsBuilder => Unit)) = this.copy(topology = Some(func))
def topology(func: (StreamsBuilder => Unit)) = {
val buildTopology = () => {
val builder = new StreamsBuilder()
func(builder)
builder.build()
}
this.copy(topology = Some(buildTopology))
}

def withTopology(t: () => Topology) = this.copy(topology = Some(t))

def stores(stores: Seq[String]) = this.copy(stateStores = stores)

Expand Down Expand Up @@ -93,14 +102,9 @@ object MockedStreams {
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.putAll(configuration)

val builder = new StreamsBuilder()

topology match {
case Some(t) => t(builder)
case _ => throw new NoTopologySpecified
}
val t = topology.getOrElse(throw new NoTopologySpecified)

new Driver(new StreamsConfig(props), builder.build())
new Driver(new StreamsConfig(props), t())
}

private def produce(driver: Driver): Unit = {
Expand Down
19 changes: 18 additions & 1 deletion src/test/scala/MockedStreamsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream._
import org.apache.kafka.streams.processor.TimestampExtractor
import org.apache.kafka.streams.{Consumed, StreamsBuilder, KeyValue, StreamsConfig}
import org.apache.kafka.streams._
import org.scalatest.{FlatSpec, Matchers}

class MockedStreamsSpec extends FlatSpec with Matchers {
Expand Down Expand Up @@ -156,6 +156,23 @@ class MockedStreamsSpec extends FlatSpec with Matchers {
.shouldEqual(expectedCy.toMap)
}

it should "accept already built topology" in {
import Fixtures.Uppercase._

def getTopology() = {
val builder = new StreamsBuilder()
topology(builder)
builder.build()
}

val output = MockedStreams()
.withTopology(getTopology)
.input(InputTopic, strings, strings, input)
.output(OutputTopic, strings, strings, expected.size)

output shouldEqual expected
}

class LastInitializer extends Initializer[Integer] {
override def apply() = 0
}
Expand Down

0 comments on commit 51e7e06

Please sign in to comment.