Skip to content

Commit 803aa2c

Browse files
committed
[SPARK-2808][Streaming][Kafka] code cleanup per TD
1 parent e6dfaf6 commit 803aa2c

File tree

4 files changed

+17
-20
lines changed

4 files changed

+17
-20
lines changed

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,6 @@ private[spark]
3838
class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
3939
import KafkaCluster.{Err, LeaderOffset, SimpleConsumerConfig}
4040

41-
/** Constructor that takes a Java map */
42-
def this(kafkaParams: java.util.Map[String, String]) {
43-
this(kafkaParams.asScala.toMap)
44-
}
45-
4641
// ConsumerConfig isn't serializable
4742
@transient private var _config: SimpleConsumerConfig = null
4843

@@ -227,7 +222,7 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
227222
// scalastyle:on
228223

229224
// this 0 here indicates api version, in this case the original ZK backed api.
230-
def defaultConsumerApiVersion: Short = 0
225+
private def defaultConsumerApiVersion: Short = 0
231226

232227
/** Requires Kafka >= 0.8.1.1 */
233228
def getConsumerOffsets(

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ private class KafkaTestUtils extends Logging {
229229
tryAgain(1)
230230
}
231231

232-
/** wait until the leader offset for the given topic/partition equals the specified offset */
232+
/** Wait until the leader offset for the given topic/partition equals the specified offset */
233233
def waitUntilLeaderOffset(
234234
topic: String,
235235
partition: Int,
@@ -245,20 +245,19 @@ private class KafkaTestUtils extends Logging {
245245
}
246246

247247
private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
248+
def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
249+
case Some(partitionState) =>
250+
val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
251+
252+
ZkUtils.getLeaderForPartition(zkClient, topic, partition).isDefined &&
253+
Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
254+
leaderAndInSyncReplicas.isr.size >= 1
255+
256+
case _ =>
257+
false
258+
}
248259
eventually(Time(10000), Time(100)) {
249-
assert(
250-
server.apis.metadataCache.getPartitionInfo(topic, partition) match {
251-
case Some(partitionState) =>
252-
val leaderAndIsr = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
253-
ZkUtils.getLeaderForPartition(zkClient, topic, partition).isDefined &&
254-
Request.isValidBrokerId(leaderAndIsr.leader) &&
255-
leaderAndIsr.isr.size >= 1
256-
257-
case _ =>
258-
false
259-
},
260-
s"Partition [$topic, $partition] metadata not propagated after timeout"
261-
)
260+
assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
262261
}
263262
}
264263

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll {
101101

102102
val rangesMap = ranges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap
103103

104+
// make sure consumer offsets are committed before the next getRdd call
104105
kc.setConsumerOffsets(kafkaParams("group.id"), rangesMap).fold(
105106
err => throw new Exception(err.mkString("\n")),
106107
_ => ()

python/pyspark/streaming/tests.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,7 @@ def test_kafka_direct_stream(self):
631631

632632
self._kafkaTestUtils.createTopic(topic)
633633
self._kafkaTestUtils.sendMessages(topic, sendData)
634+
self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
634635

635636
stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams)
636637
self._validateStreamResult(sendData, stream)
@@ -645,6 +646,7 @@ def test_kafka_direct_stream_from_offset(self):
645646

646647
self._kafkaTestUtils.createTopic(topic)
647648
self._kafkaTestUtils.sendMessages(topic, sendData)
649+
self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
648650

649651
stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, fromOffsets)
650652
self._validateStreamResult(sendData, stream)

0 commit comments

Comments
 (0)