Skip to content

Commit

Permalink
[reaper] run 'reaper' every 15mins (IF reaper bucket is configured) w…
Browse files Browse the repository at this point in the history
…eighted based on ingestion rate of the environment (to reduce configuration)
  • Loading branch information
twrichards committed Sep 25, 2023
1 parent 29a1a39 commit be3a857
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 40 deletions.
2 changes: 1 addition & 1 deletion thrall/app/ThrallComponents.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class ThrallComponents(context: Context) extends GridComponents(context, new Thr
val softDeletedMetadataTable = new SoftDeletedMetadataTable(config)

val thrallController = new ThrallController(es, store, migrationSourceWithSender.send, messageSender, actorSystem, auth, config.services, controllerComponents, gridClient)
val reaperController = new ReaperController(es, store, authorisation, config.persistedRootCollections, config.persistenceIdentifier, softDeletedMetadataTable, auth, config.services, controllerComponents)
val reaperController = new ReaperController(es, store, authorisation, config, actorSystem.scheduler, softDeletedMetadataTable, auth, config.services, controllerComponents)
val healthCheckController = new HealthCheck(es, streamRunning.isCompleted, config, controllerComponents)
val InnerServiceStatusCheckController = new InnerServiceStatusCheckController(auth, controllerComponents, config.services, wsClient)

Expand Down
110 changes: 72 additions & 38 deletions thrall/app/controllers/ReaperController.scala
Original file line number Diff line number Diff line change
@@ -1,92 +1,133 @@
package controllers

import akka.actor.Scheduler
import com.gu.mediaservice.lib.ImageIngestOperations
import com.gu.mediaservice.lib.auth.Authentication.Principal
import com.gu.mediaservice.lib.auth.Permissions.DeleteImage
import com.gu.mediaservice.lib.auth.{Authentication, Authorisation, BaseControllerWithLoginRedirects}
import com.gu.mediaservice.lib.config.Services
import com.gu.mediaservice.lib.elasticsearch.ReapableEligibility
import com.gu.mediaservice.lib.logging.{GridLogging, MarkerMap}
import com.gu.mediaservice.lib.metadata.SoftDeletedMetadataTable
import com.gu.mediaservice.model.{ImageStatusRecord, SoftDeletedMetadata}
import lib.ThrallStore
import lib.{ThrallConfig, ThrallStore}
import lib.elasticsearch.ElasticSearch
import org.joda.time.{DateTime, DateTimeZone}
import play.api.libs.json.Json
import play.api.mvc.{ControllerComponents, Result}
import play.api.libs.json.{JsValue, Json}
import play.api.mvc.ControllerComponents

import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}
import scala.language.postfixOps

class ReaperController(
es: ElasticSearch,
store: ThrallStore,
authorisation: Authorisation,
persistedRootCollections: List[String],
persistenceIdentifier: String,
config: ThrallConfig,
scheduler: Scheduler,
softDeletedMetadataTable: SoftDeletedMetadataTable,
override val auth: Authentication,
override val services: Services,
override val controllerComponents: ControllerComponents,
)(implicit val ec: ExecutionContext) extends BaseControllerWithLoginRedirects with GridLogging {

private def batchDeleteWrapper(count: Int)(func: (Principal, ReapableEligibility) => Future[Result]) = auth.async { request =>
private val INTERVAL = 15 minutes // based on max of 1000 per reap, this interval will max out at 96,000 images per day
private val REAPS_PER_WEEK = 7.days / INTERVAL

implicit val logMarker: MarkerMap = MarkerMap()

private def getIsReapable = new ReapableEligibility {
override val persistedRootCollections: List[String] = config.persistedRootCollections
override val persistenceIdentifier: String = config.persistenceIdentifier
}

// TODO do something smarter for whole controller when maybeReaperBucket is None
config.maybeReaperBucket.foreach { reaperBucket =>
scheduler.scheduleAtFixedRate(
initialDelay = 0 seconds,
interval = 15 minutes,
){ () =>
// TODO have control file in S3 bucket, exit early if reaping paused

es.countImagesIngestedInLast(7 days)(DateTime.now(DateTimeZone.UTC)).flatMap { imagesIngestedInLast7Days =>

val imagesIngestedPer15Mins = imagesIngestedInLast7Days / REAPS_PER_WEEK

val countOfImagesToReap = Math.min(imagesIngestedPer15Mins, 1000).toInt

if (countOfImagesToReap == 1000) {
logger.warn(s"Reaper is reaping at maximum rate of 1000 images per $INTERVAL. If this persists, the INTERVAL will need to become more frequent.")
}

val isReapable = getIsReapable

Future.sequence(Seq(
doBatchSoftReap(countOfImagesToReap, deletedBy = "reaper", isReapable),
doBatchHardReap(countOfImagesToReap, deletedBy = "reaper", isReapable)
))
}
}
}

private def batchDeleteWrapper(count: Int)(func: (Int, String, ReapableEligibility) => Future[JsValue]) = auth.async { request =>
if (!authorisation.hasPermissionTo(DeleteImage)(request.user)) {
Future.successful(Forbidden)
}
else if (count > 1000) {
Future.successful(BadRequest("Too many IDs. Maximum 1000."))
}
else {
val isReapable = new ReapableEligibility {
override val persistedRootCollections: List[String] = ReaperController.this.persistedRootCollections
override val persistenceIdentifier: String = ReaperController.this.persistenceIdentifier
}
func(
request.user,
isReapable
)
count,
request.user.accessor.identity,
getIsReapable
).map(Ok(_))
}
}

def doBatchSoftReap(count: Int) = batchDeleteWrapper(count){ (user, isReapable) => Future {
private def persistedBatchDeleteOperation(deleteType: String)(doBatchDelete: => Future[JsValue]) = config.maybeReaperBucket match {
case None => Future.failed(new Exception("Reaper bucket not configured"))
case Some(reaperBucket) => doBatchDelete.map { json =>
val now = DateTime.now(DateTimeZone.UTC)
val key = s"$deleteType/${now.formatted("yyyy-MM-dd")}/$deleteType-${now.toString()}.json" //TODO probably improve this path
store.client.putObject(reaperBucket, key, json.toString())
json
}
}

implicit val logMarker: MarkerMap = MarkerMap()
def doBatchSoftReap(count: Int) = batchDeleteWrapper(count)(doBatchSoftReap)

def doBatchSoftReap(count: Int, deletedBy: String, isReapable: ReapableEligibility) = persistedBatchDeleteOperation("soft"){

logger.info(s"Soft deleting next $count images...")

val deleteTime = DateTime.now(DateTimeZone.UTC)
val deletedBy = user.accessor.identity

val results = for {
(for {
idsSoftDeletedInES: Set[String] <- es.softDeleteNextBatchOfImages(isReapable, count, SoftDeletedMetadata(deleteTime, deletedBy))
// TODO add some logging that no IDs needed deleting (might already happen inside ES function)
if idsSoftDeletedInES.nonEmpty
//TODO do something with the dynamoStatuses and log per ID
dynamoStatuses <- softDeletedMetadataTable.setStatuses(idsSoftDeletedInES.map(
ImageStatusRecord(
_,
deletedBy = user.accessor.identity,
deletedBy,
deleteTime = deleteTime.toString,
isDeleted = true
)
))
} yield idsSoftDeletedInES //TODO do something with the dynamoStatuses and log per ID

val resultsJson = results.map(Json.toJson(_))

// FIXME write permanent log to S3
} yield idsSoftDeletedInES).map(Json.toJson(_))
}

resultsJson.map(Ok(_))
}.flatten}


def doBatchHardReap(count: Int) = batchDeleteWrapper(count){ (_, isReapable) => Future {
def doBatchHardReap(count: Int) = batchDeleteWrapper(count)(doBatchHardReap)

implicit val logMarker: MarkerMap = MarkerMap()
def doBatchHardReap(count: Int, deletedBy: String, isReapable: ReapableEligibility) = persistedBatchDeleteOperation("hard"){

logger.info(s"Hard deleting next $count images...")

val results = for {
(for {
idsHardDeletedFromES: Set[String] <- es.hardDeleteNextBatchOfImages(isReapable, count)
// TODO add some logging that no IDs needed deleting (might already happen inside ES function)
if idsHardDeletedFromES.nonEmpty
Expand All @@ -101,14 +142,7 @@ class ReaperController(
"thumb" -> thumbsS3Deletions.get(ImageIngestOperations.fileKeyFromId(id)),
"optimisedPng" -> pngsS3Deletions.get(ImageIngestOperations.optimisedPngKeyFromId(id))
)
}.toMap

val resultsJson = results.map(Json.toJson(_))

// FIXME write permanent log to S3

resultsJson.map(Ok(_))

}.flatten}
}.toMap).map(Json.toJson(_))
}

}
1 change: 1 addition & 0 deletions thrall/app/lib/ThrallConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class ThrallConfig(resources: GridConfigResources) extends CommonConfigWithElast

val thumbnailBucket: String = string("s3.thumb.bucket")

val maybeReaperBucket: Option[String] = stringOpt("s3.reaper.bucket")

val metadataTopicArn: String = string("indexed.image.sns.topic.arn")

Expand Down
7 changes: 6 additions & 1 deletion thrall/app/lib/elasticsearch/ElasticSearch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.joda.time.DateTime
import play.api.libs.json._

import scala.annotation.nowarn
import scala.concurrent.duration.DurationInt
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}

object ImageNotDeletable extends Throwable("Image cannot be deleted")
Expand Down Expand Up @@ -292,6 +292,11 @@ class ElasticSearch(
).map(_ => ElasticSearchUpdateResponse()))
}

def countImagesIngestedInLast(duration: FiniteDuration)(now: DateTime)(implicit ex: ExecutionContext, logMarker: LogMarker): Future[Long] = executeAndLog(
ElasticDsl.count(imagesCurrentAlias).query(filters.date("uploadTime", from = now minus duration, to = now)),
s"count images in the last $duration (relative to $now)"
).map(_.result.count)

private def getNextBatchOfImageIdsForDeletion(query: Query, count: Int, deletionType: String)
(implicit ex: ExecutionContext, logMarker: LogMarker) =
executeAndLog(
Expand Down

0 comments on commit be3a857

Please sign in to comment.