Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-22830] [Improvements] Requested changes addressed
  • Loading branch information
chetkhatri committed Dec 19, 2017
commit 8e0e4ee3ffacc7ebdf3d5bc1bcd73674288efa3e
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ object BroadcastTest {
val arr1 = (0 until num).toArray

for (i <- 0 until 3) {
println(s"Iteration ${i}")
println(s"Iteration $i")
println("===========")
val startTime = System.nanoTime
val barr1 = sc.broadcast(arr1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,22 +95,22 @@ object DFSReadWriteTest {
def main(args: Array[String]): Unit = {
parseArgs(args)

println(s"Performing local word count")
println("Performing local word count")
val fileContents = readFile(localFilePath.toString())
val localWordCount = runLocalWordCount(fileContents)

println(s"Creating SparkSession")
println("Creating SparkSession")
val spark = SparkSession
.builder
.appName("DFS Read Write Test")
.getOrCreate()

println(s"Writing local file to DFS")
println("Writing local file to DFS")
val dfsFilename = s"${dfsDirPath}/dfs_read_write_test"
val fileRDD = spark.sparkContext.parallelize(fileContents)
fileRDD.saveAsTextFile(dfsFilename)

println(s"Reading file from DFS and running Word Count")
println("Reading file from DFS and running Word Count")
val readFileRDD = spark.sparkContext.textFile(dfsFilename)

val dfsWordCount = readFileRDD
Expand All @@ -125,11 +125,11 @@ object DFSReadWriteTest {
spark.stop()

if (localWordCount == dfsWordCount) {
println(s"Success! Local Word Count ($localWordCount)
and DFS Word Count ($dfsWordCount) agree.")
println(s"Success! Local Word Count $localWordCount and " +
s"DFS Word Count $dfsWordCount agree.")
} else {
println(s"Failure! Local Word Count ($localWordCount)
and DFS Word Count ($dfsWordCount) disagree.")
println(s"Failure! Local Word Count $localWordCount " +
s"and DFS Word Count $dfsWordCount disagree.")
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ import org.apache.spark.util.Utils
object DriverSubmissionTest {
def main(args: Array[String]) {
if (args.length < 1) {
println(s"Usage: DriverSubmissionTest <seconds-to-sleep>")
println("Usage: DriverSubmissionTest <seconds-to-sleep>")
System.exit(0)
}
val numSecondsToSleep = args(0).toInt

val env = System.getenv()
val properties = Utils.getSystemProperties

println(s"Environment variables containing SPARK_TEST:")
println("Environment variables containing SPARK_TEST:")
env.asScala.filter { case (k, _) => k.contains("SPARK_TEST")}.foreach(println)

println(s"System properties containing spark.test:")
println("System properties containing spark.test:")
properties.filter { case (k, _) => k.toString.contains("spark.test") }.foreach(println)

for (i <- 1 until numSecondsToSleep) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ object HdfsTest {
/** Usage: HdfsTest [file] */
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println(s"Usage: HdfsTest <file>")
System.err.println("Usage: HdfsTest <file>")
System.exit(1)
}
val spark = SparkSession
Expand All @@ -39,7 +39,7 @@ object HdfsTest {
val start = System.currentTimeMillis()
for (x <- mapped) { x + 2 }
val end = System.currentTimeMillis()
println(s"Iteration ${iter} took ${(end-start)} ${ms}")
println(s"Iteration ${iter} took ${(end-start)} ms")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just write as $iter and $end-start

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon $end-start won't work, both are different variables see. I made changes.

}
spark.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ object LocalALS {
F = f.toInt
ITERATIONS = iters.toInt
case _ =>
System.err.println(s"Usage: LocalALS <M> <U> <F> <iters>")
System.err.println("Usage: LocalALS <M> <U> <F> <iters>")
System.exit(1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ object LocalFileLR {

// Initialize w to a random value
val w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
println(s"Initial w: ${w}")
println(s"Initial w: $w")

for (i <- 1 to ITERATIONS) {
println(s"On iteration ${i}")
println(s"On iteration $i")
val gradient = DenseVector.zeros[Double](D)
for (p <- points) {
val scale = (1 / (1 + math.exp(-p.y * (w.dot(p.x)))) - 1) * p.y
Expand All @@ -71,7 +71,7 @@ object LocalFileLR {
}

fileSrc.close()
println(s"Final w: ${w}")
println(s"Final w: $w")
}
}
// scalastyle:on println
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ object LocalKMeans {
kPoints.put(i, iter.next())
}

println(s"Initial centers: ${kPoints}")
println(s"Initial centers: $kPoints")

while(tempDist > convergeDist) {
val closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
Expand All @@ -114,7 +114,7 @@ object LocalKMeans {
}
}

println(s"Final centers: ${kPoints}")
println(s"Final centers: $kPoints")
}
}
// scalastyle:on println
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ object LocalLR {
val data = generateData
// Initialize w to a random value
val w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
println(s"Initial w: ${w}")
println(s"Initial w: $w")

for (i <- 1 to ITERATIONS) {
println(s"On iteration ${i}")
println(s"On iteration $i")
val gradient = DenseVector.zeros[Double](D)
for (p <- data) {
val scale = (1 / (1 + math.exp(-p.y * (w.dot(p.x)))) - 1) * p.y
Expand All @@ -73,7 +73,7 @@ object LocalLR {
w -= gradient
}

println(s"Final w: ${w}")
println(s"Final w: $w")
}
}
// scalastyle:on println
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ object SparkHdfsLR {
def main(args: Array[String]) {

if (args.length < 2) {
System.err.println(s"Usage: SparkHdfsLR <file> <iters>")
System.err.println("Usage: SparkHdfsLR <file> <iters>")
System.exit(1)
}

Expand All @@ -79,17 +79,17 @@ object SparkHdfsLR {

// Initialize w to a random value
val w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
println(s"Initial w: ${w}")
println(s"Initial w: $w")

for (i <- 1 to ITERATIONS) {
println(s"On iteration ${i}")
println(s"On iteration $i")
val gradient = points.map { p =>
p.x * (1 / (1 + exp(-p.y * (w.dot(p.x)))) - 1) * p.y
}.reduce(_ + _)
w -= gradient
}

println(s"Final w: ${w}")
println(s"Final w: $w")
spark.stop()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ object SparkKMeans {
def main(args: Array[String]) {

if (args.length < 3) {
System.err.println(s"Usage: SparkKMeans <file> <k> <convergeDist>")
System.err.println("Usage: SparkKMeans <file> <k> <convergeDist>")
System.exit(1)
}

Expand Down Expand Up @@ -95,10 +95,10 @@ object SparkKMeans {
for (newP <- newPoints) {
kPoints(newP._1) = newP._2
}
println(s"Finished iteration (delta = ${tempDist})")
println(s"Finished iteration (delta = $tempDist)")
}

println(s"Final centers:")
println("Final centers:")
kPoints.foreach(println)
spark.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,17 @@ object SparkLR {

// Initialize w to a random value
val w = DenseVector.fill(D) {2 * rand.nextDouble - 1}
println(s"Initial w: ${w}")
println(s"Initial w: $w")

for (i <- 1 to ITERATIONS) {
println(s"On iteration ${i}")
println(s"On iteration $i")
val gradient = points.map { p =>
p.x * (1 / (1 + exp(-p.y * (w.dot(p.x)))) - 1) * p.y
}.reduce(_ + _)
w -= gradient
}

println(s"Final w: ${w}")
println(s"Final w: $w")

spark.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object SparkPageRank {

def main(args: Array[String]) {
if (args.length < 1) {
System.err.println(s"Usage: SparkPageRank <file> <iter>")
System.err.println("Usage: SparkPageRank <file> <iter>")
System.exit(1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object Analytics extends Logging {
val optionsList = args.drop(2).map { arg =>
arg.dropWhile(_ == '-').split('=') match {
case Array(opt, v) => (opt -> v)
case _ => throw new IllegalArgumentException(s"Invalid argument: ${arg}")
case _ => throw new IllegalArgumentException(s"Invalid argument: $arg")
}
}
val options = mutable.Map(optionsList: _*)
Expand All @@ -77,14 +77,14 @@ object Analytics extends Logging {
val numIterOpt = options.remove("numIter").map(_.toInt)

options.foreach {
case (opt, _) => throw new IllegalArgumentException(s"Invalid option: ${opt}")
case (opt, _) => throw new IllegalArgumentException(s"Invalid option: $opt")
}

println("======================================")
println("| PageRank |")
println("======================================")

val sc = new SparkContext(conf.setAppName(s"PageRank(${fname})"))
val sc = new SparkContext(conf.setAppName(s"PageRank($fname)"))

val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
numEdgePartitions = numEPart,
Expand All @@ -103,22 +103,22 @@ object Analytics extends Logging {
println(s"GRAPHX: Total rank: ${pr.map(_._2).reduce(_ + _)}")

if (!outFname.isEmpty) {
logWarning(s"Saving pageranks of pages to ${outFname}")
logWarning(s"Saving pageranks of pages to $outFname")
pr.map { case (id, r) => id + "\t" + r }.saveAsTextFile(outFname)
}

sc.stop()

case "cc" =>
options.foreach {
case (opt, _) => throw new IllegalArgumentException(s"Invalid option: ${opt}")
case (opt, _) => throw new IllegalArgumentException(s"Invalid option: $opt")
}

println("======================================")
println("| Connected Components |")
println("======================================")

val sc = new SparkContext(conf.setAppName(s"ConnectedComponents(${fname})"))
val sc = new SparkContext(conf.setAppName(s"ConnectedComponents($fname)"))
val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
numEdgePartitions = numEPart,
edgeStorageLevel = edgeStorageLevel,
Expand All @@ -131,14 +131,14 @@ object Analytics extends Logging {

case "triangles" =>
options.foreach {
case (opt, _) => throw new IllegalArgumentException(s"Invalid option: ${opt}")
case (opt, _) => throw new IllegalArgumentException(s"Invalid option: $opt")
}

println("======================================")
println("| Triangle Count |")
println("======================================")

val sc = new SparkContext(conf.setAppName(s"TriangleCount(${fname})"))
val sc = new SparkContext(conf.setAppName(s"TriangleCount($fname)"))
val graph = GraphLoader.edgeListFile(sc, fname,
canonicalOrientation = true,
numEdgePartitions = numEPart,
Expand Down