Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
update
  • Loading branch information
uncleGen committed Jan 18, 2017
commit e51623c007b9faf2ba4fe7c92ad138b0c9c2a8c1
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()

val inputStreamNameAndID = new ArrayBuffer[(String, Int)]()
@volatile private var inputStreamNameAndID: Seq[(String, Int)] = Nil

var rememberDuration: Duration = null
var checkpointInProgress = false

var zeroTime: Time = null
var startTime: Time = null
var batchDuration: Duration = null
var numReceivers: Int = 0
@volatile private var numReceivers: Int = 0

def start(time: Time) {
this.synchronized {
Expand All @@ -50,7 +50,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
outputStreams.foreach(_.remember(rememberDuration))
outputStreams.foreach(_.validateAtStart())
numReceivers = inputStreams.count(_.isInstanceOf[ReceiverInputDStream[_]])
inputStreams.foreach(is => inputStreamNameAndID.+=((is.name, is.id)))
inputStreamNameAndID = inputStreams.map(is => (is.name, is.id))
inputStreams.par.foreach(_.start())
}
}
Expand Down Expand Up @@ -111,9 +111,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
.toArray
}

def getReceiverNumber: Int = numReceivers
def getNumReceivers: Int = numReceivers

def getInputStreamNameAndID: ArrayBuffer[(String, Int)] = inputStreamNameAndID
def getInputStreamNameAndID: Seq[(String, Int)] = inputStreamNameAndID

def generateJobs(time: Time): Seq[Job] = {
logDebug("Generating jobs for time " + time)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext)
}

def numInactiveReceivers: Int = {
ssc.graph.getReceiverNumber - numActiveReceivers
ssc.graph.getNumReceivers - numActiveReceivers
}

def numTotalCompletedBatches: Long = synchronized {
Expand Down Expand Up @@ -197,7 +197,7 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext)
}

def retainedCompletedBatches: Seq[BatchUIData] = synchronized {
completedBatchUIData
completedBatchUIData.toIndexedSeq
}

def streamName(streamId: Int): Option[String] = {
Expand Down