Skip to content

Commit

Permalink
[SPARK-28144][SPARK-29294][SS] Upgrade Kafka to 2.4.0
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This patch upgrades the version of Kafka to 2.4, which supports Scala 2.13.

There're some incompatible changes in Kafka 2.4 which the patch addresses as well:

* `ZkUtils` is removed -> Replaced with `KafkaZkClient`
* Majority of methods are removed in `AdminUtils` -> Replaced with `AdminZkClient`
* Method signature of `Scheduler.schedule` is changed (return type) -> leverage `DeterministicScheduler` to avoid implementing `ScheduledFuture`

### Why are the changes needed?

* Kafka 2.4 supports Scala 2.13

### Does this PR introduce any user-facing change?

No, as Kafka API is known to be compatible across versions.

### How was this patch tested?

Existing UTs

Closes apache#26960 from HeartSaVioR/SPARK-29294.

Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
HeartSaVioR authored and dongjoon-hyun committed Dec 21, 2019
1 parent fa47b7f commit 8384ff4
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ import scala.util.Random

import com.google.common.io.Files
import kafka.api.Request
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.server.{HostedPartition, KafkaConfig, KafkaServer}
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils.ZkUtils
import kafka.zk.KafkaZkClient
import org.apache.hadoop.minikdc.MiniKdc
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.clients.CommonClientConfigs
Expand All @@ -44,6 +44,7 @@ import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol.{PLAINTEXT, SASL_PLAINTEXT}
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.kafka.common.utils.SystemTime
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
import org.apache.zookeeper.server.auth.SASLAuthenticationProvider
import org.scalatest.Assertions._
Expand Down Expand Up @@ -81,7 +82,7 @@ class KafkaTestUtils(
private val zkSessionTimeout = 10000

private var zookeeper: EmbeddedZookeeper = _
private var zkUtils: ZkUtils = _
private var zkClient: KafkaZkClient = _

// Kafka broker related configurations
private val brokerHost = localCanonicalHostName
Expand Down Expand Up @@ -115,9 +116,9 @@ class KafkaTestUtils(
s"$brokerHost:$brokerPort"
}

def zookeeperClient: ZkUtils = {
def zookeeperClient: KafkaZkClient = {
assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client")
Option(zkUtils).getOrElse(
Option(zkClient).getOrElse(
throw new IllegalStateException("Zookeeper client is not yet initialized"))
}

Expand Down Expand Up @@ -243,7 +244,8 @@ class KafkaTestUtils(
zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
// Get the actual zookeeper binding port
zkPort = zookeeper.actualPort
zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, false)
zkClient = KafkaZkClient(s"$zkHost:$zkPort", isSecure = false, zkSessionTimeout,
zkConnectionTimeout, 1, new SystemTime())
zkReady = true
}

Expand Down Expand Up @@ -288,7 +290,7 @@ class KafkaTestUtils(
setupEmbeddedZookeeper()
setupEmbeddedKafkaServer()
eventually(timeout(1.minute)) {
assert(zkUtils.getAllBrokersInCluster().nonEmpty, "Broker was not up in 60 seconds")
assert(zkClient.getAllBrokersInCluster.nonEmpty, "Broker was not up in 60 seconds")
}
}

Expand Down Expand Up @@ -335,9 +337,9 @@ class KafkaTestUtils(
}
}

if (zkUtils != null) {
zkUtils.close()
zkUtils = null
if (zkClient != null) {
zkClient.close()
zkClient = null
}

if (zookeeper != null) {
Expand Down Expand Up @@ -367,7 +369,7 @@ class KafkaTestUtils(
var created = false
while (!created) {
try {
val newTopic = new NewTopic(topic, partitions, 1)
val newTopic = new NewTopic(topic, partitions, 1.shortValue())
adminClient.createTopics(Collections.singleton(newTopic))
created = true
} catch {
Expand All @@ -384,7 +386,7 @@ class KafkaTestUtils(
}

def getAllTopicsAndPartitionSize(): Seq[(String, Int)] = {
zkUtils.getPartitionsForTopics(zkUtils.getAllTopics()).mapValues(_.size).toSeq
zkClient.getPartitionsForTopics(zkClient.getAllTopicsInCluster).mapValues(_.size).toSeq
}

/** Create a Kafka topic and wait until it is propagated to the whole cluster */
Expand All @@ -394,9 +396,9 @@ class KafkaTestUtils(

/** Delete a Kafka topic and wait until it is propagated to the whole cluster */
def deleteTopic(topic: String): Unit = {
val partitions = zkUtils.getPartitionsForTopics(Seq(topic))(topic).size
val partitions = zkClient.getPartitionsForTopics(Set(topic))(topic).size
adminClient.deleteTopics(Collections.singleton(topic))
verifyTopicDeletionWithRetries(zkUtils, topic, partitions, List(this.server))
verifyTopicDeletionWithRetries(topic, partitions, List(this.server))
}

/** Add new partitions to a Kafka topic */
Expand Down Expand Up @@ -575,15 +577,12 @@ class KafkaTestUtils(
servers: Seq[KafkaServer]): Unit = {
val topicAndPartitions = (0 until numPartitions).map(new TopicPartition(topic, _))

import ZkUtils._
// wait until admin path for delete topic is deleted, signaling completion of topic deletion
assert(
!zkUtils.pathExists(getDeleteTopicPath(topic)),
s"${getDeleteTopicPath(topic)} still exists")
assert(!zkUtils.pathExists(getTopicPath(topic)), s"${getTopicPath(topic)} still exists")
assert(!zkClient.isTopicMarkedForDeletion(topic), "topic is still marked for deletion")
assert(!zkClient.topicExists(topic), "topic still exists")
// ensure that the topic-partition has been deleted from all brokers' replica managers
assert(servers.forall(server => topicAndPartitions.forall(tp =>
server.replicaManager.getPartition(tp) == None)),
server.replicaManager.getPartition(tp) == HostedPartition.None)),
s"topic $topic still exists in the replica manager")
// ensure that logs from all replicas are deleted if delete topic is marked successful
assert(servers.forall(server => topicAndPartitions.forall(tp =>
Expand All @@ -598,13 +597,12 @@ class KafkaTestUtils(
}), s"checkpoint for topic $topic still exists")
// ensure the topic is gone
assert(
!zkUtils.getAllTopics().contains(topic),
!zkClient.getAllTopicsInCluster.contains(topic),
s"topic $topic still exists on zookeeper")
}

/** Verify topic is deleted. Retry to delete the topic if not. */
private def verifyTopicDeletionWithRetries(
zkUtils: ZkUtils,
topic: String,
numPartitions: Int,
servers: Seq[KafkaServer]): Unit = {
Expand All @@ -626,9 +624,9 @@ class KafkaTestUtils(
def isPropagated = server.dataPlaneRequestProcessor.metadataCache
.getPartitionInfo(topic, partition) match {
case Some(partitionState) =>
zkUtils.getLeaderForPartition(topic, partition).isDefined &&
Request.isValidBrokerId(partitionState.basePartitionState.leader) &&
!partitionState.basePartitionState.replicas.isEmpty
zkClient.getLeaderForPartition(new TopicPartition(topic, partition)).isDefined &&
Request.isValidBrokerId(partitionState.leader) &&
!partitionState.replicas.isEmpty

case _ =>
false
Expand Down
5 changes: 5 additions & 0 deletions external/kafka-0-10/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.jmock</groupId>
<artifactId>jmock-junit4</artifactId>
<scope>test</scope>
</dependency>

<!--
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import kafka.admin.AdminUtils
import kafka.api.Request
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.ZkUtils
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.utils.SystemTime
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}

import org.apache.spark.{SparkConf, SparkException}
Expand All @@ -57,7 +58,8 @@ private[kafka010] class KafkaTestUtils extends Logging {

private var zookeeper: EmbeddedZookeeper = _

private var zkUtils: ZkUtils = _
private var zkClient: KafkaZkClient = _
private var admClient: AdminZkClient = _

// Kafka broker related configurations
private val brokerHost = "127.0.0.1"
Expand Down Expand Up @@ -85,19 +87,27 @@ private[kafka010] class KafkaTestUtils extends Logging {
s"$brokerHost:$brokerPort"
}

def zookeeperClient: ZkUtils = {
def zookeeperClient: KafkaZkClient = {
assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client")
Option(zkUtils).getOrElse(
Option(zkClient).getOrElse(
throw new IllegalStateException("Zookeeper client is not yet initialized"))
}

def adminClient: AdminZkClient = {
assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client")
Option(admClient).getOrElse(
throw new IllegalStateException("Admin client is not yet initialized"))
}

// Set up the Embedded Zookeeper server and get the proper Zookeeper port
private def setupEmbeddedZookeeper(): Unit = {
// Zookeeper server startup
zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
// Get the actual zookeeper binding port
zkPort = zookeeper.actualPort
zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, false)
zkClient = KafkaZkClient(s"$zkHost:$zkPort", isSecure = false, zkSessionTimeout,
zkConnectionTimeout, 1, new SystemTime())
admClient = new AdminZkClient(zkClient)
zkReady = true
}

Expand Down Expand Up @@ -162,9 +172,9 @@ private[kafka010] class KafkaTestUtils extends Logging {
}
}

if (zkUtils != null) {
zkUtils.close()
zkUtils = null
if (zkClient != null) {
zkClient.close()
zkClient = null
}

if (zookeeper != null) {
Expand All @@ -175,7 +185,7 @@ private[kafka010] class KafkaTestUtils extends Logging {

/** Create a Kafka topic and wait until it is propagated to the whole cluster */
def createTopic(topic: String, partitions: Int, config: Properties): Unit = {
AdminUtils.createTopic(zkUtils, topic, partitions, 1, config)
adminClient.createTopic(topic, partitions, 1, config)
// wait until metadata is propagated
(0 until partitions).foreach { p =>
waitUntilMetadataIsPropagated(topic, p)
Expand Down Expand Up @@ -289,9 +299,9 @@ private[kafka010] class KafkaTestUtils extends Logging {
def isPropagated = server.dataPlaneRequestProcessor.metadataCache
.getPartitionInfo(topic, partition) match {
case Some(partitionState) =>
val leader = partitionState.basePartitionState.leader
val isr = partitionState.basePartitionState.isr
zkUtils.getLeaderForPartition(topic, partition).isDefined &&
val leader = partitionState.leader
val isr = partitionState.isr
zkClient.getLeaderForPartition(new TopicPartition(topic, partition)).isDefined &&
Request.isValidBrokerId(leader) && !isr.isEmpty
case _ =>
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package org.apache.spark.streaming.kafka010.mocks

import java.util.concurrent.TimeUnit
import java.util.concurrent.{ScheduledFuture, TimeUnit}

import scala.collection.mutable.PriorityQueue

import kafka.utils.Scheduler
import org.apache.kafka.common.utils.Time
import org.jmock.lib.concurrent.DeterministicScheduler

/**
* A mock scheduler that executes tasks synchronously using a mock time instance.
Expand All @@ -41,57 +42,33 @@ import org.apache.kafka.common.utils.Time
*/
private[kafka010] class MockScheduler(val time: Time) extends Scheduler {

/* a priority queue of tasks ordered by next execution time */
var tasks = new PriorityQueue[MockTask]()
val scheduler = new DeterministicScheduler()

def isStarted: Boolean = true

def startup(): Unit = {}

def shutdown(): Unit = synchronized {
tasks.foreach(_.fun())
tasks.clear()
scheduler.runUntilIdle()
}

/**
* Check for any tasks that need to execute. Since this is a mock scheduler this check only occurs
* when this method is called and the execution happens synchronously in the calling thread.
* If you are using the scheduler associated with a MockTime instance this call
* will be triggered automatically.
*/
def tick(): Unit = synchronized {
val now = time.milliseconds
while(!tasks.isEmpty && tasks.head.nextExecution <= now) {
/* pop and execute the task with the lowest next execution time */
val curr = tasks.dequeue
curr.fun()
/* if the task is periodic, reschedule it and re-enqueue */
if(curr.periodic) {
curr.nextExecution += curr.period
this.tasks += curr
}
}
def tick(duration: Long, timeUnit: TimeUnit): Unit = synchronized {
scheduler.tick(duration, timeUnit)
}

def schedule(
name: String,
fun: () => Unit,
delay: Long = 0,
period: Long = -1,
unit: TimeUnit = TimeUnit.MILLISECONDS): Unit = synchronized {
tasks += MockTask(name, fun, time.milliseconds + delay, period = period)
tick()
}

}

case class MockTask(
val name: String,
val fun: () => Unit,
var nextExecution: Long,
val period: Long) extends Ordered[MockTask] {
def periodic: Boolean = period >= 0
def compare(t: MockTask): Int = {
java.lang.Long.compare(t.nextExecution, nextExecution)
unit: TimeUnit = TimeUnit.MILLISECONDS): ScheduledFuture[_] = synchronized {
val runnable = new Runnable {
override def run(): Unit = fun()
}
if (period >= 0) {
scheduler.scheduleAtFixedRate(runnable, delay, period, unit)
} else {
scheduler.schedule(runnable, delay, unit)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private[kafka010] class MockTime(@volatile private var currentMs: Long) extends

override def sleep(ms: Long): Unit = {
this.currentMs += ms
scheduler.tick()
scheduler.tick(ms, TimeUnit.MILLISECONDS)
}

override def waitObject(obj: Any, condition: Supplier[lang.Boolean], timeoutMs: Long): Unit =
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@
<!-- Version used for internal directory structure -->
<hive.version.short>2.3</hive.version.short>
<!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
<kafka.version>2.3.1</kafka.version>
<kafka.version>2.4.0</kafka.version>
<derby.version>10.12.1.1</derby.version>
<parquet.version>1.10.1</parquet.version>
<orc.version>1.5.8</orc.version>
Expand Down

0 comments on commit 8384ff4

Please sign in to comment.