Skip to content

Commit 326ff3c

Browse files
committed
add some tests
1 parent 38bb727 commit 326ff3c

File tree

4 files changed

+86
-5
lines changed

4 files changed

+86
-5
lines changed

external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaCluster.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import kafka.consumer.{ConsumerConfig, SimpleConsumer}
3232
* NOT zookeeper servers, specified in host1:port1,host2:port2 form
3333
*/
3434
class KafkaCluster(val kafkaParams: Map[String, String]) {
35-
type Err = ArrayBuffer[Throwable]
35+
import KafkaCluster.Err
3636

3737
val seedBrokers: Array[(String, Int)] =
3838
kafkaParams.get("metadata.broker.list")
@@ -287,6 +287,8 @@ class KafkaCluster(val kafkaParams: Map[String, String]) {
287287
}
288288

289289
object KafkaCluster {
290+
type Err = ArrayBuffer[Throwable]
291+
290292
/** Make a consumer config without requiring group.id or zookeeper.connect,
291293
* since communicating with brokers also needs common settings such as timeout
292294
*/

external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,15 +79,15 @@ class KafkaRDD[
7979
override def compute(thePart: Partition, context: TaskContext) = {
8080
val part = thePart.asInstanceOf[KafkaRDDPartition]
8181
if (part.fromOffset >= part.untilOffset) {
82-
log.warn("Beginning offset is same or after ending offset" +
82+
log.warn("Beginning offset is same or after ending offset " +
8383
s"skipping ${part.topic} ${part.partition}")
8484
Iterator.empty
8585
} else {
8686
new NextIterator[R] {
8787
context.addTaskCompletionListener{ context => closeIfNeeded() }
8888

8989
val kc = new KafkaCluster(kafkaParams)
90-
log.info(s"Computing topic ${part.topic}, partition ${part.partition}" +
90+
log.info(s"Computing topic ${part.topic}, partition ${part.partition} " +
9191
s"offsets ${part.fromOffset} -> ${part.untilOffset}")
9292
val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
9393
.newInstance(kc.config.props)
@@ -97,7 +97,7 @@ class KafkaRDD[
9797
.asInstanceOf[Decoder[V]]
9898
val consumer: SimpleConsumer = kc.connectLeader(part.topic, part.partition).fold(
9999
errs => throw new Exception(
100-
s"Couldn't connect to leader for topic ${part.topic} ${part.partition}:" +
100+
s"Couldn't connect to leader for topic ${part.topic} ${part.partition}: " +
101101
errs.mkString("\n")),
102102
consumer => consumer
103103
)
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.rdd.kafka
19+
20+
import scala.util.Random
21+
22+
import kafka.serializer.StringDecoder
23+
import org.scalatest.BeforeAndAfter
24+
25+
import org.apache.spark._
26+
import org.apache.spark.SparkContext._
27+
import org.apache.spark.streaming.kafka.KafkaStreamSuiteBase
28+
29+
class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
30+
var sc: SparkContext = _
31+
before {
32+
setupKafka()
33+
}
34+
35+
after {
36+
if (sc != null) {
37+
sc.stop
38+
sc = null
39+
}
40+
tearDownKafka()
41+
}
42+
43+
test("Kafka RDD") {
44+
val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
45+
sc = new SparkContext(sparkConf)
46+
val topic = "topic1"
47+
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
48+
createTopic(topic)
49+
produceAndSendMessage(topic, sent)
50+
51+
val kafkaParams = Map("metadata.broker.list" -> s"localhost:$brokerPort",
52+
"group.id" -> s"test-consumer-${Random.nextInt(10000)}")
53+
54+
val kc = new KafkaCluster(kafkaParams)
55+
56+
val rdd = getRdd(kc, Set(topic))
57+
assert(rdd.isDefined)
58+
assert(rdd.get.countByValue.size === sent.size)
59+
60+
kc.setConsumerOffsets(kafkaParams("group.id"), rdd.get.untilOffsets)
61+
62+
val rdd2 = getRdd(kc, Set(topic))
63+
assert(rdd2.isDefined)
64+
assert(rdd2.get.count === 0)
65+
}
66+
67+
private def getRdd(kc: KafkaCluster, topics: Set[String]) = {
68+
val groupId = kc.kafkaParams("group.id")
69+
for {
70+
topicPartitions <- kc.getPartitions(topics).right.toOption
71+
from <- kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse(
72+
kc.getEarliestLeaderOffsets(topicPartitions).right.toOption)
73+
until <- kc.getLatestLeaderOffsets(topicPartitions).right.toOption
74+
} yield {
75+
new KafkaRDD[String, String, StringDecoder, StringDecoder, String](
76+
sc, kc.kafkaParams, from, until, mmd => mmd.message)
77+
}
78+
}
79+
}

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
5656
private val zkSessionTimeout = 6000
5757
private var zookeeper: EmbeddedZookeeper = _
5858
private var zkPort: Int = 0
59-
private var brokerPort = 9092
59+
protected var brokerPort = 9092
6060
private var brokerConf: KafkaConfig = _
6161
private var server: KafkaServer = _
6262
private var producer: Producer[String, String] = _

0 commit comments

Comments
 (0)