diff --git a/core/src/main/scala/cleanup/CleanupCron.scala b/core/src/main/scala/cleanup/CleanupCron.scala index 8b7e5b41..bced1e6c 100644 --- a/core/src/main/scala/cleanup/CleanupCron.scala +++ b/core/src/main/scala/cleanup/CleanupCron.scala @@ -46,6 +46,8 @@ object CleanupCron { private val status = DeploymentStatus.all.toList.filter(_ != DeploymentStatus.Terminated).toNel.yolo("can't be empty") + private val routables = DeploymentStatus.routable.toList + /* Gets all deployments (exluding terminated) and routing graph for a namespace */ def getDeploymentsForNamespace(ns: Namespace): StoreOpF[Vector[(Namespace,DeploymentCtx)]] = for { @@ -56,7 +58,7 @@ object CleanupCron { } yield dcx.map(d => (ns,d)) /* Gets all deployment (excluding terminated) and routing graph for each namespace within a Datacenter. */ - def getDeploymentsForDatacenter(dc: Datacenter): StoreOpF[Vector[(Datacenter,Namespace,DeploymentCtx,RoutingGraph)]] = + def getDeploymentsForDatacenter(dc: Datacenter): StoreOpF[Vector[CleanupRow]] = for { _ <- log.debug(s"cleanup cron running for ${dc.name}").point[StoreOpF] ns <- StoreOp.listNamespacesForDatacenter(dc.name) @@ -65,12 +67,17 @@ object CleanupCron { } yield ds.flatMap { case (ns, dcx) => gr.find(_._1 == ns).map(x => (dc,ns,dcx,x._2)) } /* Gets all deployments (excluding terminated) for all datacenters and namespaces */ - def getDeployments(cfg: NelsonConfig): Task[Vector[(Datacenter,Namespace,DeploymentCtx,RoutingGraph)]] = + def getDeployments(cfg: NelsonConfig): Task[Vector[CleanupRow]] = runs(cfg.storage, cfg.datacenters.toVector.traverseM(dc => getDeploymentsForDatacenter(dc))) - def process(cfg: NelsonConfig): Process[Task, (Datacenter,Namespace,Deployment)] = - Process.eval(getDeployments(cfg)).flatMap(Process.emitAll) - .through(ExpirationPolicyProcess.expirationProcess(cfg)) + def routable(d: DeploymentCtx): Boolean = + routables.exists(_ == d.status) + + def process(cfg: NelsonConfig): Process[Task, CleanupRow] = + Process.eval(getDeployments(cfg)).flatMap(Process.emitAll) + .map(a => if (routable(a._3)) \/.right(a) else \/.left(a)) + .throughO(ExpirationPolicyProcess.expirationProcess(cfg)) // only apply expiration policy to the rhs + .map(_.fold(identity, identity)) .filter(x => GarbageCollector.expired(x._3)) // mark only expired deployments .through(GarbageCollector.mark(cfg)) diff --git a/core/src/main/scala/cleanup/ExpirationPolicy.scala b/core/src/main/scala/cleanup/ExpirationPolicy.scala index 43f7879f..60365e20 100644 --- a/core/src/main/scala/cleanup/ExpirationPolicy.scala +++ b/core/src/main/scala/cleanup/ExpirationPolicy.scala @@ -209,11 +209,11 @@ object ExpirationPolicyProcess { .map(_ => d.copy(exp = Some(i))) } - def expirationProcess(cfg: NelsonConfig): Channel[Task,(Datacenter,Namespace,DeploymentCtx,RoutingGraph),(Datacenter,Namespace,DeploymentCtx)] = + def expirationProcess(cfg: NelsonConfig): Channel[Task, CleanupRow, CleanupRow] = channel.lift { case (dc, ns, d, graph) => applyPolicyToDeployment(d, graph)(cfg.cleanup.extendTTL) - .map(ext => runs(cfg.storage, updateExpiration(d,ext)).map(d => (dc,ns,d))) - .getOrElse(Task.now((dc,ns,d))) + .map(ext => runs(cfg.storage, updateExpiration(d,ext)).map(d => (dc,ns,d,graph))) + .getOrElse(Task.now((dc,ns,d,graph))) } } diff --git a/core/src/main/scala/cleanup/GarbageCollector.scala b/core/src/main/scala/cleanup/GarbageCollector.scala index 0faa03c0..0ff5180f 100644 --- a/core/src/main/scala/cleanup/GarbageCollector.scala +++ b/core/src/main/scala/cleanup/GarbageCollector.scala @@ -51,11 +51,11 @@ object GarbageCollector { * against the case where the ExpirationPolicy process fails * and GC eagerly marks deployments that it shouldn't. */ - def mark(cfg: NelsonConfig): Channel[Task,(Datacenter,Namespace,DeploymentCtx),(Datacenter,Namespace,Deployment)] = { + def mark(cfg: NelsonConfig): Channel[Task, CleanupRow, CleanupRow] = { import Json._ import audit.AuditableInstances._ - channel.lift { case (dc, ns, d) => - runs(cfg.storage, markAsGarbage(d.deployment).map(d => (dc, ns, d))) <* + channel.lift { case (dc, ns, d, gr) => + runs(cfg.storage, markAsGarbage(d.deployment).map(_ => (dc, ns, d, gr))) <* cfg.auditor.write(d.deployment, audit.GarbageAction) } } diff --git a/core/src/main/scala/cleanup/Reaper.scala b/core/src/main/scala/cleanup/Reaper.scala index 7b27b62b..64ea6d19 100644 --- a/core/src/main/scala/cleanup/Reaper.scala +++ b/core/src/main/scala/cleanup/Reaper.scala @@ -40,9 +40,9 @@ object Reaper { * scheduler was used to initially place the deployment * to delete the running job. */ - def reap(cfg: NelsonConfig): Sink[Task, (Datacenter,Namespace,Deployment)] = - sink.lift { case (dc, ns, d) => - destroy(dc,ns,d)(dc.workflow)(cfg) + def reap(cfg: NelsonConfig): Sink[Task, CleanupRow] = + sink.lift { case (dc, ns, d, gr) => + destroy(dc,ns,d.deployment)(dc.workflow)(cfg) .map { _ => destroySuccessCounter.labels(ns.name.asString).inc() } .handleWith { // this is a Sink and the end of the world, so we need to handle NonFatal to keep Processes running diff --git a/core/src/main/scala/cleanup/package.scala b/core/src/main/scala/cleanup/package.scala new file mode 100644 index 00000000..363a3193 --- /dev/null +++ b/core/src/main/scala/cleanup/package.scala @@ -0,0 +1,24 @@ +//: ---------------------------------------------------------------------------- +//: Copyright (C) 2017 Verizon. All Rights Reserved. +//: +//: Licensed under the Apache License, Version 2.0 (the "License"); +//: you may not use this file except in compliance with the License. +//: You may obtain a copy of the License at +//: +//: http://www.apache.org/licenses/LICENSE-2.0 +//: +//: Unless required by applicable law or agreed to in writing, software +//: distributed under the License is distributed on an "AS IS" BASIS, +//: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +//: See the License for the specific language governing permissions and +//: limitations under the License. +//: +//: ---------------------------------------------------------------------------- +package nelson + +package object cleanup { + import Datacenter.Namespace + import routing.RoutingGraph + + type CleanupRow = (Datacenter, Namespace, DeploymentCtx, RoutingGraph) +} diff --git a/core/src/test/scala/CleanupSpec.scala b/core/src/test/scala/CleanupSpec.scala index 6341f12f..d8b2e415 100644 --- a/core/src/test/scala/CleanupSpec.scala +++ b/core/src/test/scala/CleanupSpec.scala @@ -60,4 +60,23 @@ class CleanupSpec extends NelsonSuite { val status = runs(config.storage, StoreOp.getDeploymentStatus(su.id)).run status should equal(Some(DeploymentStatus.Ready)) } + + it should "run the entire cleanup pipeline and not apply an expiration policy to non routable deployments" in { + import cleanup._ + val st = StackName("search", Version(2,2,2), "aaaa") + val su = runs(config.storage, StoreOp.findDeployment(st)).run.get + + runs(config.storage, StoreOp.createDeploymentStatus(su.id, DeploymentStatus.Warming, None)).run + val statusBefore = runs(config.storage, StoreOp.getDeploymentStatus(su.id)).run + statusBefore should equal(Some(DeploymentStatus.Warming)) + + val exp = java.time.Instant.now().plusSeconds(1000) + + runs(config.storage, StoreOp.createDeploymentExpiration(su.id, exp)).run + + CleanupCron.process(config).runLog.run + + val expAfter = runs(config.storage, StoreOp.findDeploymentExpiration(su.id)).run + expAfter should equal(Some(exp)) + } } diff --git a/core/src/test/scala/GarbageCollectorSpec.scala b/core/src/test/scala/GarbageCollectorSpec.scala index 53de02d2..eed82f00 100644 --- a/core/src/test/scala/GarbageCollectorSpec.scala +++ b/core/src/test/scala/GarbageCollectorSpec.scala @@ -44,6 +44,8 @@ class GarbageCollectorSpec extends NelsonSuite with BeforeAndAfterEach { val dc = config.datacenters.head + val gr = quiver.empty[RoutingNode,Unit,RoutePath] + it should "should identify expired deployments" in { val st = StackName("search", Version(1,1,0), "foo") val su = runs(config.storage, StoreOp.findDeployment(st)).run.get @@ -66,7 +68,7 @@ class GarbageCollectorSpec extends NelsonSuite with BeforeAndAfterEach { val ctx = DeploymentCtx(su, Ready, Some(Instant.now().minusSeconds(1000))) val ns = runs(config.storage, StoreOp.listNamespacesForDatacenter(testName)).run.head - Process.eval(Task.now(((dc,ns,ctx)))).through(GarbageCollector.mark(config)).runLog.run + Process.eval(Task.now(((dc,ns,ctx,gr)))).through(GarbageCollector.mark(config)).runLog.run val status = runs(config.storage, StoreOp.getDeploymentStatus(su.id)).run status should equal(Some(DeploymentStatus.Garbage)) @@ -89,7 +91,7 @@ class GarbageCollectorSpec extends NelsonSuite with BeforeAndAfterEach { Context(Vector(), RoutingNode(ind), (), Vector()) & Context(Vector((RoutePath(ind,"","",80,80),RoutingNode(su))), RoutingNode(su), (), Vector()) - Process.eval(Task.now(((dc,ns,ctx,g)))) + Process.eval(Task.now((dc,ns,ctx,g))) .through(ExpirationPolicyProcess.expirationProcess(config)) .filter(d => GarbageCollector.expired(d._3)) .through(GarbageCollector.mark(config)).runLog.run diff --git a/core/src/test/scala/ReaperSpec.scala b/core/src/test/scala/ReaperSpec.scala index bbc8115a..cdb71cf1 100644 --- a/core/src/test/scala/ReaperSpec.scala +++ b/core/src/test/scala/ReaperSpec.scala @@ -36,16 +36,18 @@ class ReaperSpec extends NelsonSuite with BeforeAndAfterEach { val dc = config.datacenters.head + val gr = quiver.empty[routing.RoutingNode,Unit,routing.RoutePath] + it should "mark deployment as terminated" in { val st = StackName("search", Version(1,1,0), "foo") val sn = ServiceName("search", st.version.toFeatureVersion) - val su = runs(config.storage, StoreOp.findDeployment(st)).run.get + val dep = runs(config.storage, StoreOp.findDeployment(st)).run.get + + val ctx = DeploymentCtx(dep, DeploymentStatus.Garbage, Some(java.time.Instant.now)) - runs(config.storage, StoreOp.createDeploymentStatus(su.id, DeploymentStatus.Garbage, None)).run - val ns = runs(config.storage, StoreOp.listNamespacesForDatacenter(testName)).run.head - Process.emit((dc,ns,su)).to(Reaper.reap(config)).take(1).runLog.run + Process.eval(Task.now((dc,dep.namespace,ctx,gr))).to(Reaper.reap(config)).take(1).runLog.run - val status = runs(config.storage, StoreOp.getDeploymentStatus(su.id)).run + val status = runs(config.storage, StoreOp.getDeploymentStatus(dep.id)).run status should equal(Some(DeploymentStatus.Terminated)) } }