Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update
  • Loading branch information
uncleGen committed Jan 17, 2017
commit eaa7b15f19711b27e628cfe366fa819a46d0e450
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()

val inputStreamNameAndID = new ArrayBuffer[(String, Int)]()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: change it to @volatile private var inputStreamNameAndID: Seq[(String, Int)] = Nil and just set it in start. Don't expose a mutable ArrayBuffer to the caller.


var rememberDuration: Duration = null
var checkpointInProgress = false

var zeroTime: Time = null
var startTime: Time = null
var batchDuration: Duration = null
var numReceivers: Int = 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add @volatile private


def start(time: Time) {
this.synchronized {
Expand All @@ -45,7 +48,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
startTime = time
outputStreams.foreach(_.initialize(zeroTime))
outputStreams.foreach(_.remember(rememberDuration))
outputStreams.foreach(_.validateAtStart)
outputStreams.foreach(_.validateAtStart())
numReceivers = inputStreams.count(_.isInstanceOf[ReceiverInputDStream[_]])
inputStreams.foreach(is => inputStreamNameAndID.+=((is.name, is.id)))
inputStreams.par.foreach(_.start())
}
}
Expand Down Expand Up @@ -106,16 +111,18 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
.toArray
}

def getInputStreamName(streamId: Int): Option[String] = synchronized {
inputStreams.find(_.id == streamId).map(_.name)
}
def getReceiverNumber: Int = numReceivers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit getNumReceivers for consistence.


def getInputStreamNameAndID: ArrayBuffer[(String, Int)] = inputStreamNameAndID

def generateJobs(time: Time): Seq[Job] = {
logDebug("Generating jobs for time " + time)
val jobs = getOutputStreams().flatMap { outputStream =>
val jobOption = outputStream.generateJob(time)
jobOption.foreach(_.setCallSite(outputStream.creationSite))
jobOption
val jobs = this.synchronized {
outputStreams.flatMap { outputStream =>
val jobOption = outputStream.generateJob(time)
jobOption.foreach(_.setCallSite(outputStream.creationSite))
jobOption
}
}
logDebug("Generated " + jobs.length + " jobs for time " + time)
jobs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext)
}

def numInactiveReceivers: Int = {
ssc.graph.getReceiverInputStreams().length - numActiveReceivers
ssc.graph.getReceiverNumber - numActiveReceivers
}

def numTotalCompletedBatches: Long = synchronized {
Expand Down Expand Up @@ -197,17 +197,17 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext)
}

def retainedCompletedBatches: Seq[BatchUIData] = synchronized {
completedBatchUIData.toSeq
completedBatchUIData
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you change toSeq to toIndexedSeq to avoid exposing a mutable collection.

}

def streamName(streamId: Int): Option[String] = {
ssc.graph.getInputStreamName(streamId)
ssc.graph.getInputStreamNameAndID.find(_._2 == streamId).map(_._1)
}

/**
* Return all InputDStream Ids
*/
def streamIds: Seq[Int] = ssc.graph.getInputStreams().map(_.id)
def streamIds: Seq[Int] = ssc.graph.getInputStreamNameAndID.map(_._2)

/**
* Return all of the record rates for each InputDStream in each batch. The key of the return value
Expand Down