Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions external/kafka-0-10-sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<properties>
<sbt.project.name>sql-kafka-0-10</sbt.project.name>
<kafka.version>0.10.0.1</kafka.version>
<kafka.version>2.0.0</kafka.version>
</properties>
<packaging>jar</packaging>
<name>Kafka 0.10 Source for Structured Streaming</name>
<name>Kafka 0.10+ Source for Structured Streaming</name>
<url>http://spark.apache.org/</url>

<dependencies>
Expand Down Expand Up @@ -73,13 +73,33 @@
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
<version>3.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>${jetty.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("kafka.default.api.timeout.ms", "3000")
.option("subscribePattern", s"$topicPrefix-.*")
.option("failOnDataLoss", "false")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("kafka.default.api.timeout.ms", "3000")
.option("subscribePattern", s"$topicPrefix-.*")
.option("failOnDataLoss", "false")

Expand Down Expand Up @@ -467,6 +468,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("kafka.default.api.timeout.ms", "3000")
.option("subscribe", topic)
// If a topic is deleted and we try to poll data starting from offset 0,
// the Kafka consumer will just block until timeout and return an empty result.
Expand Down Expand Up @@ -1103,6 +1105,7 @@ class KafkaSourceStressSuite extends KafkaSourceTest {
.option("kafka.metadata.max.age.ms", "1")
.option("subscribePattern", "stress.*")
.option("failOnDataLoss", "false")
.option("kafka.default.api.timeout.ms", "3000")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
Expand Down Expand Up @@ -1173,7 +1176,8 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
// 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at
// least 30 seconds.
props.put("log.cleaner.backoff.ms", "100")
props.put("log.segment.bytes", "40")
// The size of RecordBatch V2 increases to support transactional write.
props.put("log.segment.bytes", "70")
props.put("log.retention.bytes", "40")
props.put("log.retention.check.interval.ms", "100")
props.put("delete.retention.ms", "10")
Expand Down Expand Up @@ -1215,6 +1219,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("kafka.default.api.timeout.ms", "3000")
.option("subscribePattern", "failOnDataLoss.*")
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@ import scala.util.Random

import kafka.admin.AdminUtils
import kafka.api.Request
import kafka.common.TopicAndPartition
import kafka.server.{KafkaConfig, KafkaServer, OffsetCheckpoint}
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils.ZkUtils
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, NewPartitions}
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
import org.scalatest.concurrent.Eventually._
Expand All @@ -61,6 +64,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
private var zookeeper: EmbeddedZookeeper = _

private var zkUtils: ZkUtils = _
private var adminClient: AdminClient = null

// Kafka broker related configurations
private val brokerHost = "localhost"
Expand Down Expand Up @@ -113,17 +117,23 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
server = new KafkaServer(brokerConf)
server.startup()
brokerPort = server.boundPort()
brokerPort = server.boundPort(new ListenerName("PLAINTEXT"))
(server, brokerPort)
}, new SparkConf(), "KafkaBroker")

brokerReady = true
val props = new Properties()
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, s"$brokerHost:$brokerPort")
adminClient = AdminClient.create(props)
}

/** setup the whole embedded servers, including Zookeeper and Kafka brokers */
def setup(): Unit = {
setupEmbeddedZookeeper()
setupEmbeddedKafkaServer()
eventually(timeout(60.seconds)) {
assert(zkUtils.getAllBrokersInCluster().nonEmpty, "Broker was not up in 60 seconds")
}
}

/** Teardown the whole servers, including Kafka broker and Zookeeper */
Expand Down Expand Up @@ -203,7 +213,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L

/** Add new partitions to a Kafka topic */
def addPartitions(topic: String, partitions: Int): Unit = {
AdminUtils.addPartitions(zkUtils, topic, partitions)
adminClient.createPartitions(
Map(topic -> NewPartitions.increaseTo(partitions)).asJava,
new CreatePartitionsOptions)
// wait until metadata is propagated
(0 until partitions).foreach { p =>
waitUntilMetadataIsPropagated(topic, p)
Expand Down Expand Up @@ -296,6 +308,8 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
props.put("replica.socket.timeout.ms", "1500")
props.put("delete.topic.enable", "true")
props.put("offsets.topic.num.partitions", "1")
props.put("offsets.topic.replication.factor", "1")
props.put("group.initial.rebalance.delay.ms", "10")
// Can not use properties.putAll(propsMap.asJava) in scala-2.12
// See https://github.com/scala/bug/issues/10418
withBrokerProps.foreach { case (k, v) => props.put(k, v) }
Expand Down Expand Up @@ -327,7 +341,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
topic: String,
numPartitions: Int,
servers: Seq[KafkaServer]): Unit = {
val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
val topicAndPartitions = (0 until numPartitions).map(new TopicPartition(topic, _))

import ZkUtils._
// wait until admin path for delete topic is deleted, signaling completion of topic deletion
Expand All @@ -337,16 +351,16 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
assert(!zkUtils.pathExists(getTopicPath(topic)), s"${getTopicPath(topic)} still exists")
// ensure that the topic-partition has been deleted from all brokers' replica managers
assert(servers.forall(server => topicAndPartitions.forall(tp =>
server.replicaManager.getPartition(tp.topic, tp.partition) == None)),
server.replicaManager.getPartition(tp) == None)),
s"topic $topic still exists in the replica manager")
// ensure that logs from all replicas are deleted if delete topic is marked successful
assert(servers.forall(server => topicAndPartitions.forall(tp =>
server.getLogManager().getLog(tp).isEmpty)),
s"topic $topic still exists in log mananger")
// ensure that topic is removed from all cleaner offsets
assert(servers.forall(server => topicAndPartitions.forall { tp =>
val checkpoints = server.getLogManager().logDirs.map { logDir =>
new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
val checkpoints = server.getLogManager().liveLogDirs.map { logDir =>
new OffsetCheckpointFile(new File(logDir, "cleaner-offset-checkpoint")).read()
}
checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
}), s"checkpoint for topic $topic still exists")
Expand Down Expand Up @@ -379,11 +393,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
case Some(partitionState) =>
val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr

zkUtils.getLeaderForPartition(topic, partition).isDefined &&
Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
leaderAndInSyncReplicas.isr.nonEmpty
Request.isValidBrokerId(partitionState.basePartitionState.leader) &&
!partitionState.basePartitionState.replicas.isEmpty

case _ =>
false
Expand Down