Skip to content

Commit

Permalink
KAFKA-18274 Failed to restart controller in testing due to closed soc…
Browse files Browse the repository at this point in the history
…ket channel [1/2] (#18310)

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
peterxcli authored Dec 25, 2024
1 parent 88adb94 commit c7c1364
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
6 changes: 6 additions & 0 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, Delegati
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{LogContext, Utils}
Expand Down Expand Up @@ -155,6 +156,11 @@ class ControllerServer(
() => featuresPublisher.features()
)

// metrics will be set to null when closing a controller, so we should recreate it for testing
if (sharedServer.metrics == null){
sharedServer.metrics = new Metrics()
}

tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
socketServer = new SocketServer(config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.kafka.common.config.ConfigResource.Type
import org.apache.kafka.common.errors.{InvalidPartitionsException, PolicyViolationException, UnsupportedVersionException}
import org.apache.kafka.common.message.DescribeClusterRequestData
import org.apache.kafka.common.metadata.{ConfigRecord, FeatureLevelRecord}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.quota.ClientQuotaAlteration.Op
Expand Down Expand Up @@ -122,8 +121,6 @@ class KRaftClusterTest {
val config = controller.sharedServer.controllerConfig.props
config.asInstanceOf[util.HashMap[String,String]].put(SocketServerConfigs.LISTENERS_CONFIG, s"CONTROLLER://localhost:$port")
controller.sharedServer.controllerConfig.updateCurrentConfig(new KafkaConfig(config))
// metrics will be set to null when closing a controller, so we should recreate it for testing
controller.sharedServer.metrics = new Metrics()

// restart controller
controller.startup()
Expand Down

0 comments on commit c7c1364

Please sign in to comment.