diff --git a/build.sbt b/build.sbt index 665e2d408830..8d1a882f2c22 100644 --- a/build.sbt +++ b/build.sbt @@ -29,6 +29,7 @@ val common = library("common") awsSts, awsSqs, awsSsm, + eTagCachingS3, contentApiClient, enumeratumPlayJson, filters, @@ -70,8 +71,6 @@ val common = library("common") capiAws, okhttp, ) ++ jackson, - ) - .settings( TestAssets / mappings ~= filterAssets, ) diff --git a/common/app/common/metrics.scala b/common/app/common/metrics.scala index 2bf6dbfc9fd8..6e94833cf673 100644 --- a/common/app/common/metrics.scala +++ b/common/app/common/metrics.scala @@ -164,6 +164,7 @@ object FaciaPressMetrics { val FrontPressContentSizeLite = SamplerMetric("front-press-content-size-lite", StandardUnit.Bytes) val FrontDecodingLatency = DurationMetric("front-decoding-latency", StandardUnit.Milliseconds) val FrontDownloadLatency = DurationMetric("front-download-latency", StandardUnit.Milliseconds) + val FrontNotModifiedDownloadLatency = DurationMetric("front-not-modified-download-latency", StandardUnit.Milliseconds) } object EmailSubsciptionMetrics { diff --git a/common/app/concurrent/FutureSemaphore.scala b/common/app/concurrent/FutureSemaphore.scala deleted file mode 100644 index d8db0ef11033..000000000000 --- a/common/app/concurrent/FutureSemaphore.scala +++ /dev/null @@ -1,23 +0,0 @@ -package concurrent - -import java.util.concurrent.Semaphore - -import scala.concurrent.{ExecutionContext, Future} - -class FutureSemaphore(maxOperations: Int) { - private val semaphore = new Semaphore(maxOperations) - - def execute[A](task: => Future[A])(implicit ec: ExecutionContext): Future[A] = { - if (semaphore.tryAcquire()) { - val resultF = task - resultF.onComplete(_ => semaphore.release()) - resultF - } else { - Future.failed(new FutureSemaphore.TooManyOperationsInProgress()) - } - } -} - -object FutureSemaphore { - class TooManyOperationsInProgress extends Exception("Too many operations in progress, cannot execute task") -} diff --git a/common/app/metrics/FrontendMetrics.scala b/common/app/metrics/FrontendMetrics.scala index ac7a660fd7a0..04ccc570d53d 100644 --- a/common/app/metrics/FrontendMetrics.scala +++ b/common/app/metrics/FrontendMetrics.scala @@ -1,11 +1,12 @@ package metrics -import java.util.concurrent.atomic.AtomicLong - import com.amazonaws.services.cloudwatch.model.StandardUnit import common.{Box, StopWatch} import model.diagnostics.CloudWatch import org.joda.time.DateTime + +import java.util.concurrent.TimeUnit.MILLISECONDS +import java.util.concurrent.atomic.AtomicLong import scala.concurrent.Future import scala.util.Try @@ -130,6 +131,8 @@ final case class DurationMetric(override val name: String, override val metricUn def record(dataPoint: DurationDataPoint): Unit = dataPoints.alter(dataPoint :: _) def recordDuration(timeInMillis: Double): Unit = record(DurationDataPoint(timeInMillis, Option(DateTime.now))) + def recordDuration(duration: java.time.Duration): Unit = + recordDuration(duration.toNanos.toDouble / MILLISECONDS.toNanos(1)) override def isEmpty: Boolean = dataPoints.get().isEmpty } diff --git a/common/app/services/S3.scala b/common/app/services/S3.scala index 08dfe3a034ac..c2d618c89fd7 100644 --- a/common/app/services/S3.scala +++ b/common/app/services/S3.scala @@ -1,17 +1,18 @@ package services -import java.io._ -import java.util.zip.{GZIPInputStream, GZIPOutputStream} - -import com.amazonaws.services.s3.{AmazonS3, AmazonS3Client} import com.amazonaws.services.s3.model.CannedAccessControlList.{Private, PublicRead} import com.amazonaws.services.s3.model._ +import com.amazonaws.services.s3.{AmazonS3, AmazonS3Client} import com.amazonaws.util.StringInputStream +import com.gu.etagcaching.aws.s3.ObjectId import common.GuLogging import conf.Configuration import model.PressedPageType import org.joda.time.DateTime +import services.S3.logS3ExceptionWithDevHint +import java.io._ +import java.util.zip.GZIPOutputStream import scala.io.{Codec, Source} trait S3 extends GuLogging { @@ -27,8 +28,8 @@ trait S3 extends GuLogging { private def withS3Result[T](key: String)(action: S3Object => T): Option[T] = client.flatMap { client => + val objectId = ObjectId(bucket, key) try { - val request = new GetObjectRequest(bucket, key) val result = client.getObject(request) log.info(s"S3 got ${result.getObjectMetadata.getContentLength} bytes from ${result.getKey}") @@ -44,13 +45,10 @@ trait S3 extends GuLogging { } } catch { case e: AmazonS3Exception if e.getStatusCode == 404 => - log.warn("not found at %s - %s" format (bucket, key)) + log.warn(s"not found at ${objectId.s3Uri}") None case e: AmazonS3Exception => - val errorMsg = s"Unable to fetch S3 object (key: $key)" - val hintMsg = "Hint: your AWS credentials might be missing or expired. You can fetch new ones using Janus." - log.error(errorMsg, e) - println(errorMsg + " \n" + hintMsg) + logS3ExceptionWithDevHint(objectId, e) None case e: Exception => throw e @@ -91,11 +89,6 @@ trait S3 extends GuLogging { putGzipped(key, value, contentType, Private) } - def getGzipped(key: String)(implicit codec: Codec): Option[String] = - withS3Result(key) { result => - Source.fromInputStream(new GZIPInputStream(result.getObjectContent)).mkString - } - private def putGzipped( key: String, value: String, @@ -148,7 +141,14 @@ trait S3 extends GuLogging { } } -object S3 extends S3 +object S3 extends S3 { + def logS3ExceptionWithDevHint(s3ObjectId: ObjectId, e: Exception): Unit = { + val errorMsg = s"Unable to fetch S3 object (${s3ObjectId.s3Uri})" + val hintMsg = "Hint: your AWS credentials might be missing or expired. You can fetch new ones using Janus." + log.error(errorMsg, e) + println(errorMsg + " \n" + hintMsg) + } +} object S3FrontsApi extends S3 { diff --git a/common/app/services/fronts/FrontJsonFapi.scala b/common/app/services/fronts/FrontJsonFapi.scala index 63e2dff45483..a9103e68e567 100644 --- a/common/app/services/fronts/FrontJsonFapi.scala +++ b/common/app/services/fronts/FrontJsonFapi.scala @@ -1,70 +1,67 @@ package services.fronts -import common.{FaciaPressMetrics, GuLogging} -import concurrent.{BlockingOperations, FutureSemaphore} +import com.gu.etagcaching.ETagCache +import com.gu.etagcaching.FreshnessPolicy.AlwaysWaitForRefreshedValue +import com.gu.etagcaching.aws.s3.ObjectId +import com.gu.etagcaching.aws.sdkv2.s3.S3ObjectFetching +import com.gu.etagcaching.aws.sdkv2.s3.response.Transformer.Bytes +import common.FaciaPressMetrics.{FrontDecodingLatency, FrontDownloadLatency, FrontNotModifiedDownloadLatency} +import common.GuLogging import conf.Configuration -import metrics.DurationMetric +import metrics.DurationMetric.withMetrics import model.{PressedPage, PressedPageType} import play.api.libs.json.Json -import services.S3 +import services.S3.logS3ExceptionWithDevHint +import services._ +import software.amazon.awssdk.services.s3.model.S3Exception +import utils.AWSv2.S3Async +import java.util.zip.GZIPInputStream +import scala.concurrent.duration.DurationInt import scala.concurrent.{ExecutionContext, Future} +import scala.util.Using trait FrontJsonFapi extends GuLogging { + implicit val executionContext: ExecutionContext lazy val stage: String = Configuration.facia.stage.toUpperCase val bucketLocation: String - val parallelJsonPresses = 32 - val futureSemaphore = new FutureSemaphore(parallelJsonPresses) - def blockingOperations: BlockingOperations + private def s3ObjectIdFor(path: String, prefix: String): ObjectId = + ObjectId( + S3.bucket, + s"$bucketLocation/${path.replaceAll("""\+""", "%2B")}/fapi/pressed.v2$prefix.json", + ) - private def getAddressForPath(path: String, prefix: String): String = - s"$bucketLocation/${path.replaceAll("""\+""", "%2B")}/fapi/pressed.v2$prefix.json" - - def get(path: String, pageType: PressedPageType)(implicit - executionContext: ExecutionContext, - ): Future[Option[PressedPage]] = - errorLoggingF(s"FrontJsonFapi.get $path") { - pressedPageFromS3(getAddressForPath(path, pageType.suffix)) - } - - private def parsePressedPage( - jsonStringOpt: Option[String], - )(implicit executionContext: ExecutionContext): Future[Option[PressedPage]] = - futureSemaphore.execute { - blockingOperations.executeBlocking { - jsonStringOpt.map { jsonString => - DurationMetric.withMetrics(FaciaPressMetrics.FrontDecodingLatency) { - // This operation is run in the thread pool since it is very CPU intensive - Json.parse(jsonString).as[PressedPage] - } + private val pressedPageCache: ETagCache[ObjectId, PressedPage] = new ETagCache( + S3ObjectFetching(S3Async, Bytes) + .timing( + successWith = FrontDownloadLatency.recordDuration, + notModifiedWith = FrontNotModifiedDownloadLatency.recordDuration, + ) + .thenParsing { bytes => + withMetrics(FrontDecodingLatency) { + Using(new GZIPInputStream(bytes.asInputStream()))(Json.parse(_).as[PressedPage]).get } - } - } + }, + AlwaysWaitForRefreshedValue, + _.maximumSize(180).expireAfterAccess(1.hour), + ) - private def loadPressedPageFromS3(path: String) = - blockingOperations.executeBlocking { - DurationMetric.withMetrics(FaciaPressMetrics.FrontDownloadLatency) { - S3.getGzipped(path) + def get(path: String, pageType: PressedPageType): Future[Option[PressedPage]] = + errorLoggingF(s"FrontJsonFapi.get $path") { + val objectId = s3ObjectIdFor(path, pageType.suffix) + pressedPageCache.get(objectId).map(Some(_)).recover { + case s3Exception: S3Exception => + logS3ExceptionWithDevHint(objectId, s3Exception) + None } } - - private def pressedPageFromS3( - path: String, - )(implicit executionContext: ExecutionContext): Future[Option[PressedPage]] = - errorLoggingF(s"FrontJsonFapi.pressedPageFromS3 $path") { - for { - s3FrontData <- loadPressedPageFromS3(path) - pressedPage <- parsePressedPage(s3FrontData) - } yield pressedPage - } - } -class FrontJsonFapiLive(val blockingOperations: BlockingOperations) extends FrontJsonFapi { +class FrontJsonFapiLive(implicit val executionContext: ExecutionContext) extends FrontJsonFapi { override val bucketLocation: String = s"$stage/frontsapi/pressed/live" } -class FrontJsonFapiDraft(val blockingOperations: BlockingOperations) extends FrontJsonFapi { +class FrontJsonFapiDraft(implicit val executionContext: ExecutionContext) extends FrontJsonFapi { override val bucketLocation: String = s"$stage/frontsapi/pressed/draft" } diff --git a/common/app/utils/AWSv2.scala b/common/app/utils/AWSv2.scala new file mode 100644 index 000000000000..6f45d7a291a3 --- /dev/null +++ b/common/app/utils/AWSv2.scala @@ -0,0 +1,25 @@ +package utils + +import software.amazon.awssdk.auth.credentials._ +import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient +import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.regions.Region.EU_WEST_1 +import software.amazon.awssdk.services.s3.{S3AsyncClient, S3AsyncClientBuilder} + +object AWSv2 { + val region: Region = EU_WEST_1 + + def credentialsForDevAndProd(devProfile: String, prodCreds: AwsCredentialsProvider): AwsCredentialsProviderChain = + AwsCredentialsProviderChain.of(prodCreds, ProfileCredentialsProvider.builder().profileName(devProfile).build()) + + lazy val credentials: AwsCredentialsProvider = + credentialsForDevAndProd("frontend", InstanceProfileCredentialsProvider.create()) + + def build[T, B <: AwsClientBuilder[B, T]](builder: B): T = + builder.credentialsProvider(credentials).region(region).build() + + val S3Async: S3AsyncClient = build[S3AsyncClient, S3AsyncClientBuilder]( + S3AsyncClient.builder().httpClientBuilder(NettyNioAsyncHttpClient.builder().maxConcurrency(250)), + ) +} diff --git a/common/test/package.scala b/common/test/package.scala index 7ed6ee4fa8f7..7f611c4985d5 100644 --- a/common/test/package.scala +++ b/common/test/package.scala @@ -170,12 +170,9 @@ trait WithTestCSRF { trait WithTestFrontJsonFapi { // need a front api that stores S3 locally so it can run without deps in the unit tests - class TestFrontJsonFapi(override val blockingOperations: BlockingOperations) - extends FrontJsonFapiLive(blockingOperations) { + class TestFrontJsonFapi extends FrontJsonFapiLive()(ExecutionContext.global) { - override def get(path: String, pageType: PressedPageType)(implicit - executionContext: ExecutionContext, - ): Future[Option[PressedPage]] = { + override def get(path: String, pageType: PressedPageType): Future[Option[PressedPage]] = { recorder.load(path, Map()) { super.get(path, pageType) } @@ -197,7 +194,5 @@ trait WithTestFrontJsonFapi { } } - lazy val actorSystem = ActorSystem() - lazy val blockingOperations = new BlockingOperations(actorSystem) - lazy val fapi = new TestFrontJsonFapi(blockingOperations) + lazy val fapi = new TestFrontJsonFapi() } diff --git a/facia/app/AppLoader.scala b/facia/app/AppLoader.scala index a15f21d52999..eaf994e2c4bd 100644 --- a/facia/app/AppLoader.scala +++ b/facia/app/AppLoader.scala @@ -25,12 +25,15 @@ import services._ import services.fronts.{FrontJsonFapiDraft, FrontJsonFapiLive} import router.Routes +import scala.concurrent.ExecutionContext + class AppLoader extends FrontendApplicationLoader { override def buildComponents(context: Context): FrontendComponents = new BuiltInComponentsFromContext(context) with AppComponents } trait FapiServices { + implicit val executionContext: ExecutionContext def wsClient: WSClient def actorSystem: ActorSystem lazy val frontJsonFapiLive = wire[FrontJsonFapiLive] @@ -68,6 +71,7 @@ trait AppComponents extends FrontendComponents with FaciaControllers with FapiSe override lazy val appMetrics = ApplicationMetrics( FaciaPressMetrics.FrontDecodingLatency, FaciaPressMetrics.FrontDownloadLatency, + FaciaPressMetrics.FrontNotModifiedDownloadLatency, DCRMetrics.DCRLatencyMetric, DCRMetrics.DCRRequestCountMetric, ) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 4a6ed9d1fe98..7c46380a3165 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -22,6 +22,7 @@ object Dependencies { val awsEc2 = "com.amazonaws" % "aws-java-sdk-ec2" % awsVersion val awsKinesis = "com.amazonaws" % "aws-java-sdk-kinesis" % awsVersion val awsS3 = "com.amazonaws" % "aws-java-sdk-s3" % awsVersion + val eTagCachingS3 = "com.gu.etag-caching" %% "aws-s3-sdk-v2" % "1.0.4" val awsSes = "com.amazonaws" % "aws-java-sdk-ses" % awsVersion val awsSns = "com.amazonaws" % "aws-java-sdk-sns" % awsVersion val awsSts = "com.amazonaws" % "aws-java-sdk-sts" % awsVersion