Skip to content

Commit d4a7cf7

Browse files
committed
[SPARK-4964] allow for use cases that need to override compute for custom kafka dstreams
1 parent c1bd6d9 commit d4a7cf7

File tree

1 file changed

+5
-5
lines changed

1 file changed

+5
-5
lines changed

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ class DeterministicKafkaInputDStream[
6666

6767
protected[streaming] override val checkpointData = new DeterministicKafkaInputDStreamCheckpointData
6868

69-
private val kc = new KafkaCluster(kafkaParams)
69+
protected val kc = new KafkaCluster(kafkaParams)
7070

71-
private val maxMessagesPerPartition: Option[Long] = {
71+
protected val maxMessagesPerPartition: Option[Long] = {
7272
val ratePerSec = context.sparkContext.getConf.getInt("spark.streaming.receiver.maxRate", 0)
7373
if (ratePerSec > 0) {
7474
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
@@ -78,10 +78,10 @@ class DeterministicKafkaInputDStream[
7878
}
7979
}
8080

81-
private var currentOffsets = fromOffsets
81+
protected var currentOffsets = fromOffsets
8282

8383
@tailrec
84-
private def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
84+
protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
8585
val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
8686
// Either.fold would confuse @tailrec, do it manually
8787
if (o.isLeft) {
@@ -98,7 +98,7 @@ class DeterministicKafkaInputDStream[
9898
}
9999
}
100100

101-
private def clamp(
101+
protected def clamp(
102102
leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = {
103103
maxMessagesPerPartition.map { mmp =>
104104
leaderOffsets.map { case (tp, lo) =>

0 commit comments

Comments
 (0)