Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
SPARK-1096, a space after comment style checker.
  • Loading branch information
ScrapCodes committed Mar 27, 2014
commit 810a1d6ca32c2fa33a7e20df7077252bb09fe485
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class SparkEnv private[spark] (
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
// down, but let's call it anyway in case it gets fixed in a later release
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
//actorSystem.awaitTermination()
// actorSystem.awaitTermination()
}

private[spark]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ extends Logging {
private var initialized = false
private var conf: SparkConf = null
def initialize(_isDriver: Boolean, conf: SparkConf) {
TorrentBroadcast.conf = conf //TODO: we might have to fix it in tests
TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests
synchronized {
if (!initialized) {
initialized = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
// TODO: In Akka 2.1.x, ActorSystem.awaitTermination hangs when you have remote actors!
// This is unfortunate, but for now we just comment it out.
workerActorSystems.foreach(_.shutdown())
//workerActorSystems.foreach(_.awaitTermination())
// workerActorSystems.foreach(_.awaitTermination())
masterActorSystems.foreach(_.shutdown())
//masterActorSystems.foreach(_.awaitTermination())
// masterActorSystems.foreach(_.awaitTermination())
masterActorSystems.clear()
workerActorSystems.clear()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.deploy.master.MasterMessages.ElectedLeader
* [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]]
*/
private[spark] trait LeaderElectionAgent extends Actor {
//TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need refactoring.
// TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need refactoring.
val masterActor: ActorRef
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@ private[spark] class Executor(
// have left some weird state around depending on when the exception was thrown, but on
// the other hand, maybe we could detect that when future tasks fail and exit then.
logError("Exception in task ID " + taskId, t)
//System.exit(1)
}
} finally {
// TODO: Unregister shuffle memory only for ResultTask
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
}

def initialize() {
//Add default properties in case there's no properties file
// Add default properties in case there's no properties file
setDefaultProperties(properties)

// If spark.metrics.conf is not set, try to get file in class path
Expand Down
18 changes: 9 additions & 9 deletions core/src/main/scala/org/apache/spark/network/Connection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
channel.socket.setTcpNoDelay(true)
channel.socket.setReuseAddress(true)
channel.socket.setKeepAlive(true)
/*channel.socket.setReceiveBufferSize(32768) */
/* channel.socket.setReceiveBufferSize(32768) */

@volatile private var closed = false
var onCloseCallback: Connection => Unit = null
Expand Down Expand Up @@ -206,12 +206,12 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,

private class Outbox {
val messages = new Queue[Message]()
val defaultChunkSize = 65536 //32768 //16384
val defaultChunkSize = 65536
var nextMessageToBeUsed = 0

def addMessage(message: Message) {
messages.synchronized{
/*messages += message*/
/* messages += message*/
messages.enqueue(message)
logDebug("Added [" + message + "] to outbox for sending to " +
"[" + getRemoteConnectionManagerId() + "]")
Expand All @@ -221,8 +221,8 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
def getChunk(): Option[MessageChunk] = {
messages.synchronized {
while (!messages.isEmpty) {
/*nextMessageToBeUsed = nextMessageToBeUsed % messages.size */
/*val message = messages(nextMessageToBeUsed)*/
/* nextMessageToBeUsed = nextMessageToBeUsed % messages.size */
/* val message = messages(nextMessageToBeUsed)*/
val message = messages.dequeue
val chunk = message.getChunkForSending(defaultChunkSize)
if (chunk.isDefined) {
Expand Down Expand Up @@ -262,7 +262,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,

val currentBuffers = new ArrayBuffer[ByteBuffer]()

/*channel.socket.setSendBufferSize(256 * 1024)*/
/* channel.socket.setSendBufferSize(256 * 1024)*/

override def getRemoteAddress() = address

Expand Down Expand Up @@ -355,7 +355,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
}
case None => {
// changeConnectionKeyInterest(0)
/*key.interestOps(0)*/
/* key.interestOps(0)*/
return false
}
}
Expand Down Expand Up @@ -540,10 +540,10 @@ private[spark] class ReceivingConnection(
return false
}

/*logDebug("Read " + bytesRead + " bytes for the buffer")*/
/* logDebug("Read " + bytesRead + " bytes for the buffer")*/

if (currentChunk.buffer.remaining == 0) {
/*println("Filled buffer at " + System.currentTimeMillis)*/
/* println("Filled buffer at " + System.currentTimeMillis)*/
val bufferMessage = inbox.getMessageForChunk(currentChunk).get
if (bufferMessage.isCompletelyReceived) {
bufferMessage.flip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
}
}
handleMessageExecutor.execute(runnable)
/*handleMessage(connection, message)*/
/* handleMessage(connection, message)*/
}

private def handleClientAuthentication(
Expand Down Expand Up @@ -733,7 +733,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
logTrace("Sending Security [" + message + "] to [" + connManagerId + "]")
val connection = connectionsById.getOrElseUpdate(connManagerId, startNewConnection())

//send security message until going connection has been authenticated
// send security message until going connection has been authenticated
connection.send(message)

wakeupSelector()
Expand Down Expand Up @@ -859,14 +859,14 @@ private[spark] object ConnectionManager {
None
})

/*testSequentialSending(manager)*/
/*System.gc()*/
/* testSequentialSending(manager)*/
/* System.gc()*/

/*testParallelSending(manager)*/
/*System.gc()*/
/* testParallelSending(manager)*/
/* System.gc()*/

/*testParallelDecreasingSending(manager)*/
/*System.gc()*/
/* testParallelDecreasingSending(manager)*/
/* System.gc()*/

testContinuousSending(manager)
System.gc()
Expand Down Expand Up @@ -948,7 +948,7 @@ private[spark] object ConnectionManager {
val ms = finishTime - startTime
val tput = mb * 1000.0 / ms
println("--------------------------")
/*println("Started at " + startTime + ", finished at " + finishTime) */
/* println("Started at " + startTime + ", finished at " + finishTime) */
println("Sent " + mb + " MB in " + ms + " ms (" + tput + " MB/s)")
println("--------------------------")
println()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ private[spark] object ConnectionManagerTest extends Logging{
val slaves = slavesFile.mkString.split("\n")
slavesFile.close()

/*println("Slaves")*/
/*slaves.foreach(println)*/
/* println("Slaves")*/
/* slaves.foreach(println)*/
val tasknum = if (args.length > 2) args(2).toInt else slaves.length
val size = ( if (args.length > 3) (args(3).toInt) else 10 ) * 1024 * 1024
val count = if (args.length > 4) args(4).toInt else 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ private[spark] object ReceiverTest {
println("Started connection manager with id = " + manager.id)

manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
/*println("Received [" + msg + "] from [" + id + "] at " + System.currentTimeMillis)*/
/* println("Received [" + msg + "] from [" + id + "] at " + System.currentTimeMillis)*/
val buffer = ByteBuffer.wrap("response".getBytes)
Some(Message.createBufferMessage(buffer, msg.id))
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[spark] object SenderTest {
(0 until count).foreach(i => {
val dataMessage = Message.createBufferMessage(buffer.duplicate)
val startTime = System.currentTimeMillis
/*println("Started timer at " + startTime)*/
/* println("Started timer at " + startTime)*/
val responseStr = manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage)
.map { response =>
val buffer = response.asInstanceOf[BufferMessage].buffers(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[spark] class FileHeader (
buf.writeInt(fileLen)
buf.writeInt(blockId.name.length)
blockId.name.foreach((x: Char) => buf.writeByte(x))
//padding the rest of header
// padding the rest of header
if (FileHeader.HEADER_SIZE - buf.readableBytes > 0 ) {
buf.writeZero(FileHeader.HEADER_SIZE - buf.readableBytes)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ class DAGScheduler(
val properties = if (stageIdToActiveJob.contains(jobId)) {
stageIdToActiveJob(stage.jobId).properties
} else {
//this stage will be assigned to "default" pool
// this stage will be assigned to "default" pool
null
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
properties += ((key, value))
}
}
//TODO (prashant) send conf instead of properties
// TODO (prashant) send conf instead of properties
driverActor = actorSystem.actorOf(
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ object BlockFetcherIterator {
}
} catch {
case x: InterruptedException => logInfo("Copier Interrupted")
//case _ => throw new SparkException("Exception Throw in Shuffle Copier")
// case _ => throw new SparkException("Exception Throw in Shuffle Copier")
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private[spark] object ClosureCleaner extends Logging {
accessedFields(cls) = Set[String]()
for (cls <- func.getClass :: innerClasses)
getClassReader(cls).accept(new FieldAccessFinder(accessedFields), 0)
//logInfo("accessedFields: " + accessedFields)
// logInfo("accessedFields: " + accessedFields)

val inInterpreter = {
try {
Expand All @@ -139,21 +139,21 @@ private[spark] object ClosureCleaner extends Logging {
val field = cls.getDeclaredField(fieldName)
field.setAccessible(true)
val value = field.get(obj)
//logInfo("1: Setting " + fieldName + " on " + cls + " to " + value);
// logInfo("1: Setting " + fieldName + " on " + cls + " to " + value);
field.set(outer, value)
}
}

if (outer != null) {
//logInfo("2: Setting $outer on " + func.getClass + " to " + outer);
// logInfo("2: Setting $outer on " + func.getClass + " to " + outer);
val field = func.getClass.getDeclaredField("$outer")
field.setAccessible(true)
field.set(func, outer)
}
}

private def instantiateClass(cls: Class[_], outer: AnyRef, inInterpreter: Boolean): AnyRef = {
//logInfo("Creating a " + cls + " with outer = " + outer)
// logInfo("Creating a " + cls + " with outer = " + outer)
if (!inInterpreter) {
// This is a bona fide closure class, whose constructor has no effects
// other than to set its fields, so use its constructor
Expand All @@ -170,7 +170,7 @@ private[spark] object ClosureCleaner extends Logging {
val newCtor = rf.newConstructorForSerialization(cls, parentCtor)
val obj = newCtor.newInstance().asInstanceOf[AnyRef]
if (outer != null) {
//logInfo("3: Setting $outer on " + cls + " to " + outer);
// logInfo("3: Setting $outer on " + cls + " to " + outer);
val field = cls.getDeclaredField("$outer")
field.setAccessible(true)
field.set(obj, outer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private[akka] class IndestructibleActorSystemImpl(
if (isFatalError(cause) && !settings.JvmExitOnFatalError) {
log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " +
"ActorSystem [{}] tolerating and continuing.... ", thread.getName, name)
//shutdown() //TODO make it configurable
// shutdown() //TODO make it configurable
} else {
fallbackHandler.uncaughtException(thread, cause)
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/util/MutablePair.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ package org.apache.spark.util
* @param _1 Element 1 of this MutablePair
* @param _2 Element 2 of this MutablePair
*/
case class MutablePair[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T1,
@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T2]
case class MutablePair[@specialized(Int, Long, Double, Char, Boolean/* , AnyRef*/) T1,
@specialized(Int, Long, Double, Char, Boolean/* , AnyRef*/) T2]
(var _1: T1, var _2: T2)
extends Product2[T1, T2]
{
Expand Down
6 changes: 3 additions & 3 deletions core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte

test ("add value to collection accumulators") {
val maxI = 1000
for (nThreads <- List(1, 10)) { //test single & multi-threaded
for (nThreads <- List(1, 10)) { // test single & multi-threaded
sc = new SparkContext("local[" + nThreads + "]", "test")
val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
val d = sc.parallelize(1 to maxI)
Expand All @@ -83,7 +83,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte

test ("value not readable in tasks") {
val maxI = 1000
for (nThreads <- List(1, 10)) { //test single & multi-threaded
for (nThreads <- List(1, 10)) { // test single & multi-threaded
sc = new SparkContext("local[" + nThreads + "]", "test")
val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
val d = sc.parallelize(1 to maxI)
Expand Down Expand Up @@ -124,7 +124,7 @@ class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkConte

test ("localValue readable in tasks") {
val maxI = 1000
for (nThreads <- List(1, 10)) { //test single & multi-threaded
for (nThreads <- List(1, 10)) { // test single & multi-threaded
sc = new SparkContext("local[" + nThreads + "]", "test")
val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
val groupedInts = (1 to (maxI/20)).map {x => (20 * (x - 1) to 20 * x).toSet}
Expand Down
1 change: 0 additions & 1 deletion core/src/test/scala/org/apache/spark/CheckpointSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,6 @@ object CheckpointSuite {
// This is a custom cogroup function that does not use mapValues like
// the PairRDDFunctions.cogroup()
def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner) = {
//println("First = " + first + ", second = " + second)
new CoGroupedRDD[K](
Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]),
part
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet

assert(intercept[SparkException]{ arrs.distinct() }.getMessage.contains("array"))
// We can't catch all usages of arrays, since they might occur inside other collections:
//assert(fails { arrPairs.distinct() })
// assert(fails { arrPairs.distinct() })
assert(intercept[SparkException]{ arrPairs.partitionBy(new HashPartitioner(2)) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.join(arrPairs) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.leftOuterJoin(arrPairs) }.getMessage.contains("array"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
val listener = new SaveStageAndTaskInfo
sc.addSparkListener(listener)
sc.addSparkListener(new StatsReportListener)
//just to make sure some of the tasks take a noticeable amount of time
// just to make sure some of the tasks take a noticeable amount of time
val w = {i:Int =>
if (i == 0)
Thread.sleep(100)
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class UtilsSuite extends FunSuite {
}

test("copyStream") {
//input array initialization
// input array initialization
val bytes = Array.ofDim[Byte](9000)
Random.nextBytes(bytes)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ object LocalALS {
for (i <- 0 until M; j <- 0 until U) {
r.set(i, j, blas.ddot(ms(i), us(j)))
}
//println("R: " + r)
blas.daxpy(-1, targetR, r)
val sumSqs = r.aggregate(Functions.plus, Functions.square)
sqrt(sumSqs / (M * U))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ object SimpleSkewedGroupByTest {

println("RESULT: " + pairs1.groupByKey(numReducers).count)
// Print how many keys each reducer got (for debugging)
//println("RESULT: " + pairs1.groupByKey(numReducers)
// println("RESULT: " + pairs1.groupByKey(numReducers)
// .map{case (k,v) => (k, v.size)}
// .collectAsMap)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ object SparkALS {
for (i <- 0 until M; j <- 0 until U) {
r.set(i, j, blas.ddot(ms(i), us(j)))
}
//println("R: " + r)
blas.daxpy(-1, targetR, r)
val sumSqs = r.aggregate(Functions.plus, Functions.square)
sqrt(sumSqs / (M * U))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ object SparkHdfsLR {
case class DataPoint(x: Vector, y: Double)

def parsePoint(line: String): DataPoint = {
//val nums = line.split(' ').map(_.toDouble)
//return DataPoint(new Vector(nums.slice(1, D+1)), nums(0))
val tok = new java.util.StringTokenizer(line, " ")
var y = tok.nextToken.toDouble
var x = new Array[Double](D)
Expand Down
Loading