diff --git a/thrall/app/ThrallComponents.scala b/thrall/app/ThrallComponents.scala index 56debbbd35..ac6420a589 100644 --- a/thrall/app/ThrallComponents.scala +++ b/thrall/app/ThrallComponents.scala @@ -86,7 +86,7 @@ class ThrallComponents(context: Context) extends GridComponents(context, new Thr val softDeletedMetadataTable = new SoftDeletedMetadataTable(config) val thrallController = new ThrallController(es, store, migrationSourceWithSender.send, messageSender, actorSystem, auth, config.services, controllerComponents, gridClient) - val reaperController = new ReaperController(es, store, authorisation, config.persistedRootCollections, config.persistenceIdentifier, softDeletedMetadataTable, auth, config.services, controllerComponents) + val reaperController = new ReaperController(es, store, authorisation, config, actorSystem.scheduler, softDeletedMetadataTable, auth, config.services, controllerComponents) val healthCheckController = new HealthCheck(es, streamRunning.isCompleted, config, controllerComponents) val InnerServiceStatusCheckController = new InnerServiceStatusCheckController(auth, controllerComponents, config.services, wsClient) diff --git a/thrall/app/controllers/ReaperController.scala b/thrall/app/controllers/ReaperController.scala index 31e6de2e97..a120da774a 100644 --- a/thrall/app/controllers/ReaperController.scala +++ b/thrall/app/controllers/ReaperController.scala @@ -1,7 +1,7 @@ package controllers +import akka.actor.Scheduler import com.gu.mediaservice.lib.ImageIngestOperations -import com.gu.mediaservice.lib.auth.Authentication.Principal import com.gu.mediaservice.lib.auth.Permissions.DeleteImage import com.gu.mediaservice.lib.auth.{Authentication, Authorisation, BaseControllerWithLoginRedirects} import com.gu.mediaservice.lib.config.Services @@ -9,12 +9,13 @@ import com.gu.mediaservice.lib.elasticsearch.ReapableEligibility import com.gu.mediaservice.lib.logging.{GridLogging, MarkerMap} import com.gu.mediaservice.lib.metadata.SoftDeletedMetadataTable import com.gu.mediaservice.model.{ImageStatusRecord, SoftDeletedMetadata} -import lib.ThrallStore +import lib.{ThrallConfig, ThrallStore} import lib.elasticsearch.ElasticSearch import org.joda.time.{DateTime, DateTimeZone} -import play.api.libs.json.Json -import play.api.mvc.{ControllerComponents, Result} +import play.api.libs.json.{JsValue, Json} +import play.api.mvc.ControllerComponents +import scala.concurrent.duration.DurationInt import scala.concurrent.{ExecutionContext, Future} import scala.language.postfixOps @@ -22,15 +23,53 @@ class ReaperController( es: ElasticSearch, store: ThrallStore, authorisation: Authorisation, - persistedRootCollections: List[String], - persistenceIdentifier: String, + config: ThrallConfig, + scheduler: Scheduler, softDeletedMetadataTable: SoftDeletedMetadataTable, override val auth: Authentication, override val services: Services, override val controllerComponents: ControllerComponents, )(implicit val ec: ExecutionContext) extends BaseControllerWithLoginRedirects with GridLogging { - private def batchDeleteWrapper(count: Int)(func: (Principal, ReapableEligibility) => Future[Result]) = auth.async { request => + private val INTERVAL = 15 minutes // based on max of 1000 per reap, this interval will max out at 96,000 images per day + private val REAPS_PER_WEEK = 7.days / INTERVAL + + implicit val logMarker: MarkerMap = MarkerMap() + + private def getIsReapable = new ReapableEligibility { + override val persistedRootCollections: List[String] = config.persistedRootCollections + override val persistenceIdentifier: String = config.persistenceIdentifier + } + + // TODO do something smarter for whole controller when maybeReaperBucket is None + config.maybeReaperBucket.foreach { reaperBucket => + scheduler.scheduleAtFixedRate( + initialDelay = 0 seconds, + interval = 15 minutes, + ){ () => + // TODO have control file in S3 bucket, exit early if reaping paused + + es.countImagesIngestedInLast(7 days)(DateTime.now(DateTimeZone.UTC)).flatMap { imagesIngestedInLast7Days => + + val imagesIngestedPer15Mins = imagesIngestedInLast7Days / REAPS_PER_WEEK + + val countOfImagesToReap = Math.min(imagesIngestedPer15Mins, 1000).toInt + + if (countOfImagesToReap == 1000) { + logger.warn(s"Reaper is reaping at maximum rate of 1000 images per $INTERVAL. If this persists, the INTERVAL will need to become more frequent.") + } + + val isReapable = getIsReapable + + Future.sequence(Seq( + doBatchSoftReap(countOfImagesToReap, deletedBy = "reaper", isReapable), + doBatchHardReap(countOfImagesToReap, deletedBy = "reaper", isReapable) + )) + } + } + } + + private def batchDeleteWrapper(count: Int)(func: (Int, String, ReapableEligibility) => Future[JsValue]) = auth.async { request => if (!authorisation.hasPermissionTo(DeleteImage)(request.user)) { Future.successful(Forbidden) } @@ -38,55 +77,57 @@ class ReaperController( Future.successful(BadRequest("Too many IDs. Maximum 1000.")) } else { - val isReapable = new ReapableEligibility { - override val persistedRootCollections: List[String] = ReaperController.this.persistedRootCollections - override val persistenceIdentifier: String = ReaperController.this.persistenceIdentifier - } func( - request.user, - isReapable - ) + count, + request.user.accessor.identity, + getIsReapable + ).map(Ok(_)) } } - def doBatchSoftReap(count: Int) = batchDeleteWrapper(count){ (user, isReapable) => Future { + private def persistedBatchDeleteOperation(deleteType: String)(doBatchDelete: => Future[JsValue]) = config.maybeReaperBucket match { + case None => Future.failed(new Exception("Reaper bucket not configured")) + case Some(reaperBucket) => doBatchDelete.map { json => + val now = DateTime.now(DateTimeZone.UTC) + val key = s"$deleteType/${now.formatted("yyyy-MM-dd")}/$deleteType-${now.toString()}.json" //TODO probably improve this path + store.client.putObject(reaperBucket, key, json.toString()) + json + } + } - implicit val logMarker: MarkerMap = MarkerMap() + def doBatchSoftReap(count: Int) = batchDeleteWrapper(count)(doBatchSoftReap) + + def doBatchSoftReap(count: Int, deletedBy: String, isReapable: ReapableEligibility) = persistedBatchDeleteOperation("soft"){ logger.info(s"Soft deleting next $count images...") val deleteTime = DateTime.now(DateTimeZone.UTC) - val deletedBy = user.accessor.identity - val results = for { + (for { idsSoftDeletedInES: Set[String] <- es.softDeleteNextBatchOfImages(isReapable, count, SoftDeletedMetadata(deleteTime, deletedBy)) // TODO add some logging that no IDs needed deleting (might already happen inside ES function) if idsSoftDeletedInES.nonEmpty + //TODO do something with the dynamoStatuses and log per ID dynamoStatuses <- softDeletedMetadataTable.setStatuses(idsSoftDeletedInES.map( ImageStatusRecord( _, - deletedBy = user.accessor.identity, + deletedBy, deleteTime = deleteTime.toString, isDeleted = true ) )) - } yield idsSoftDeletedInES //TODO do something with the dynamoStatuses and log per ID - - val resultsJson = results.map(Json.toJson(_)) - - // FIXME write permanent log to S3 + } yield idsSoftDeletedInES).map(Json.toJson(_)) + } - resultsJson.map(Ok(_)) - }.flatten} - def doBatchHardReap(count: Int) = batchDeleteWrapper(count){ (_, isReapable) => Future { + def doBatchHardReap(count: Int) = batchDeleteWrapper(count)(doBatchHardReap) - implicit val logMarker: MarkerMap = MarkerMap() + def doBatchHardReap(count: Int, deletedBy: String, isReapable: ReapableEligibility) = persistedBatchDeleteOperation("hard"){ logger.info(s"Hard deleting next $count images...") - val results = for { + (for { idsHardDeletedFromES: Set[String] <- es.hardDeleteNextBatchOfImages(isReapable, count) // TODO add some logging that no IDs needed deleting (might already happen inside ES function) if idsHardDeletedFromES.nonEmpty @@ -101,14 +142,7 @@ class ReaperController( "thumb" -> thumbsS3Deletions.get(ImageIngestOperations.fileKeyFromId(id)), "optimisedPng" -> pngsS3Deletions.get(ImageIngestOperations.optimisedPngKeyFromId(id)) ) - }.toMap - - val resultsJson = results.map(Json.toJson(_)) - - // FIXME write permanent log to S3 - - resultsJson.map(Ok(_)) - - }.flatten} + }.toMap).map(Json.toJson(_)) + } } diff --git a/thrall/app/lib/ThrallConfig.scala b/thrall/app/lib/ThrallConfig.scala index a9810d089d..0530e1aab8 100644 --- a/thrall/app/lib/ThrallConfig.scala +++ b/thrall/app/lib/ThrallConfig.scala @@ -33,6 +33,7 @@ class ThrallConfig(resources: GridConfigResources) extends CommonConfigWithElast val thumbnailBucket: String = string("s3.thumb.bucket") + val maybeReaperBucket: Option[String] = stringOpt("s3.reaper.bucket") val metadataTopicArn: String = string("indexed.image.sns.topic.arn") diff --git a/thrall/app/lib/elasticsearch/ElasticSearch.scala b/thrall/app/lib/elasticsearch/ElasticSearch.scala index 8141b1ff35..a92cdd6fdb 100644 --- a/thrall/app/lib/elasticsearch/ElasticSearch.scala +++ b/thrall/app/lib/elasticsearch/ElasticSearch.scala @@ -23,7 +23,7 @@ import org.joda.time.DateTime import play.api.libs.json._ import scala.annotation.nowarn -import scala.concurrent.duration.DurationInt +import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.concurrent.{ExecutionContext, Future} object ImageNotDeletable extends Throwable("Image cannot be deleted") @@ -292,6 +292,11 @@ class ElasticSearch( ).map(_ => ElasticSearchUpdateResponse())) } + def countImagesIngestedInLast(duration: FiniteDuration)(now: DateTime)(implicit ex: ExecutionContext, logMarker: LogMarker): Future[Long] = executeAndLog( + ElasticDsl.count(imagesCurrentAlias).query(filters.date("uploadTime", from = now minus duration, to = now)), + s"count images in the last $duration (relative to $now)" + ).map(_.result.count) + private def getNextBatchOfImageIdsForDeletion(query: Query, count: Int, deletionType: String) (implicit ex: ExecutionContext, logMarker: LogMarker) = executeAndLog(