Skip to content

Commit 677dc8d

Browse files
committed
Start fixing a lot of long lines
1 parent 5d1895e commit 677dc8d

File tree

15 files changed

+191
-99
lines changed

15 files changed

+191
-99
lines changed

src/main/scala/com/high-performance-spark-examples/dataframe/HappyPandas.scala

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
/**
2-
* Happy Panda Example for DataFrames. Computes the % of happy pandas. Very contrived.
2+
* Happy Panda Example for DataFrames. This computes the % of happy pandas and
3+
* is a very contrived example (sorry!).
34
*/
45
package com.highperformancespark.examples.dataframe
56

@@ -30,7 +31,8 @@ object HappyPandas {
3031
val session = SparkSession.builder()
3132
.enableHiveSupport()
3233
.getOrCreate()
33-
// Import the implicits, unlike in core Spark the implicits are defined on the context
34+
// Import the implicits, unlike in core Spark the implicits are defined
35+
// on the context.
3436
import session.implicits._
3537
//end::createSparkSession[]
3638
session
@@ -42,7 +44,8 @@ object HappyPandas {
4244
def sqlContext(sc: SparkContext): SQLContext = {
4345
//tag::createSQLContext[]
4446
val sqlContext = new SQLContext(sc)
45-
// Import the implicits, unlike in core Spark the implicits are defined on the context
47+
// Import the implicits, unlike in core Spark the implicits are defined
48+
// on the context.
4649
import sqlContext.implicits._
4750
//end::createSQLContext[]
4851
sqlContext
@@ -54,7 +57,8 @@ object HappyPandas {
5457
def hiveContext(sc: SparkContext): HiveContext = {
5558
//tag::createHiveContext[]
5659
val hiveContext = new HiveContext(sc)
57-
// Import the implicits, unlike in core Spark the implicits are defined on the context
60+
// Import the implicits, unlike in core Spark the implicits are defined
61+
// on the context.
5862
import hiveContext.implicits._
5963
//end::createHiveContext[]
6064
hiveContext
@@ -63,7 +67,8 @@ object HappyPandas {
6367
/**
6468
* Illustrate loading some JSON data.
6569
*/
66-
def loadDataSimple(sc: SparkContext, session: SparkSession, path: String): DataFrame = {
70+
def loadDataSimple(sc: SparkContext, session: SparkSession, path: String):
71+
DataFrame = {
6772
//tag::loadPandaJSONSimple[]
6873
val df1 = session.read.json(path)
6974
//end::loadPandaJSONSimple[]
@@ -94,7 +99,11 @@ object HappyPandas {
9499
* @param happyPandas number of happy pandas in this place
95100
* @param totalPandas total number of pandas in this place
96101
*/
97-
case class PandaInfo(place: String, pandaType: String, happyPandas: Integer, totalPandas: Integer)
102+
case class PandaInfo(
103+
place: String,
104+
pandaType: String,
105+
happyPandas: Integer,
106+
totalPandas: Integer)
98107

99108
/**
100109
* Gets the percentage of happy pandas per place.
@@ -103,7 +112,10 @@ object HappyPandas {
103112
* @return Returns DataFrame of (place, percentage of happy pandas)
104113
*/
105114
def happyPandasPercentage(pandaInfo: DataFrame): DataFrame = {
106-
pandaInfo.select(pandaInfo("place"), (pandaInfo("happyPandas") / pandaInfo("totalPandas")).as("percentHappy"))
115+
pandaInfo.select(
116+
pandaInfo("place"),
117+
(pandaInfo("happyPandas") / pandaInfo("totalPandas")).as("percentHappy")
118+
)
107119
}
108120

109121
//tag::encodePandaType[]

src/main/scala/com/high-performance-spark-examples/errors/throws.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ object Throws {
77
def throwInner(sc: SparkContext) = {
88
//tag::throwInner1[]
99
val data = sc.parallelize(List(1, 2, 3))
10-
val transform1 = data.map(x => x/0) // Will throw an exception when forced to evaluate
10+
// Will throw an exception when forced to evaluate
11+
val transform1 = data.map(x => x/0)
1112
val transform2 = transform1.map(x => x + 1)
1213
transform2.collect() // Forces evaluation
1314
//end::throwInner1[]
@@ -17,7 +18,8 @@ object Throws {
1718
//tag::throwOuter1[]
1819
val data = sc.parallelize(List(1, 2, 3))
1920
val transform1 = data.map(x => x + 1)
20-
val transform2 = transform1.map(x => x/0) // Will throw an exception when forced to evaluate
21+
// Will throw an exception when forced to evaluate
22+
val transform2 = transform1.map(x => x/0)
2123
transform2.collect() // Forces evaluation
2224
//end::throwOuter1[]
2325
}
@@ -35,15 +37,17 @@ object Throws {
3537
//tag::badEx3[]
3638
def throwInner2(sc: SparkContext) = {
3739
val data = sc.parallelize(List(1, 2, 3))
38-
val transform1 = data.map(divZero) // Will throw an exception when forced to evaluate
40+
// Will throw an exception when forced to evaluate
41+
val transform1 = data.map(divZero)
3942
val transform2 = transform1.map(add1)
4043
transform2.collect() // Forces evaluation
4144
}
4245

4346
def throwOuter2(sc: SparkContext) = {
4447
val data = sc.parallelize(List(1, 2, 3))
4548
val transform1 = data.map(add1)
46-
val transform2 = transform1.map(divZero) // Will throw an exception when forced to evaluate
49+
// Will throw an exception when forced to evaluate
50+
val transform2 = transform1.map(divZero)
4751
transform2.collect() // Forces evaluation
4852
}
4953
//end::badEx3

src/main/scala/com/high-performance-spark-examples/ml/CustomPipeline.scala

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ class HardCodedWordCountStage(override val uid: String) extends Transformer {
3434
val idx = schema.fieldIndex("happy_pandas")
3535
val field = schema.fields(idx)
3636
if (field.dataType != StringType) {
37-
throw new Exception(s"Input type ${field.dataType} did not match input type StringType")
37+
throw new Exception(
38+
s"Input type ${field.dataType} did not match input type StringType")
3839
}
3940
// Add the return field
4041
schema.add(StructField("happy_panda_counts", IntegerType, false))
@@ -71,7 +72,8 @@ class ConfigurableWordCount(override val uid: String) extends Transformer {
7172
val idx = schema.fieldIndex($(inputCol))
7273
val field = schema.fields(idx)
7374
if (field.dataType != StringType) {
74-
throw new Exception(s"Input type ${field.dataType} did not match input type StringType")
75+
throw new Exception(
76+
s"Input type ${field.dataType} did not match input type StringType")
7577
}
7678
// Add the return field
7779
schema.add(StructField($(outputCol), IntegerType, false))
@@ -91,7 +93,8 @@ trait SimpleIndexerParams extends Params {
9193
final val outputCol = new Param[String](this, "outputCol", "The output column")
9294
}
9395

94-
class SimpleIndexer(override val uid: String) extends Estimator[SimpleIndexerModel] with SimpleIndexerParams {
96+
class SimpleIndexer(override val uid: String)
97+
extends Estimator[SimpleIndexerModel] with SimpleIndexerParams {
9598

9699
def setInputCol(value: String) = set(inputCol, value)
97100

@@ -108,7 +111,8 @@ class SimpleIndexer(override val uid: String) extends Estimator[SimpleIndexerMod
108111
val idx = schema.fieldIndex($(inputCol))
109112
val field = schema.fields(idx)
110113
if (field.dataType != StringType) {
111-
throw new Exception(s"Input type ${field.dataType} did not match input type StringType")
114+
throw new Exception(
115+
s"Input type ${field.dataType} did not match input type StringType")
112116
}
113117
// Add the return field
114118
schema.add(StructField($(outputCol), IntegerType, false))
@@ -122,8 +126,8 @@ class SimpleIndexer(override val uid: String) extends Estimator[SimpleIndexerMod
122126
}
123127
}
124128

125-
class SimpleIndexerModel(
126-
override val uid: String, words: Array[String]) extends Model[SimpleIndexerModel] with SimpleIndexerParams {
129+
class SimpleIndexerModel(override val uid: String, words: Array[String])
130+
extends Model[SimpleIndexerModel] with SimpleIndexerParams {
127131

128132
override def copy(extra: ParamMap): SimpleIndexerModel = {
129133
defaultCopy(extra)
@@ -137,7 +141,8 @@ class SimpleIndexerModel(
137141
val idx = schema.fieldIndex($(inputCol))
138142
val field = schema.fields(idx)
139143
if (field.dataType != StringType) {
140-
throw new Exception(s"Input type ${field.dataType} did not match input type StringType")
144+
throw new Exception(
145+
s"Input type ${field.dataType} did not match input type StringType")
141146
}
142147
// Add the return field
143148
schema.add(StructField($(outputCol), IntegerType, false))

src/main/scala/com/high-performance-spark-examples/ml/SimpleNaiveBayes.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ class SimpleNaiveBayes(val uid: String)
3232
import ds.sparkSession.implicits._
3333
ds.cache()
3434
// Note: you can use getNumClasses & extractLabeledPoints to get an RDD instead
35-
// Using the RDD approach is common when integrating with legacy machine learning code
36-
// or iterative algorithms which can create large query plans.
35+
// Using the RDD approach is common when integrating with legacy machine
36+
// learning code or iterative algorithms which can create large query plans.
3737
// Compute the number of documents
3838
val numDocs = ds.count
3939
// Get the number of classes.
@@ -116,9 +116,10 @@ case class SimpleNaiveBayesModel(
116116
val onesVec = Vectors.dense(Array.fill(theta.numCols)(1.0))
117117
val negThetaSum: Array[Double] = negTheta.multiply(onesVec).toArray
118118

119-
// Here is the prediciton functionality you need to implement - for ClassificationModels
120-
// transform automatically wraps this - but if you might benefit from broadcasting your model or
121-
// other optimizations you can also override transform.
119+
// Here is the prediciton functionality you need to implement - for
120+
// ClassificationModels transform automatically wraps this.
121+
// If you might benefit from broadcasting your model or other optimizations you
122+
// can override transform and place your desired logic there.
122123
def predictRaw(features: Vector): Vector = {
123124
// Toy implementation - use BLAS or similar instead
124125
// the summing of the three vectors but the functionality isn't exposed.

src/main/scala/com/high-performance-spark-examples/ml/SimplePipeline.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,16 @@ object SimplePipeline {
9292

9393
def reverseStringIndexer(sbModel: StringIndexerModel) = {
9494
//tag::indexToString[]
95-
// Construct the inverse of the model to go from index-to-string after prediction.
95+
// Construct the inverse of the model to go from index-to-string
96+
// after prediction.
9697
val sbInverse = new IndexToString()
9798
sbInverse.setInputCol("prediction")
9899
sbInverse.setLabels(sbModel.labels)
99100
//end::indexToString[]
100101
// Or if meta data is present
101102
//tag::indexToStringMD[]
102-
// Construct the inverse of the model to go from index-to-string after prediction.
103+
// Construct the inverse of the model to go from
104+
// index-to-string after prediction.
103105
val sbInverseMD = new IndexToString()
104106
sbInverseMD.setInputCol("prediction")
105107
//end::indexToStringMD[]

src/main/scala/com/high-performance-spark-examples/mllib/GoldilocksMLlib.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ import org.apache.spark.rdd.RDD
1010
//tag::imports[]
1111
import com.github.fommil.netlib.BLAS.{getInstance => blas}
1212
import org.apache.spark.mllib.linalg.Vectors
13-
import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionModel}
13+
import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS,
14+
LogisticRegressionModel}
1415
// Rename Vector to SparkVector to avoid conflicts with Scala's Vector class
1516
import org.apache.spark.mllib.linalg.{Vector => SparkVector}
1617
import org.apache.spark.mllib.regression.LabeledPoint
@@ -109,13 +110,15 @@ object GoldilocksMLlib {
109110
// Vector size is 100 - we use this to build a transformer on top of WVM that
110111
// works on sentences.
111112
val vectorSize = 100
112-
// The transform function works on a per-word basis, but we have sentences as input.
113+
// The transform function works on a per-word basis, but we have
114+
// sentences as input.
113115
tokenized.map{words =>
114116
// If there is nothing in the sentence output a null vector
115117
if (words.isEmpty) {
116118
Vectors.sparse(vectorSize, Array.empty[Int], Array.empty[Double])
117119
} else {
118-
// If there are sentences construct a running sum of the vectors for each word
120+
// If there are sentences construct a running sum of the
121+
// vectors for each word
119122
val sum = Array[Double](vectorSize)
120123
words.foreach { word =>
121124
blas.daxpy(

src/main/scala/com/high-performance-spark-examples/native/PipeExample.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@ object PipeExample {
2525
// Copy our script to the worker nodes with sc.addFile
2626
// Add file requires absolute paths
2727
val distScriptName = "ghinfo.pl"
28-
val localScript = System.getProperty("user.dir") + "/src/main/perl/" + distScriptName
28+
val userDir = System.getProperty("user.dir")
29+
val localScript = s"${userDir}/src/main/perl/${distScriptName}"
2930
val addedFile = sc.addFile(localScript)
3031

3132
// Pass enviroment variables to our worker
3233
val enviromentVars = Map("user" -> "apache", "repo" -> "spark")
33-
val result = input.map(x => x.toString).pipe(SparkFiles.get(distScriptName), enviromentVars)
34+
val result = input.map(x => x.toString)
35+
.pipe(SparkFiles.get(distScriptName), enviromentVars)
3436
// Parse the results
3537
result.map{record =>
3638
val elems: Array[String] = record.split(" ")

src/main/scala/com/high-performance-spark-examples/perf/SimplePerfTest.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,20 @@ object SimplePerfTest {
3737
run(sc, sparkSession, scalingFactor, size)
3838
}
3939

40-
def run(sc: SparkContext, session: SparkSession, scalingFactor: Long, size: Int) = {
40+
def run(sc: SparkContext, session: SparkSession,
41+
scalingFactor: Long, size: Int) = {
4142
import session.implicits._
42-
val inputRDD = GenerateScalingData.generateFullGoldilocks(sc, scalingFactor, size)
43+
val inputRDD = GenerateScalingData.generateFullGoldilocks(
44+
sc, scalingFactor, size)
4345
val pairRDD = inputRDD.map(p => (p.zip.toInt, p.attributes(0)))
4446
pairRDD.cache()
4547
pairRDD.count()
4648
val rddTimeings = 1.to(10).map(x => time(testOnRDD(pairRDD)))
4749
val groupTimeings = 1.to(10).map(x => time(groupOnRDD(pairRDD)))
4850
val df = inputRDD.toDF()
49-
val inputDataFrame = df.select(df("zip").cast(IntegerType), df("attributes")(0).as("fuzzyness").cast(DoubleType))
51+
val inputDataFrame = df.select(
52+
df("zip").cast(IntegerType),
53+
df("attributes")(0).as("fuzzyness").cast(DoubleType))
5054
inputDataFrame.cache()
5155
inputDataFrame.count()
5256
val dataFrameTimeings = 1.to(10).map(x => time(testOnDataFrame(inputDataFrame)))
@@ -56,7 +60,8 @@ object SimplePerfTest {
5660
}
5761

5862
def testOnRDD(rdd: RDD[(Int, Double)]) = {
59-
rdd.map{case (x, y) => (x, (y, 1))}.reduceByKey{case (x, y) => (x._1 + y._1, x._2 + y._2)}.count()
63+
rdd.map{case (x, y) => (x, (y, 1))}
64+
.reduceByKey{case (x, y) => (x._1 + y._1, x._2 + y._2)}.count()
6065
}
6166

6267
def groupOnRDD(rdd: RDD[(Int, Double)]) = {

src/main/scala/com/high-performance-spark-examples/streaming/DStream.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Happy Panda Example for DataFrames. Computes the % of happy pandas. Very contrived.
2+
* Streaming Pandas Example with the old DStream APIs.
33
*/
44
package com.highperformancespark.examples.streaming
55

@@ -42,7 +42,8 @@ object DStreamExamples {
4242
//end::sscRecover[]
4343
}
4444

45-
def fileAPIExample(ssc: StreamingContext, path: String): DStream[(Long, String)] = {
45+
def fileAPIExample(ssc: StreamingContext, path: String):
46+
DStream[(Long, String)] = {
4647
//tag::file[]
4748
// You don't need to write the types of the InputDStream but it for illustration
4849
val inputDStream: InputDStream[(LongWritable, Text)] =

src/main/scala/com/high-performance-spark-examples/tools/FilterInvalidPandas.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,17 @@ import com.typesafe.scalalogging.LazyLogging
1313

1414
object FilterInvalidPandas extends LazyLogging {
1515

16-
def filterInvalidPandas(sc: SparkContext, invalidPandas: List[Long], input: RDD[RawPanda]) = {
16+
def filterInvalidPandas(sc: SparkContext, invalidPandas: List[Long],
17+
input: RDD[RawPanda]) = {
1718
//tag::broadcast[]
1819
val invalid = HashSet() ++ invalidPandas
1920
val invalidBroadcast = sc.broadcast(invalid)
2021
input.filter{panda => !invalidBroadcast.value.contains(panda.id)}
2122
//end::broadcast[]
2223
}
2324

24-
def filterInvalidPandasWithLogs(sc: SparkContext, invalidPandas: List[Long], input: RDD[RawPanda]) = {
25+
def filterInvalidPandasWithLogs(sc: SparkContext, invalidPandas: List[Long],
26+
input: RDD[RawPanda]) = {
2527
//tag::broadcastAndLog[]
2628
val invalid = HashSet() ++ invalidPandas
2729
val invalidBroadcast = sc.broadcast(invalid)

0 commit comments

Comments
 (0)