Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15793 Fix ZkMigrationIntegrationTest#testMigrateTopicDeletions #17004

Merged
merged 27 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
d9f41f2
wait for topic to be created
mumrah Aug 26, 2024
4663c5f
select the core submodule
mumrah Aug 26, 2024
d05e93d
turn up logging
mumrah Aug 26, 2024
c843d43
increase timeouts
mumrah Aug 26, 2024
af06377
try repeating the test
mumrah Aug 26, 2024
a7a6bab
add a deflake action
mumrah Aug 26, 2024
2e3257a
really increase the timeout
mumrah Aug 26, 2024
791b36f
undo annotation changes
mumrah Aug 27, 2024
977971a
remove annotation param
mumrah Aug 27, 2024
3d39557
more test logs
mumrah Aug 27, 2024
63afeb5
Merge remote-tracking branch 'origin/trunk' into gh-deflake-testMigra…
mumrah Aug 27, 2024
af4dba3
Revert changes
mumrah Aug 27, 2024
c5fc814
remove annotation param name
mumrah Aug 28, 2024
02bf995
use different timeouts
mumrah Aug 28, 2024
ceacced
Merge remote-tracking branch 'origin/trunk' into gh-deflake-testMigra…
mumrah Aug 31, 2024
635896f
increase logging to info temporarily
mumrah Aug 31, 2024
1fc62ce
remove 3.4 case
mumrah Aug 31, 2024
a70ce0f
Revert "increase logging to info temporarily"
mumrah Aug 31, 2024
f3e589f
Merge remote-tracking branch 'origin/trunk' into gh-deflake-testMigra…
mumrah Aug 31, 2024
3f027e0
pr feedback
mumrah Sep 2, 2024
498711e
increase overall test timeout to 10m
mumrah Sep 2, 2024
6c997a3
increase logging
mumrah Sep 2, 2024
c292b43
Revert "increase logging"
mumrah Sep 3, 2024
4dc1435
increase logging
mumrah Sep 3, 2024
2f71f5f
ignore non-fatal faults
mumrah Sep 5, 2024
0358527
add log inside topicsAllDeleted
mumrah Sep 5, 2024
2af2478
add wait on ISR
mumrah Sep 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/deflake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
- name: Test
timeout-minutes: 60
run: |
./gradlew --build-cache --scan --continue \
./gradlew --info --build-cache --scan --continue \
-PtestLoggingEvents=started,passed,skipped,failed \
-PignoreFailures=true -PmaxParallelForks=2 \
-Pkafka.cluster.test.repeat=${{ inputs.test-repeat }} \
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

log4j.logger.kafka=WARN
log4j.logger.org.apache.kafka=WARN
log4j.logger.kafka=INFO
log4j.logger.org.apache.kafka=INFO

mumrah marked this conversation as resolved.
Show resolved Hide resolved
# zkclient can be verbose, during debugging it is common to adjust it separately
log4j.logger.org.apache.zookeeper=WARN
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ import scala.jdk.CollectionConverters._
object ZkMigrationIntegrationTest {
def zkClustersForAllMigrationVersions(): java.util.List[ClusterConfig] = {
Seq(
MetadataVersion.IBP_3_4_IV0,
MetadataVersion.IBP_3_5_IV2,
MetadataVersion.IBP_3_6_IV2,
MetadataVersion.IBP_3_7_IV0,
Expand All @@ -92,7 +91,7 @@ object ZkMigrationIntegrationTest {
}

@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@Timeout(300)
@Timeout(600)
class ZkMigrationIntegrationTest {

val log: Logger = LoggerFactory.getLogger(classOf[ZkMigrationIntegrationTest])
Expand Down Expand Up @@ -296,13 +295,29 @@ class ZkMigrationIntegrationTest {
def testMigrateTopicDeletions(zkCluster: ClusterInstance): Unit = {
// Create some topics in ZK mode
var admin = zkCluster.createAdminClient()
val newTopics = new util.ArrayList[NewTopic]()
newTopics.add(new NewTopic("test-topic-1", 10, 3.toShort))
newTopics.add(new NewTopic("test-topic-2", 10, 3.toShort))
newTopics.add(new NewTopic("test-topic-3", 10, 3.toShort))
val createTopicResult = admin.createTopics(newTopics)
createTopicResult.all().get(300, TimeUnit.SECONDS)
admin.close()
try {
val newTopics = new util.ArrayList[NewTopic]()
newTopics.add(new NewTopic("test-topic-1", 10, 3.toShort))
newTopics.add(new NewTopic("test-topic-2", 10, 3.toShort))
newTopics.add(new NewTopic("test-topic-3", 10, 3.toShort))
val createTopicResult = admin.createTopics(newTopics)
createTopicResult.all().get(61, TimeUnit.SECONDS)
TestUtils.waitUntilTrue(() => {
val topicDescribe = admin.describeTopics(Seq("test-topic-1", "test-topic-2", "test-topic-3").asJava)
if (topicDescribe.topicNameValues() == null || topicDescribe.topicNameValues().size() < 3) {
false
} else {
topicDescribe.topicNameValues().values().stream().allMatch {
topic => topic.get(62, TimeUnit.SECONDS).partitions().stream().allMatch(part => {
part.leader() != null && part.isr().size() == 3
})
}
}
}, msg="waiting for topics to be available", waitTimeMs=303000)
} finally {
mumrah marked this conversation as resolved.
Show resolved Hide resolved
admin.close()
}

val zkClient = zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient

// Bootstrap the ZK cluster ID into KRaft
Expand Down Expand Up @@ -339,7 +354,7 @@ class ZkMigrationIntegrationTest {
zkClient.createDeleteTopicPath("test-topic-3")

zkCluster.waitForReadyBrokers()
readyFuture.get(60, TimeUnit.SECONDS)
readyFuture.get(64, TimeUnit.SECONDS)

// Only continue with the test if there are some pending deletions to verify. If there are not any pending
// deletions, this will mark the test as "skipped" instead of failed.
Expand All @@ -352,12 +367,13 @@ class ZkMigrationIntegrationTest {
TestUtils.waitUntilTrue(
() => zkClient.getOrCreateMigrationState(ZkMigrationLeadershipState.EMPTY).initialZkMigrationComplete(),
"Timed out waiting for migration to complete",
30000)
65000)

// At this point, some of the topics may have been deleted by ZK controller and the rest will be
// implicitly deleted by the KRaft controller and remove from the ZK brokers as stray partitions
def topicsAllDeleted(admin: Admin): Boolean = {
val topics = admin.listTopics().names().get(60, TimeUnit.SECONDS)
val topics = admin.listTopics().names().get(66, TimeUnit.SECONDS)
log.info("Topics are {}", topics)
topics.retainAll(util.Arrays.asList(
"test-topic-1", "test-topic-2", "test-topic-3"
))
Expand All @@ -369,18 +385,19 @@ class ZkMigrationIntegrationTest {
TestUtils.waitUntilTrue(
() => topicsAllDeleted(admin),
"Timed out waiting for topics to be deleted",
30000,
307000,
1000)
log.info("Topics were deleted. Now re-creating them.")

val newTopics = new util.ArrayList[NewTopic]()
newTopics.add(new NewTopic("test-topic-1", 2, 3.toShort))
newTopics.add(new NewTopic("test-topic-2", 1, 3.toShort))
newTopics.add(new NewTopic("test-topic-3", 10, 3.toShort))
val createTopicResult = admin.createTopics(newTopics)
createTopicResult.all().get(60, TimeUnit.SECONDS)
createTopicResult.all().get(68, TimeUnit.SECONDS)

def topicsAllRecreated(admin: Admin): Boolean = {
val topics = admin.listTopics().names().get(60, TimeUnit.SECONDS)
val topics = admin.listTopics().names().get(69, TimeUnit.SECONDS)
topics.retainAll(util.Arrays.asList(
"test-topic-1", "test-topic-2", "test-topic-3"
))
Expand All @@ -391,21 +408,22 @@ class ZkMigrationIntegrationTest {
TestUtils.waitUntilTrue(
() => topicsAllRecreated(admin),
"Timed out waiting for topics to be created",
30000,
70000,
1000)

TestUtils.retry(300000) {
log.info("Topics were re-created. Now waiting for consistent topic state.")
TestUtils.retry(311000) {
// Need a retry here since topic metadata may be inconsistent between brokers
val topicDescriptions = try {
admin.describeTopics(util.Arrays.asList(
"test-topic-1", "test-topic-2", "test-topic-3"
)).topicNameValues().asScala.map { case (name, description) =>
name -> description.get(60, TimeUnit.SECONDS)
name -> description.get(72, TimeUnit.SECONDS)
}.toMap
} catch {
case e: ExecutionException if e.getCause.isInstanceOf[UnknownTopicOrPartitionException] => Map.empty[String, TopicDescription]
case t: Throwable => fail("Error describing topics", t.getCause)
}
log.debug("Topic describe: {}", topicDescriptions);

assertEquals(2, topicDescriptions("test-topic-1").partitions().size())
assertEquals(1, topicDescriptions("test-topic-2").partitions().size())
Expand All @@ -417,14 +435,13 @@ class ZkMigrationIntegrationTest {
})
}

val absentTopics = admin.listTopics().names().get(60, TimeUnit.SECONDS).asScala
val absentTopics = admin.listTopics().names().get(73, TimeUnit.SECONDS).asScala
assertTrue(absentTopics.contains("test-topic-1"))
assertTrue(absentTopics.contains("test-topic-2"))
assertTrue(absentTopics.contains("test-topic-3"))
}

admin.close()
} finally {
admin.close()
shutdownInSequence(zkCluster, kraftCluster)
}
}
Expand Down Expand Up @@ -1153,6 +1170,7 @@ class ZkMigrationIntegrationTest {

def shutdownInSequence(zkCluster: ClusterInstance, kraftCluster: KafkaClusterTestKit): Unit = {
zkCluster.brokerIds().forEach(zkCluster.shutdownBroker(_))
kraftCluster.nonFatalFaultHandler().setIgnore(true)
kraftCluster.close()
zkCluster.stop()
}
Expand Down