Skip to content

Commit

Permalink
Merge branch 'split-kafka-service-consumer'
Browse files Browse the repository at this point in the history
  • Loading branch information
tillkuhn committed Sep 25, 2024
2 parents cb7400e + 416bcad commit f00b49b
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 121 deletions.
132 changes: 132 additions & 0 deletions kotlin/src/main/kotlin/net/timafe/angkor/service/EventConsumer.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package net.timafe.angkor.service

import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.ObjectMapper
import jakarta.annotation.PostConstruct
import net.timafe.angkor.config.AppProperties
import net.timafe.angkor.domain.Event
import net.timafe.angkor.security.SecurityUtils
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.serialization.StringDeserializer
import org.slf4j.LoggerFactory
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service
import java.time.Duration
import java.util.*
import java.util.concurrent.TimeUnit

/**
* Service Implementation for consuming [Event]
*/
@Service
class EventConsumer(
// private val repo: EventRepository,
private val objectMapper: ObjectMapper,
private val appProps: AppProperties,
private val kafkaProperties: KafkaProperties,
private val eventService: EventService,

) {
// Populated by init
lateinit var consumerProps: Properties

private val log = LoggerFactory.getLogger(javaClass)
val logPrefix = "[KafkaConsumer]"

@PostConstruct
fun init() {
log.info("[Kafka] Event Consumer initialized with kafkaSupport=${eventService.kafkaEnabled()} producerBootstrapServers=${kafkaProperties.bootstrapServers}")
// https://kafka.apache.org/documentation.html#consumerconfigs
this.consumerProps = Properties()
this.consumerProps.putAll(eventService.kafkaBaseProperties())
// Consumer props which will raise a warning if used for producer
this.consumerProps["group.id"] = "${appProps.kafka.topicPrefix}hase"
this.consumerProps["enable.auto.commit"] = "true"
this.consumerProps["auto.commit.interval.ms"] = "1000"
this.consumerProps["auto.offset.reset"] = "earliest"
this.consumerProps["session.timeout.ms"] = "30000"
this.consumerProps["key.deserializer"] = StringDeserializer::class.java.name
this.consumerProps["value.deserializer"] = StringDeserializer::class.java.name

}

// CAUTION: each call of consumeMessages requires an active DB Connection from the Pool
// Value increased to 300000 (5min) to increase the time that hikari cp can be scaled to 0
// durations are in milliseconds. also supports ${my.delay.property} (escape with \ or kotlin compiler complains)
// 600000 = 10 Minutes make sure @EnableScheduling is active in AsyncConfig 600000 = 10 min, 3600000 = 1h

// #{__listener.publicTopic} Starting with version 2.1.2, the SpEL expressions support a special token: __listener. It is a pseudo bean name
// that represents the current bean instance within which this annotation exists.
// https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/listener-annotation.html#annotation-properties
// https://stackoverflow.com/a/27817678/4292075
@Scheduled(
fixedRateString = "\${app.kafka.fixed-rate-seconds}",
initialDelay = 20,
timeUnit = TimeUnit.SECONDS,
)
// @Scheduled(fixedRateString = "300000", initialDelay = 20000)
fun consumeMessages() {

// https://www.tutorialspoint.com/apache_kafka/apache_kafka_consumer_group_example.htm
// https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html
val consumer: KafkaConsumer<String, String> = KafkaConsumer<String, String>(this.consumerProps)
val topics = listOf("imagine", "audit", "system", "app").map { "${appProps.kafka.topicPrefix}$it" }
log.debug(" {} I'm here to consume new Kafka Messages from topics {}", logPrefix, topics)
consumer.subscribe(topics)
var (received, persisted) = listOf(0, 0)
val records = consumer.poll(Duration.ofMillis(10L * 1000))
if (! records.isEmpty) {
// Lazy invoke authenticate which is marked @Transactional
// Advantage: We don't need a transaction if there are no events to persist,
// so we can keep the active connection pool count at zero (nice for neon db)
eventService.authenticate()
}
for (record in records) {
val eventVal = record.value()
log.info("$logPrefix Polled record #$received topic=${record.topic()}, partition/offset=${record.partition()}/${record.offset()}, key=${record.key()}, value=$eventVal")
try {
val parsedEvent: Event = objectMapper
.reader()
.withoutFeatures(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.readValue(eventVal, Event::class.java)

parsedEvent.topic = record.topic().removePrefix(appProps.kafka.topicPrefix)
parsedEvent.partition = record.partition()
parsedEvent.offset = record.offset()
parsedEvent.id = computeMessageId(record.headers())
eventService.save(parsedEvent)
persisted++
} catch (e: Exception) {
log.warn("$logPrefix Cannot parse $eventVal to Event: ${e.message}")
}
received++
}
if (received > 0) {
log.info("$logPrefix Polled $received records ($persisted persisted), see you again at a fixed rate")
} else {
log.trace("$logPrefix No records to poll in this run")
}
consumer.close()
}

fun computeMessageId(headers: Headers): UUID {
for (header in headers) {
if (header.key().toString() == "messageId") {
val mid = String(header.value())
val midUUID = SecurityUtils.safeConvertToUUID(mid)
return if (midUUID == null) {
log.warn("$logPrefix Could not convert messageId $mid to UUID, generating new one")
UUID.randomUUID()
} else {
log.debug("{} using messageId from header {}", logPrefix, midUUID)
midUUID
}
}
}
val newId = UUID.randomUUID()
log.warn("$logPrefix Could not find messageId in any header, generated $newId")
return newId
}
}
111 changes: 14 additions & 97 deletions kotlin/src/main/kotlin/net/timafe/angkor/service/EventService.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package net.timafe.angkor.service

import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
import jakarta.annotation.PostConstruct
Expand All @@ -11,24 +10,18 @@ import net.timafe.angkor.domain.enums.EntityType
import net.timafe.angkor.domain.enums.EventTopic
import net.timafe.angkor.repo.EventRepository
import net.timafe.angkor.security.SecurityUtils
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.core.env.Environment
import org.springframework.core.env.Profiles
import org.springframework.scheduling.annotation.Async
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.security.core.context.SecurityContextHolder
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
import java.time.Duration
import java.util.*
import java.util.concurrent.TimeUnit


/**
Expand All @@ -47,7 +40,6 @@ class EventService(

// Kafka properties that will be populated by init() method
lateinit var producerProps: Properties
lateinit var consumerProps: Properties

override fun entityType(): EntityType {
return EntityType.Event
Expand All @@ -61,31 +53,14 @@ class EventService(
// "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"
// val jaasCfg = String.format(jaasTemplate, appProps.kafka.saslUsername, appProps.kafka.saslPassword)

val baseProps = Properties()
baseProps["bootstrap.servers"] = kafkaProperties.bootstrapServers
baseProps["security.protocol"] = kafkaProperties.security.protocol
baseProps["sasl.mechanism"] = kafkaProperties.properties["sasl.mechanism"]?:throw IllegalArgumentException("sasl.mechanism not configured")
baseProps["sasl.jaas.config"] = kafkaProperties.properties["sasl.jaas.config"]?:throw IllegalArgumentException("sasl.jaas.config not configured")

// https://kafka.apache.org/documentation.html#producerconfigs
this.producerProps = Properties()
this.producerProps.putAll(baseProps)
this.producerProps.putAll(kafkaBaseProperties())
this.producerProps["key.serializer"] = StringSerializer::class.java.name
this.producerProps["value.serializer"] = StringSerializer::class.java.name
// default 1, leader will write the record to its local log but not await full acknowledgement from all followers
this.producerProps["acks"] = "1"

// https://kafka.apache.org/documentation.html#consumerconfigs
this.consumerProps = Properties()
this.consumerProps.putAll(baseProps)
// Consumer props which will raise a warning if used for producer
this.consumerProps["group.id"] = "${appProps.kafka.topicPrefix}hase"
this.consumerProps["enable.auto.commit"] = "true"
this.consumerProps["auto.commit.interval.ms"] = "1000"
this.consumerProps["auto.offset.reset"] = "earliest"
this.consumerProps["session.timeout.ms"] = "30000"
this.consumerProps["key.deserializer"] = StringDeserializer::class.java.name
this.consumerProps["value.deserializer"] = StringDeserializer::class.java.name
// https://www.confluent.de/blog/5-things-every-kafka-developer-should-know/#tip-3-cooperative-rebalancing
// Avoid “stop-the-world” consumer group re-balances by using cooperative re-balancing
// this.consumerProps["partition.assignment.strategy"] = CooperativeStickyAssignor::class.java.name
Expand Down Expand Up @@ -134,64 +109,23 @@ class EventService(
}
}

// CAUTION: each call of consumeMessages requires an active DB Connection from the Pool
// Value increased to 300000 (5min) to increase the time that hikari cp can be scaled to 0
// durations are in milliseconds. also supports ${my.delay.property} (escape with \ or kotlin compiler complains)
// 600000 = 10 Minutes make sure @EnableScheduling is active in AsyncConfig 600000 = 10 min, 3600000 = 1h

// #{__listener.publicTopic}" Starting with version 2.1.2, the SpEL expressions support a special token: __listener. It is a pseudo bean name
// that represents the current bean instance within which this annotation exists.
// https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/listener-annotation.html#annotation-properties
// https://stackoverflow.com/a/27817678/4292075
@Scheduled(
fixedRateString = "\${app.kafka.fixed-rate-seconds}",
initialDelay = 20,
timeUnit = TimeUnit.SECONDS,
)
// @Scheduled(fixedRateString = "300000", initialDelay = 20000)
@Transactional
fun consumeMessages() {
// @Scheduled runs without Auth Context, so we use a special ServiceAccountToken here
// Other serviced annotated with @Scheduled runs without Auth Context,
// so we use a special ServiceAccountToken here, but we need a transaction
@Transactional(readOnly = true)
fun authenticate() {
SecurityContextHolder.getContext().authentication = userService.getServiceAccountToken(this.javaClass)
}

val logPrefix = "[KafkaConsumer]"
// https://www.tutorialspoint.com/apache_kafka/apache_kafka_consumer_group_example.htm
// https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html
val consumer: KafkaConsumer<String, String> = KafkaConsumer<String, String>(this.consumerProps)
val topics = listOf("imagine", "audit", "system", "app").map { "${appProps.kafka.topicPrefix}$it" }
log.debug(" {} I'm here to consume new Kafka Messages from topics {}", logPrefix, topics)
consumer.subscribe(topics)
var (received, persisted) = listOf(0, 0)
val records = consumer.poll(Duration.ofMillis(10L * 1000))
for (record in records) {
val eventVal = record.value()
log.info("$logPrefix Polled record #$received topic=${record.topic()}, partition/offset=${record.partition()}/${record.offset()}, key=${record.key()}, value=$eventVal")
try {
val parsedEvent: Event = objectMapper
.reader()
.withoutFeatures(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.readValue(eventVal, Event::class.java)

parsedEvent.topic = record.topic().removePrefix(appProps.kafka.topicPrefix)
parsedEvent.partition = record.partition()
parsedEvent.offset = record.offset()
parsedEvent.id = computeMessageId(record.headers())
super.save(parsedEvent)
persisted++
} catch (e: Exception) {
log.warn("$logPrefix Cannot parse $eventVal to Event: ${e.message}")
}
received++
}
if (received > 0) {
log.info("$logPrefix Polled $received records ($persisted persisted), see you again at a fixed rate")
} else {
log.trace("$logPrefix No records to poll in this run")
}
consumer.close()
fun kafkaBaseProperties():Properties {
val baseProps = Properties()
baseProps["bootstrap.servers"] = kafkaProperties.bootstrapServers
baseProps["security.protocol"] = kafkaProperties.security.protocol
baseProps["sasl.mechanism"] = kafkaProperties.properties["sasl.mechanism"]?:throw IllegalArgumentException("sasl.mechanism not configured")
baseProps["sasl.jaas.config"] = kafkaProperties.properties["sasl.jaas.config"]?:throw IllegalArgumentException("sasl.jaas.config not configured")
return baseProps
}

private fun kafkaEnabled(): Boolean {
fun kafkaEnabled(): Boolean {
val appEnabled = appProps.kafka.enabled
val notTest = env.acceptsProfiles(Profiles.of("!" + Constants.PROFILE_TEST))
return appEnabled && notTest
Expand Down Expand Up @@ -230,22 +164,5 @@ class EventService(
return items
}

fun computeMessageId(headers: Headers): UUID {
for (header in headers) {
if (header.key().toString() == "messageId") {
val mid = String(header.value())
val midUUID = SecurityUtils.safeConvertToUUID(mid)
return if (midUUID == null) {
log.warn("${logPrefix()} Could not convert messageId $mid to UUID, generating new one")
UUID.randomUUID()
} else {
log.debug("{} using messageId from header {}", logPrefix(), midUUID)
midUUID
}
}
}
log.warn("${logPrefix()} Could not find messageId in any header, generating new onw")
return UUID.randomUUID()
}

}
5 changes: 4 additions & 1 deletion kotlin/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ app:
import-folder: /tmp/upload
metrics:
basic-auth-user: prometheus
basic-auth-password:
basic-auth-password: ""
kafka:
fixed-rate-seconds: 600 # 600s=10min
logging:
Expand Down Expand Up @@ -225,6 +225,9 @@ spring:
username: angkor_dev
password:

app:
kafka:
fixed-rate-seconds: 15 # 600s=10min¥

---
# spring 'prod' profile overwrites, none-sensitive test specific values only
Expand Down
2 changes: 1 addition & 1 deletion terraform/files/appctl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ fi
# todo: https://docs.docker.com/compose/environment-variables/set-environment-variables/#use-the-env_file-attribute
if [[ "$*" == *pull-secrets* ]] || [[ "$*" == *update* ]] || [[ "$*" == *all* ]]; then
# vlt login; vlt apps list
secrets_store="runtime-secrets"
secrets_store="rt-secrets"
env_file="${WORKDIR}/.env_secrets"
echo "# Generated - DO NOT EDIT. Secrets pulled from HCP $secrets_store by appctl.sh" >"$env_file"
logit "Pulling secrets from HCP $secrets_store"
Expand Down
4 changes: 2 additions & 2 deletions terraform/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ module "cognito" {
# Setup secret Vault(s), see https://portal.cloud.hashicorp.com/
module "runtime_secrets" {
source = "./modules/secrets"
vault_secrets_app_name = "runtime-secrets"
vault_secrets_app_description = "${var.appid} Runtime Secrets"
vault_secrets_app_name = "rt-secrets"
vault_secrets_app_description = "${var.appid} Runtime Secrets managed by terraform"
upper_key = true
secrets = [
{
Expand Down
Loading

0 comments on commit f00b49b

Please sign in to comment.