Skip to content

Commit

Permalink
KAFKA-17986 Fix ConsumerRebootstrapTest and ProducerRebootstrapTest (a…
Browse files Browse the repository at this point in the history
…pache#18175)

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
peterxcli authored Jan 9, 2025
1 parent fcd98da commit a116753
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException

class ConsumerRebootstrapTest extends RebootstrapTest {
@Disabled("KAFKA-17986")
@ParameterizedTest(name = RebootstrapTestName)
@MethodSource(Array("rebootstrapTestParams"))
def testRebootstrap(quorum: String, groupProtocol: String, useRebootstrapTriggerMs: Boolean): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@ package kafka.api

import org.apache.kafka.clients.producer.ProducerRecord
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource

class ProducerRebootstrapTest extends RebootstrapTest {
@Disabled("KAFKA-17986")
@ParameterizedTest(name = "{displayName}.quorum=kraft.useRebootstrapTriggerMs={0}")
@ValueSource(booleans = Array(false, true))
def testRebootstrap(useRebootstrapTriggerMs: Boolean): Unit = {
// It's ok to shut the leader down, cause the reelection is small enough to the producer timeout.
server1.shutdown()
server1.awaitShutdown()

Expand Down
14 changes: 13 additions & 1 deletion core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import kafka.server.{KafkaBroker, KafkaConfig}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.junit.jupiter.api.{BeforeEach, TestInfo}

import java.util.Properties

Expand All @@ -29,10 +30,21 @@ abstract class RebootstrapTest extends AbstractConsumerTest {
def server0: KafkaBroker = serverForId(0).get
def server1: KafkaBroker = serverForId(1).get

@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.doSetup(testInfo, createOffsetsTopic = true)

// Enable unclean leader election for the test topic
val topicProps = new Properties
topicProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true")

// create the test topic with all the brokers as replicas
createTopic(topic, 2, brokerCount, adminClientConfig = this.adminClientConfig, topicConfig = topicProps)
}

override def generateConfigs: Seq[KafkaConfig] = {
val overridingProps = new Properties()
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, brokerCount.toString)
overridingProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true")

// In this test, fixed ports are necessary, because brokers must have the
// same port after the restart.
Expand Down

0 comments on commit a116753

Please sign in to comment.