Skip to content

Commit

Permalink
Merge pull request #4360 from guardian/an/kcl-akka-stream-sonatype
Browse files Browse the repository at this point in the history
migrate kcl-akka-stream 2.1.0=>4.1.1, kcl 1.8.10=>2.6.0
  • Loading branch information
andrew-nowak authored Nov 13, 2024
2 parents 4ac9a2c + 0371ee2 commit 485f8c7
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 58 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ lazy val thrall = playProject("thrall", 9002)
pipelineStages := Seq(digest, gzip),
libraryDependencies ++= Seq(
"org.codehaus.groovy" % "groovy-json" % "3.0.7",
"com.amazonaws" % "amazon-kinesis-client" % "1.8.10",
"software.amazon.kinesis" % "amazon-kinesis-client" % "2.4.2",
"io.github.streetcontxt" %% "kcl-akka-stream" % "4.1.1",
"org.testcontainers" % "elasticsearch" % "1.19.2" % Test,
"com.google.protobuf" % "protobuf-java" % "3.19.6"
)
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.10.1
sbt.version=1.10.3
7 changes: 3 additions & 4 deletions thrall/app/ThrallComponents.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import akka.Done
import akka.stream.scaladsl.Source
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
import com.contxt.kinesis.{KinesisRecord, KinesisSource}
import com.contxt.kinesis.{KinesisRecord, KinesisSource, ConsumerConfig => KclAkkaStreamConfig}
import com.gu.mediaservice.GridClient
import com.gu.mediaservice.lib.config.Services
import com.gu.mediaservice.lib.aws.{S3Ops, ThrallMessageSender}
Expand Down Expand Up @@ -49,8 +48,8 @@ class ThrallComponents(context: Context) extends GridComponents(context, new Thr

val messageSender = new ThrallMessageSender(config.thrallKinesisStreamConfig)

val highPriorityKinesisConfig: KinesisClientLibConfiguration = KinesisConfig.kinesisConfig(config.kinesisConfig)
val lowPriorityKinesisConfig: KinesisClientLibConfiguration = KinesisConfig.kinesisConfig(config.kinesisLowPriorityConfig)
val highPriorityKinesisConfig: KclAkkaStreamConfig = KinesisConfig.kinesisConfig(config.kinesisConfig)
val lowPriorityKinesisConfig: KclAkkaStreamConfig = KinesisConfig.kinesisConfig(config.kinesisLowPriorityConfig)

val uiSource: Source[KinesisRecord, Future[Done]] = KinesisSource(highPriorityKinesisConfig)
val automationSource: Source[KinesisRecord, Future[Done]] = KinesisSource(lowPriorityKinesisConfig)
Expand Down
32 changes: 21 additions & 11 deletions thrall/app/lib/ThrallConfig.scala
Original file line number Diff line number Diff line change
@@ -1,32 +1,42 @@
package lib

import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
import com.gu.mediaservice.lib.aws.AwsClientV1BuilderUtils
import com.gu.mediaservice.lib.config.{CommonConfigWithElastic, GridConfigResources, ReapableEligibilityLoader}
import com.gu.mediaservice.lib.aws.AwsClientV2BuilderUtils
import com.gu.mediaservice.lib.cleanup.ReapableEligibiltyResources
import com.gu.mediaservice.lib.config.{CommonConfigWithElastic, GridConfigResources, ReapableEligibilityLoader}
import com.gu.mediaservice.lib.elasticsearch.ReapableEligibility
import org.joda.time.DateTime
import org.joda.time.format.ISODateTimeFormat
import play.api.inject.ApplicationLifecycle
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.cloudwatch.{CloudWatchAsyncClient, CloudWatchAsyncClientBuilder}
import software.amazon.awssdk.services.dynamodb.{DynamoDbAsyncClient, DynamoDbAsyncClientBuilder}
import software.amazon.awssdk.services.kinesis.{KinesisAsyncClient, KinesisAsyncClientBuilder}
import software.amazon.kinesis.metrics.MetricsLevel

import java.net.URI
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.language.postfixOps

case class KinesisReceiverConfig(
override val awsRegion: String,
override val awsCredentials: AWSCredentialsProvider,
override val awsLocalEndpoint: Option[String],
override val awsRegionV2: Region,
override val awsCredentialsV2: AwsCredentialsProvider,
override val awsLocalEndpointUri: Option[URI],
override val isDev: Boolean,
streamName: String,
rewindFrom: Option[DateTime],
metricsLevel: MetricsLevel = MetricsLevel.DETAILED
) extends AwsClientV1BuilderUtils
) extends AwsClientV2BuilderUtils {
lazy val kinesisClient: KinesisAsyncClient = withAWSCredentialsV2(KinesisAsyncClient.builder()).build()
lazy val dynamoClient: DynamoDbAsyncClient = withAWSCredentialsV2(DynamoDbAsyncClient.builder()).build()
lazy val cloudwatchClient: CloudWatchAsyncClient = withAWSCredentialsV2(CloudWatchAsyncClient.builder()).build()
}

object KinesisReceiverConfig {
def apply(streamName: String, rewindFrom: Option[DateTime], thrallConfig: ThrallConfig): KinesisReceiverConfig = KinesisReceiverConfig(
thrallConfig.awsRegion,
thrallConfig.awsCredentials,
thrallConfig.awsLocalEndpoint,
thrallConfig.awsRegionV2,
thrallConfig.awsCredentialsV2,
thrallConfig.awsLocalEndpointUri,
thrallConfig.isDev,
streamName,
rewindFrom
Expand Down
74 changes: 34 additions & 40 deletions thrall/app/lib/kinesis/KinesisConfig.scala
Original file line number Diff line number Diff line change
@@ -1,52 +1,46 @@
package lib.kinesis

import com.contxt.kinesis.ConsumerConfig
import com.gu.mediaservice.lib.logging.GridLogging
import lib.KinesisReceiverConfig
import software.amazon.kinesis.common.{InitialPositionInStream, InitialPositionInStreamExtended}
import software.amazon.kinesis.metrics.MetricsConfig
import software.amazon.kinesis.retrieval.RetrievalConfig
import software.amazon.kinesis.retrieval.polling.PollingConfig

import java.net.InetAddress
import java.util.UUID

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration}
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
import com.gu.mediaservice.lib.logging.GridLogging
import lib.KinesisReceiverConfig
import org.joda.time.DateTime
object KinesisConfig extends GridLogging {
private val workerId = InetAddress.getLocalHost.getCanonicalHostName + ":" + UUID.randomUUID()

def kinesisConfig(config: KinesisReceiverConfig): KinesisClientLibConfiguration = {
val clientConfig = kinesisClientLibConfig(
kinesisAppName = config.streamName,
streamName = config.streamName,
config,
from = config.rewindFrom,
config.metricsLevel
)

config.awsLocalEndpoint.map(endpoint => {
logger.info(s"creating kinesis consumer with endpoint=$endpoint")
clientConfig.withKinesisEndpoint(endpoint).withDynamoDBEndpoint(endpoint)
}).getOrElse(clientConfig)
}

private def kinesisClientLibConfig(kinesisAppName: String, streamName: String, config: KinesisReceiverConfig, from: Option[DateTime], metricsLevel: MetricsLevel): KinesisClientLibConfiguration = {
val credentialsProvider = config.awsCredentials
def kinesisConfig(config: KinesisReceiverConfig): ConsumerConfig = {
val initialPosition = config.rewindFrom match {
case None => InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
case Some(position) => InitialPositionInStreamExtended.newInitialPositionAtTimestamp(position.toDate)
}
val pollingConfig = new PollingConfig(config.streamName, config.kinesisClient)
.maxRecords(100)
.idleTimeBetweenReadsInMillis(250)
val retrievalConfig = new RetrievalConfig(config.kinesisClient, config.streamName, config.streamName)
.retrievalSpecificConfig(pollingConfig)

val kinesisConfig = new KinesisClientLibConfiguration(
kinesisAppName,
streamName,
credentialsProvider,
credentialsProvider,
credentialsProvider,
workerId
).withRegionName(config.awsRegion).
withMaxRecords(100).
withIdleMillisBetweenCalls(1000).
withIdleTimeBetweenReadsInMillis(250).
withCallProcessRecordsEvenForEmptyRecordList(true).
withMetricsLevel(metricsLevel)
val metricsConfig = new MetricsConfig(config.cloudwatchClient, config.streamName)
.metricsLevel(config.metricsLevel)

from.fold(
kinesisConfig.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
){ f =>
kinesisConfig.withTimestampAtInitialPositionInStream(f.toDate)
}
val clientConfig = ConsumerConfig(
streamName = config.streamName,
appName = config.streamName,
workerId = workerId,
kinesisClient = config.kinesisClient,
dynamoClient = config.dynamoClient,
cloudwatchClient = config.cloudwatchClient,
initialPositionInStreamExtended = initialPosition,
coordinatorConfig = None,
leaseManagementConfig = None,
metricsConfig = Some(metricsConfig),
retrievalConfig = Some(retrievalConfig)
)
clientConfig
}
}
Binary file removed thrall/lib/kcl-akka-stream_2.12-2.1.0.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion thrall/test/lib/ThrallStreamProcessorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ThrallStreamProcessorTest extends AnyFunSpec with BeforeAndAfterAll with M
sequenceNumber = "",
subSequenceNumber = None,
approximateArrivalTimestamp = OffsetDateTime.now().toInstant,
encryptionType = ""
None
)

def createMigrationRecord: MigrationRecord = MigrationRecord(
Expand Down

0 comments on commit 485f8c7

Please sign in to comment.