Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions docs/structured-streaming-kafka-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,25 @@ The following options must be set for the Kafka source.

<table class="table">
<tr><th>Option</th><th>value</th><th>meaning</th></tr>
<tr>
<td>assign</td>
<td>json string {"topicA":[0,1],"topicB":[2,4]}</td>
<td>Specific TopicPartitions to consume.
Only one of "assign", "subscribe" or "subscribePattern"
options can be specified for Kafka source.</td>
</tr>
<tr>
<td>subscribe</td>
<td>A comma-separated list of topics</td>
<td>The topic list to subscribe. Only one of "subscribe" and "subscribePattern" options can be
specified for Kafka source.</td>
<td>The topic list to subscribe.
Only one of "assign", "subscribe" or "subscribePattern"
options can be specified for Kafka source.</td>
</tr>
<tr>
<td>subscribePattern</td>
<td>Java regex string</td>
<td>The pattern used to subscribe the topic. Only one of "subscribe" and "subscribePattern"
<td>The pattern used to subscribe to topic(s).
Only one of "assign, "subscribe" or "subscribePattern"
options can be specified for Kafka source.</td>
</tr>
<tr>
Expand All @@ -174,16 +183,21 @@ The following configurations are optional:
<table class="table">
<tr><th>Option</th><th>value</th><th>default</th><th>meaning</th></tr>
<tr>
<td>startingOffset</td>
<td>["earliest", "latest"]</td>
<td>"latest"</td>
<td>The start point when a query is started, either "earliest" which is from the earliest offset,
or "latest" which is just from the latest offset. Note: This only applies when a new Streaming q
uery is started, and that resuming will always pick up from where the query left off.</td>
<td>startingOffsets</td>
<td>earliest, latest, or json string
{"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}
</td>
<td>latest</td>
<td>The start point when a query is started, either "earliest" which is from the earliest offsets,
"latest" which is just from the latest offsets, or a json string specifying a starting offset for
each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest.
Note: This only applies when a new Streaming query is started, and that resuming will always pick
up from where the query left off. Newly discovered partitions during a query will start at
earliest.</td>
</tr>
<tr>
<td>failOnDataLoss</td>
<td>[true, false]</td>
<td>true or false</td>
<td>true</td>
<td>Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or
offsets are out of range). This may be a false alarm. You can disable it when it doesn't work
Expand Down Expand Up @@ -215,10 +229,10 @@ Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.

Note that the following Kafka params cannot be set and the Kafka source will throw an exception:
- **group.id**: Kafka source will create a unique group id for each query automatically.
- **auto.offset.reset**: Set the source option `startingOffset` to `earliest` or `latest` to specify
- **auto.offset.reset**: Set the source option `startingOffsets` to specify
where to start instead. Structured Streaming manages which offsets are consumed internally, rather
than rely on the kafka Consumer to do it. This will ensure that no data is missed when when new
topics/partitions are dynamically subscribed. Note that `startingOffset` only applies when a new
topics/partitions are dynamically subscribed. Note that `startingOffsets` only applies when a new
Streaming query is started, and that resuming will always pick up from where the query left off.
- **key.deserializer**: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use
DataFrame operations to explicitly deserialize the keys.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.kafka010

import java.io.Writer

import scala.collection.mutable.{ ArrayBuffer, HashMap }
import scala.util.control.NonFatal

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node._
import org.apache.kafka.common.TopicPartition

/**
* Utilities for converting Kafka related objects to and from json.
*/
private object JsonUtils {
private val mapper = new ObjectMapper()

/**
* Read TopicPartitions from json string
*/
def partitions(str: String): Array[TopicPartition] = {
try {
val res = new ArrayBuffer[TopicPartition]()
val topics = mapper.readTree(str).fields
while (topics.hasNext) {
val node = topics.next
val topic = node.getKey
val parts = node.getValue.elements
while (parts.hasNext) {
res.append(new TopicPartition(topic, parts.next().asInt))
}
}
res.toArray
} catch {
case NonFatal(x) =>
throw new IllegalArgumentException(
s"""Expected e.g. {"topicA":[0,1],"topicB":[0,1]}, got $str""")
}
}

/**
* Write TopicPartitions as json
*/
def partitions(partitions: Iterable[TopicPartition], writer: Writer): Unit = {
val root = mapper.createObjectNode()
partitions.foreach { tp =>
var topic = root.get(tp.topic)
if (null == topic) {
root.set(tp.topic, mapper.createArrayNode())
topic = root.get(tp.topic)
}
topic.asInstanceOf[ArrayNode].add(tp.partition)
}
mapper.writeValue(writer, root)
}

/**
* Read per-TopicPartition offsets from json string
*/
def partitionOffsets(str: String): Map[TopicPartition, Long] = {
try {
val res = new HashMap[TopicPartition, Long]
val topics = mapper.readTree(str).fields
while (topics.hasNext) {
val node = topics.next
val topic = node.getKey
val parts = node.getValue.fields
while (parts.hasNext) {
val node = parts.next
val part = node.getKey.toInt
val offset = node.getValue.asLong
res += new TopicPartition(topic, part) -> offset
}
}
res.toMap
} catch {
case NonFatal(x) =>
throw new IllegalArgumentException(
s"""Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got $str""")
}
}

/**
* Write per-TopicPartition offsets as json
*/
def partitionOffsets(partitionOffsets: Map[TopicPartition, Long], writer: Writer): Unit = {
val root = mapper.createObjectNode()
partitionOffsets.foreach { case (tp, off) =>
var topic = root.get(tp.topic)
if (null == topic) {
root.set(tp.topic, mapper.createObjectNode())
topic = root.get(tp.topic)
}
topic.asInstanceOf[ObjectNode].set(tp.partition.toString, new LongNode(off))
}
mapper.writeValue(writer, root)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.{util => ju}
import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer}
import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer, OffsetOutOfRangeException}
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.common.TopicPartition

Expand Down Expand Up @@ -82,7 +82,7 @@ private[kafka010] case class KafkaSource(
executorKafkaParams: ju.Map[String, Object],
sourceOptions: Map[String, String],
metadataPath: String,
startFromEarliestOffset: Boolean,
startingOffsets: StartingOffsets,
failOnDataLoss: Boolean)
extends Source with Logging {

Expand Down Expand Up @@ -110,10 +110,10 @@ private[kafka010] case class KafkaSource(
private lazy val initialPartitionOffsets = {
val metadataLog = new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath)
metadataLog.get(0).getOrElse {
val offsets = if (startFromEarliestOffset) {
KafkaSourceOffset(fetchEarliestOffsets())
} else {
KafkaSourceOffset(fetchLatestOffsets())
val offsets = startingOffsets match {
case EarliestOffsets => KafkaSourceOffset(fetchEarliestOffsets())
case LatestOffsets => KafkaSourceOffset(fetchLatestOffsets())
case SpecificOffsets(p) => KafkaSourceOffset(fetchSpecificStartingOffsets(p))
}
metadataLog.add(0, offsets)
logInfo(s"Initial offsets: $offsets")
Expand Down Expand Up @@ -231,6 +231,42 @@ private[kafka010] case class KafkaSource(

override def toString(): String = s"KafkaSource[$consumerStrategy]"

/**
* Set consumer position to specified offsets, making sure all assignments are set.
*/
private def fetchSpecificStartingOffsets(
partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] =
withRetriesWithoutInterrupt {
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()
consumer.pause(partitions)
assert(partitions.asScala == partitionOffsets.keySet,
"If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" +
"Use -1 for latest, -2 for earliest, if you don't care.\n" +
s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}")
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets")

partitionOffsets.foreach {
case (tp, -1) => consumer.seekToEnd(ju.Arrays.asList(tp))
case (tp, -2) => consumer.seekToBeginning(ju.Arrays.asList(tp))
case (tp, off) => consumer.seek(tp, off)
}
val result = partitionOffsets.map {
case (tp, _) => tp -> consumer.position(tp)
}
partitionOffsets.foreach {
case (tp, off) if off != -1 && off != -2 =>
if (result(tp) != off) {
reportDataLoss(
s"startingOffsets for $tp was $off but consumer reset to earliest ${result(tp)}")
}
case _ =>
// no real way to check that beginning or end is reasonable
}
result
}

/**
* Fetch the earliest offsets of partitions.
*/
Expand Down Expand Up @@ -273,7 +309,7 @@ private[kafka010] case class KafkaSource(
consumer.poll(0)
val partitions = consumer.assignment()
consumer.pause(partitions)
logDebug(s"\tPartitioned assigned to consumer: $partitions")
logDebug(s"\tPartitions assigned to consumer: $partitions")

// Get the earliest offset of each partition
consumer.seekToBeginning(partitions)
Expand Down Expand Up @@ -317,6 +353,8 @@ private[kafka010] case class KafkaSource(
try {
result = Some(body)
} catch {
case x: OffsetOutOfRangeException =>
reportDataLoss(x.getMessage)
case NonFatal(e) =>
lastException = e
logWarning(s"Error in attempt $attempt getting Kafka offsets: ", e)
Expand Down Expand Up @@ -373,6 +411,17 @@ private[kafka010] object KafkaSource {
def createConsumer(): Consumer[Array[Byte], Array[Byte]]
}

case class AssignStrategy(partitions: Array[TopicPartition], kafkaParams: ju.Map[String, Object])
extends ConsumerStrategy {
override def createConsumer(): Consumer[Array[Byte], Array[Byte]] = {
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
consumer.assign(ju.Arrays.asList(partitions: _*))
consumer
}

override def toString: String = s"Assign[${partitions.mkString(", ")}]"
}

case class SubscribeStrategy(topics: Seq[String], kafkaParams: ju.Map[String, Object])
extends ConsumerStrategy {
override def createConsumer(): Consumer[Array[Byte], Array[Byte]] = {
Expand Down
Loading