From 82fca24dc8fd042c45ab38834c8048dcdbf33601 Mon Sep 17 00:00:00 2001 From: Svend Vanderveken Date: Sun, 24 Sep 2017 09:27:33 +0200 Subject: [PATCH] some readability improvements + bump sbt to 1.0.2 --- project/build.properties | 2 +- src/main/scala/MockedStreams.scala | 25 +++++++++++++------------ 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/project/build.properties b/project/build.properties index 94005e5..b7dd3cb 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.0.0 +sbt.version=1.0.2 diff --git a/src/main/scala/MockedStreams.scala b/src/main/scala/MockedStreams.scala index 0a68c66..fcc0b94 100644 --- a/src/main/scala/MockedStreams.scala +++ b/src/main/scala/MockedStreams.scala @@ -30,13 +30,12 @@ object MockedStreams { def apply() = Builder() - case class Event(topic: String, key: Array[Byte], value: Array[Byte]) + case class Record(topic: String, key: Array[Byte], value: Array[Byte]) case class Builder(topology: Option[(KStreamBuilder => Unit)] = None, configuration: Properties = new Properties(), stateStores: Seq[String] = Seq(), - inputs: List[Event] = List.empty) { - + inputs: List[Record] = List.empty) { def config(configuration: Properties) = this.copy(configuration = configuration) @@ -44,20 +43,22 @@ object MockedStreams { def stores(stores: Seq[String]) = this.copy(stateStores = stores) - def input[K, V](topic: String, key: Serde[K], value: Serde[V], newInput: Seq[(K, V)]) = { + def input[K, V](topic: String, key: Serde[K], value: Serde[V], newRecords: Seq[(K, V)]) = { val keySer = key.serializer val valSer = value.serializer - val updatedInputs = newInput.foldLeft(inputs) { - case (events, (k, v)) => Event(topic, keySer.serialize(topic, k), valSer.serialize(topic, v)) :: events + val updatedRecords = newRecords.foldLeft(inputs) { + case (events, (k, v)) => + val newRecord = Record(topic, keySer.serialize(topic, k), valSer.serialize(topic, v)) + newRecord :: events } - this.copy(inputs = updatedInputs) + this.copy(inputs = updatedRecords) } def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = { if (size <= 0) throw new ExpectedOutputIsEmpty - withProcessedDriver { driver => + withProcessedDriver { driver => (0 until size).flatMap { i => Option(driver.readOutput(topic, key.deserializer, value.deserializer)) match { case Some(record) => Some((record.key, record.value)) @@ -67,10 +68,10 @@ object MockedStreams { } } - def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] = + def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] = output[K, V](topic, key, value, size).toMap - def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { driver => + def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { driver => val records = driver.getKeyValueStore(name).all() val list = records.asScala.toList.map { record => (record.key, record.value) } records.close() @@ -103,9 +104,9 @@ object MockedStreams { new Driver(new StreamsConfig(props), builder) } - private def produce(driver: Driver) = { + private def produce(driver: Driver): Unit = { inputs.reverse.foreach{ - case Event(topic, key, value) => + case Record(topic, key, value) => driver.process(topic, key, value) } }