Skip to content

Commit

Permalink
Now allows to fine control the test events order
Browse files Browse the repository at this point in the history
- previously, the events for each mock topic were submitted all at once during
  the test, topic per topic
- this commit ensure that MockStream records the order in which the test author
  invoked `input` in the test fixture makes sure they are submitted to the
  topology in that order
  • Loading branch information
sv3ndk committed Sep 15, 2017
1 parent 0fb39dc commit 3757c42
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 14 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

* Hamidreza Afzali
* Jendrik Poloczek

* Svend Vanderveken
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,26 @@ It also allows you to have multiple input and output streams. If your topology u
mstreams.output("out-a", strings, ints, expA.size) shouldEqual(expectedA)
mstreams.output("out-b", strings, ints, expB.size) shouldEqual(expectedB)

## Event order and multiple emissions

The events provided to the mock stream will be submitted to the topology during the test in the order in which they appear in the fixture. You can also submit events multiple times to the same topics, at various moments in your scenario.

This can be handy to validate that your topology behaviour is or is not dependent on the order in which the events are received and processed.

In the example below, 2 events are first submitted to topic A, then 3 to topic B, then 1 more to topic A again.

val firstInputForTopicA = Seq(("x", int(1)), ("y", int(2)))
val firstInputForTopicB = Seq(("x", int(4)), ("y", int(3)), ("y", int(5)))
val secondInputForTopicA = Seq(("y", int(4)))

val expectedOutput = Seq(("x", 5), ("y", 5), ("y", 7), ("y", 9))

val builder = MockedStreams()
.topology(topologyTables)
.input(InputATopic, strings, ints, firstInputForTopicA)
.input(InputBTopic, strings, ints, firstInputForTopicB)
.input(InputATopic, strings, ints, secondInputForTopicA)

## State Store

When you define your state stores via .stores(stores: Seq[String]) since 1.2, you are able to verify the state store content via the .stateTable(name: String) method:
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ lazy val commonSettings = Seq(

val scalaTestVersion = "3.0.2"
val rocksDBVersion = "5.0.1"
val kafkaVersion = "0.11.0.0"
val kafkaVersion = "0.11.0.1"

lazy val kafka = Seq(
"org.apache.kafka" % "kafka-clients" % kafkaVersion,
Expand Down
1 change: 1 addition & 0 deletions project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=1.0.0
4 changes: 2 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.1")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.0")
22 changes: 12 additions & 10 deletions src/main/scala/MockedStreams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,29 @@ object MockedStreams {

def apply() = Builder()

case class Input(seq: Seq[(Array[Byte], Array[Byte])])
case class Event(topic: String, key: Array[Byte], value: Array[Byte])

case class Builder(topology: Option[(KStreamBuilder => Unit)] = None,
configuration: Properties = new Properties(),
stateStores: Seq[String] = Seq(),
inputs: Map[String, Input] = Map()) {
inputs: List[Event] = List.empty) {


def config(configuration: Properties) = this.copy(configuration = configuration)

def topology(func: (KStreamBuilder => Unit)) = this.copy(topology = Some(func))

def stores(stores: Seq[String]) = this.copy(stateStores = stores)

def input[K, V](topic: String, key: Serde[K], value: Serde[V], seq: Seq[(K, V)]) = {
def input[K, V](topic: String, key: Serde[K], value: Serde[V], newInput: Seq[(K, V)]) = {
val keySer = key.serializer
val valSer = value.serializer
val in = seq.map { case (k, v) => (keySer.serialize(topic, k), valSer.serialize(topic, v)) }
this.copy(inputs = inputs + (topic -> Input(in)))

val updatedInputs = newInput.foldLeft(inputs) {
case (events, (k, v)) => Event(topic, keySer.serialize(topic, k), valSer.serialize(topic, v)) :: events
}

this.copy(inputs = updatedInputs)
}

def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = {
Expand Down Expand Up @@ -99,10 +104,9 @@ object MockedStreams {
}

private def produce(driver: Driver) = {
inputs.foreach { case (topic, input) =>
input.seq.foreach { case (key, value) =>
inputs.reverse.foreach{
case Event(topic, key, value) =>
driver.process(topic, key, value)
}
}
}

Expand All @@ -125,5 +129,3 @@ object MockedStreams {
class ExpectedOutputIsEmpty extends Exception("Output size needs to be greater than 0.")

}


55 changes: 55 additions & 0 deletions src/test/scala/MockedStreamsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,25 @@ class MockedStreamsSpec extends FlatSpec with Matchers {
builder.stateTable(StoreName) shouldEqual inputA.toMap
}

it should "assert correctly when joining events sent to 2 Ktables in a specific order" in {
import Fixtures.Multi._

val firstInputForTopicA = Seq(("x", int(1)), ("y", int(2)))
val firstInputForTopicB = Seq(("x", int(4)), ("y", int(3)), ("y", int(5)))
val secondInputForTopicA = Seq(("y", int(4)))

val expectedOutput = Seq(("x", 5), ("y", 5), ("y", 7), ("y", 9))

val builder = MockedStreams()
.topology(topologyTables)
.input(InputATopic, strings, ints, firstInputForTopicA)
.input(InputBTopic, strings, ints, firstInputForTopicB)
.input(InputATopic, strings, ints, secondInputForTopicA)

builder.output(OutputATopic, strings, ints, expectedOutput.size)
.shouldEqual(expectedOutput)
}

it should "assert correctly when processing windowed state output topology" in {
import Fixtures.Multi._

Expand Down Expand Up @@ -194,6 +213,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers {
val OutputATopic = "outputA"
val OutputBTopic = "outputB"
val StoreName = "store"
val Store2Name = "store2"

def topology1Output(builder: KStreamBuilder) = {
val streamA = builder.stream(strings, ints, InputATopic)
Expand All @@ -207,6 +227,18 @@ class MockedStreamsSpec extends FlatSpec with Matchers {
.to(strings, ints, OutputATopic)
}

def topology1Outputbis(builder: KStreamBuilder) = {
val streamA = builder.stream(strings, ints, InputATopic)
val streamB = builder.stream(strings, ints, InputBTopic)

val table = streamB.groupByKey(strings, ints).aggregate(
new LastInitializer,
new LastAggregator, ints, StoreName)

streamA.leftJoin[Integer, Integer](table, new AddJoiner(), strings, ints)
.to(strings, ints, OutputATopic)
}

def topology1WindowOutput(builder: KStreamBuilder) = {
val streamA = builder.stream(strings, ints, InputCTopic)
streamA.groupByKey(strings, ints).count(
Expand All @@ -230,6 +262,29 @@ class MockedStreamsSpec extends FlatSpec with Matchers {
streamB.leftJoin[Integer, Integer](table, new SubJoiner(), strings, ints)
.to(strings, ints, OutputBTopic)
}

def topologyTables(builder: KStreamBuilder) = {
val streamA = builder.stream(strings, ints, InputATopic)
val streamB = builder.stream(strings, ints, InputBTopic)

val tableA = streamA.groupByKey(strings, ints).aggregate(
new LastInitializer,
new LastAggregator,
ints,
StoreName)

val tableB = streamB.groupByKey(strings, ints).aggregate(
new LastInitializer,
new LastAggregator,
ints,
Store2Name)

val resultTable = tableA.join[Integer,Integer](tableB, new AddJoiner)

resultTable
.toStream
.to(strings, ints, OutputATopic)
}
}

}
Expand Down

0 comments on commit 3757c42

Please sign in to comment.