Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
e768164
#2808 update kafka to version 0.8.2
Dec 7, 2014
d9dc2bc
Merge remote-tracking branch 'upstream/master' into wip-2808-kafka-0.…
Dec 23, 2014
2e67c66
#SPARK-2808 Update to Kafka 0.8.2.0 GA from beta.
Feb 5, 2015
6953429
[SPARK-2808][Streaming][Kafka] update kafka to 0.8.2
koeninger Feb 11, 2015
77de6c2
Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade
koeninger Mar 18, 2015
407382e
[SPARK-2808][Streaming][Kafka] update kafka to 0.8.2.1
koeninger Mar 18, 2015
ed02d2c
[SPARK-2808][Streaming][Kafka] move default argument for api version …
koeninger Apr 15, 2015
1d10751
Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade
koeninger Apr 27, 2015
c70ee43
[SPARK-2808][Streaming][Kafka] add more asserts to test, try to figur…
koeninger Apr 28, 2015
9edab4c
[SPARK-2808][Streaming][Kafka] more shots in the dark on jenkins fail…
koeninger Apr 28, 2015
af6f3ec
[SPARK-2808][Streaming][Kafka] delay test until latest leader offset …
koeninger Apr 29, 2015
61b3464
[SPARK-2808][Streaming][Kafka] delay for second send in boundary cond…
koeninger Apr 29, 2015
3824ce3
[SPARK-2808][Streaming][Kafka] naming / comments per tdas
koeninger Apr 29, 2015
2b92d3f
[SPARK-2808][Streaming][Kafka] wait for leader offsets in the java te…
koeninger Apr 29, 2015
2712649
[SPARK-2808][Streaming][Kafka] add more logging to python test, see w…
koeninger Apr 29, 2015
115aeee
Merge branch 'master' into wip-2808-kafka-0.8.2-upgrade
koeninger Apr 29, 2015
4c4557f
[SPARK-2808][Streaming][Kafka] add even more logging to python test
koeninger Apr 30, 2015
1d896e2
[SPARK-2808][Streaming][Kafka] add even even more logging to python test
koeninger Apr 30, 2015
bb0cfe2
Changes to debug flaky streaming tests.
tdas May 1, 2015
ae12eb2
Enable only kafka streaming test
tdas May 1, 2015
4bb7e40
Fix goof up
tdas May 1, 2015
9804030
removed sleeps.
tdas May 1, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion external/kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>0.8.1.1</version>
<version>0.8.2.0</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.util.Random
import scala.collection.mutable.ArrayBuffer
import java.util.Properties
import kafka.api._
import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
import kafka.common.{ErrorMapping, OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition}
import kafka.consumer.{ConsumerConfig, SimpleConsumer}
import org.apache.spark.SparkException

Expand Down Expand Up @@ -212,12 +212,16 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
// scalastyle:on

// this 0 here indicates api version, in this case the original ZK backed api.
def defaultConsumerApiVersion: Short = 0

/** Requires Kafka >= 0.8.1.1 */
def getConsumerOffsets(
groupId: String,
topicAndPartitions: Set[TopicAndPartition]
topicAndPartitions: Set[TopicAndPartition],
versionId: Short = defaultConsumerApiVersion
): Either[Err, Map[TopicAndPartition, Long]] = {
getConsumerOffsetMetadata(groupId, topicAndPartitions).right.map { r =>
getConsumerOffsetMetadata(groupId, topicAndPartitions, versionId).right.map { r =>
r.map { kv =>
kv._1 -> kv._2.offset
}
Expand All @@ -227,10 +231,11 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
/** Requires Kafka >= 0.8.1.1 */
def getConsumerOffsetMetadata(
groupId: String,
topicAndPartitions: Set[TopicAndPartition]
topicAndPartitions: Set[TopicAndPartition],
versionId: Short = defaultConsumerApiVersion
): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = {
var result = Map[TopicAndPartition, OffsetMetadataAndError]()
val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq)
val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, versionId)
val errs = new Err
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
val resp = consumer.fetchOffsets(req)
Expand All @@ -257,25 +262,28 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
/** Requires Kafka >= 0.8.1.1 */
def setConsumerOffsets(
groupId: String,
offsets: Map[TopicAndPartition, Long]
offsets: Map[TopicAndPartition, Long],
versionId: Short = defaultConsumerApiVersion
): Either[Err, Map[TopicAndPartition, Short]] = {
setConsumerOffsetMetadata(groupId, offsets.map { kv =>
kv._1 -> OffsetMetadataAndError(kv._2)
})
val meta = offsets.map { kv =>
kv._1 -> OffsetAndMetadata(kv._2)
}
setConsumerOffsetMetadata(groupId, meta, versionId)
}

/** Requires Kafka >= 0.8.1.1 */
def setConsumerOffsetMetadata(
groupId: String,
metadata: Map[TopicAndPartition, OffsetMetadataAndError]
metadata: Map[TopicAndPartition, OffsetAndMetadata],
versionId: Short = defaultConsumerApiVersion
): Either[Err, Map[TopicAndPartition, Short]] = {
var result = Map[TopicAndPartition, Short]()
val req = OffsetCommitRequest(groupId, metadata)
val req = OffsetCommitRequest(groupId, metadata, versionId)
val errs = new Err
val topicAndPartitions = metadata.keySet
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
val resp = consumer.commitOffsets(req)
val respMap = resp.requestInfo
val respMap = resp.commitStatus
val needed = topicAndPartitions.diff(result.keySet)
needed.foreach { tp: TopicAndPartition =>
respMap.get(tp).foreach { err: Short =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfterAll {
val ranges = rdd.get.asInstanceOf[HasOffsetRanges]
.offsetRanges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap

kc.setConsumerOffsets(kafkaParams("group.id"), ranges)
kc.setConsumerOffsets(kafkaParams("group.id"), ranges).fold(
err => throw new Exception(err.mkString("\n")),
_ => ()
)

// this is the "0 messages" case
val rdd2 = getRdd(kc, Set(topic))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
var bindSuccess: Boolean = false
while(!bindSuccess) {
try {
val brokerProps = getBrokerConfig()
brokerConf = new KafkaConfig(brokerProps)
brokerConf = new KafkaConfig(brokerConfig)
server = new KafkaServer(brokerConf)
server.startup()
logInfo("==================== Kafka Broker Started ====================")
Expand Down Expand Up @@ -137,7 +136,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
def createTopic(topic: String) {
AdminUtils.createTopic(zkClient, topic, 1, 1)
// wait until metadata is propagated
waitUntilMetadataIsPropagated(topic, 0)
waitUntilMetadataIsPropagated(Seq(server), topic, 0)
logInfo(s"==================== Topic $topic Created ====================")
}

Expand All @@ -147,13 +146,13 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
}

def sendMessages(topic: String, messages: Array[String]) {
producer = new Producer[String, String](new ProducerConfig(getProducerConfig()))
producer = new Producer[String, String](new ProducerConfig(producerConfig))
producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*)
producer.close()
logInfo(s"==================== Sent Messages: ${messages.mkString(", ")} ====================")
}

private def getBrokerConfig(): Properties = {
private def brokerConfig: Properties = {
val props = new Properties()
props.put("broker.id", "0")
props.put("host.name", "localhost")
Expand All @@ -165,21 +164,28 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
props
}

private def getProducerConfig(): Properties = {
private def producerConfig: Properties = {
val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
val props = new Properties()
props.put("metadata.broker.list", brokerAddr)
props.put("serializer.class", classOf[StringEncoder].getName)
props
}

private def waitUntilMetadataIsPropagated(topic: String, partition: Int) {
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
assert(
server.apis.metadataCache.containsTopicAndPartition(topic, partition),
s"Partition [$topic, $partition] metadata not propagated after timeout"
)
private def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int): Int = {
var leader: Int = -1
eventually(timeout(1000 milliseconds), interval(100 milliseconds)) {
assert(servers.forall { server =>
val partitionStateOpt = server.apis.metadataCache.getPartitionInfo(topic, partition)
partitionStateOpt match {
case Some(partitionState) =>
leader = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr.leader
leader >= 0 // is valid broker id
case _ => false
}
}, s"Partition [$topic, $partition] metadata not propagated after timeout")
}
leader
}

class EmbeddedZookeeper(val zkConnect: String) {
Expand Down