Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Oppdatert kafka config. Det er blant annet tydeligere hvilken config som gjelder for hvilket topic #555

Merged
merged 1 commit into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package no.nav.familie.ef.personhendelse.configuration

import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig
import no.nav.familie.kafka.KafkaErrorHandler
import no.nav.person.pdl.leesah.Personhendelse
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.beans.factory.ObjectProvider
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.boot.ssl.SslBundles
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.listener.ContainerProperties
import org.springframework.kafka.listener.ContainerProperties.AckMode

@EnableKafka
@Configuration
Expand All @@ -20,25 +23,15 @@ class KafkaConfig {
fun kafkaAivenPersonhendelseListenerContainerFactory(
properties: KafkaProperties,
kafkaErrorHandler: KafkaErrorHandler,
sslBundles: ObjectProvider<SslBundles>,
): ConcurrentKafkaListenerContainerFactory<Long, Personhendelse> {
properties.properties[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = "true"
val factory = ConcurrentKafkaListenerContainerFactory<Long, Personhendelse>()
factory.consumerFactory = DefaultKafkaConsumerFactory(properties.buildConsumerProperties())
factory.setCommonErrorHandler(kafkaErrorHandler)
return factory
}
factory.containerProperties.ackMode = AckMode.MANUAL_IMMEDIATE

val props = fellesProperties(properties.buildConsumerProperties(sslBundles.getIfAvailable()))
props[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = true
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = KafkaAvroDeserializer::class.java

@Bean
fun kafkaVedtakListenerContainerFactory(
properties: KafkaProperties,
kafkaErrorHandler: KafkaErrorHandler,
): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
val props = properties.buildConsumerProperties()
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = false
factory.consumerFactory = DefaultKafkaConsumerFactory(props)
factory.setCommonErrorHandler(kafkaErrorHandler)
return factory
Expand All @@ -48,15 +41,25 @@ class KafkaConfig {
fun kafkaKontantstøtteVedtakListenerContainerFactory(
properties: KafkaProperties,
kafkaErrorHandler: KafkaErrorHandler,
sslBundles: ObjectProvider<SslBundles>,
): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
val props = properties.buildConsumerProperties()
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
factory.containerProperties.ackMode = AckMode.MANUAL_IMMEDIATE

val props = fellesProperties(properties.buildConsumerProperties(sslBundles.getIfAvailable()))
props[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = false
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java

factory.consumerFactory = DefaultKafkaConsumerFactory(props)
factory.setCommonErrorHandler(kafkaErrorHandler)
return factory
}

private fun fellesProperties(consumerProperties: MutableMap<String, Any>): MutableMap<String, Any> {
consumerProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
consumerProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 1
consumerProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
consumerProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = false
return consumerProperties
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.slf4j.MDC
import org.springframework.beans.factory.annotation.Value
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.listener.ConsumerSeekAware
import org.springframework.kafka.support.Acknowledgment
import org.springframework.messaging.handler.annotation.Payload
import org.springframework.stereotype.Component
import java.util.UUID
Expand All @@ -34,6 +35,7 @@ class PersonhendelseListener(
)
fun listen(
@Payload personhendelse: Personhendelse,
ack: Acknowledgment,
) {
try {
MDC.put(MDCConstants.MDC_CALL_ID, UUID.randomUUID().toString())
Expand All @@ -52,6 +54,7 @@ class PersonhendelseListener(
} else {
if (env != "dev") throw RuntimeException("Hendelse uten personIdent mottatt for hendelseId: ${personhendelse.hendelseId}")
}
ack.acknowledge()
} catch (e: Exception) {
logger.error("Feil ved håndtering av personhendelse med hendelseId: ${personhendelse.hendelseId}")
securelogger.error(
Expand Down
6 changes: 0 additions & 6 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,6 @@ spring:
type: PKCS12
location: ${KAFKA_TRUSTSTORE_PATH}
password: ${KAFKA_CREDSTORE_PASSWORD}
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
max-poll-records: 1
auto-offset-reset: latest
enable-auto-commit: false

no.nav.security.jwt:
issuer:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ import no.nav.person.pdl.leesah.Personhendelse
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.springframework.kafka.support.Acknowledgment
import java.util.UUID

internal class PersonhendelseListenerTest {
private val sakClient = mockk<SakClient>()
private val personhendelseService = mockk<PersonhendelseService>(relaxed = true)

private val ack = mockk<Acknowledgment>(relaxed = true)
private lateinit var listener: PersonhendelseListener

private val personMedSak = "11111111111"
Expand All @@ -31,15 +32,15 @@ internal class PersonhendelseListenerTest {

@Test
internal fun `skal kalle på personhendelseService for hendelse`() {
listener.listen(lagPersonhendelse(personIdent = personUtenSak))
listener.listen(lagPersonhendelse(personIdent = personUtenSak), ack)

verify(exactly = 1) { personhendelseService.håndterPersonhendelse(any()) }
}

@Test
internal fun `skal kaste feil når hendelse mangler personidenter`() {
val personhendelse = lagPersonhendelse(personIdent = "")
assertThatThrownBy { listener.listen(personhendelse) }.hasMessageContaining("Hendelse uten personIdent")
assertThatThrownBy { listener.listen(personhendelse, ack) }.hasMessageContaining("Hendelse uten personIdent")

verify(exactly = 0) { personhendelseService.håndterPersonhendelse(any()) }
}
Expand Down
8 changes: 1 addition & 7 deletions src/test/resources/application-integrasjonstest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,7 @@ spring:
schema.registry.url: http://localhost:8081
security:
protocol: PLAINTEXT
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
group-id: srvc01
max-poll-records: 1
auto-offset-reset: latest
enable-auto-commit: false


AZURE_APP_WELL_KNOWN_URL: https://login.microsoftonline.com/navq.onmicrosoft.com/v2.0/.well-known/openid-configuration
AZURE_OPENID_CONFIG_TOKEN_ENDPOINT: https://login.microsoftonline.com/navq.onmicrosoft.com/oauth/v2.0/token
Expand Down
7 changes: 0 additions & 7 deletions src/test/resources/application-local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,6 @@ spring:
sasl:
mechanism: PLAIN
jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
group-id: srvfamilie-ef-person
max-poll-records: 1
auto-offset-reset: latest
enable-auto-commit: false


FAMILIE_INTEGRASJONER_API_URL: https://familie-integrasjoner.dev-fss-pub.nais.io
Expand Down