Skip to content

Commit

Permalink
Mread upgrade to akka 2.6.x (#314)
Browse files Browse the repository at this point in the history
* upgrade sbt to 1.3.13

* add type Format[skuber.Protocol.Value] to line 405

* Upgrade Bouncy Castle to 1.66 and Play JSON to 2.9.0 due to security concerns

* Upgrade Akka (2.6.8) & Akka HTTP (10.1.12)

* inital refactoring - mostly removing implicit materializers

* finish refactoring - mostly removing implicit materializers

* drop support for Scala 2.11

* drop reference to Scala 2.11 for current release

* drop reference to Scala 2.11 for v2

* change import for MockitoSugar (org.scalatestplus.mockito.MockitoSugar)

* Use collect instead of filter

Since Akka 2.6.2 `filter` will prefetch one element. This will not change overall output of a stream but changes timing of when demand is propagated. In `WatchSourceSpec` it will lead to an extra request being made even if there's no demand yet from the consumer. In general, the test should be hardened not to fail if these details change but for now this simple change will prevent the prefetching because `collect` does not (yet?) prefetch elements.

Great work, @seglo, pinpointing the problem to the filter change in akka/akka#28467.

* upgrade Mockito (v3.4.4) & Akka HTTP to (10.1.12)

* bump commons-io 2.7

Co-authored-by: Johannes Rudolph <[email protected]>
  • Loading branch information
michael-read and jrudolph authored Sep 6, 2020
1 parent 919922e commit 9f9a06f
Show file tree
Hide file tree
Showing 31 changed files with 38 additions and 87 deletions.
2 changes: 0 additions & 2 deletions Quickstart.sc
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@
import $ivy.`io.skuber::skuber:2.0.10`, skuber._, skuber.json.format._

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import api.Configuration
import scala.concurrent.Future
import scala.util.{Success, Failure}
import skuber.apps.v1.Deployment

// Some standard Akka implicits that are required by the skuber v2 client API
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val dispatcher = system.dispatcher

val cfg: Configuration = api.Configuration.parseKubeconfigFile().get
Expand Down
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@ This example lists pods in `kube-system` namespace:
import skuber._
import skuber.json.format._
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import scala.util.{Success, Failure}

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val dispatcher = system.dispatcher

val k8s = k8sInit
Expand Down Expand Up @@ -116,7 +114,7 @@ To get minikube follow the instructions [here](https://github.com/kubernetes/min
## Release
You can use the latest release (for Scala 2.11, 2.12 or 2.13) by adding to your build:
You can use the latest release (for 2.12 or 2.13) by adding to your build:
```sbt
libraryDependencies += "io.skuber" %% "skuber" % "2.5.0"
Expand All @@ -128,7 +126,7 @@ Meanwhile users of skuber v1 can continue to use the final v1.x release, which i
libraryDependencies += "io.skuber" % "skuber_2.11" % "1.7.1"
```
NOTE: Skuber 2 supports Scala 2.13 since v2.4.0 - support for Scala 2.11 is now deprecated and will be removed in a future release.
NOTE: Skuber 2 supports Scala 2.13 since v2.4.0 - support for Scala 2.11 has now been removed.
## Migrating to release v2
Expand Down
16 changes: 8 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@

resolvers += "Typesafe Releases" at "https://repo.typesafe.com/typesafe/releases/"

val akkaVersion = "2.5.29"
val akkaVersion = "2.6.8"

val scalaCheck = "org.scalacheck" %% "scalacheck" % "1.14.3"
val specs2 = "org.specs2" %% "specs2-core" % "4.8.3"
val scalaTest = "org.scalatest" %% "scalatest" % "3.0.8"
val mockito = "org.mockito" % "mockito-core" % "2.21.0"
val mockito = "org.mockito" % "mockito-core" % "3.4.4"
val akkaStreamTestKit = "com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion

val snakeYaml = "org.yaml" % "snakeyaml" % "1.25"
val commonsIO = "commons-io" % "commons-io" % "2.6"
val commonsIO = "commons-io" % "commons-io" % "2.7"
val commonsCodec = "commons-codec" % "commons-codec" % "1.14"
val bouncyCastle = "org.bouncycastle" % "bcpkix-jdk15on" % "1.64"
val bouncyCastle = "org.bouncycastle" % "bcpkix-jdk15on" % "1.66"

// the client API request/response handing uses Akka Http
val akkaHttp = "com.typesafe.akka" %% "akka-http" % "10.1.11"
val akkaHttp = "com.typesafe.akka" %% "akka-http" % "10.1.12"
val akkaStream = "com.typesafe.akka" %% "akka-stream" % akkaVersion
val akka = "com.typesafe.akka" %% "akka-actor" % akkaVersion

Expand All @@ -24,7 +24,7 @@ val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % akkaVersion
val logback = "ch.qos.logback" % "logback-classic" % "1.1.3" % Runtime

// the Json formatters are based on Play Json
val playJson = "com.typesafe.play" %% "play-json" % "2.7.4"
val playJson = "com.typesafe.play" %% "play-json" % "2.9.0"

// Need Java 8 or later as the java.time package is used to represent K8S timestamps
scalacOptions += "-target:jvm-1.8"
Expand Down Expand Up @@ -52,8 +52,8 @@ developers in ThisBuild := List(Developer(id="doriordan", name="David ORiordan",

lazy val commonSettings = Seq(
organization := "io.skuber",
crossScalaVersions := Seq("2.11.12", "2.12.10", "2.13.1"),
scalaVersion := "2.13.1",
crossScalaVersions := Seq("2.12.10", "2.13.3"),
scalaVersion := "2.12.10",
publishTo := sonatypePublishToBundle.value,
pomIncludeRepository := { _ => false },
Test / classLoaderLayeringStrategy := ClassLoaderLayeringStrategy.Flat
Expand Down
2 changes: 0 additions & 2 deletions client/src/it/scala/skuber/K8SFixture.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package skuber

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import org.scalatest.{FutureOutcome, fixture}
import skuber.api.client._
import com.typesafe.config.ConfigFactory
Expand All @@ -12,7 +11,6 @@ trait K8SFixture extends fixture.AsyncFlatSpec {
override type FixtureParam = K8SRequestContext

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val dispatcher = system.dispatcher

val config = ConfigFactory.load()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object PodExecImpl {
maybeStdout: Option[Sink[String, _]] = None,
maybeStderr: Option[Sink[String, _]] = None,
tty: Boolean = false,
maybeClose: Option[Promise[Unit]] = None)(implicit sys: ActorSystem, mat: Materializer, lc : LoggingContext): Future[Unit] =
maybeClose: Option[Promise[Unit]] = None)(implicit sys: ActorSystem, lc : LoggingContext): Future[Unit] =
{
implicit val executor: ExecutionContext = sys.dispatcher

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class KubernetesClientImpl private[client] (
val podLogSettings: ConnectionPoolSettings,
val sslContext: Option[SSLContext], // provides the Akka client with the SSL details needed for https connections to the API server
override val logConfig: LoggingConfig,
val closeHook: Option[() => Unit])(implicit val actorSystem: ActorSystem, val materializer: Materializer, val executionContext: ExecutionContext)
val closeHook: Option[() => Unit])(implicit val actorSystem: ActorSystem, val executionContext: ExecutionContext)
extends KubernetesClient
{
val log = Logging.getLogger(actorSystem, "skuber.api")
Expand Down Expand Up @@ -700,7 +700,7 @@ class KubernetesClientImpl private[client] (
object KubernetesClientImpl {

def apply(k8sContext: Context, logConfig: LoggingConfig, closeHook: Option[() => Unit], appConfig: Config)
(implicit actorSystem: ActorSystem, materializer: Materializer): KubernetesClientImpl =
(implicit actorSystem: ActorSystem): KubernetesClientImpl =
{
appConfig.checkValid(ConfigFactory.defaultReference(), "skuber")

Expand Down
12 changes: 6 additions & 6 deletions client/src/main/scala/skuber/api/client/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -190,29 +190,29 @@ package object client {

type RequestContext = KubernetesClient // for backwards compatibility (with pre 2.1 clients)

def init()(implicit actorSystem: ActorSystem, materializer: Materializer): KubernetesClient = {
def init()(implicit actorSystem: ActorSystem): KubernetesClient = {
init(defaultK8sConfig, defaultAppConfig)
}

def init(config: Configuration)(implicit actorSystem: ActorSystem, materializer: Materializer): KubernetesClient = {
def init(config: Configuration)(implicit actorSystem: ActorSystem): KubernetesClient = {
init(config.currentContext, LoggingConfig(), None, defaultAppConfig)
}

def init(appConfig: Config)(implicit actorSystem: ActorSystem, materializer: Materializer): KubernetesClient = {
def init(appConfig: Config)(implicit actorSystem: ActorSystem): KubernetesClient = {
init(defaultK8sConfig.currentContext, LoggingConfig(), None, appConfig)
}

def init(config: Configuration, appConfig: Config)(implicit actorSystem: ActorSystem, materializer: Materializer): KubernetesClient = {
def init(config: Configuration, appConfig: Config)(implicit actorSystem: ActorSystem): KubernetesClient = {
init(config.currentContext, LoggingConfig(), None, appConfig)
}

def init(k8sContext: Context, logConfig: LoggingConfig, closeHook: Option[() => Unit] = None)
(implicit actorSystem: ActorSystem, materializer: Materializer): KubernetesClient = {
(implicit actorSystem: ActorSystem): KubernetesClient = {
init(k8sContext, logConfig, closeHook, defaultAppConfig)
}

def init(k8sContext: Context, logConfig: LoggingConfig, closeHook: Option[() => Unit], appConfig: Config)
(implicit actorSystem: ActorSystem, materializer: Materializer): KubernetesClient = {
(implicit actorSystem: ActorSystem): KubernetesClient = {
KubernetesClientImpl(k8sContext, logConfig, closeHook, appConfig)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ private[api] object LongPollingPool {
def apply[T](schema: String, host: String, port: Int,
poolIdleTimeout: Duration,
httpsConnectionContext: Option[HttpsConnectionContext],
clientConnectionSettings: ClientConnectionSettings)(implicit mat: Materializer, system: ActorSystem): Pool[T] = {
clientConnectionSettings: ClientConnectionSettings)(implicit system: ActorSystem): Pool[T] = {
schema match {
case "http" =>
Http().newHostConnectionPool[T](
Expand Down
14 changes: 5 additions & 9 deletions client/src/main/scala/skuber/api/watch/WatchSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.model._
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Merge, Source}
import akka.stream.{Materializer, SourceShape}
import akka.stream.SourceShape
import play.api.libs.json.Format
import skuber.api.client._
import skuber.api.client.impl.KubernetesClientImpl
import skuber.{K8SRequestContext, ObjectResource, ResourceDefinition, ListOptions}
import skuber.{ObjectResource, ResourceDefinition, ListOptions}

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
Expand All @@ -32,7 +32,6 @@ private[api] object WatchSource {
name: Option[String],
options: ListOptions,
bufSize: Int)(implicit sys: ActorSystem,
fm: Materializer,
format: Format[O],
rd: ResourceDefinition[O],
lc: LoggingContext): Source[WatchEvent[O], NotUsed] = {
Expand Down Expand Up @@ -77,21 +76,18 @@ private[api] object WatchSource {
case (Success(HttpResponse(sc, _, entity, _)), _) =>
client.logWarn(s"Error watching resource. Received a status of ${sc.intValue()}")
entity.discardBytes()
throw new K8SException(Status(message = Some("Error watching resource"), code = Some(sc.intValue())))
throw new K8SException(Status(message = Some("Error watching resource 1"), code = Some(sc.intValue())))
case (Failure(f), _) =>
client.logError("Error watching resource.", f)
throw new K8SException(Status(message = Some("Error watching resource"), details = Some(f.getMessage)))
throw new K8SException(Status(message = Some("Error watching resource 2"), details = Some(f.getMessage)))
}

val outboundFlow: Flow[StreamElement[O], WatchEvent[O], NotUsed] =
Flow[StreamElement[O]]
.filter(_.isInstanceOf[Result[O]])
.map{
.collect {
case Result(_, event) => event
case _ => throw new K8SException(Status(message = Some("Error processing watch events.")))
}


val feedbackFlow: Flow[StreamElement[O], (HttpRequest, Start[O]), NotUsed] =
Flow[StreamElement[O]].scan(StreamContext(None, Waiting)){(cxt, next) =>
next match {
Expand Down
2 changes: 1 addition & 1 deletion client/src/main/scala/skuber/json/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ package object format {
(JsPath \ "requests").formatMaybeEmptyMap[Resource.Quantity]
)(Resource.Requirements.apply _, unlift(Resource.Requirements.unapply))

implicit val protocolFmt = Format(enumReads(Protocol, Protocol.TCP), enumWrites)
implicit val protocolFmt: Format[skuber.Protocol.Value] = Format(enumReads(Protocol, Protocol.TCP), enumWrites)

implicit val formatCntrProt: Format[Container.Port] = (
(JsPath \ "containerPort").format[Int] and
Expand Down
8 changes: 4 additions & 4 deletions client/src/main/scala/skuber/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -284,28 +284,28 @@ package object skuber {
/**
* Initialise Skuber using default Kubernetes and application configuration.
*/
def k8sInit(implicit actorSystem: ActorSystem, materializer: Materializer): KubernetesClient = {
def k8sInit(implicit actorSystem: ActorSystem): KubernetesClient = {
skuber.api.client.init
}

/**
* Initialise Skuber using the specified Kubernetes configuration and default application configuration.
*/
def k8sInit(config: skuber.api.Configuration)(implicit actorSystem: ActorSystem, materializer: Materializer): KubernetesClient = {
def k8sInit(config: skuber.api.Configuration)(implicit actorSystem: ActorSystem): KubernetesClient = {
skuber.api.client.init(config)
}

/**
* Initialise Skuber using default Kubernetes configuration and the specified application configuration.
*/
def k8sInit(appConfig: Config)(implicit actorSystem: ActorSystem, materializer: Materializer): KubernetesClient = {
def k8sInit(appConfig: Config)(implicit actorSystem: ActorSystem): KubernetesClient = {
skuber.api.client.init(appConfig)
}

/**
* Initialise Skuber using the specified Kubernetes and application configuration.
*/
def k8sInit(config: skuber.api.Configuration, appConfig: Config)(implicit actorSystem: ActorSystem, materializer: Materializer)
def k8sInit(config: skuber.api.Configuration, appConfig: Config)(implicit actorSystem: ActorSystem)
: KubernetesClient =
{
skuber.api.client.init(config, appConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import skuber.json.format._
import skuber.ReplicationController
import org.specs2.mutable.Specification
import akka.util.ByteString
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.actor.ActorSystem
import skuber.api.watch.BytesToWatchEventSource
Expand All @@ -21,7 +20,6 @@ class BytesToWatchEventSourceSpec extends Specification {

implicit val system: ActorSystem = ActorSystem("test")
implicit val ec: ExecutionContext = system.dispatcher
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val loggingContext: LoggingContext = new LoggingContext { override def output:String="test" }

"A single chunk containing a single Watch event can be read correctly" >> {
Expand Down
2 changes: 0 additions & 2 deletions client/src/test/scala/skuber/api/ConfigurationSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import java.nio.file.Paths
import java.time.format.DateTimeFormatter
import java.time.{Instant, ZoneId}

import akka.stream.ActorMaterializer
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory

Expand Down Expand Up @@ -102,7 +101,6 @@ users:
"""

implicit val system=ActorSystem("test")
implicit val materializer = ActorMaterializer()
implicit val loggingContext: LoggingContext = new LoggingContext { override def output:String="test" }

"An example kubeconfig file can be parsed correctly" >> {
Expand Down
2 changes: 0 additions & 2 deletions client/src/test/scala/skuber/api/LongPollingPoolSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.settings.ClientConnectionSettings
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import com.typesafe.sslconfig.akka.AkkaSSLConfig
import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}
Expand All @@ -25,7 +24,6 @@ import scala.util.Success

class LongPollingPoolSpec extends Specification with ScalaFutures {
implicit val system: ActorSystem = ActorSystem("watch-source-spec")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val ec: ExecutionContext = system.dispatcher
implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = Span(3, Seconds), interval = Span(5, Millis))

Expand Down
5 changes: 2 additions & 3 deletions client/src/test/scala/skuber/api/WatchSourceSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import akka.http.scaladsl.model._
import akka.stream.scaladsl.Framing.FramingException
import akka.stream.scaladsl.{Flow, Keep, TcpIdleTimeoutException}
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{ActorMaterializer, KillSwitches}
import akka.stream.KillSwitches
import com.fasterxml.jackson.core.JsonParseException
import org.mockito.Mockito.{times, verify, when}
import org.scalatest.mockito.MockitoSugar
import org.scalatestplus.mockito.MockitoSugar
import org.specs2.mutable.Specification
import skuber.api.client.impl.KubernetesClientImpl
import skuber.api.watch.WatchSource.Start
Expand All @@ -26,7 +26,6 @@ import scala.util.Try

class WatchSourceSpec extends Specification with MockitoSugar {
implicit val system: ActorSystem = ActorSystem("watch-source-spec")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val loggingContext: LoggingContext = new LoggingContext {
override def output: String = "test"
}
Expand Down
2 changes: 0 additions & 2 deletions docs/Examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ import skuber.json.format._

// Some standard Akka implicits that are required by the skuber v2 client API
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val dispatcher = system.dispatcher
```

Expand Down
4 changes: 0 additions & 4 deletions docs/GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,7 @@ import skuber._
import skuber.json.format._

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val dispatcher = system.dispatcher

val k8s = k8sInit
Expand Down Expand Up @@ -259,12 +257,10 @@ import skuber.json.format._
import skuber.apps.v1.Deployment

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink

object WatchExamples {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val dispatcher = system.dispatcher
val k8s = k8sInit

Expand Down
4 changes: 0 additions & 4 deletions docs/MIGRATION_1-to-2.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,8 @@ Because skuber now uses [Akka Http](https://doc.akka.io/docs/akka-http/current/s
```
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val k8s = skuber.k8sInit
Expand Down
Loading

0 comments on commit 9f9a06f

Please sign in to comment.