Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,26 +70,26 @@ class KinesisBackedBlockRDDPartition(
*/
private[kinesis]
class KinesisBackedBlockRDD[T: ClassTag](
@transient sc: SparkContext,
sc: SparkContext,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This matches the change made to c1bc4f4#diff-3fdb38876be7c77f4568be49edba4c02L38

val regionName: String,
val endpointUrl: String,
@transient blockIds: Array[BlockId],
@transient private val _blockIds: Array[BlockId],
@transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges],
@transient isBlockIdValid: Array[Boolean] = Array.empty,
@transient private val isBlockIdValid: Array[Boolean] = Array.empty,
val retryTimeoutMs: Int = 10000,
val messageHandler: Record => T = KinesisUtils.defaultMessageHandler _,
val awsCredentialsOption: Option[SerializableAWSCredentials] = None
) extends BlockRDD[T](sc, blockIds) {
) extends BlockRDD[T](sc, _blockIds) {

require(blockIds.length == arrayOfseqNumberRanges.length,
require(_blockIds.length == arrayOfseqNumberRanges.length,
"Number of blockIds is not equal to the number of sequence number ranges")

override def isValid(): Boolean = true

override def getPartitions: Array[Partition] = {
Array.tabulate(blockIds.length) { i =>
Array.tabulate(_blockIds.length) { i =>
val isValid = if (isBlockIdValid.length == 0) true else isBlockIdValid(i)
new KinesisBackedBlockRDDPartition(i, blockIds(i), isValid, arrayOfseqNumberRanges(i))
new KinesisBackedBlockRDDPartition(i, _blockIds(i), isValid, arrayOfseqNumberRanges(i))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
import org.apache.spark.streaming.{Duration, StreamingContext, Time}

private[kinesis] class KinesisInputDStream[T: ClassTag](
@transient _ssc: StreamingContext,
_ssc: StreamingContext,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this matches the change that was made to FlumePollingInputDStream: c1bc4f4#diff-0a2347be859b78b562e3e8e8e17a1f1fL49

streamName: String,
endpointUrl: String,
regionName: String,
Expand Down