Skip to content

Commit

Permalink
cleanup process only applies expiration policy to routable deployments
Browse files Browse the repository at this point in the history
  • Loading branch information
kaiserpelagic committed Oct 3, 2017
1 parent 9c757a2 commit 3aabbd0
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 21 deletions.
17 changes: 12 additions & 5 deletions core/src/main/scala/cleanup/CleanupCron.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ object CleanupCron {
private val status =
DeploymentStatus.all.toList.filter(_ != DeploymentStatus.Terminated).toNel.yolo("can't be empty")

private val routables = DeploymentStatus.routable.toList

/* Gets all deployments (exluding terminated) and routing graph for a namespace */
def getDeploymentsForNamespace(ns: Namespace): StoreOpF[Vector[(Namespace,DeploymentCtx)]] =
for {
Expand All @@ -56,7 +58,7 @@ object CleanupCron {
} yield dcx.map(d => (ns,d))

/* Gets all deployment (excluding terminated) and routing graph for each namespace within a Datacenter. */
def getDeploymentsForDatacenter(dc: Datacenter): StoreOpF[Vector[(Datacenter,Namespace,DeploymentCtx,RoutingGraph)]] =
def getDeploymentsForDatacenter(dc: Datacenter): StoreOpF[Vector[CleanupRow]] =
for {
_ <- log.debug(s"cleanup cron running for ${dc.name}").point[StoreOpF]
ns <- StoreOp.listNamespacesForDatacenter(dc.name)
Expand All @@ -65,12 +67,17 @@ object CleanupCron {
} yield ds.flatMap { case (ns, dcx) => gr.find(_._1 == ns).map(x => (dc,ns,dcx,x._2)) }

/* Gets all deployments (excluding terminated) for all datacenters and namespaces */
def getDeployments(cfg: NelsonConfig): Task[Vector[(Datacenter,Namespace,DeploymentCtx,RoutingGraph)]] =
def getDeployments(cfg: NelsonConfig): Task[Vector[CleanupRow]] =
runs(cfg.storage, cfg.datacenters.toVector.traverseM(dc => getDeploymentsForDatacenter(dc)))

def process(cfg: NelsonConfig): Process[Task, (Datacenter,Namespace,Deployment)] =
Process.eval(getDeployments(cfg)).flatMap(Process.emitAll)
.through(ExpirationPolicyProcess.expirationProcess(cfg))
def routable(d: DeploymentCtx): Boolean =
routables.exists(_ == d.status)

def process(cfg: NelsonConfig): Process[Task, CleanupRow] =
Process.eval(getDeployments(cfg)).flatMap(Process.emitAll)
.map(a => if (routable(a._3)) \/.right(a) else \/.left(a))
.throughO(ExpirationPolicyProcess.expirationProcess(cfg)) // only apply expiration policy to the rhs
.map(_.fold(identity, identity))
.filter(x => GarbageCollector.expired(x._3)) // mark only expired deployments
.through(GarbageCollector.mark(cfg))

Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/cleanup/ExpirationPolicy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,11 @@ object ExpirationPolicyProcess {
.map(_ => d.copy(exp = Some(i)))
}

def expirationProcess(cfg: NelsonConfig): Channel[Task,(Datacenter,Namespace,DeploymentCtx,RoutingGraph),(Datacenter,Namespace,DeploymentCtx)] =
def expirationProcess(cfg: NelsonConfig): Channel[Task, CleanupRow, CleanupRow] =
channel.lift { case (dc, ns, d, graph) =>
applyPolicyToDeployment(d, graph)(cfg.cleanup.extendTTL)
.map(ext => runs(cfg.storage, updateExpiration(d,ext)).map(d => (dc,ns,d)))
.getOrElse(Task.now((dc,ns,d)))
.map(ext => runs(cfg.storage, updateExpiration(d,ext)).map(d => (dc,ns,d,graph)))
.getOrElse(Task.now((dc,ns,d,graph)))
}
}

6 changes: 3 additions & 3 deletions core/src/main/scala/cleanup/GarbageCollector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ object GarbageCollector {
* against the case where the ExpirationPolicy process fails
* and GC eagerly marks deployments that it shouldn't.
*/
def mark(cfg: NelsonConfig): Channel[Task,(Datacenter,Namespace,DeploymentCtx),(Datacenter,Namespace,Deployment)] = {
def mark(cfg: NelsonConfig): Channel[Task, CleanupRow, CleanupRow] = {
import Json._
import audit.AuditableInstances._
channel.lift { case (dc, ns, d) =>
runs(cfg.storage, markAsGarbage(d.deployment).map(d => (dc, ns, d))) <*
channel.lift { case (dc, ns, d, gr) =>
runs(cfg.storage, markAsGarbage(d.deployment).map(_ => (dc, ns, d, gr))) <*
cfg.auditor.write(d.deployment, audit.GarbageAction)
}
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/cleanup/Reaper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ object Reaper {
* scheduler was used to initially place the deployment
* to delete the running job.
*/
def reap(cfg: NelsonConfig): Sink[Task, (Datacenter,Namespace,Deployment)] =
sink.lift { case (dc, ns, d) =>
destroy(dc,ns,d)(dc.workflow)(cfg)
def reap(cfg: NelsonConfig): Sink[Task, CleanupRow] =
sink.lift { case (dc, ns, d, gr) =>
destroy(dc,ns,d.deployment)(dc.workflow)(cfg)
.map { _ => destroySuccessCounter.labels(ns.name.asString).inc() }
.handleWith {
// this is a Sink and the end of the world, so we need to handle NonFatal to keep Processes running
Expand Down
24 changes: 24 additions & 0 deletions core/src/main/scala/cleanup/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//: ----------------------------------------------------------------------------
//: Copyright (C) 2017 Verizon. All Rights Reserved.
//:
//: Licensed under the Apache License, Version 2.0 (the "License");
//: you may not use this file except in compliance with the License.
//: You may obtain a copy of the License at
//:
//: http://www.apache.org/licenses/LICENSE-2.0
//:
//: Unless required by applicable law or agreed to in writing, software
//: distributed under the License is distributed on an "AS IS" BASIS,
//: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//: See the License for the specific language governing permissions and
//: limitations under the License.
//:
//: ----------------------------------------------------------------------------
package nelson

package object cleanup {
import Datacenter.Namespace
import routing.RoutingGraph

type CleanupRow = (Datacenter, Namespace, DeploymentCtx, RoutingGraph)
}
19 changes: 19 additions & 0 deletions core/src/test/scala/CleanupSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,23 @@ class CleanupSpec extends NelsonSuite {
val status = runs(config.storage, StoreOp.getDeploymentStatus(su.id)).run
status should equal(Some(DeploymentStatus.Ready))
}

it should "run the entire cleanup pipeline and not apply an expiration policy to non routable deployments" in {
import cleanup._
val st = StackName("search", Version(2,2,2), "aaaa")
val su = runs(config.storage, StoreOp.findDeployment(st)).run.get

runs(config.storage, StoreOp.createDeploymentStatus(su.id, DeploymentStatus.Warming, None)).run
val statusBefore = runs(config.storage, StoreOp.getDeploymentStatus(su.id)).run
statusBefore should equal(Some(DeploymentStatus.Warming))

val exp = java.time.Instant.now().plusSeconds(1000)

runs(config.storage, StoreOp.createDeploymentExpiration(su.id, exp)).run

CleanupCron.process(config).runLog.run

val expAfter = runs(config.storage, StoreOp.findDeploymentExpiration(su.id)).run
expAfter should equal(Some(exp))
}
}
6 changes: 4 additions & 2 deletions core/src/test/scala/GarbageCollectorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class GarbageCollectorSpec extends NelsonSuite with BeforeAndAfterEach {

val dc = config.datacenters.head

val gr = quiver.empty[RoutingNode,Unit,RoutePath]

it should "should identify expired deployments" in {
val st = StackName("search", Version(1,1,0), "foo")
val su = runs(config.storage, StoreOp.findDeployment(st)).run.get
Expand All @@ -66,7 +68,7 @@ class GarbageCollectorSpec extends NelsonSuite with BeforeAndAfterEach {
val ctx = DeploymentCtx(su, Ready, Some(Instant.now().minusSeconds(1000)))

val ns = runs(config.storage, StoreOp.listNamespacesForDatacenter(testName)).run.head
Process.eval(Task.now(((dc,ns,ctx)))).through(GarbageCollector.mark(config)).runLog.run
Process.eval(Task.now(((dc,ns,ctx,gr)))).through(GarbageCollector.mark(config)).runLog.run

val status = runs(config.storage, StoreOp.getDeploymentStatus(su.id)).run
status should equal(Some(DeploymentStatus.Garbage))
Expand All @@ -89,7 +91,7 @@ class GarbageCollectorSpec extends NelsonSuite with BeforeAndAfterEach {
Context(Vector(), RoutingNode(ind), (), Vector()) &
Context(Vector((RoutePath(ind,"","",80,80),RoutingNode(su))), RoutingNode(su), (), Vector())

Process.eval(Task.now(((dc,ns,ctx,g))))
Process.eval(Task.now((dc,ns,ctx,g)))
.through(ExpirationPolicyProcess.expirationProcess(config))
.filter(d => GarbageCollector.expired(d._3))
.through(GarbageCollector.mark(config)).runLog.run
Expand Down
12 changes: 7 additions & 5 deletions core/src/test/scala/ReaperSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,18 @@ class ReaperSpec extends NelsonSuite with BeforeAndAfterEach {

val dc = config.datacenters.head

val gr = quiver.empty[routing.RoutingNode,Unit,routing.RoutePath]

it should "mark deployment as terminated" in {
val st = StackName("search", Version(1,1,0), "foo")
val sn = ServiceName("search", st.version.toFeatureVersion)
val su = runs(config.storage, StoreOp.findDeployment(st)).run.get
val dep = runs(config.storage, StoreOp.findDeployment(st)).run.get

val ctx = DeploymentCtx(dep, DeploymentStatus.Garbage, Some(java.time.Instant.now))

runs(config.storage, StoreOp.createDeploymentStatus(su.id, DeploymentStatus.Garbage, None)).run
val ns = runs(config.storage, StoreOp.listNamespacesForDatacenter(testName)).run.head
Process.emit((dc,ns,su)).to(Reaper.reap(config)).take(1).runLog.run
Process.eval(Task.now((dc,dep.namespace,ctx,gr))).to(Reaper.reap(config)).take(1).runLog.run

val status = runs(config.storage, StoreOp.getDeploymentStatus(su.id)).run
val status = runs(config.storage, StoreOp.getDeploymentStatus(dep.id)).run
status should equal(Some(DeploymentStatus.Terminated))
}
}

0 comments on commit 3aabbd0

Please sign in to comment.