From ce74dc196f6d217f23ca8cc22175f3210fc60506 Mon Sep 17 00:00:00 2001 From: Nicolas F Rouquette Date: Sat, 20 Jul 2019 17:18:58 -0700 Subject: [PATCH 1/6] Refactoring to support reusing a skuber-created pool across multiple job executions. --- build.sbt | 12 +- .../skuber/api/client/KubernetesClient.scala | 70 +++++- .../client/impl/KubernetesClientImpl.scala | 79 ++++-- .../scala/skuber/api/client/package.scala | 10 +- .../skuber/api/watch/LongPollingPool.scala | 7 +- .../scala/skuber/api/watch/WatchSource.scala | 157 ++++++------ .../scala/skuber/api/WatchSourceSpec.scala | 5 +- docs/Examples.md | 183 +++++++++++++- .../scala/skuber/examples/job/PiJobs.scala | 238 ++++++++++++++++++ 9 files changed, 645 insertions(+), 116 deletions(-) create mode 100644 examples/src/main/scala/skuber/examples/job/PiJobs.scala diff --git a/build.sbt b/build.sbt index cdc2ad3a..57e3f1b7 100644 --- a/build.sbt +++ b/build.sbt @@ -1,11 +1,13 @@ resolvers += "Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases/" +val akkaVersion = "2.5.23" + val scalaCheck = "org.scalacheck" %% "scalacheck" % "1.14.0" val specs2 = "org.specs2" %% "specs2-core" % "4.3.2" val scalaTest = "org.scalatest" %% "scalatest" % "3.0.5" val mockito = "org.mockito" % "mockito-core" % "2.21.0" -val akkaStreamTestKit = "com.typesafe.akka" %% "akka-stream-testkit" % "2.5.14" +val akkaStreamTestKit = "com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion val snakeYaml = "org.yaml" % "snakeyaml" % "1.21" val commonsIO = "commons-io" % "commons-io" % "2.6" @@ -14,11 +16,11 @@ val bouncyCastle = "org.bouncycastle" % "bcpkix-jdk15on" % "1.60" // the client API request/response handing uses Akka Http val akkaHttp = "com.typesafe.akka" %% "akka-http" % "10.1.3" -val akkaStream = "com.typesafe.akka" %% "akka-stream" % "2.5.14" -val akka = "com.typesafe.akka" %% "akka-actor" % "2.5.14" +val akkaStream = "com.typesafe.akka" %% "akka-stream" % akkaVersion +val akka = "com.typesafe.akka" %% "akka-actor" % akkaVersion // Skuber uses akka logging, so the examples config uses the akka slf4j logger with logback backend -val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % "2.5.14" +val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % akkaVersion val logback = "ch.qos.logback" % "logback-classic" % "1.1.3" % Runtime // the Json formatters are based on Play Json @@ -29,7 +31,7 @@ scalacOptions += "-target:jvm-1.8" scalacOptions in Test ++= Seq("-Yrangepos") -version in ThisBuild := "2.2.0" +version in ThisBuild := "2.3.0" sonatypeProfileName := "io.skuber" diff --git a/client/src/main/scala/skuber/api/client/KubernetesClient.scala b/client/src/main/scala/skuber/api/client/KubernetesClient.scala index 92d7708e..5240526c 100644 --- a/client/src/main/scala/skuber/api/client/KubernetesClient.scala +++ b/client/src/main/scala/skuber/api/client/KubernetesClient.scala @@ -1,11 +1,15 @@ package skuber.api.client import akka.stream.scaladsl.{Sink, Source} +import akka.http.scaladsl.Http import akka.util.ByteString -import play.api.libs.json.{Writes,Format} +import play.api.libs.json.{Format, Writes} import skuber.{DeleteOptions, HasStatusSubresource, LabelSelector, ListOptions, ListResource, ObjectResource, Pod, ResourceDefinition, Scale} import skuber.api.patch.Patch +import skuber.api.watch.WatchSource +import skuber.batch.Job +import scala.concurrent.duration.{Duration, FiniteDuration} import scala.concurrent.{Future, Promise} /** @@ -88,6 +92,15 @@ trait KubernetesClient { */ def deleteWithOptions[O <: ObjectResource](name: String, options: DeleteOptions)(implicit rd: ResourceDefinition[O], lc: LoggingContext): Future[Unit] + /** + * Monitor a resource existence until no longer available + * @param name the name of the resource to monitor its existence + * @param monitorRepeatDelay delay for repeating the monitoring as long as the resource is available by name + * @tparam O the specific object resource type e.g. Pod, Deployment + * @return A future that will be set to success when Kubernetes confirm the resource is no longer available by name, otherwise failure + */ + def monitorResourceUntilUnavailable[O <: ObjectResource](name: String, monitorRepeatDelay: FiniteDuration)(implicit fmt: Format[O], rd: ResourceDefinition[O]): Future[Unit] + /** * Delete all resources of specified type in current namespace * @tparam L list resource type of resources to delete e.g. PodList, DeploymentList @@ -216,10 +229,12 @@ trait KubernetesClient { * Watch a specific object resource continuously. This returns a source that will continue to produce * events on any updates to the object even if the server times out, by transparently restarting the watch as needed. * @param obj the object resource to watch + * @param pool reuse a skuber pool for querying the server if any or create a new one * @tparam O the type of the resource e.g Pod - * @return A future containing an Akka streams Source of WatchEvents that will be emitted + * @return A future containing an Akka streams Source of WatchEvents that will be emitted where the materialized + * value is a pair of the skuber pool used and the underlying Akka host connection pool used, if any. */ - def watchContinuously[O <: ObjectResource](obj: O)(implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _] + def watchContinuously[O <: ObjectResource](obj: O, pool: Option[Pool[WatchSource.Start[O]]])(implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])] /** * Watch a specific object resource continuously. This returns a source that will continue to produce @@ -232,11 +247,13 @@ trait KubernetesClient { * applicable type (e.g. PodList, DeploymentList) and then supplies that to this method to receive any future updates. If no resource version is specified, * a single ADDED event will be produced for an already existing object followed by events for any future changes. * @param bufSize optional buffer size for received object updates, normally the default is more than enough + * @param pool reuse a skuber pool for querying the server if any or create a new one * @tparam O the type of the resource - * @return A future containing an Akka streams Source of WatchEvents that will be emitted + * @return A future containing an Akka streams Source of WatchEvents that will be emitted where the materialized + * value is a pair of the skuber pool used and the underlying Akka host connection pool used, if any. */ - def watchContinuously[O <: ObjectResource](name: String, sinceResourceVersion: Option[String] = None, bufSize: Int = 10000)( - implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _] + def watchContinuously[O <: ObjectResource](name: String, sinceResourceVersion: Option[String] = None, bufSize: Int = 10000, pool: Option[Pool[WatchSource.Start[O]]] = None)( + implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])] /** * Watch all object resources of a specified type continuously. This returns a source that will continue to produce @@ -248,24 +265,29 @@ trait KubernetesClient { * applicable type (e.g. PodList, DeploymentList) and then supplies that to this method to receive any future updates. If no resource version is specified, * a single ADDED event will be produced for an already existing object followed by events for any future changes. * @param bufSize optional buffer size for received object updates, normally the default is more than enough + * @param pool reuse a skuber pool for querying the server if any or create a new one * @tparam O the type pf the resource - * @return A future containing an Akka streams Source of WatchEvents that will be emitted + * @return A future containing an Akka streams Source of WatchEvents that will be emitted where the materialized + * value is a pair of the skuber pool used and the underlying Akka host connection pool used, if any. */ - def watchAllContinuously[O <: ObjectResource](sinceResourceVersion: Option[String] = None, bufSize: Int = 10000)( - implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _] + def watchAllContinuously[O <: ObjectResource](sinceResourceVersion: Option[String] = None, bufSize: Int = 10000, pool: Option[Pool[WatchSource.Start[O]]] = None)( + implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])] /** * Watch all object resources of a specified type continuously, passing the specified options to the API server with the watch request. * This returns a source that will continue to produce events even if the server times out, by transparently restarting the watch as needed. + * * @param options a set of list options to pass to the server. See https://godoc.org/k8s.io/apimachinery/pkg/apis/meta/v1#ListOptions * for the meaning of the options. Note that the `watch` flag in the options will be ignored / overridden by the client, which * ensures a watch is always requested on the server. * @param bufsize optional buffer size for received object updates, normally the default is more than enough + * @param pool reuse a skuber pool for querying the server if any or create a new one * @tparam O the resource type to watch - * @return A future containing an Akka streams Source of WatchEvents that will be emitted + * @return A future containing an Akka streams Source of WatchEvents that will be emitted where the materialized + * value is a pair of the skuber pool used and the underlying Akka host connection pool used, if any. */ - def watchWithOptions[O <: ObjectResource](options: ListOptions, bufsize: Int = 10000)( - implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _] + def watchWithOptions[O <: ObjectResource](options: ListOptions, bufsize: Int = 10000, pool: Option[Pool[WatchSource.Start[O]]] = None)( + implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])] /** * Get the scale subresource of the named object resource @@ -350,6 +372,30 @@ trait KubernetesClient { tty: Boolean = false, maybeClose: Option[Promise[Unit]] = None)(implicit lc: LoggingContext): Future[Unit] + /** + * Execute a job, monitoring the progress of its pod until completion and monitor its deletion until complete + * @param job the Kubernetes job to execute + * @param labelSelector the label selector for monitoring the job's pod status + * @param podProgress the predicate for monitoring the pod status while satisfied before deleting the job + * @param podCompletion a callback invoked at the completion of the job's pod (successful or not), after which the job will be deleted + * @param watchContinuouslyRequestTimeout the delay for continuously monitoring the pod progress + * @param deletionMonitorRepeatDelay the delay for continuously monitoring the job deletion + * @param pool a skuber pool to reuse, if any, or to create otherwise + * @return A future consisting of a triple of the following: + * - the skuber pool suitable for subsequently executing other jobs on the same server + * - the akka host connection pool that can be shutdown when no further jobs need to be executed on the same server + * - the last pod status received when the pod progress predicate became unsatisfied + */ + def executeJobAndWaitUntilDeleted( + job: Job, + labelSelector: LabelSelector, + podProgress: WatchEvent[Pod] => Boolean, + podCompletion: WatchEvent[Pod] => Future[Unit], + watchContinuouslyRequestTimeout: Duration, + deletionMonitorRepeatDelay: FiniteDuration, + pool: Option[Pool[WatchSource.Start[Pod]]])(implicit jfmt: Format[Job], pfmt: Format[Pod], jrd: ResourceDefinition[Job], prd: ResourceDefinition[Pod]): + Future[(Pool[WatchSource.Start[Pod]], Option[Http.HostConnectionPool], WatchEvent[Pod])] + /** * Return list of API versions supported by the server * @param lc diff --git a/client/src/main/scala/skuber/api/client/impl/KubernetesClientImpl.scala b/client/src/main/scala/skuber/api/client/impl/KubernetesClientImpl.scala index 3bdbd3a6..76915ed4 100644 --- a/client/src/main/scala/skuber/api/client/impl/KubernetesClientImpl.scala +++ b/client/src/main/scala/skuber/api/client/impl/KubernetesClientImpl.scala @@ -7,12 +7,13 @@ import akka.http.scaladsl.model._ import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings} import akka.http.scaladsl.unmarshalling.Unmarshal import akka.http.scaladsl.{ConnectionContext, Http, HttpsConnectionContext} +import akka.pattern.after import akka.stream.Materializer -import akka.stream.scaladsl.{Sink, Source} +import akka.stream.scaladsl.{Keep, Sink, Source} import akka.util.ByteString import com.typesafe.config.{Config, ConfigFactory} import javax.net.ssl.SSLContext -import play.api.libs.json.{Format, Writes, Reads} +import play.api.libs.json.{Format, Reads, Writes} import skuber._ import skuber.api.client.exec.PodExecImpl import skuber.api.client.{K8SException => _, _} @@ -22,6 +23,7 @@ import skuber.json.PlayJsonSupportForAkkaHttp._ import skuber.json.format.apiobj.statusReads import skuber.json.format.{apiVersionsFormat, deleteOptionsFmt, namespaceListFmt} import skuber.api.patch._ +import skuber.batch.Job import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future, Promise} @@ -403,6 +405,17 @@ class KubernetesClientImpl private[client] ( } yield () } + override def monitorResourceUntilUnavailable[O <: ObjectResource](name: String, monitorRepeatDelay: FiniteDuration)( + implicit fmt: Format[O], rd: ResourceDefinition[O]): Future[Unit] = + getOption[O](name).flatMap { + case None => + Future.successful(()) + case Some(_) => + after(monitorRepeatDelay, actorSystem.scheduler)( + monitorResourceUntilUnavailable[O](name, monitorRepeatDelay) + ) + } + override def deleteAll[L <: ListResource[_]]()( implicit fmt: Format[L], rd: ResourceDefinition[L], lc: LoggingContext): Future[L] = { @@ -481,30 +494,30 @@ class KubernetesClientImpl private[client] ( Watch.eventsOnKind[O](this, sinceResourceVersion, bufSize) } - override def watchContinuously[O <: ObjectResource](obj: O)( - implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _] = + override def watchContinuously[O <: ObjectResource](obj: O, pool: Option[Pool[WatchSource.Start[O]]])( + implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])] = { - watchContinuously(obj.name) + watchContinuously(obj.name, pool = pool) } - override def watchContinuously[O <: ObjectResource](name: String, sinceResourceVersion: Option[String] = None, bufSize: Int = 10000)( - implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _] = + override def watchContinuously[O <: ObjectResource](name: String, sinceResourceVersion: Option[String] = None, bufSize: Int = 10000, pool: Option[Pool[WatchSource.Start[O]]] = None)( + implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])] = { val options=ListOptions(resourceVersion = sinceResourceVersion, timeoutSeconds = Some(watchContinuouslyRequestTimeout.toSeconds) ) - WatchSource(this, buildLongPollingPool(), Some(name), options, bufSize) + WatchSource(this, pool.getOrElse(buildLongPollingPool()), Some(name), options, bufSize) } - override def watchAllContinuously[O <: ObjectResource](sinceResourceVersion: Option[String] = None, bufSize: Int = 10000)( - implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _] = + override def watchAllContinuously[O <: ObjectResource](sinceResourceVersion: Option[String] = None, bufSize: Int = 10000, pool: Option[Pool[WatchSource.Start[O]]] = None)( + implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])] = { val options=ListOptions(resourceVersion = sinceResourceVersion, timeoutSeconds = Some(watchContinuouslyRequestTimeout.toSeconds)) - WatchSource(this, buildLongPollingPool(), None, options, bufSize) + WatchSource(this, pool.getOrElse(buildLongPollingPool()), None, options, bufSize) } - override def watchWithOptions[O <: skuber.ObjectResource](options: ListOptions, bufsize: Int = 10000)( - implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _] = + override def watchWithOptions[O <: skuber.ObjectResource](options: ListOptions, bufsize: Int = 10000, pool: Option[Pool[WatchSource.Start[O]]] = None)( + implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])] = { - WatchSource(this, buildLongPollingPool(), None, options, bufsize) + WatchSource(this, pool.getOrElse(buildLongPollingPool()), None, options, bufsize) } private def buildLongPollingPool[O <: ObjectResource]() = { @@ -622,6 +635,44 @@ class KubernetesClientImpl private[client] ( PodExecImpl.exec(this, podName, command, maybeContainerName, maybeStdin, maybeStdout, maybeStderr, tty, maybeClose) } + override def executeJobAndWaitUntilDeleted( + job: Job, + labelSelector: LabelSelector, + podProgress: WatchEvent[Pod] => Boolean, + podCompletion: WatchEvent[Pod] => Future[Unit], + watchContinuouslyRequestTimeout: Duration, + deletionMonitorRepeatDelay: FiniteDuration, + pool: Option[Pool[WatchSource.Start[Pod]]])(implicit jfmt: Format[Job], pfmt: Format[Pod], jrd: ResourceDefinition[Job], prd: ResourceDefinition[Pod]) + : Future[(Pool[WatchSource.Start[Pod]], Option[Http.HostConnectionPool], WatchEvent[Pod])] = + for { + j <- create(job) + (p, hcp, lastPodEvent) <- { + watchWithOptions[Pod]( + options = ListOptions( + labelSelector = Some(labelSelector), + timeoutSeconds = Some(watchContinuouslyRequestTimeout.toSeconds) + ), + pool = pool + ) + .takeWhile(podProgress, inclusive = true) + .toMat(Sink.last)(Keep.both) + .run() match { + case ((pool: Pool[WatchSource.Start[Pod]], + hostConnectionPool: Option[Http.HostConnectionPool]), + f: Future[WatchEvent[Pod]]) => + f.map { ev => + (pool, hostConnectionPool, ev) + } + } + } + _ <- podCompletion(lastPodEvent) + _ <- deleteWithOptions[Job]( + name = j.metadata.name, + options = + DeleteOptions(propagationPolicy = Some(DeletePropagation.Foreground))) + _ <- monitorResourceUntilUnavailable[Job](j.metadata.name, deletionMonitorRepeatDelay) + } yield (p, hcp, lastPodEvent) + override def close: Unit = { isClosed = true diff --git a/client/src/main/scala/skuber/api/client/package.scala b/client/src/main/scala/skuber/api/client/package.scala index 9069ed9d..d6268971 100644 --- a/client/src/main/scala/skuber/api/client/package.scala +++ b/client/src/main/scala/skuber/api/client/package.scala @@ -3,8 +3,8 @@ package skuber.api import java.time.Instant import java.util.UUID -import akka.NotUsed import akka.actor.ActorSystem +import akka.http.scaladsl.Http import akka.http.scaladsl.model._ import akka.stream.Materializer import akka.stream.scaladsl.Flow @@ -23,7 +23,13 @@ import skuber.api.client.impl.KubernetesClientImpl */ package object client { - type Pool[T] = Flow[(HttpRequest, T), (Try[HttpResponse], T), NotUsed] + /** + * The materialized value is an optional host connection pool. + * For testing, allows mocking without creating a host connection pool. + * For development and production, provides access to the host connection pool created (if none was provided). + * @tparam T The type of elements flowing in and out. + */ + type Pool[T] = Flow[(HttpRequest, T), (Try[HttpResponse], T), Option[Http.HostConnectionPool]] final val sysProps = new SystemProperties diff --git a/client/src/main/scala/skuber/api/watch/LongPollingPool.scala b/client/src/main/scala/skuber/api/watch/LongPollingPool.scala index 13a1ec2b..c534db74 100644 --- a/client/src/main/scala/skuber/api/watch/LongPollingPool.scala +++ b/client/src/main/scala/skuber/api/watch/LongPollingPool.scala @@ -1,6 +1,5 @@ package skuber.api.watch -import akka.NotUsed import akka.actor.ActorSystem import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings} import akka.http.scaladsl.{Http, HttpsConnectionContext} @@ -19,19 +18,19 @@ private[api] object LongPollingPool { Http().newHostConnectionPool[T]( host, port, buildHostConnectionPool(poolIdleTimeout, clientConnectionSettings, system) - ).mapMaterializedValue(_ => NotUsed) + ).mapMaterializedValue(Some(_)) case "https" => Http().newHostConnectionPoolHttps[T]( host, port, httpsConnectionContext.getOrElse(Http().defaultClientHttpsContext), buildHostConnectionPool(poolIdleTimeout, clientConnectionSettings, system) - ).mapMaterializedValue(_ => NotUsed) + ).mapMaterializedValue(Some(_)) case unsupported => throw new IllegalArgumentException(s"Schema $unsupported is not supported") } } - private def buildHostConnectionPool[T](poolIdleTimeout: Duration, clientConnectionSettings: ClientConnectionSettings, system: ActorSystem) = { + def buildHostConnectionPool[T](poolIdleTimeout: Duration, clientConnectionSettings: ClientConnectionSettings, system: ActorSystem) = { ConnectionPoolSettings(system) .withMaxConnections(1) // Limit number the of open connections to one .withPipeliningLimit(1) // Limit pipelining of requests to one diff --git a/client/src/main/scala/skuber/api/watch/WatchSource.scala b/client/src/main/scala/skuber/api/watch/WatchSource.scala index 6b26f4c5..d84b60a0 100644 --- a/client/src/main/scala/skuber/api/watch/WatchSource.scala +++ b/client/src/main/scala/skuber/api/watch/WatchSource.scala @@ -2,32 +2,35 @@ package skuber.api.watch import akka.NotUsed import akka.actor.ActorSystem +import akka.http.scaladsl.Http import akka.http.scaladsl.model._ -import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Merge, Source} +import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, Merge, Source} import akka.stream.{Materializer, SourceShape} import play.api.libs.json.Format import skuber.api.client._ import skuber.api.client.impl.KubernetesClientImpl -import skuber.{K8SRequestContext, ObjectResource, ResourceDefinition, ListOptions} +import skuber.{ListOptions, ObjectResource, ResourceDefinition} import scala.concurrent.ExecutionContext -import scala.concurrent.duration._ -import scala.util.{Failure, Success} - -private[api] object WatchSource { - sealed trait StreamElement[O <: ObjectResource] {} - case class End[O <: ObjectResource]() extends StreamElement[O] +import scala.util.{Failure, Success, Try} + +/** + * @author David O'Riordan + */ +object WatchSource { + private[api] sealed trait StreamElement[O <: ObjectResource] {} + private[api] case class End[O <: ObjectResource]() extends StreamElement[O] case class Start[O <: ObjectResource](resourceVersion: Option[String]) extends StreamElement[O] - case class Result[O <: ObjectResource](resourceVersion: String, value: WatchEvent[O]) extends StreamElement[O] + private[api] case class Result[O <: ObjectResource](resourceVersion: String, value: WatchEvent[O]) extends StreamElement[O] - sealed trait StreamState {} - case object Waiting extends StreamState - case object Processing extends StreamState - case object Finished extends StreamState + private[api] sealed trait StreamState {} + private[api] case object Waiting extends StreamState + private[api] case object Processing extends StreamState + private[api] case object Finished extends StreamState - case class StreamContext(currentResourceVersion: Option[String], state: StreamState) + private[api] case class StreamContext(currentResourceVersion: Option[String], state: StreamState) - def apply[O <: ObjectResource](client: KubernetesClientImpl, + private[api] def apply[O <: ObjectResource](client: KubernetesClientImpl, pool: Pool[Start[O]], name: Option[String], options: ListOptions, @@ -35,76 +38,78 @@ private[api] object WatchSource { fm: Materializer, format: Format[O], rd: ResourceDefinition[O], - lc: LoggingContext): Source[WatchEvent[O], NotUsed] = { - Source.fromGraph(GraphDSL.create() { implicit b => - import GraphDSL.Implicits._ - - implicit val dispatcher: ExecutionContext = sys.dispatcher - - def createWatchRequest(since: Option[String]) = - { - val nameFieldSelector=name.map(objName => s"metadata.name=$objName") - val watchOptions=options.copy( - resourceVersion = since, - watch = Some(true), - fieldSelector = nameFieldSelector.orElse(options.fieldSelector) - ) - client.buildRequest( - HttpMethods.GET, rd, None, query = Some(Uri.Query(watchOptions.asMap)) - ) + lc: LoggingContext): Source[WatchEvent[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])] = { + + implicit val dispatcher: ExecutionContext = sys.dispatcher + + val singleEnd = Source.single(End[O]()) + + def singleStart(s:StreamElement[O]) = Source.single(s) + + val httpFlow: Flow[(HttpRequest, Start[O]), StreamElement[O], Option[Http.HostConnectionPool]] = + Flow[(HttpRequest, Start[O])].map { request => // log request + client.logInfo(client.logConfig.logRequestBasic, s"about to send HTTP request: ${request._1.method.value} ${request._1.uri.toString}") + request + }.viaMat[(Try[HttpResponse], Start[O]), Option[Http.HostConnectionPool], Option[Http.HostConnectionPool]](pool)(Keep.right).flatMapConcat { + case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), se) => + client.logInfo(client.logConfig.logResponseBasic, s"received response with HTTP status 200") + singleStart(se).concat( + BytesToWatchEventSource[O](entity.dataBytes, bufSize).map { event => + Result[O](event._object.resourceVersion, event) + } + ).concat(singleEnd) + case (Success(HttpResponse(sc, _, entity, _)), _) => + client.logWarn(s"Error watching resource. Received a status of ${sc.intValue()}") + entity.discardBytes() + throw new K8SException(Status(message = Some("Error watching resource"), code = Some(sc.intValue()))) + case (Failure(f), _) => + client.logError("Error watching resource.", f) + throw new K8SException(Status(message = Some("Error watching resource"), details = Some(f.getMessage))) } - val singleEnd = Source.single(End[O]()) + val httpFlowMat: Flow[(HttpRequest, Start[O]), StreamElement[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])] = + httpFlow.mapMaterializedValue { pool -> _ } - def singleStart(s:StreamElement[O]) = Source.single(s) - - val initSource = Source.single( - (createWatchRequest(options.resourceVersion), Start[O](options.resourceVersion)) + def createWatchRequest(since: Option[String]) = + { + val nameFieldSelector=name.map(objName => s"metadata.name=$objName") + val watchOptions=options.copy( + resourceVersion = since, + watch = Some(true), + fieldSelector = nameFieldSelector.orElse(options.fieldSelector) ) - - val httpFlow: Flow[(HttpRequest, Start[O]), StreamElement[O], NotUsed] = - Flow[(HttpRequest, Start[O])].map { request => // log request - client.logInfo(client.logConfig.logRequestBasic, s"about to send HTTP request: ${request._1.method.value} ${request._1.uri.toString}") - request - }.via(pool).flatMapConcat { - case (Success(HttpResponse(StatusCodes.OK, _, entity, _)), se) => - client.logInfo(client.logConfig.logResponseBasic, s"received response with HTTP status 200") - singleStart(se).concat( - BytesToWatchEventSource[O](entity.dataBytes, bufSize).map { event => - Result[O](event._object.resourceVersion, event) - } - ).concat(singleEnd) - case (Success(HttpResponse(sc, _, entity, _)), _) => - client.logWarn(s"Error watching resource. Received a status of ${sc.intValue()}") - entity.discardBytes() - throw new K8SException(Status(message = Some("Error watching resource"), code = Some(sc.intValue()))) - case (Failure(f), _) => - client.logError("Error watching resource.", f) - throw new K8SException(Status(message = Some("Error watching resource"), details = Some(f.getMessage))) + client.buildRequest( + HttpMethods.GET, rd, None, query = Some(Uri.Query(watchOptions.asMap)) + ) + } + + val initSource = Source.single( + (createWatchRequest(options.resourceVersion), Start[O](options.resourceVersion)) + ) + + val outboundFlow: Flow[StreamElement[O], WatchEvent[O], NotUsed] = + Flow[StreamElement[O]] + .filter(_.isInstanceOf[Result[O]]) + .map{ + case Result(_, event) => event + case _ => throw new K8SException(Status(message = Some("Error processing watch events."))) } - val outboundFlow: Flow[StreamElement[O], WatchEvent[O], NotUsed] = - Flow[StreamElement[O]] - .filter(_.isInstanceOf[Result[O]]) - .map{ - case Result(_, event) => event - case _ => throw new K8SException(Status(message = Some("Error processing watch events."))) - } - - - val feedbackFlow: Flow[StreamElement[O], (HttpRequest, Start[O]), NotUsed] = - Flow[StreamElement[O]].scan(StreamContext(None, Waiting)){(cxt, next) => - next match { - case Start(rv) => StreamContext(rv, Processing) - case Result(rv, _) => StreamContext(Some(rv), Processing) - case End() => cxt.copy(state = Finished) - } - }.filter(_.state == Finished).map { acc => - (createWatchRequest(acc.currentResourceVersion), Start[O](acc.currentResourceVersion)) + val feedbackFlow: Flow[StreamElement[O], (HttpRequest, Start[O]), NotUsed] = + Flow[StreamElement[O]].scan(StreamContext(None, Waiting)){(cxt, next) => + next match { + case Start(rv) => StreamContext(rv, Processing) + case Result(rv, _) => StreamContext(Some(rv), Processing) + case End() => cxt.copy(state = Finished) } + }.filter(_.state == Finished).map { acc => + (createWatchRequest(acc.currentResourceVersion), Start[O](acc.currentResourceVersion)) + } + + Source.fromGraph(GraphDSL.create(httpFlowMat) { implicit b => http => + import GraphDSL.Implicits._ val init = b.add(initSource) - val http = b.add(httpFlow) val merge = b.add(Merge[(HttpRequest, Start[O])](2)) val broadcast = b.add(Broadcast[StreamElement[O]](2, eagerCancel = true)) val outbound = b.add(outboundFlow) diff --git a/client/src/test/scala/skuber/api/WatchSourceSpec.scala b/client/src/test/scala/skuber/api/WatchSourceSpec.scala index 64e29eba..876034ff 100644 --- a/client/src/test/scala/skuber/api/WatchSourceSpec.scala +++ b/client/src/test/scala/skuber/api/WatchSourceSpec.scala @@ -4,6 +4,7 @@ import java.net.ConnectException import java.time.{ZoneId, ZonedDateTime} import akka.actor.ActorSystem +import akka.http.scaladsl.Http import akka.http.scaladsl.model._ import akka.stream.scaladsl.Framing.FramingException import akka.stream.scaladsl.{Flow, Keep, TcpIdleTimeoutException} @@ -517,13 +518,13 @@ class WatchSourceSpec extends Specification with MockitoSugar { def mockPool[O <: ObjectResource](requestResponses: Map[HttpRequest, HttpResponse]): Pool[Start[O]] = { Flow[(HttpRequest, Start[O])].map { x => (Try(requestResponses(x._1)), x._2) - } + }.mapMaterializedValue(_ => Option.empty[Http.HostConnectionPool]) } def mockPool[O <: ObjectResource](error: Throwable): Pool[Start[O]] = { Flow[(HttpRequest, Start[O])].map { x => (Try(throw error), x._2) - } + }.mapMaterializedValue(_ => Option.empty[Http.HostConnectionPool]) } def retrieveWatchJson(path: String): String = { diff --git a/docs/Examples.md b/docs/Examples.md index 25804e51..83a1bd48 100644 --- a/docs/Examples.md +++ b/docs/Examples.md @@ -72,7 +72,6 @@ val podFuture = k8s.create(pod) // handle future as you see fit ``` - ## Create deployment This example creates a nginx service (accessed via port 30001 on each Kubernetes cluster node) that is backed by a deployment of five nginx replicas. @@ -117,6 +116,188 @@ createOnK8s.onComplete { } ``` +## Execute a job and monitor its execution until completed (successfully or not) and monitor its deletion + +First, define a suitable progress predicate for monitoring the execution of a Kubernetes pod. +For example: + +```scala + def podProgress( + ev: WatchEvent[Pod] + ): Boolean = { + + def containerStatusProgress(acc: Boolean, x: Container.Status): Boolean = { + x.state.fold[Boolean](acc) { + case Container.Waiting(None) => acc + case Container.Waiting(Some(reason)) => + !(reason.startsWith("Err") || reason.endsWith("BackOff")) + case Container.Running(_) => acc + case _: Container.Terminated => false + } + } + + def podStatusProgress( + s: Pod.Status + ): Boolean = { + val ok1 = s.initContainerStatuses + .foldLeft[Boolean](true)(containerStatusProgress) + val ok2 = s.containerStatuses + .foldLeft[Boolean](ok1)(containerStatusProgress) + val ok3 = s.conditions.foldLeft[Boolean](ok2) { + case (acc, _: Pod.Condition) => + acc + } + ok3 + } + + ev._type != EventType.DELETED && + ev._type != EventType.ERROR && + ev._object.status.fold[Boolean](true)(podStatusProgress) + } +``` + +Next, define a suitable completion callback for handling a completed. +For example: + +```scala + def podCompletion(k8s: KubernetesClient)(lastPodEvent: WatchEvent[Pod])( + implicit ec: ExecutionContext, + mat: ActorMaterializer): Future[Unit] = { + + def printLogFlow(cntrName: String): Sink[ByteString, NotUsed] = + Flow[ByteString] + .via( + Framing.delimiter(ByteString("\n"), + maximumFrameLength = 10000, + allowTruncation = true)) + .map(_.utf8String) + .to(Sink.foreach(text => println(s"[$cntrName logs] $text"))) + + def showContainerStateIfSuccessful(cs: Container.Status, + podName: String, + message: String): Future[Unit] = + cs.state.fold[Future[Unit]](Future.successful(())) { + case s: Container.Terminated if s.exitCode == 0 => + for { + logSource <- k8s.getPodLogSource( + name = podName, + queryParams = Pod.LogQueryParams(containerName = Some(cs.name))) + _ = logSource.runWith(printLogFlow(message)) + } yield () + case s => + println(s"[$message] No logs because of unsuccessful status: $s") + Future.successful(()) + } + + lastPodEvent._object.status match { + case None => + Future.successful(()) + case Some(s) => + val podName = lastPodEvent._object.name + for { + _ <- s.initContainerStatuses + .foldLeft[Future[Unit]](Future.successful(())) { + case (_, cs) => + showContainerStateIfSuccessful( + cs, + podName, + s"init/$podName (iteration=${lastPodEvent._object.metadata.labels("iteration")})") + } + _ <- s.containerStatuses + .foldLeft[Future[Unit]](Future.successful(())) { + case (_, cs) => + showContainerStateIfSuccessful( + cs, + podName, + s"$podName (iteration=${lastPodEvent._object.metadata.labels("iteration")})") + } + } yield () + } + } +``` + +Next, define some suitable delays for monitoring: + +```scala +val watchContinuouslyRequestTimeout: Duration = ... +val deletionMonitorRepeatDelay: FiniteDuration = ... +``` + +Next, define a list of jobs to execute. +This example generates a sequence of jobs, some that cannot be executed. + +```scala + val jobs = Seq.tabulate[Job](n = 10) { n => + if (n % 3 == 0) { + // simulate a job failure + val piContainer = Container(name = "pi", + image = "nowhere/does-not-exist:latest", + command = List("/bin/bash"), + args = List("-c", "env")) + val piSpec = Pod + .Spec() + .addContainer(piContainer) + .withRestartPolicy(RestartPolicy.Never) + val piTemplateSpec = + Pod.Template.Spec(metadata = metadata(n)).withPodSpec(piSpec) + Job("pi").withTemplate(piTemplateSpec) + } else { + val piContainer = Container( + name = "pi", + image = "perl", + command = + List("perl", "-Mbignum=bpi", "-wle", s"print bpi(${n * 10})")) + val piSpec = Pod + .Spec() + .addContainer(piContainer) + .withRestartPolicy(RestartPolicy.Never) + val piTemplateSpec = + Pod.Template.Spec(metadata = metadata(n)).withPodSpec(piSpec) + Job("pi").withTemplate(piTemplateSpec) + } + } +``` + +Finally, exeute them. Note that skuber will create a host connection pool +for the first job and that this pool will be reused for subsequent jobs +and will be shutdown when no more jobs need to be executed on the same server. + +```scala + val (firstJob, otherJobs) = (jobs.head, jobs.tail) + + val f: Future[Unit] = for { + + // First run: create a pool. + (pool, hcp, podEvent) <- k8s.executeJobAndWaitUntilDeleted( + firstJob, + labelSelector, + podProgress, + podCompletion(k8s), + watchContinuouslyRequestTimeout, + deletionMonitorRepeatDelay, + None) + + // Subsequent runs: reuse the same pool. + _ <- Source + .fromIterator(() => otherJobs.toIterator) + .mapAsync(parallelism = 1) { job: Job => + k8s.executeJobAndWaitUntilDeleted(job, + labelSelector, + podProgress, + podCompletion(k8s), + watchContinuouslyRequestTimeout, + deletionMonitorRepeatDelay, + Some(pool)) + } + .runForeach(_ => ()) + + // Shutdown the pool, if any. + _ <- hcp.fold(Future.successful(()))(_.shutdown().map(_ => ())) + + } yield () +``` + + ## Safely shutdown the client ```scala diff --git a/examples/src/main/scala/skuber/examples/job/PiJobs.scala b/examples/src/main/scala/skuber/examples/job/PiJobs.scala new file mode 100644 index 00000000..4db4a23b --- /dev/null +++ b/examples/src/main/scala/skuber/examples/job/PiJobs.scala @@ -0,0 +1,238 @@ +package skuber.examples.job + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.stream.scaladsl._ +import akka.util.ByteString +import com.typesafe.config.{Config, ConfigFactory} +import skuber.api.client.{ + EventType, + KubernetesClient, + WatchEvent, + defaultK8sConfig +} +import skuber.batch.Job +import skuber.json.batch.format._ +import skuber.json.format._ +import skuber.{ + Container, + LabelSelector, + ObjectMeta, + Pod, + RestartPolicy, + k8sInit +} + +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext, Future} + +/** + * Demonstrates two things: + * 1) watching continuously pod events until any container status or pod status indicates a non-progress condition. + * 2) making sure that the host connection pool used for watching is shutdown to avoid the + */ +object PiJobs { + + def podProgress( + ev: WatchEvent[Pod] + ): Boolean = { + + def containerStatusProgress(acc: Boolean, x: Container.Status): Boolean = { + x.state.fold[Boolean](acc) { + case Container.Waiting(None) => acc + case Container.Waiting(Some(reason)) => + !(reason.startsWith("Err") || reason.endsWith("BackOff")) + case Container.Running(_) => acc + case _: Container.Terminated => false + } + } + + def podStatusProgress( + s: Pod.Status + ): Boolean = { + val ok1 = s.initContainerStatuses + .foldLeft[Boolean](true)(containerStatusProgress) + val ok2 = s.containerStatuses + .foldLeft[Boolean](ok1)(containerStatusProgress) + val ok3 = s.conditions.foldLeft[Boolean](ok2) { + case (acc, _: Pod.Condition) => + acc + } + ok3 + } + + ev._type != EventType.DELETED && + ev._type != EventType.ERROR && + ev._object.status.fold[Boolean](true)(podStatusProgress) + } + + def durationFomConfig(config: Config)(configKey: String): Option[Duration] = + Some(Duration.fromNanos(config.getDuration(configKey).toNanos)) + + def getSkuberConfig[T](config: Config, + key: String, + fromConfig: String => Option[T], + default: T): T = { + val skuberConfigKey = s"skuber.$key" + if (config.getIsNull(skuberConfigKey)) { + default + } else { + fromConfig(skuberConfigKey) match { + case None => default + case Some(t) => t + } + } + } + + def podCompletion(k8s: KubernetesClient)(lastPodEvent: WatchEvent[Pod])( + implicit ec: ExecutionContext, + mat: ActorMaterializer): Future[Unit] = { + + def printLogFlow(cntrName: String): Sink[ByteString, NotUsed] = + Flow[ByteString] + .via( + Framing.delimiter(ByteString("\n"), + maximumFrameLength = 10000, + allowTruncation = true)) + .map(_.utf8String) + .to(Sink.foreach(text => println(s"[$cntrName logs] $text"))) + + def showContainerStateIfSuccessful(cs: Container.Status, + podName: String, + message: String): Future[Unit] = + cs.state.fold[Future[Unit]](Future.successful(())) { + case s: Container.Terminated if s.exitCode == 0 => + for { + logSource <- k8s.getPodLogSource( + name = podName, + queryParams = Pod.LogQueryParams(containerName = Some(cs.name))) + _ = logSource.runWith(printLogFlow(message)) + } yield () + case s => + println(s"[$message] No logs because of unsuccessful status: $s") + Future.successful(()) + } + + lastPodEvent._object.status match { + case None => + Future.successful(()) + case Some(s) => + val podName = lastPodEvent._object.name + for { + _ <- s.initContainerStatuses + .foldLeft[Future[Unit]](Future.successful(())) { + case (_, cs) => + showContainerStateIfSuccessful( + cs, + podName, + s"init/$podName (iteration=${lastPodEvent._object.metadata.labels("iteration")})") + } + _ <- s.containerStatuses + .foldLeft[Future[Unit]](Future.successful(())) { + case (_, cs) => + showContainerStateIfSuccessful( + cs, + podName, + s"$podName (iteration=${lastPodEvent._object.metadata.labels("iteration")})") + } + } yield () + } + } + + def main( + args: Array[String] + ): Unit = { + + implicit val as: ActorSystem = ActorSystem("PiJobs") + implicit val ec: ExecutionContext = as.dispatcher + implicit val mat: ActorMaterializer = ActorMaterializer() + val sconfig: skuber.api.Configuration = defaultK8sConfig + val aconfig: Config = ConfigFactory.load() + implicit val k8s: KubernetesClient = + k8sInit(config = sconfig, appConfig = aconfig) + + val watchContinuouslyRequestTimeout: Duration = getSkuberConfig( + aconfig, + "watch-continuously.request-timeout", + durationFomConfig(aconfig), + 30.seconds) + + val deletionMonitorRepeatDelay: FiniteDuration = 1.second + + def metadata(n: Int) = + ObjectMeta(name = "pi", + labels = Map("job-kind" -> "piTest", "iteration" -> s"$n")) + val labelSelector = LabelSelector( + LabelSelector.IsEqualRequirement("job-kind", "piTest")) + + val jobs = Seq.tabulate[Job](n = 10) { n => + if (n % 3 == 0) { + // simulate a job failure + val piContainer = Container(name = "pi", + image = "nowhere/does-not-exist:latest", + command = List("/bin/bash"), + args = List("-c", "env")) + val piSpec = Pod + .Spec() + .addContainer(piContainer) + .withRestartPolicy(RestartPolicy.Never) + val piTemplateSpec = + Pod.Template.Spec(metadata = metadata(n)).withPodSpec(piSpec) + Job("pi").withTemplate(piTemplateSpec) + } else { + val piContainer = Container( + name = "pi", + image = "perl", + command = + List("perl", "-Mbignum=bpi", "-wle", s"print bpi(${n * 10})")) + val piSpec = Pod + .Spec() + .addContainer(piContainer) + .withRestartPolicy(RestartPolicy.Never) + val piTemplateSpec = + Pod.Template.Spec(metadata = metadata(n)).withPodSpec(piSpec) + Job("pi").withTemplate(piTemplateSpec) + } + } + val (firstJob, otherJobs) = (jobs.head, jobs.tail) + + val f: Future[Unit] = for { + + // First run: create a pool. + (pool, hcp, podEvent) <- k8s.executeJobAndWaitUntilDeleted( + firstJob, + labelSelector, + podProgress, + podCompletion(k8s), + watchContinuouslyRequestTimeout, + deletionMonitorRepeatDelay, + None) + + // Subsequent runs: reuse the same pool. + _ <- Source + .fromIterator(() => otherJobs.toIterator) + .mapAsync(parallelism = 1) { job: Job => + k8s.executeJobAndWaitUntilDeleted(job, + labelSelector, + podProgress, + podCompletion(k8s), + watchContinuouslyRequestTimeout, + deletionMonitorRepeatDelay, + Some(pool)) + } + .runForeach(_ => ()) + + // Shutdown the pool, if any. + _ <- hcp.fold(Future.successful(()))(_.shutdown().map(_ => ())) + + } yield () + + // Wait until done and shutdown k8s & akka. + Await.result(f.flatMap { _ => + k8s.close + as.terminate().map(_ => ()) + }, Duration.Inf) + + } +} From b46e2b1dca2ed56ffe109b1b7ff946ddbbaa2001 Mon Sep 17 00:00:00 2001 From: Nicolas F Rouquette Date: Sat, 20 Jul 2019 17:36:14 -0700 Subject: [PATCH 2/6] Travis CI -- use similar config as akka-http project. --- .travis.yml | 4 +--- project/build.properties | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/.travis.yml b/.travis.yml index 614d248c..7a9cf688 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,9 +1,7 @@ sudo: false language: scala scala: -- 2.11.8 -jdk: -- oraclejdk8 +- 2.12.8 cache: directories: - '$HOME/.ivy2/cache' diff --git a/project/build.properties b/project/build.properties index 7b6213bd..c0bab049 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.0.1 +sbt.version=1.2.8 From 1c87d8d3206b0683233d73cb984a26acf677a05b Mon Sep 17 00:00:00 2001 From: Nicolas F Rouquette Date: Sun, 21 Jul 2019 13:20:24 -0700 Subject: [PATCH 3/6] Add explicit argument for maximumObjectLength indirectly used in watchWithOptions as bufsize --- .../src/main/scala/skuber/api/client/KubernetesClient.scala | 4 +++- .../scala/skuber/api/client/impl/KubernetesClientImpl.scala | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/client/src/main/scala/skuber/api/client/KubernetesClient.scala b/client/src/main/scala/skuber/api/client/KubernetesClient.scala index 5240526c..351585ba 100644 --- a/client/src/main/scala/skuber/api/client/KubernetesClient.scala +++ b/client/src/main/scala/skuber/api/client/KubernetesClient.scala @@ -381,6 +381,7 @@ trait KubernetesClient { * @param watchContinuouslyRequestTimeout the delay for continuously monitoring the pod progress * @param deletionMonitorRepeatDelay the delay for continuously monitoring the job deletion * @param pool a skuber pool to reuse, if any, or to create otherwise + * @param bufSize optional buffer size for received object updates, normally the default is more than enough * @return A future consisting of a triple of the following: * - the skuber pool suitable for subsequently executing other jobs on the same server * - the akka host connection pool that can be shutdown when no further jobs need to be executed on the same server @@ -393,7 +394,8 @@ trait KubernetesClient { podCompletion: WatchEvent[Pod] => Future[Unit], watchContinuouslyRequestTimeout: Duration, deletionMonitorRepeatDelay: FiniteDuration, - pool: Option[Pool[WatchSource.Start[Pod]]])(implicit jfmt: Format[Job], pfmt: Format[Pod], jrd: ResourceDefinition[Job], prd: ResourceDefinition[Pod]): + pool: Option[Pool[WatchSource.Start[Pod]]], + bufSize: Int = 10000)(implicit jfmt: Format[Job], pfmt: Format[Pod], jrd: ResourceDefinition[Job], prd: ResourceDefinition[Pod]): Future[(Pool[WatchSource.Start[Pod]], Option[Http.HostConnectionPool], WatchEvent[Pod])] /** diff --git a/client/src/main/scala/skuber/api/client/impl/KubernetesClientImpl.scala b/client/src/main/scala/skuber/api/client/impl/KubernetesClientImpl.scala index 76915ed4..22bcc7f4 100644 --- a/client/src/main/scala/skuber/api/client/impl/KubernetesClientImpl.scala +++ b/client/src/main/scala/skuber/api/client/impl/KubernetesClientImpl.scala @@ -642,7 +642,8 @@ class KubernetesClientImpl private[client] ( podCompletion: WatchEvent[Pod] => Future[Unit], watchContinuouslyRequestTimeout: Duration, deletionMonitorRepeatDelay: FiniteDuration, - pool: Option[Pool[WatchSource.Start[Pod]]])(implicit jfmt: Format[Job], pfmt: Format[Pod], jrd: ResourceDefinition[Job], prd: ResourceDefinition[Pod]) + pool: Option[Pool[WatchSource.Start[Pod]]], + bufSize: Int = 10000)(implicit jfmt: Format[Job], pfmt: Format[Pod], jrd: ResourceDefinition[Job], prd: ResourceDefinition[Pod]) : Future[(Pool[WatchSource.Start[Pod]], Option[Http.HostConnectionPool], WatchEvent[Pod])] = for { j <- create(job) @@ -652,6 +653,7 @@ class KubernetesClientImpl private[client] ( labelSelector = Some(labelSelector), timeoutSeconds = Some(watchContinuouslyRequestTimeout.toSeconds) ), + bufsize = bufSize, pool = pool ) .takeWhile(podProgress, inclusive = true) From 1baf541fa53c2ccd2a06f462c5aa0e86150ff959 Mon Sep 17 00:00:00 2001 From: Nicolas F Rouquette Date: Sun, 21 Jul 2019 14:46:17 -0700 Subject: [PATCH 4/6] Updated example doc to explain sequential vs. parallel execution. --- docs/Examples.md | 173 +++++++++---- .../skuber/examples/job/PiJobsParallel.scala | 228 ++++++++++++++++++ .../{PiJobs.scala => PiJobsSequential.scala} | 11 +- 3 files changed, 358 insertions(+), 54 deletions(-) create mode 100644 examples/src/main/scala/skuber/examples/job/PiJobsParallel.scala rename examples/src/main/scala/skuber/examples/job/{PiJobs.scala => PiJobsSequential.scala} (95%) diff --git a/docs/Examples.md b/docs/Examples.md index 83a1bd48..cbdfe5bd 100644 --- a/docs/Examples.md +++ b/docs/Examples.md @@ -223,12 +223,102 @@ val watchContinuouslyRequestTimeout: Duration = ... val deletionMonitorRepeatDelay: FiniteDuration = ... ``` -Next, define a list of jobs to execute. -This example generates a sequence of jobs, some that cannot be executed. - -```scala - val jobs = Seq.tabulate[Job](n = 10) { n => - if (n % 3 == 0) { +There are different strategies to execute jobs. + +- Sequentially + + Define a list of jobs to execute. + This example generates a sequence of jobs, some that cannot be executed. + + ```scala + val jobs = Seq.tabulate[Job](n = 10) { n => + if (n % 3 == 0) { + // simulate a job failure + val piContainer = Container(name = "pi", + image = "nowhere/does-not-exist:latest", + command = List("/bin/bash"), + args = List("-c", "env")) + val piSpec = Pod + .Spec() + .addContainer(piContainer) + .withRestartPolicy(RestartPolicy.Never) + val piTemplateSpec = + Pod.Template.Spec(metadata = metadata(n)).withPodSpec(piSpec) + Job("pi").withTemplate(piTemplateSpec) + } else { + val piContainer = Container( + name = "pi", + image = "perl", + command = + List("perl", "-Mbignum=bpi", "-wle", s"print bpi(${n * 10})")) + val piSpec = Pod + .Spec() + .addContainer(piContainer) + .withRestartPolicy(RestartPolicy.Never) + val piTemplateSpec = + Pod.Template.Spec(metadata = metadata(n)).withPodSpec(piSpec) + Job("pi").withTemplate(piTemplateSpec) + } + } + ``` + + Execute the first job without a host connection pool + and reuse the pool obtained for executing subsequent jobs. + Finally, shutdown the connection pool. + + ```scala + val (firstJob, otherJobs) = (jobs.head, jobs.tail) + + val f: Future[Unit] = for { + + // First run: create a pool. + (pool, hcp, podEvent) <- k8s.executeJobAndWaitUntilDeleted( + firstJob, + labelSelector, + podProgress, + podCompletion(k8s), + watchContinuouslyRequestTimeout, + deletionMonitorRepeatDelay, + None) + + // Subsequent runs: reuse the same pool. + _ <- Source + .fromIterator(() => otherJobs.toIterator) + .mapAsync(parallelism = 1) { job: Job => + k8s.executeJobAndWaitUntilDeleted(job, + labelSelector, + podProgress, + podCompletion(k8s), + watchContinuouslyRequestTimeout, + deletionMonitorRepeatDelay, + Some(pool)) + } + .runForeach(_ => ()) + + // Shutdown the pool, if any. + _ <- hcp.fold(Future.successful(()))(_.shutdown().map(_ => ())) + + } yield () + ``` + + For a working example, see: [PiJobsSequential.scala](examples/job/PiJobsSequential.scala) + +- In parallel + + Define a list of job execution futures, + taking care of shutting down the pool after completion. + + ```scala + + def metadata(n: Int) = + ObjectMeta(name = s"pi-$n", + labels = Map("job-kind" -> s"piTest$n", "iteration" -> s"$n")) + def labelSelector(n: Int) = + LabelSelector(LabelSelector.IsEqualRequirement("job-kind", s"piTest$n")) + + val jobs = Seq.tabulate[Future[Unit]](n = 10) { n => + val jname = s"pi-$n" + val job: Job = if (n % 3 == 0) { // simulate a job failure val piContainer = Container(name = "pi", image = "nowhere/does-not-exist:latest", @@ -240,7 +330,7 @@ This example generates a sequence of jobs, some that cannot be executed. .withRestartPolicy(RestartPolicy.Never) val piTemplateSpec = Pod.Template.Spec(metadata = metadata(n)).withPodSpec(piSpec) - Job("pi").withTemplate(piTemplateSpec) + Job(jname).withTemplate(piTemplateSpec) } else { val piContainer = Container( name = "pi", @@ -253,51 +343,36 @@ This example generates a sequence of jobs, some that cannot be executed. .withRestartPolicy(RestartPolicy.Never) val piTemplateSpec = Pod.Template.Spec(metadata = metadata(n)).withPodSpec(piSpec) - Job("pi").withTemplate(piTemplateSpec) + Job(jname).withTemplate(piTemplateSpec) } - } -``` - -Finally, exeute them. Note that skuber will create a host connection pool -for the first job and that this pool will be reused for subsequent jobs -and will be shutdown when no more jobs need to be executed on the same server. - -```scala - val (firstJob, otherJobs) = (jobs.head, jobs.tail) - - val f: Future[Unit] = for { - - // First run: create a pool. - (pool, hcp, podEvent) <- k8s.executeJobAndWaitUntilDeleted( - firstJob, - labelSelector, - podProgress, - podCompletion(k8s), - watchContinuouslyRequestTimeout, - deletionMonitorRepeatDelay, - None) - - // Subsequent runs: reuse the same pool. - _ <- Source - .fromIterator(() => otherJobs.toIterator) - .mapAsync(parallelism = 1) { job: Job => - k8s.executeJobAndWaitUntilDeleted(job, - labelSelector, - podProgress, - podCompletion(k8s), - watchContinuouslyRequestTimeout, - deletionMonitorRepeatDelay, - Some(pool)) - } - .runForeach(_ => ()) - - // Shutdown the pool, if any. - _ <- hcp.fold(Future.successful(()))(_.shutdown().map(_ => ())) - - } yield () -``` + for { + // Execute the job with a unique pool + (_, hcp, _) <- k8s.executeJobAndWaitUntilDeleted( + job, + labelSelector(n), + podProgress, + podCompletion(k8s), + watchContinuouslyRequestTimeout, + deletionMonitorRepeatDelay, + None) + + // Shutdown the pool, if any. + _ <- hcp.fold(Future.successful(()))(_.shutdown().map(_ => ())) + } yield () + } + ``` + + Execute the job futures in parallel. + + ```scala + val f: Future[Unit] = + Future.foldLeft[Unit, Unit](jobs)(())((_: Unit, _: Unit) => ()) + ``` + + For a working example, see: [PiJobsParallel.scala](examples/job/PiJobsParallel.scala) + ## Safely shutdown the client ```scala diff --git a/examples/src/main/scala/skuber/examples/job/PiJobsParallel.scala b/examples/src/main/scala/skuber/examples/job/PiJobsParallel.scala new file mode 100644 index 00000000..27a59f4d --- /dev/null +++ b/examples/src/main/scala/skuber/examples/job/PiJobsParallel.scala @@ -0,0 +1,228 @@ +package skuber.examples.job + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.stream.scaladsl._ +import akka.util.ByteString +import com.typesafe.config.{Config, ConfigFactory} +import skuber.api.client.{ + EventType, + KubernetesClient, + WatchEvent, + defaultK8sConfig +} +import skuber.batch.Job +import skuber.json.batch.format._ +import skuber.json.format._ +import skuber.{ + Container, + LabelSelector, + ObjectMeta, + Pod, + RestartPolicy, + k8sInit +} + +import scala.concurrent.duration._ +import scala.collection.immutable._ +import scala.concurrent.{Await, ExecutionContext, Future} + +/** + * Demonstrates two things: + * 1) executing jobs in parallel, each with an independent pool + * 2) watching continuously pod events until any container status or pod status indicates a non-progress condition. + * 3) making sure that the host connection pool used for watching is shutdown + */ +object PiJobsParallel { + + def podProgress( + ev: WatchEvent[Pod] + ): Boolean = { + + def containerStatusProgress(acc: Boolean, x: Container.Status): Boolean = { + x.state.fold[Boolean](acc) { + case Container.Waiting(None) => acc + case Container.Waiting(Some(reason)) => + !(reason.startsWith("Err") || reason.endsWith("BackOff")) + case Container.Running(_) => acc + case _: Container.Terminated => false + } + } + + def podStatusProgress( + s: Pod.Status + ): Boolean = { + val ok1 = s.initContainerStatuses + .foldLeft[Boolean](true)(containerStatusProgress) + val ok2 = s.containerStatuses + .foldLeft[Boolean](ok1)(containerStatusProgress) + val ok3 = s.conditions.foldLeft[Boolean](ok2) { + case (acc, _: Pod.Condition) => + acc + } + ok3 + } + + ev._type != EventType.DELETED && + ev._type != EventType.ERROR && + ev._object.status.fold[Boolean](true)(podStatusProgress) + } + + def durationFomConfig(config: Config)(configKey: String): Option[Duration] = + Some(Duration.fromNanos(config.getDuration(configKey).toNanos)) + + def getSkuberConfig[T](config: Config, + key: String, + fromConfig: String => Option[T], + default: T): T = { + val skuberConfigKey = s"skuber.$key" + if (config.getIsNull(skuberConfigKey)) { + default + } else { + fromConfig(skuberConfigKey) match { + case None => default + case Some(t) => t + } + } + } + + def podCompletion(k8s: KubernetesClient)(lastPodEvent: WatchEvent[Pod])( + implicit ec: ExecutionContext, + mat: ActorMaterializer): Future[Unit] = { + + def printLogFlow(cntrName: String): Sink[ByteString, NotUsed] = + Flow[ByteString] + .via( + Framing.delimiter(ByteString("\n"), + maximumFrameLength = 10000, + allowTruncation = true)) + .map(_.utf8String) + .to(Sink.foreach(text => println(s"[$cntrName logs] $text"))) + + def showContainerStateIfSuccessful(cs: Container.Status, + podName: String, + message: String): Future[Unit] = + cs.state.fold[Future[Unit]](Future.successful(())) { + case s: Container.Terminated if s.exitCode == 0 => + for { + logSource <- k8s.getPodLogSource( + name = podName, + queryParams = Pod.LogQueryParams(containerName = Some(cs.name))) + _ = logSource.runWith(printLogFlow(message)) + } yield () + case s => + println(s"[$message] No logs because of unsuccessful status: $s") + Future.successful(()) + } + + lastPodEvent._object.status match { + case None => + Future.successful(()) + case Some(s) => + val podName = lastPodEvent._object.name + for { + _ <- s.initContainerStatuses + .foldLeft[Future[Unit]](Future.successful(())) { + case (_, cs) => + showContainerStateIfSuccessful( + cs, + podName, + s"init/$podName (iteration=${lastPodEvent._object.metadata.labels("iteration")})") + } + _ <- s.containerStatuses + .foldLeft[Future[Unit]](Future.successful(())) { + case (_, cs) => + showContainerStateIfSuccessful( + cs, + podName, + s"$podName (iteration=${lastPodEvent._object.metadata.labels("iteration")})") + } + } yield () + } + } + + def main( + args: Array[String] + ): Unit = { + + implicit val as: ActorSystem = ActorSystem("PiJobsSequential") + implicit val ec: ExecutionContext = as.dispatcher + implicit val mat: ActorMaterializer = ActorMaterializer() + val sconfig: skuber.api.Configuration = defaultK8sConfig + val aconfig: Config = ConfigFactory.load() + implicit val k8s: KubernetesClient = + k8sInit(config = sconfig, appConfig = aconfig) + + val watchContinuouslyRequestTimeout: Duration = getSkuberConfig( + aconfig, + "watch-continuously.request-timeout", + durationFomConfig(aconfig), + 30.seconds) + + val deletionMonitorRepeatDelay: FiniteDuration = 1.second + + def metadata(n: Int) = + ObjectMeta(name = s"pi-$n", + labels = Map("job-kind" -> s"piTest$n", "iteration" -> s"$n")) + def labelSelector(n: Int) = + LabelSelector(LabelSelector.IsEqualRequirement("job-kind", s"piTest$n")) + + val jobs = Seq.tabulate[Future[Unit]](n = 10) { n => + val jname = s"pi-$n" + val job: Job = if (n % 3 == 0) { + // simulate a job failure + val piContainer = Container(name = "pi", + image = "nowhere/does-not-exist:latest", + command = List("/bin/bash"), + args = List("-c", "env")) + val piSpec = Pod + .Spec() + .addContainer(piContainer) + .withRestartPolicy(RestartPolicy.Never) + val piTemplateSpec = + Pod.Template.Spec(metadata = metadata(n)).withPodSpec(piSpec) + Job(jname).withTemplate(piTemplateSpec) + } else { + val piContainer = Container( + name = "pi", + image = "perl", + command = + List("perl", "-Mbignum=bpi", "-wle", s"print bpi(${n * 10})")) + val piSpec = Pod + .Spec() + .addContainer(piContainer) + .withRestartPolicy(RestartPolicy.Never) + val piTemplateSpec = + Pod.Template.Spec(metadata = metadata(n)).withPodSpec(piSpec) + Job(jname).withTemplate(piTemplateSpec) + } + + for { + // Execute the job with a unique pool + (_, hcp, _) <- k8s.executeJobAndWaitUntilDeleted( + job, + labelSelector(n), + podProgress, + podCompletion(k8s), + watchContinuouslyRequestTimeout, + deletionMonitorRepeatDelay, + None) + + // Shutdown the pool, if any. + _ <- hcp.fold(Future.successful(()))(_.shutdown().map(_ => ())) + } yield () + + } + + val f: Future[Unit] = + Future.foldLeft[Unit, Unit](jobs)(())((_: Unit, _: Unit) => ()) + + // Wait until done and shutdown k8s & akka. + Await.result(f.flatMap { _ => + k8s.close + as.terminate().map(_ => ()) + }, Duration.Inf) + + } +} diff --git a/examples/src/main/scala/skuber/examples/job/PiJobs.scala b/examples/src/main/scala/skuber/examples/job/PiJobsSequential.scala similarity index 95% rename from examples/src/main/scala/skuber/examples/job/PiJobs.scala rename to examples/src/main/scala/skuber/examples/job/PiJobsSequential.scala index 4db4a23b..b0678934 100644 --- a/examples/src/main/scala/skuber/examples/job/PiJobs.scala +++ b/examples/src/main/scala/skuber/examples/job/PiJobsSequential.scala @@ -28,11 +28,12 @@ import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future} /** - * Demonstrates two things: - * 1) watching continuously pod events until any container status or pod status indicates a non-progress condition. - * 2) making sure that the host connection pool used for watching is shutdown to avoid the + * Demonstrates three things: + * 1) executing jobs sequentially, creating a pool for the first job and reusing it afterwards + * 2) watching continuously pod events until any container status or pod status indicates a non-progress condition. + * 3) making sure that the host connection pool used for watching is shutdown */ -object PiJobs { +object PiJobsSequential { def podProgress( ev: WatchEvent[Pod] @@ -144,7 +145,7 @@ object PiJobs { args: Array[String] ): Unit = { - implicit val as: ActorSystem = ActorSystem("PiJobs") + implicit val as: ActorSystem = ActorSystem("PiJobsSequential") implicit val ec: ExecutionContext = as.dispatcher implicit val mat: ActorMaterializer = ActorMaterializer() val sconfig: skuber.api.Configuration = defaultK8sConfig From 7b4be5441e8f56052735ad82c5d82f4cd6e2f168 Mon Sep 17 00:00:00 2001 From: Nicolas F Rouquette Date: Sun, 21 Jul 2019 21:32:52 -0700 Subject: [PATCH 5/6] Corrected subtle akka bug in example. --- docs/Examples.md | 35 +++++++----- .../skuber/examples/job/PiJobsParallel.scala | 53 +++++++++---------- .../examples/job/PiJobsSequential.scala | 38 +++++++------ 3 files changed, 68 insertions(+), 58 deletions(-) diff --git a/docs/Examples.md b/docs/Examples.md index cbdfe5bd..bd0f2181 100644 --- a/docs/Examples.md +++ b/docs/Examples.md @@ -164,31 +164,38 @@ For example: implicit ec: ExecutionContext, mat: ActorMaterializer): Future[Unit] = { - def printLogFlow(cntrName: String): Sink[ByteString, NotUsed] = + def printLogFlow(cntrName: String): Sink[ByteString, Future[Done]] = Flow[ByteString] .via( Framing.delimiter(ByteString("\n"), maximumFrameLength = 10000, allowTruncation = true)) .map(_.utf8String) - .to(Sink.foreach(text => println(s"[$cntrName logs] $text"))) + .toMat(Sink.foreach(text => println(s"[$cntrName logs] $text")))(Keep.right) def showContainerStateIfSuccessful(cs: Container.Status, podName: String, - message: String): Future[Unit] = - cs.state.fold[Future[Unit]](Future.successful(())) { - case s: Container.Terminated if s.exitCode == 0 => - for { - logSource <- k8s.getPodLogSource( - name = podName, - queryParams = Pod.LogQueryParams(containerName = Some(cs.name))) - _ = logSource.runWith(printLogFlow(message)) - } yield () - case s => - println(s"[$message] No logs because of unsuccessful status: $s") - Future.successful(()) + message: String): Future[Unit] = { + val terminatedSuccessfully = cs.state.foldLeft[Boolean](false) { + case (_, s: Container.Terminated) => + 0 == s.exitCode + case (flag, _) => + flag } + if (terminatedSuccessfully) + for { + logSource <- k8s.getPodLogSource( + name = podName, + queryParams = Pod.LogQueryParams(containerName = Some(cs.name))) + _ <- logSource.runWith(printLogFlow(message)) + } yield () + else { + println(s"$message: no output because of unsuccessful execution") + Future.successful(()) + } + } + lastPodEvent._object.status match { case None => Future.successful(()) diff --git a/examples/src/main/scala/skuber/examples/job/PiJobsParallel.scala b/examples/src/main/scala/skuber/examples/job/PiJobsParallel.scala index 27a59f4d..36221973 100644 --- a/examples/src/main/scala/skuber/examples/job/PiJobsParallel.scala +++ b/examples/src/main/scala/skuber/examples/job/PiJobsParallel.scala @@ -1,28 +1,16 @@ package skuber.examples.job -import akka.NotUsed +import akka.Done import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl._ import akka.util.ByteString import com.typesafe.config.{Config, ConfigFactory} -import skuber.api.client.{ - EventType, - KubernetesClient, - WatchEvent, - defaultK8sConfig -} +import skuber.api.client.{EventType, KubernetesClient, WatchEvent, defaultK8sConfig} import skuber.batch.Job import skuber.json.batch.format._ import skuber.json.format._ -import skuber.{ - Container, - LabelSelector, - ObjectMeta, - Pod, - RestartPolicy, - k8sInit -} +import skuber.{Container, LabelSelector, ObjectMeta, Pod, RestartPolicy, k8sInit} import scala.concurrent.duration._ import scala.collection.immutable._ @@ -91,30 +79,37 @@ object PiJobsParallel { implicit ec: ExecutionContext, mat: ActorMaterializer): Future[Unit] = { - def printLogFlow(cntrName: String): Sink[ByteString, NotUsed] = + def printLogFlow(cntrName: String): Sink[ByteString, Future[Done]] = Flow[ByteString] .via( Framing.delimiter(ByteString("\n"), maximumFrameLength = 10000, allowTruncation = true)) .map(_.utf8String) - .to(Sink.foreach(text => println(s"[$cntrName logs] $text"))) + .toMat(Sink.foreach(text => println(s"[$cntrName logs] $text")))(Keep.right) def showContainerStateIfSuccessful(cs: Container.Status, podName: String, - message: String): Future[Unit] = - cs.state.fold[Future[Unit]](Future.successful(())) { - case s: Container.Terminated if s.exitCode == 0 => - for { - logSource <- k8s.getPodLogSource( - name = podName, - queryParams = Pod.LogQueryParams(containerName = Some(cs.name))) - _ = logSource.runWith(printLogFlow(message)) - } yield () - case s => - println(s"[$message] No logs because of unsuccessful status: $s") - Future.successful(()) + message: String): Future[Unit] = { + val terminatedSuccessfully = cs.state.foldLeft[Boolean](false) { + case (_, s: Container.Terminated) => + 0 == s.exitCode + case (flag, _) => + flag + } + + if (terminatedSuccessfully) + for { + logSource <- k8s.getPodLogSource( + name = podName, + queryParams = Pod.LogQueryParams(containerName = Some(cs.name))) + _ <- logSource.runWith(printLogFlow(message)) + } yield () + else { + println(s"$message: no output because of unsuccessful execution") + Future.successful(()) } + } lastPodEvent._object.status match { case None => diff --git a/examples/src/main/scala/skuber/examples/job/PiJobsSequential.scala b/examples/src/main/scala/skuber/examples/job/PiJobsSequential.scala index b0678934..9d85cbfd 100644 --- a/examples/src/main/scala/skuber/examples/job/PiJobsSequential.scala +++ b/examples/src/main/scala/skuber/examples/job/PiJobsSequential.scala @@ -1,6 +1,6 @@ package skuber.examples.job -import akka.NotUsed +import akka.Done import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl._ @@ -90,31 +90,39 @@ object PiJobsSequential { implicit ec: ExecutionContext, mat: ActorMaterializer): Future[Unit] = { - def printLogFlow(cntrName: String): Sink[ByteString, NotUsed] = + def printLogFlow(cntrName: String): Sink[ByteString, Future[Done]] = Flow[ByteString] .via( Framing.delimiter(ByteString("\n"), maximumFrameLength = 10000, allowTruncation = true)) .map(_.utf8String) - .to(Sink.foreach(text => println(s"[$cntrName logs] $text"))) + .toMat(Sink.foreach(text => println(s"[$cntrName logs] $text")))( + Keep.right) def showContainerStateIfSuccessful(cs: Container.Status, podName: String, - message: String): Future[Unit] = - cs.state.fold[Future[Unit]](Future.successful(())) { - case s: Container.Terminated if s.exitCode == 0 => - for { - logSource <- k8s.getPodLogSource( - name = podName, - queryParams = Pod.LogQueryParams(containerName = Some(cs.name))) - _ = logSource.runWith(printLogFlow(message)) - } yield () - case s => - println(s"[$message] No logs because of unsuccessful status: $s") - Future.successful(()) + message: String): Future[Unit] = { + val terminatedSuccessfully = cs.state.foldLeft[Boolean](false) { + case (_, s: Container.Terminated) => + 0 == s.exitCode + case (flag, _) => + flag } + if (terminatedSuccessfully) + for { + logSource <- k8s.getPodLogSource( + name = podName, + queryParams = Pod.LogQueryParams(containerName = Some(cs.name))) + _ <- logSource.runWith(printLogFlow(message)) + } yield () + else { + println(s"$message: no output because of unsuccessful execution") + Future.successful(()) + } + } + lastPodEvent._object.status match { case None => Future.successful(()) From 473d70fe5ace22c23f54fa211ccde29b3d808a85 Mon Sep 17 00:00:00 2001 From: Nicolas F Rouquette Date: Mon, 22 Jul 2019 07:49:13 -0700 Subject: [PATCH 6/6] executeJobAndWaitUntilDeleted: delete if podCompletion is true --- .../skuber/api/client/KubernetesClient.scala | 5 +- .../client/impl/KubernetesClientImpl.scala | 13 ++-- .../skuber/examples/job/PiJobsParallel.scala | 66 ++++++++++++------- .../examples/job/PiJobsSequential.scala | 48 ++++++++------ 4 files changed, 80 insertions(+), 52 deletions(-) diff --git a/client/src/main/scala/skuber/api/client/KubernetesClient.scala b/client/src/main/scala/skuber/api/client/KubernetesClient.scala index 351585ba..68cc78bb 100644 --- a/client/src/main/scala/skuber/api/client/KubernetesClient.scala +++ b/client/src/main/scala/skuber/api/client/KubernetesClient.scala @@ -377,7 +377,8 @@ trait KubernetesClient { * @param job the Kubernetes job to execute * @param labelSelector the label selector for monitoring the job's pod status * @param podProgress the predicate for monitoring the pod status while satisfied before deleting the job - * @param podCompletion a callback invoked at the completion of the job's pod (successful or not), after which the job will be deleted + * @param podCompletion a callback invoked at the completion of the job's pod (successful or not), + * after which the job will be deleted if and only if the podCompletion result is true * @param watchContinuouslyRequestTimeout the delay for continuously monitoring the pod progress * @param deletionMonitorRepeatDelay the delay for continuously monitoring the job deletion * @param pool a skuber pool to reuse, if any, or to create otherwise @@ -391,7 +392,7 @@ trait KubernetesClient { job: Job, labelSelector: LabelSelector, podProgress: WatchEvent[Pod] => Boolean, - podCompletion: WatchEvent[Pod] => Future[Unit], + podCompletion: WatchEvent[Pod] => Future[Boolean], watchContinuouslyRequestTimeout: Duration, deletionMonitorRepeatDelay: FiniteDuration, pool: Option[Pool[WatchSource.Start[Pod]]], diff --git a/client/src/main/scala/skuber/api/client/impl/KubernetesClientImpl.scala b/client/src/main/scala/skuber/api/client/impl/KubernetesClientImpl.scala index 22bcc7f4..613bba54 100644 --- a/client/src/main/scala/skuber/api/client/impl/KubernetesClientImpl.scala +++ b/client/src/main/scala/skuber/api/client/impl/KubernetesClientImpl.scala @@ -639,7 +639,7 @@ class KubernetesClientImpl private[client] ( job: Job, labelSelector: LabelSelector, podProgress: WatchEvent[Pod] => Boolean, - podCompletion: WatchEvent[Pod] => Future[Unit], + podCompletion: WatchEvent[Pod] => Future[Boolean], watchContinuouslyRequestTimeout: Duration, deletionMonitorRepeatDelay: FiniteDuration, pool: Option[Pool[WatchSource.Start[Pod]]], @@ -667,12 +667,11 @@ class KubernetesClientImpl private[client] ( } } } - _ <- podCompletion(lastPodEvent) - _ <- deleteWithOptions[Job]( - name = j.metadata.name, - options = - DeleteOptions(propagationPolicy = Some(DeletePropagation.Foreground))) - _ <- monitorResourceUntilUnavailable[Job](j.metadata.name, deletionMonitorRepeatDelay) + delete <- podCompletion(lastPodEvent) + _ <- if (delete) + deleteWithOptions[Job](name = j.metadata.name, options = DeleteOptions(propagationPolicy = Some(DeletePropagation.Foreground))) + .flatMap(_ => monitorResourceUntilUnavailable[Job](j.metadata.name, deletionMonitorRepeatDelay)) + else Future.successful(()) } yield (p, hcp, lastPodEvent) override def close: Unit = diff --git a/examples/src/main/scala/skuber/examples/job/PiJobsParallel.scala b/examples/src/main/scala/skuber/examples/job/PiJobsParallel.scala index 36221973..9c4ee6de 100644 --- a/examples/src/main/scala/skuber/examples/job/PiJobsParallel.scala +++ b/examples/src/main/scala/skuber/examples/job/PiJobsParallel.scala @@ -6,11 +6,23 @@ import akka.stream.ActorMaterializer import akka.stream.scaladsl._ import akka.util.ByteString import com.typesafe.config.{Config, ConfigFactory} -import skuber.api.client.{EventType, KubernetesClient, WatchEvent, defaultK8sConfig} +import skuber.api.client.{ + EventType, + KubernetesClient, + WatchEvent, + defaultK8sConfig +} import skuber.batch.Job import skuber.json.batch.format._ import skuber.json.format._ -import skuber.{Container, LabelSelector, ObjectMeta, Pod, RestartPolicy, k8sInit} +import skuber.{ + Container, + LabelSelector, + ObjectMeta, + Pod, + RestartPolicy, + k8sInit +} import scala.concurrent.duration._ import scala.collection.immutable._ @@ -77,7 +89,7 @@ object PiJobsParallel { def podCompletion(k8s: KubernetesClient)(lastPodEvent: WatchEvent[Pod])( implicit ec: ExecutionContext, - mat: ActorMaterializer): Future[Unit] = { + mat: ActorMaterializer): Future[Boolean] = { def printLogFlow(cntrName: String): Sink[ByteString, Future[Done]] = Flow[ByteString] @@ -86,11 +98,12 @@ object PiJobsParallel { maximumFrameLength = 10000, allowTruncation = true)) .map(_.utf8String) - .toMat(Sink.foreach(text => println(s"[$cntrName logs] $text")))(Keep.right) + .toMat(Sink.foreach(text => println(s"[$cntrName logs] $text")))( + Keep.right) def showContainerStateIfSuccessful(cs: Container.Status, podName: String, - message: String): Future[Unit] = { + message: String): Future[Boolean] = { val terminatedSuccessfully = cs.state.foldLeft[Boolean](false) { case (_, s: Container.Terminated) => 0 == s.exitCode @@ -104,36 +117,43 @@ object PiJobsParallel { name = podName, queryParams = Pod.LogQueryParams(containerName = Some(cs.name))) _ <- logSource.runWith(printLogFlow(message)) - } yield () + } yield true else { println(s"$message: no output because of unsuccessful execution") - Future.successful(()) + Future.successful(false) } } lastPodEvent._object.status match { case None => - Future.successful(()) + Future.successful(false) case Some(s) => val podName = lastPodEvent._object.name for { - _ <- s.initContainerStatuses - .foldLeft[Future[Unit]](Future.successful(())) { - case (_, cs) => - showContainerStateIfSuccessful( - cs, - podName, - s"init/$podName (iteration=${lastPodEvent._object.metadata.labels("iteration")})") + delete1 <- s.initContainerStatuses + .foldLeft[Future[Boolean]](Future.successful(true)) { + case (flag, cs) => + Future.reduceLeft( + Seq( + flag, + showContainerStateIfSuccessful( + cs, + podName, + s"init/$podName (iteration=${lastPodEvent._object.metadata + .labels("iteration")})")))(_ || _) } - _ <- s.containerStatuses - .foldLeft[Future[Unit]](Future.successful(())) { - case (_, cs) => - showContainerStateIfSuccessful( - cs, - podName, - s"$podName (iteration=${lastPodEvent._object.metadata.labels("iteration")})") + delete2 <- s.containerStatuses + .foldLeft[Future[Boolean]](Future.successful(delete1)) { + case (flag, cs) => + Future.reduceLeft( + Seq(flag, + showContainerStateIfSuccessful( + cs, + podName, + s"$podName (iteration=${lastPodEvent._object.metadata + .labels("iteration")})")))(_ || _) } - } yield () + } yield delete2 } } diff --git a/examples/src/main/scala/skuber/examples/job/PiJobsSequential.scala b/examples/src/main/scala/skuber/examples/job/PiJobsSequential.scala index 9d85cbfd..d0df87cb 100644 --- a/examples/src/main/scala/skuber/examples/job/PiJobsSequential.scala +++ b/examples/src/main/scala/skuber/examples/job/PiJobsSequential.scala @@ -24,6 +24,7 @@ import skuber.{ k8sInit } +import scala.collection.immutable.Seq import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future} @@ -88,7 +89,7 @@ object PiJobsSequential { def podCompletion(k8s: KubernetesClient)(lastPodEvent: WatchEvent[Pod])( implicit ec: ExecutionContext, - mat: ActorMaterializer): Future[Unit] = { + mat: ActorMaterializer): Future[Boolean] = { def printLogFlow(cntrName: String): Sink[ByteString, Future[Done]] = Flow[ByteString] @@ -102,7 +103,7 @@ object PiJobsSequential { def showContainerStateIfSuccessful(cs: Container.Status, podName: String, - message: String): Future[Unit] = { + message: String): Future[Boolean] = { val terminatedSuccessfully = cs.state.foldLeft[Boolean](false) { case (_, s: Container.Terminated) => 0 == s.exitCode @@ -116,36 +117,43 @@ object PiJobsSequential { name = podName, queryParams = Pod.LogQueryParams(containerName = Some(cs.name))) _ <- logSource.runWith(printLogFlow(message)) - } yield () + } yield true else { println(s"$message: no output because of unsuccessful execution") - Future.successful(()) + Future.successful(false) } } lastPodEvent._object.status match { case None => - Future.successful(()) + Future.successful(false) case Some(s) => val podName = lastPodEvent._object.name for { - _ <- s.initContainerStatuses - .foldLeft[Future[Unit]](Future.successful(())) { - case (_, cs) => - showContainerStateIfSuccessful( - cs, - podName, - s"init/$podName (iteration=${lastPodEvent._object.metadata.labels("iteration")})") + delete1 <- s.initContainerStatuses + .foldLeft[Future[Boolean]](Future.successful(true)) { + case (flag, cs) => + Future.reduceLeft( + Seq( + flag, + showContainerStateIfSuccessful( + cs, + podName, + s"init/$podName (iteration=${lastPodEvent._object.metadata + .labels("iteration")})")))(_ || _) } - _ <- s.containerStatuses - .foldLeft[Future[Unit]](Future.successful(())) { - case (_, cs) => - showContainerStateIfSuccessful( - cs, - podName, - s"$podName (iteration=${lastPodEvent._object.metadata.labels("iteration")})") + delete2 <- s.containerStatuses + .foldLeft[Future[Boolean]](Future.successful(delete1)) { + case (flag, cs) => + Future.reduceLeft( + Seq(flag, + showContainerStateIfSuccessful( + cs, + podName, + s"$podName (iteration=${lastPodEvent._object.metadata + .labels("iteration")})")))(_ || _) } - } yield () + } yield delete2 } }