diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml
index 16bbc6db641ca..95500037c1473 100644
--- a/external/kafka-0-10-sql/pom.xml
+++ b/external/kafka-0-10-sql/pom.xml
@@ -29,10 +29,10 @@
spark-sql-kafka-0-10_2.11
sql-kafka-0-10
- 0.10.0.1
+ 2.0.0
jar
- Kafka 0.10 Source for Structured Streaming
+ Kafka 0.10+ Source for Structured Streaming
http://spark.apache.org/
@@ -73,6 +73,20 @@
kafka_${scala.binary.version}
${kafka.version}
test
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
net.sf.jopt-simple
@@ -80,6 +94,12 @@
3.2
test
+
+ org.eclipse.jetty
+ jetty-servlet
+ ${jetty.version}
+ test
+
org.scalacheck
scalacheck_${scala.binary.version}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
index aab8ec42189fb..ea2a2a84d22c6 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
@@ -42,6 +42,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
+ .option("kafka.default.api.timeout.ms", "3000")
.option("subscribePattern", s"$topicPrefix-.*")
.option("failOnDataLoss", "false")
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index c6412eac97dba..0584386d81bd9 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -290,6 +290,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
+ .option("kafka.default.api.timeout.ms", "3000")
.option("subscribePattern", s"$topicPrefix-.*")
.option("failOnDataLoss", "false")
@@ -467,6 +468,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
+ .option("kafka.default.api.timeout.ms", "3000")
.option("subscribe", topic)
// If a topic is deleted and we try to poll data starting from offset 0,
// the Kafka consumer will just block until timeout and return an empty result.
@@ -1103,6 +1105,7 @@ class KafkaSourceStressSuite extends KafkaSourceTest {
.option("kafka.metadata.max.age.ms", "1")
.option("subscribePattern", "stress.*")
.option("failOnDataLoss", "false")
+ .option("kafka.default.api.timeout.ms", "3000")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
@@ -1173,7 +1176,8 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
// 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at
// least 30 seconds.
props.put("log.cleaner.backoff.ms", "100")
- props.put("log.segment.bytes", "40")
+ // The size of RecordBatch V2 increases to support transactional write.
+ props.put("log.segment.bytes", "70")
props.put("log.retention.bytes", "40")
props.put("log.retention.check.interval.ms", "100")
props.put("delete.retention.ms", "10")
@@ -1215,6 +1219,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
+ .option("kafka.default.api.timeout.ms", "3000")
.option("subscribePattern", "failOnDataLoss.*")
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index 75245943c4936..82294905c24b9 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -29,12 +29,15 @@ import scala.util.Random
import kafka.admin.AdminUtils
import kafka.api.Request
-import kafka.common.TopicAndPartition
-import kafka.server.{KafkaConfig, KafkaServer, OffsetCheckpoint}
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils.ZkUtils
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, NewPartitions}
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
import org.scalatest.concurrent.Eventually._
@@ -61,6 +64,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
private var zookeeper: EmbeddedZookeeper = _
private var zkUtils: ZkUtils = _
+ private var adminClient: AdminClient = null
// Kafka broker related configurations
private val brokerHost = "localhost"
@@ -113,17 +117,23 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
server = new KafkaServer(brokerConf)
server.startup()
- brokerPort = server.boundPort()
+ brokerPort = server.boundPort(new ListenerName("PLAINTEXT"))
(server, brokerPort)
}, new SparkConf(), "KafkaBroker")
brokerReady = true
+ val props = new Properties()
+ props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, s"$brokerHost:$brokerPort")
+ adminClient = AdminClient.create(props)
}
/** setup the whole embedded servers, including Zookeeper and Kafka brokers */
def setup(): Unit = {
setupEmbeddedZookeeper()
setupEmbeddedKafkaServer()
+ eventually(timeout(60.seconds)) {
+ assert(zkUtils.getAllBrokersInCluster().nonEmpty, "Broker was not up in 60 seconds")
+ }
}
/** Teardown the whole servers, including Kafka broker and Zookeeper */
@@ -203,7 +213,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
/** Add new partitions to a Kafka topic */
def addPartitions(topic: String, partitions: Int): Unit = {
- AdminUtils.addPartitions(zkUtils, topic, partitions)
+ adminClient.createPartitions(
+ Map(topic -> NewPartitions.increaseTo(partitions)).asJava,
+ new CreatePartitionsOptions)
// wait until metadata is propagated
(0 until partitions).foreach { p =>
waitUntilMetadataIsPropagated(topic, p)
@@ -296,6 +308,8 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
props.put("replica.socket.timeout.ms", "1500")
props.put("delete.topic.enable", "true")
props.put("offsets.topic.num.partitions", "1")
+ props.put("offsets.topic.replication.factor", "1")
+ props.put("group.initial.rebalance.delay.ms", "10")
// Can not use properties.putAll(propsMap.asJava) in scala-2.12
// See https://github.com/scala/bug/issues/10418
withBrokerProps.foreach { case (k, v) => props.put(k, v) }
@@ -327,7 +341,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
topic: String,
numPartitions: Int,
servers: Seq[KafkaServer]): Unit = {
- val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
+ val topicAndPartitions = (0 until numPartitions).map(new TopicPartition(topic, _))
import ZkUtils._
// wait until admin path for delete topic is deleted, signaling completion of topic deletion
@@ -337,7 +351,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
assert(!zkUtils.pathExists(getTopicPath(topic)), s"${getTopicPath(topic)} still exists")
// ensure that the topic-partition has been deleted from all brokers' replica managers
assert(servers.forall(server => topicAndPartitions.forall(tp =>
- server.replicaManager.getPartition(tp.topic, tp.partition) == None)),
+ server.replicaManager.getPartition(tp) == None)),
s"topic $topic still exists in the replica manager")
// ensure that logs from all replicas are deleted if delete topic is marked successful
assert(servers.forall(server => topicAndPartitions.forall(tp =>
@@ -345,8 +359,8 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
s"topic $topic still exists in log mananger")
// ensure that topic is removed from all cleaner offsets
assert(servers.forall(server => topicAndPartitions.forall { tp =>
- val checkpoints = server.getLogManager().logDirs.map { logDir =>
- new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
+ val checkpoints = server.getLogManager().liveLogDirs.map { logDir =>
+ new OffsetCheckpointFile(new File(logDir, "cleaner-offset-checkpoint")).read()
}
checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
}), s"checkpoint for topic $topic still exists")
@@ -379,11 +393,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
case Some(partitionState) =>
- val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
-
zkUtils.getLeaderForPartition(topic, partition).isDefined &&
- Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
- leaderAndInSyncReplicas.isr.nonEmpty
+ Request.isValidBrokerId(partitionState.basePartitionState.leader) &&
+ !partitionState.basePartitionState.replicas.isEmpty
case _ =>
false