diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 16e1efd39d182..a73b650aac2b5 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -26,6 +26,7 @@ import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, Delegati import kafka.utils.{CoreUtils, Logging} import org.apache.kafka.common.message.ApiMessageType.ListenerType import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache import org.apache.kafka.common.utils.{LogContext, Utils} @@ -155,6 +156,11 @@ class ControllerServer( () => featuresPublisher.features() ) + // metrics will be set to null when closing a controller, so we should recreate it for testing + if (sharedServer.metrics == null){ + sharedServer.metrics = new Metrics() + } + tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache) socketServer = new SocketServer(config, diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index 1d3021b0bbcf7..85238c3439e69 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -31,7 +31,6 @@ import org.apache.kafka.common.config.ConfigResource.Type import org.apache.kafka.common.errors.{InvalidPartitionsException, PolicyViolationException, UnsupportedVersionException} import org.apache.kafka.common.message.DescribeClusterRequestData import org.apache.kafka.common.metadata.{ConfigRecord, FeatureLevelRecord} -import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.quota.ClientQuotaAlteration.Op @@ -122,8 +121,6 @@ class KRaftClusterTest { val config = controller.sharedServer.controllerConfig.props config.asInstanceOf[util.HashMap[String,String]].put(SocketServerConfigs.LISTENERS_CONFIG, s"CONTROLLER://localhost:$port") controller.sharedServer.controllerConfig.updateCurrentConfig(new KafkaConfig(config)) - // metrics will be set to null when closing a controller, so we should recreate it for testing - controller.sharedServer.metrics = new Metrics() // restart controller controller.startup()