Skip to content

Commit

Permalink
[reaper] add doBatchSoftReap and doBatchHardReap endpoints to `th…
Browse files Browse the repository at this point in the history
…rall`
  • Loading branch information
twrichards committed Sep 22, 2023
1 parent 633dbd6 commit 29a1a39
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.gu.mediaservice.lib

import com.amazonaws.services.s3.model.{DeleteObjectsRequest, MultiObjectDeleteException}

import java.io.File
import com.gu.mediaservice.lib.config.CommonConfig
import com.gu.mediaservice.lib.aws.S3Object
Expand All @@ -9,6 +11,8 @@ import org.joda.time.DateTime

import scala.concurrent.Future

import scala.jdk.CollectionConverters.iterableAsScalaIterableConverter

object ImageIngestOperations {
def fileKeyFromId(id: String): String = id.take(6).mkString("/") + "/" + id

Expand Down Expand Up @@ -42,9 +46,29 @@ class ImageIngestOperations(imageBucket: String, thumbnailBucket: String, config
storeImage(imageBucket, optimisedPngKeyFromId(storableImage.id), storableImage.file, Some(storableImage.mimeType),
overwrite = true)

private def bulkDelete(bucket: String, keys: Seq[String]): Future[Map[String, Boolean]] = Future {
try {
client.deleteObjects(
new DeleteObjectsRequest(bucket).withKeys(keys: _*)
)
keys.map { key =>
key -> true
}.toMap
} catch {
case partialFailure: MultiObjectDeleteException =>
// FIXME log partial failure
keys.map { key =>
key -> partialFailure.getErrors.asScala.map(_.getKey).toList.contains(key)
}.toMap
}
}

def deleteOriginal(id: String)(implicit logMarker: LogMarker): Future[Unit] = if(isVersionedS3) deleteVersionedImage(imageBucket, fileKeyFromId(id)) else deleteImage(imageBucket, fileKeyFromId(id))
def deleteOriginals(ids: Set[String]) = bulkDelete(imageBucket, ids.map(fileKeyFromId).toSeq)
def deleteThumbnail(id: String)(implicit logMarker: LogMarker): Future[Unit] = deleteImage(thumbnailBucket, fileKeyFromId(id))
def deletePng(id: String)(implicit logMarker: LogMarker): Future[Unit] = deleteImage(imageBucket, optimisedPngKeyFromId(id))
def deleteThumbnails(ids: Set[String]) = bulkDelete(thumbnailBucket, ids.map(fileKeyFromId).toSeq)
def deletePNG(id: String)(implicit logMarker: LogMarker): Future[Unit] = deleteImage(imageBucket, optimisedPngKeyFromId(id))
def deletePNGs(ids: Set[String]) = bulkDelete(imageBucket, ids.map(optimisedPngKeyFromId).toSeq)

def doesOriginalExist(id: String): Boolean =
client.doesObjectExist(imageBucket, fileKeyFromId(id))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ class SoftDeletedMetadataTable(config: CommonConfig) extends DynamoDB[ImageStatu
ScanamoAsync.exec(client)(softDeletedMetadataTable.put(imageStatus))
}

def setStatuses(imageStatuses: Set[ImageStatusRecord])(implicit ex: ExecutionContext) = {
ScanamoAsync.exec(client)(softDeletedMetadataTable.putAll(imageStatuses))
}

def updateStatus(imageId: String, isDeleted: Boolean)(implicit ex: ExecutionContext) = {
val updateExpression = set('isDeleted -> isDeleted)
ScanamoAsync.exec(client)(
Expand Down
8 changes: 6 additions & 2 deletions thrall/app/ThrallComponents.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import com.gu.mediaservice.GridClient
import com.gu.mediaservice.lib.config.Services
import com.gu.mediaservice.lib.aws.{S3Ops, ThrallMessageSender}
import com.gu.mediaservice.lib.management.InnerServiceStatusCheckController
import com.gu.mediaservice.lib.metadata.SoftDeletedMetadataTable
import com.gu.mediaservice.lib.play.GridComponents
import com.typesafe.scalalogging.StrictLogging
import controllers.{AssetsComponents, HealthCheck, ThrallController}
import controllers.{AssetsComponents, HealthCheck, ReaperController, ThrallController}
import lib._
import lib.elasticsearch._
import lib.kinesis.{KinesisConfig, ThrallEventConsumer}
Expand Down Expand Up @@ -82,9 +83,12 @@ class ThrallComponents(context: Context) extends GridComponents(context, new Thr
)
val syncCheckerStream: Future[Done] = syncChecker.run()

val softDeletedMetadataTable = new SoftDeletedMetadataTable(config)

val thrallController = new ThrallController(es, store, migrationSourceWithSender.send, messageSender, actorSystem, auth, config.services, controllerComponents, gridClient)
val reaperController = new ReaperController(es, store, authorisation, config.persistedRootCollections, config.persistenceIdentifier, softDeletedMetadataTable, auth, config.services, controllerComponents)
val healthCheckController = new HealthCheck(es, streamRunning.isCompleted, config, controllerComponents)
val InnerServiceStatusCheckController = new InnerServiceStatusCheckController(auth, controllerComponents, config.services, wsClient)

override lazy val router = new Routes(httpErrorHandler, thrallController, healthCheckController, management, InnerServiceStatusCheckController, assets)
override lazy val router = new Routes(httpErrorHandler, thrallController, reaperController, healthCheckController, management, InnerServiceStatusCheckController, assets)
}
114 changes: 114 additions & 0 deletions thrall/app/controllers/ReaperController.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package controllers

import com.gu.mediaservice.lib.ImageIngestOperations
import com.gu.mediaservice.lib.auth.Authentication.Principal
import com.gu.mediaservice.lib.auth.Permissions.DeleteImage
import com.gu.mediaservice.lib.auth.{Authentication, Authorisation, BaseControllerWithLoginRedirects}
import com.gu.mediaservice.lib.config.Services
import com.gu.mediaservice.lib.elasticsearch.ReapableEligibility
import com.gu.mediaservice.lib.logging.{GridLogging, MarkerMap}
import com.gu.mediaservice.lib.metadata.SoftDeletedMetadataTable
import com.gu.mediaservice.model.{ImageStatusRecord, SoftDeletedMetadata}
import lib.ThrallStore
import lib.elasticsearch.ElasticSearch
import org.joda.time.{DateTime, DateTimeZone}
import play.api.libs.json.Json
import play.api.mvc.{ControllerComponents, Result}

import scala.concurrent.{ExecutionContext, Future}
import scala.language.postfixOps

class ReaperController(
es: ElasticSearch,
store: ThrallStore,
authorisation: Authorisation,
persistedRootCollections: List[String],
persistenceIdentifier: String,
softDeletedMetadataTable: SoftDeletedMetadataTable,
override val auth: Authentication,
override val services: Services,
override val controllerComponents: ControllerComponents,
)(implicit val ec: ExecutionContext) extends BaseControllerWithLoginRedirects with GridLogging {

private def batchDeleteWrapper(count: Int)(func: (Principal, ReapableEligibility) => Future[Result]) = auth.async { request =>
if (!authorisation.hasPermissionTo(DeleteImage)(request.user)) {
Future.successful(Forbidden)
}
else if (count > 1000) {
Future.successful(BadRequest("Too many IDs. Maximum 1000."))
}
else {
val isReapable = new ReapableEligibility {
override val persistedRootCollections: List[String] = ReaperController.this.persistedRootCollections
override val persistenceIdentifier: String = ReaperController.this.persistenceIdentifier
}
func(
request.user,
isReapable
)
}
}

def doBatchSoftReap(count: Int) = batchDeleteWrapper(count){ (user, isReapable) => Future {

implicit val logMarker: MarkerMap = MarkerMap()

logger.info(s"Soft deleting next $count images...")

val deleteTime = DateTime.now(DateTimeZone.UTC)
val deletedBy = user.accessor.identity

val results = for {
idsSoftDeletedInES: Set[String] <- es.softDeleteNextBatchOfImages(isReapable, count, SoftDeletedMetadata(deleteTime, deletedBy))
// TODO add some logging that no IDs needed deleting (might already happen inside ES function)
if idsSoftDeletedInES.nonEmpty
dynamoStatuses <- softDeletedMetadataTable.setStatuses(idsSoftDeletedInES.map(
ImageStatusRecord(
_,
deletedBy = user.accessor.identity,
deleteTime = deleteTime.toString,
isDeleted = true
)
))
} yield idsSoftDeletedInES //TODO do something with the dynamoStatuses and log per ID

val resultsJson = results.map(Json.toJson(_))

// FIXME write permanent log to S3

resultsJson.map(Ok(_))
}.flatten}


def doBatchHardReap(count: Int) = batchDeleteWrapper(count){ (_, isReapable) => Future {

implicit val logMarker: MarkerMap = MarkerMap()

logger.info(s"Hard deleting next $count images...")

val results = for {
idsHardDeletedFromES: Set[String] <- es.hardDeleteNextBatchOfImages(isReapable, count)
// TODO add some logging that no IDs needed deleting (might already happen inside ES function)
if idsHardDeletedFromES.nonEmpty
mainImagesS3Deletions <- store.deleteOriginals(idsHardDeletedFromES)
thumbsS3Deletions <- store.deleteThumbnails(idsHardDeletedFromES)
pngsS3Deletions <- store.deletePNGs(idsHardDeletedFromES)
} yield idsHardDeletedFromES.map { id =>
// TODO log per ID
id -> Map(
"ES" -> Some(true), // since this is list of IDs deleted from ES
"mainImage" -> mainImagesS3Deletions.get(ImageIngestOperations.fileKeyFromId(id)),
"thumb" -> thumbsS3Deletions.get(ImageIngestOperations.fileKeyFromId(id)),
"optimisedPng" -> pngsS3Deletions.get(ImageIngestOperations.optimisedPngKeyFromId(id))
)
}.toMap

val resultsJson = results.map(Json.toJson(_))

// FIXME write permanent log to S3

resultsJson.map(Ok(_))

}.flatten}

}
109 changes: 89 additions & 20 deletions thrall/app/lib/elasticsearch/ElasticSearch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package lib.elasticsearch

import akka.actor.Scheduler
import com.gu.mediaservice.lib.ImageFields
import com.gu.mediaservice.lib.elasticsearch.{ElasticSearchClient, ElasticSearchConfig, ElasticSearchExecutions, Running}
import com.gu.mediaservice.lib.elasticsearch.filters
import com.gu.mediaservice.lib.elasticsearch.{ElasticSearchClient, ElasticSearchConfig, ElasticSearchExecutions, ReapableEligibility, Running}
import com.gu.mediaservice.lib.formatting.printDateTime
import com.gu.mediaservice.lib.logging.{LogMarker, MarkerMap}
import com.gu.mediaservice.model._
Expand All @@ -11,12 +12,12 @@ import com.gu.mediaservice.model.usage.Usage
import com.gu.mediaservice.syntax._
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.requests.script.Script
import com.sksamuel.elastic4s.requests.searches.SearchResponse
import com.sksamuel.elastic4s.requests.searches.{SearchRequest, SearchResponse}
import com.sksamuel.elastic4s.requests.searches.queries.Query
import com.sksamuel.elastic4s.requests.searches.queries.compound.BoolQuery
import com.sksamuel.elastic4s.requests.searches.sort.SortOrder
import com.sksamuel.elastic4s.requests.update.{UpdateRequest, UpdateResponse}
import com.sksamuel.elastic4s.{Executor, Functor, Handler, Response}
import com.sksamuel.elastic4s.requests.update.UpdateRequest
import com.sksamuel.elastic4s.{ElasticDsl, Executor, Functor, Handler, Response}
import lib.ThrallMetrics
import org.joda.time.DateTime
import play.api.libs.json._
Expand All @@ -42,24 +43,24 @@ class ElasticSearch(
lazy val replicas: Int = config.replicas


def migrationAwareUpdater(
requestFromIndexName: String => UpdateRequest,
def migrationAwareUpdater[REQUEST, RESPONSE](
requestFromIndexName: String => REQUEST,
logMessageFromIndexName: String => String,
notFoundSuccessful: Boolean = false,
)(implicit
ex: ExecutionContext,
functor: Functor[Future],
executor: Executor[Future],
handler: Handler[UpdateRequest, UpdateResponse],
manifest: Manifest[UpdateResponse],
handler: Handler[REQUEST, RESPONSE],
manifest: Manifest[RESPONSE],
logMarkers: LogMarker
): Future[Response[UpdateResponse]] = {
): Future[Response[RESPONSE]] = {
// if doc does not exist in migration index, ignore (ie. mark as successful).
// coalesce all other errors.
val runForCurrentIndex: Future[Option[Response[UpdateResponse]]] = executeAndLog(requestFromIndexName(imagesCurrentAlias), logMessageFromIndexName(imagesCurrentAlias), notFoundSuccessful).map(Some(_))
val runForCurrentIndex: Future[Option[Response[RESPONSE]]] = executeAndLog(requestFromIndexName(imagesCurrentAlias), logMessageFromIndexName(imagesCurrentAlias), notFoundSuccessful).map(Some(_))
// Update requests to the alias throw if the alias does not exist, but the exception is very generic and not cause is not obvious
// ("index names must be all upper case")
val runForMigrationIndex: Future[Option[Response[UpdateResponse]]] = migrationStatus match {
val runForMigrationIndex: Future[Option[Response[RESPONSE]]] = migrationStatus match {
case _: Running => executeAndLog(requestFromIndexName(imagesMigrationAlias), logMessageFromIndexName(imagesMigrationAlias), notFoundSuccessful = true).map(Some(_))
case _ => Future.successful(None)
}
Expand Down Expand Up @@ -252,25 +253,32 @@ class ElasticSearch(
).map(_ => ElasticSearchUpdateResponse()))
}

def applySoftDelete(id: String, softDeletedMetadata: SoftDeletedMetadata, lastModified: DateTime)
(implicit ex: ExecutionContext, logMarker: LogMarker): List[Future[ElasticSearchUpdateResponse]] = {
private def softDeletedMetadataAsPainlessScript(softDeletedMetadata: SoftDeletedMetadata) = {
val applySoftDeleteScript = "ctx._source.softDeletedMetadata = params.softDeletedMetadata;"
val softDeletedMetadataParameter = JsDefined(Json.toJson(softDeletedMetadata)).toOption.map(asNestedMap).orNull

prepareScript(
applySoftDeleteScript,
lastModified = softDeletedMetadata.deleteTime,
("softDeletedMetadata", softDeletedMetadataParameter)
)
}

def applySoftDelete(id: String, softDeletedMetadata: SoftDeletedMetadata, lastModified: DateTime)
(implicit ex: ExecutionContext, logMarker: LogMarker): List[Future[ElasticSearchUpdateResponse]] = {

List(migrationAwareUpdater(
requestFromIndexName = indexName => prepareUpdateRequest(
indexName,
id,
applySoftDeleteScript,
lastModified,
("softDeletedMetadata", softDeletedMetadataParameter)
softDeletedMetadataAsPainlessScript(softDeletedMetadata),
),
logMessageFromIndexName = indexName => s"ES7 soft delete image $id in $indexName by ${softDeletedMetadata.deletedBy}"
).map(_ => ElasticSearchUpdateResponse()))
}

def applyUnSoftDelete(id: String, lastModified: DateTime)
(implicit ex: ExecutionContext, logMarker: LogMarker): List[Future[ElasticSearchUpdateResponse]] = {
(implicit ex: ExecutionContext, logMarker: LogMarker): List[Future[ElasticSearchUpdateResponse]] = {
val applyUnSoftDeleteScript = "ctx._source.remove(\"softDeletedMetadata\");"

List(migrationAwareUpdater(
Expand All @@ -280,10 +288,68 @@ class ElasticSearch(
applyUnSoftDeleteScript,
lastModified
),
logMessageFromIndexName = indexName => s"ES7 un soft delete image $id"
logMessageFromIndexName = indexName => s"ES7 un soft delete image $id in $indexName"
).map(_ => ElasticSearchUpdateResponse()))
}

private def getNextBatchOfImageIdsForDeletion(query: Query, count: Int, deletionType: String)
(implicit ex: ExecutionContext, logMarker: LogMarker) =
executeAndLog(
ElasticDsl.search(imagesCurrentAlias) // current index is sufficient for producing the list of IDs to delete
.query(query)
.storedFields("_id")
.sortByFieldAsc("uploadTime")
.size(count),
s"ES7 searching for oldest $count images to $deletionType delete"
).map(_.result.hits.hits.map(_.id).toSet)

def softDeleteNextBatchOfImages(isReapable: ReapableEligibility, count: Int, softDeletedMetadata: SoftDeletedMetadata)
(implicit ex: ExecutionContext, logMarker: LogMarker): Future[Set[String]] = {

val query = must(
isReapable.query,
filters.existsOrMissing("softDeletedMetadata", exists = false) // not already soft deleted
)

for {
ids <- getNextBatchOfImageIdsForDeletion(query, count, "soft")
// unfortunately 'updateByQuery' doesn't return the affected IDs so can't do this whole thing in one operation - https://github.com/elastic/elasticsearch/issues/48624
_ <- migrationAwareUpdater(
requestFromIndexName = indexName =>
updateByQuery(
indexName,
idsQuery(ids),
).script(softDeletedMetadataAsPainlessScript(softDeletedMetadata)),
logMessageFromIndexName = indexName => s"ES7 soft delete $count images in $indexName by ${softDeletedMetadata.deletedBy}"
)
// TODO check that the number of soft deleted images matches the number of ids
} yield ids
}

def hardDeleteNextBatchOfImages(isReapable: ReapableEligibility, count: Int)
(implicit ex: ExecutionContext, logMarker: LogMarker): Future[Set[String]] = {

val query = must(
isReapable.query,
filters.existsOrMissing("softDeletedMetadata", exists = true), // already soft deleted
rangeQuery("softDeletedMetadata.deleteTime").lt(DateTime.now.minusWeeks(2).toString) // soft deleted more than 2 weeks ago
)

for {
ids <- getNextBatchOfImageIdsForDeletion(query, count, "hard")
// unfortunately 'deleteByQuery' doesn't return the affected IDs so can't do this whole thing in one operation - https://github.com/elastic/elasticsearch/issues/45460
_ <- migrationAwareUpdater(
requestFromIndexName = indexName =>
deleteByQuery(
indexName,
idsQuery(ids),
),
logMessageFromIndexName = indexName => s"ES7 hard delete $count images in $indexName"
)
// TODO check that the number of hard deleted images matches the number of ids
} yield ids
}

def getInferredSyndicationRightsImages(photoshoot: Photoshoot, excludedImageId: Option[String])
(implicit ex: ExecutionContext, logMarker: LogMarker): Future[List[Image]] = { // TODO could be a Seq
val inferredSyndicationRights = not(termQuery("syndicationRights.isInferred", false)) // Using 'not' to include nulls
Expand Down Expand Up @@ -447,8 +513,11 @@ class ElasticSearch(
private def prepareScript(scriptSource: String, lastModified: DateTime, params: (String, Object)*) =
Script(script = scriptSource).lang("painless").param("lastModified", printDateTime(lastModified)).params(params)

private def prepareUpdateRequest(indexName: String, id: String, scriptSource: String, lastModified: DateTime, params: (String, Object)*) =
updateById(indexName, id).script(prepareScript(scriptSource, lastModified, params:_*))
private def prepareUpdateRequest(indexName: String, id: String, script: Script): UpdateRequest =
updateById(indexName, id).script(script)

private def prepareUpdateRequest(indexName: String, id: String, scriptSource: String, lastModified: DateTime, params: (String, Object)*): UpdateRequest =
prepareUpdateRequest(indexName, id, prepareScript(scriptSource, lastModified, params:_*))

def addImageLease(id: String, lease: MediaLease, lastModified: DateTime)
(implicit ex: ExecutionContext, logMarker: LogMarker): List[Future[ElasticSearchUpdateResponse]] = {
Expand Down
2 changes: 1 addition & 1 deletion thrall/app/lib/kinesis/MessageProcessor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class MessageProcessor(
_: ElasticSearchDeleteResponse =>
store.deleteOriginal(message.id)
store.deleteThumbnail(message.id)
store.deletePng(message.id)
store.deletePNG(message.id)
// metadataEditorNotifications.publishImageDeletion(message.id) // let's not delete from Dynamo as user edits might be useful if we restore from replica
EsResponse(s"Image deleted: ${message.id}")
} recoverWith {
Expand Down
Loading

0 comments on commit 29a1a39

Please sign in to comment.