Skip to content

Commit

Permalink
MINOR: Improve PlaintextAdminIntegrationTest#testConsumerGroups (apac…
Browse files Browse the repository at this point in the history
…he#18409)

Looking at the [history](https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.timeZoneId=Europe%2FZurich&tests.container=kafka.api.PlaintextAdminIntegrationTest&tests.test=testConsumerGroups(String%2C%20String)%5B2%5D), I found out that one source of flakiness is due to syncCommit failing with CommitFailedException. We can ignore it and retry on the next iteration.

```
[2025-01-07 10:17:00,783] ERROR [Consumer instanceId=test_instance_id_1, clientId=test_client_id, groupId=test_group_id] OffsetCommit failed for member VfImExrxT3-w_HNJcTkqnw with stale member epoch error. Last epoch sent: 2 (org.apache.kafka.clients.consumer.internals.CommitRequestManager:773)
Exception in thread "Thread-6" org.apache.kafka.clients.consumer.CommitFailedException: OffsetCommit failed with stale member epoch.The member epoch is stale. The member must retry after receiving its updated member epoch via the ConsumerGroupHeartbeat API.
        at org.apache.kafka.clients.consumer.internals.CommitRequestManager.commitSyncExceptionForError(CommitRequestManager.java:481)
        at org.apache.kafka.clients.consumer.internals.CommitRequestManager.lambda$commitSyncWithRetries$7(CommitRequestManager.java:472)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
        at org.apache.kafka.clients.consumer.internals.CommitRequestManager$OffsetCommitRequestState.onResponse(CommitRequestManager.java:776)
        at org.apache.kafka.clients.consumer.internals.CommitRequestManager$RetriableRequestState.handleClientResponse(CommitRequestManager.java:893)
        at org.apache.kafka.clients.consumer.internals.CommitRequestManager$RetriableRequestState.lambda$buildRequestWithResponseHandling$0(CommitRequestManager.java:883)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
        at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
        at org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler.onComplete(NetworkClientDelegate.java:433)
        at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
        at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:669)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:661)
        at org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.poll(NetworkClientDelegate.java:153)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:160)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:106)
```

Reviewers: Lianet Magrans <[email protected]>
  • Loading branch information
dajac authored Jan 7, 2025
1 parent 25890e9 commit 48b522f
Showing 1 changed file with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.kafka.clients.HostResolver
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, GroupProtocol, KafkaConsumer, OffsetAndMetadata, ShareConsumer}
import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerConfig, GroupProtocol, KafkaConsumer, OffsetAndMetadata, ShareConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig, SslConfigs, TopicConfig}
Expand Down Expand Up @@ -1875,7 +1875,11 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
consumer.poll(JDuration.ofSeconds(5))
if (!consumer.assignment.isEmpty && latch.getCount > 0L)
latch.countDown()
consumer.commitSync()
try {
consumer.commitSync()
} catch {
case _: CommitFailedException => // Ignore and retry on next iteration.
}
}
} catch {
case _: InterruptException => // Suppress the output to stderr
Expand Down Expand Up @@ -2237,7 +2241,11 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
consumer.poll(JDuration.ofSeconds(5))
if (!consumer.assignment.isEmpty && latch.getCount > 0L)
latch.countDown()
consumer.commitSync()
try {
consumer.commitSync()
} catch {
case _: CommitFailedException => // Ignore and retry on next iteration.
}
}
} catch {
case _: InterruptException => // Suppress the output to stderr
Expand Down

0 comments on commit 48b522f

Please sign in to comment.