@@ -13,46 +13,65 @@ class GoldilocksLargeTests extends FunSuite with SharedSparkContext{
1313
1414
1515 def testGoldilocksImplementations (
16- data : DataFrame , targetRanks : List [Long ],
17- expectedResult : Map [Int , Iterable [Long ]])= {
18- val iterative = GoldilocksWhileLoop .findRankStatistics(data, targetRanks)
19- val groupByKey = GoldilocksGroupByKey .findRankStatistics(data, targetRanks)
20- val firstTry = GoldilocksFirstTry .findRankStatistics(data, targetRanks)
21- val hashMap = GoldilocksWithHashMap .findRankStatistics(data, targetRanks)
22- val secondarySort = GoldilocksSecondarySort .findRankStatistics(data, targetRanks, data.rdd.partitions.length)
23- val secondarySortV2 = GoldilocksSecondarySortV2 .findRankStatistics(data, targetRanks)
16+ data : DataFrame , targetRanks : List [Long ],
17+ expectedResult : Map [Int , Iterable [Long ]]) = {
18+
19+ val iterative =
20+ GoldilocksWhileLoop .findRankStatistics(data, targetRanks)
21+ val groupByKey =
22+ GoldilocksGroupByKey .findRankStatistics(data, targetRanks)
23+ val firstTry =
24+ GoldilocksFirstTry .findRankStatistics(data, targetRanks)
25+ val hashMap =
26+ GoldilocksWithHashMap .findRankStatistics(data, targetRanks)
27+ val secondarySort =
28+ GoldilocksSecondarySort .findRankStatistics(data, targetRanks,
29+ data.rdd.partitions.length)
30+ val secondarySortV2 =
31+ GoldilocksSecondarySortV2 .findRankStatistics(data, targetRanks)
2432
2533 expectedResult.foreach {
2634 case ((i, ranks)) =>
2735 assert(iterative(i).equals(ranks),
28- " The Iterative solution to goldilocks was incorrect for column " + i)
29- assert(groupByKey(i).equals(ranks), " Group by key solution was incorrect" )
30- assert(firstTry(i).equals(ranks), " GoldilocksFirstTry incorrect for column " + i )
31- assert(hashMap(i).equals(ranks), " GoldilocksWithhashMap incorrect for column " + i)
36+ " The Iterative solution to goldilocks was incorrect for column " + i)
37+ assert(groupByKey(i).equals(ranks),
38+ " Group by key solution was incorrect" )
39+ assert(firstTry(i).equals(ranks),
40+ " GoldilocksFirstTry incorrect for column " + i )
41+ assert(hashMap(i).equals(ranks),
42+ " GoldilocksWithhashMap incorrect for column " + i)
3243 assert(secondarySort(i).equals(ranks))
3344 assert(secondarySortV2(i).equals(ranks))
3445
3546 }
3647 }
3748
3849 test(" Goldilocks on local data solution " ){
39- val sqlContext = new SQLContext (sc)
40- val testRanks = List (3L , 8L )
41- val (smallTestData, result) = DataCreationUtils .createLocalTestData(5 , 10 , testRanks)
42- val schema = StructType (result.keys.toSeq.map(n => StructField (" Column" + n.toString, DoubleType )))
43- val smallTestDF : DataFrame = sqlContext.createDataFrame(sc.makeRDD(smallTestData), schema)
44- testGoldilocksImplementations(smallTestDF, testRanks, result)
45- }
50+ val sqlContext = new SQLContext (sc)
51+ val testRanks = List (3L , 8L )
52+ val (smallTestData, result) =
53+ DataCreationUtils .createLocalTestData(5 , 10 , testRanks)
54+ val schema = StructType (
55+ result.keys.toSeq.map(
56+ n => StructField (" Column" + n.toString, DoubleType )
57+ ))
58+ val smallTestDF : DataFrame =
59+ sqlContext.createDataFrame(sc.makeRDD(smallTestData), schema)
60+ testGoldilocksImplementations(smallTestDF, testRanks, result)
61+ }
4662}
4763
4864object DataCreationUtils {
49- def createLocalTestData (numberCols : Int , numberOfRows : Int , targetRanks : List [Long ]) = {
65+ def createLocalTestData (numberCols : Int , numberOfRows : Int ,
66+ targetRanks : List [Long ]) = {
67+
5068 val cols = Range (0 ,numberCols).toArray
5169 val scalers = cols.map(x => 1.0 )
5270 val rowRange = Range (0 , numberOfRows)
5371 val columnArray : Array [IndexedSeq [Double ]] = cols.map(
5472 columnIndex => {
55- val columnValues = rowRange.map(x => (Math .random(), x)).sortBy(_._1).map(_._2 * scalers(columnIndex))
73+ val columnValues = rowRange.map(
74+ x => (Math .random(), x)).sortBy(_._1).map(_._2 * scalers(columnIndex))
5675 columnValues
5776 })
5877 val rows = rowRange.map(
@@ -69,22 +88,23 @@ object DataCreationUtils {
6988 }
7089
7190
72- def createDistributedData (sc : SparkContext ,partitions : Int , elementsPerPartition : Int , numberOfColumns : Int ) = {
91+ def createDistributedData (sc : SparkContext , partitions : Int ,
92+ elementsPerPartition : Int , numberOfColumns : Int ) = {
7393 val partitionsStart : RDD [Int ] = sc.parallelize(
7494 Array .fill(partitions)(1 ))
7595 partitionsStart.repartition(partitions)
7696
7797 var data : RDD [(Long , List [Int ])] = partitionsStart.mapPartitionsWithIndex {
7898 case (partIndex, elements) =>
7999 val rows = Range (0 , elementsPerPartition)
80- .map(x => (Math .random(), x))
81- .map {
82- case ((randomNumber, rowValue)) =>
83- (
84- randomNumber,
85- (partIndex * elementsPerPartition.toLong + rowValue, // index of element
86- List (rowValue + partIndex * elementsPerPartition)))
87- }
100+ .map(x => (Math .random(), x))
101+ .map {
102+ case ((randomNumber, rowValue)) =>
103+ (randomNumber,
104+ // index of element
105+ (partIndex * elementsPerPartition.toLong + rowValue,
106+ List (rowValue + partIndex * elementsPerPartition)))
107+ }
88108 rows.toIterator
89109 }.sortByKey().values
90110
@@ -93,14 +113,14 @@ object DataCreationUtils {
93113 val nextColumn : RDD [(Long , Int )] = partitionsStart.mapPartitionsWithIndex {
94114 case (partIndex, elements) =>
95115 val rows = Range (0 , elementsPerPartition)
96- .map(x => (Math .random(), x))
97- .map {
98- case ((randomNumber, rowValue)) =>
99- (
100- randomNumber,
101- (partIndex * elementsPerPartition.toLong + rowValue, // index of element
102- rowValue + partIndex * elementsPerPartition))
103- }
116+ .map(x => (Math .random(), x))
117+ .map {
118+ case ((randomNumber, rowValue)) =>
119+ (randomNumber,
120+ // index of element
121+ (partIndex * elementsPerPartition.toLong + rowValue,
122+ rowValue + partIndex * elementsPerPartition))
123+ }
104124 rows.toIterator
105125 }.sortByKey().values
106126
0 commit comments