From 3c6007cf2b240f9c1efba4bba60b49545843024b Mon Sep 17 00:00:00 2001 From: Chenzhao Guo Date: Thu, 19 Mar 2020 11:24:35 +0800 Subject: [PATCH 1/8] Add repartition workload --- README.md | 10 ++-- bin/functions/hibench_prop_env_mapping.py | 2 +- .../micro/repartition/prepare/prepare.sh | 35 ++++++++++++ bin/workloads/micro/repartition/spark/run.sh | 36 +++++++++++++ conf/benchmarks.lst | 1 + conf/workloads/micro/repartition.conf | 13 +++++ conf/workloads/micro/terasort.conf | 2 +- .../sparkbench/micro/ScalaRepartition.scala | 53 +++++++++++++++++++ 8 files changed, 147 insertions(+), 5 deletions(-) create mode 100755 bin/workloads/micro/repartition/prepare/prepare.sh create mode 100755 bin/workloads/micro/repartition/spark/run.sh create mode 100644 conf/workloads/micro/repartition.conf create mode 100644 sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala diff --git a/README.md b/README.md index 478f89219..c97e316ad 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ --- ### OVERVIEW ### -HiBench is a big data benchmark suite that helps evaluate different big data frameworks in terms of speed, throughput and system resource utilizations. It contains a set of Hadoop, Spark and streaming workloads, including Sort, WordCount, TeraSort, Sleep, SQL, PageRank, Nutch indexing, Bayes, Kmeans, NWeight and enhanced DFSIO, etc. It also contains several streaming workloads for Spark Streaming, Flink, Storm and Gearpump. +HiBench is a big data benchmark suite that helps evaluate different big data frameworks in terms of speed, throughput and system resource utilizations. It contains a set of Hadoop, Spark and streaming workloads, including Sort, WordCount, TeraSort, Repartition, Sleep, SQL, PageRank, Nutch indexing, Bayes, Kmeans, NWeight and enhanced DFSIO, etc. It also contains several streaming workloads for Spark Streaming, Flink, Storm and Gearpump. ### Getting Started ### * [Build HiBench](docs/build-hibench.md) @@ -38,6 +38,10 @@ There are totally 19 workloads in HiBench. The workloads are divided into 6 cate 3. TeraSort (terasort) TeraSort is a standard benchmark created by Jim Gray. Its input data is generated by Hadoop TeraGen example program. + +4. Repartition (micro/repartition) + + This workload benchmarks shuffle performance. Input data is generated by Hadoop TeraGen. It is firstly cached in memory, then shuffle write and read in order to repartition. The last 2 stages solely reflects shuffle's performance, excluding I/O and other compute. 4. Sleep (sleep) @@ -124,9 +128,9 @@ There are totally 19 workloads in HiBench. The workloads are divided into 6 cate This workload reads input data from Kafka and then writes result to Kafka immediately, there is no complex business logic involved. -2. Repartition (repartition) +2. Repartition (streaming/repartition) - This workload reads input data from Kafka and changes the level of parallelism by creating more or fewer partitionstests. It tests the efficiency of data shuffle in the streaming frameworks. + This workload reads input data from Kafka and changes the level of parallelism by creating more or fewer partitions. It tests the efficiency of data shuffle in the streaming frameworks. 3. Stateful Wordcount (wordcount) diff --git a/bin/functions/hibench_prop_env_mapping.py b/bin/functions/hibench_prop_env_mapping.py index a3b91d064..13028fb7f 100644 --- a/bin/functions/hibench_prop_env_mapping.py +++ b/bin/functions/hibench_prop_env_mapping.py @@ -67,7 +67,7 @@ MAP_SLEEP_TIME="hibench.sleep.mapper.seconds", RED_SLEEP_TIME="hibench.sleep.reducer.seconds", HADOOP_SLEEP_JAR="hibench.sleep.job.jar", - # For Sort, Terasort, Wordcount + # For Sort, Terasort, Wordcount, Repartition DATASIZE="hibench.workload.datasize", # For hive related workload, data scale diff --git a/bin/workloads/micro/repartition/prepare/prepare.sh b/bin/workloads/micro/repartition/prepare/prepare.sh new file mode 100755 index 000000000..22419d129 --- /dev/null +++ b/bin/workloads/micro/repartition/prepare/prepare.sh @@ -0,0 +1,35 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +current_dir=`dirname "$0"` +current_dir=`cd "$current_dir"; pwd` +root_dir=${current_dir}/../../../../.. +workload_config=${root_dir}/conf/workloads/micro/repartition.conf +. "${root_dir}/bin/functions/load_bench_config.sh" + +enter_bench HadoopPrepareRepartition ${workload_config} ${current_dir} +show_bannar start + +rmr_hdfs $INPUT_HDFS || true +START_TIME=`timestamp` +run_hadoop_job ${HADOOP_EXAMPLES_JAR} teragen \ + -D mapreduce.job.maps=${NUM_MAPS} \ + -D mapreduce.job.reduces=${NUM_REDS} \ + ${DATASIZE} ${INPUT_HDFS} +END_TIME=`timestamp` + +show_bannar finish +leave_bench diff --git a/bin/workloads/micro/repartition/spark/run.sh b/bin/workloads/micro/repartition/spark/run.sh new file mode 100755 index 000000000..0e4ac49f8 --- /dev/null +++ b/bin/workloads/micro/repartition/spark/run.sh @@ -0,0 +1,36 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +current_dir=`dirname "$0"` +current_dir=`cd "$current_dir"; pwd` +root_dir=${current_dir}/../../../../.. +workload_config=${root_dir}/conf/workloads/micro/repartition.conf +. "${root_dir}/bin/functions/load_bench_config.sh" + +enter_bench ScalaRepartition ${workload_config} ${current_dir} +show_bannar start + +rmr_hdfs $OUTPUT_HDFS || true + +SIZE=`dir_size $INPUT_HDFS` +START_TIME=`timestamp` +run_spark_job com.intel.hibench.sparkbench.micro.ScalaRepartition $INPUT_HDFS $OUTPUT_HDFS +END_TIME=`timestamp` + +gen_report ${START_TIME} ${END_TIME} ${SIZE} +show_bannar finish +leave_bench + diff --git a/conf/benchmarks.lst b/conf/benchmarks.lst index f8b8fedba..df09bb4b0 100644 --- a/conf/benchmarks.lst +++ b/conf/benchmarks.lst @@ -2,6 +2,7 @@ micro.sleep micro.sort micro.terasort micro.wordcount +micro.repartition micro.dfsioe sql.aggregation diff --git a/conf/workloads/micro/repartition.conf b/conf/workloads/micro/repartition.conf new file mode 100644 index 000000000..2ce1864f6 --- /dev/null +++ b/conf/workloads/micro/repartition.conf @@ -0,0 +1,13 @@ +#datagen +hibench.repartition.tiny.datasize 100000 +hibench.repartition.small.datasize 10000000 +hibench.repartition.large.datasize 100000000 +hibench.repartition.huge.datasize 1000000000 +hibench.repartition.gigantic.datasize 10000000000 +hibench.repartition.bigdata.datasize 60000000000 + +hibench.workload.datasize ${hibench.repartition.${hibench.scale.profile}.datasize} + +# export for shell script +hibench.workload.input ${hibench.hdfs.data.dir}/Repartition/Input +hibench.workload.output ${hibench.hdfs.data.dir}/Repartition/Output diff --git a/conf/workloads/micro/terasort.conf b/conf/workloads/micro/terasort.conf index 48efc9469..92986dfd0 100644 --- a/conf/workloads/micro/terasort.conf +++ b/conf/workloads/micro/terasort.conf @@ -10,4 +10,4 @@ hibench.workload.datasize ${hibench.terasort.${hibench.scale.profile}.datasize} # export for shell script hibench.workload.input ${hibench.hdfs.data.dir}/Terasort/Input -hibench.workload.output ${hibench.hdfs.data.dir}/Terasort/Output \ No newline at end of file +hibench.workload.output ${hibench.hdfs.data.dir}/Terasort/Output diff --git a/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala b/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala new file mode 100644 index 000000000..f50686831 --- /dev/null +++ b/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.hibench.sparkbench.micro + +import com.intel.hibench.sparkbench.common.IOCommon +import org.apache.hadoop.examples.terasort.TeraInputFormat +import org.apache.hadoop.io.Text +import org.apache.spark._ +import org.apache.spark.storage.StorageLevel + +object ScalaRepartition { + + def main(args: Array[String]) { + if (args.length != 2) { + System.err.println( + s"Usage: $ScalaRepartition " + ) + System.exit(1) + } + val sparkConf = new SparkConf().setAppName("ScalaRepartition") + val sc = new SparkContext(sparkConf) + + val data = sc.newAPIHadoopFile[Text, Text, TeraInputFormat](args(0)).map { + case (k,v) => (k.copyBytes, v.copyBytes) + } + + data.persist(StorageLevel.MEMORY_ONLY) + data.count() + + val mapParallelism = sc.getConf.getInt("spark.default.parallelism", sc.defaultParallelism) + val reduceParallelism = IOCommon.getProperty("hibench.default.shuffle.parallelism") + .getOrElse((mapParallelism / 2).toString).toInt + + data.repartition(reduceParallelism).foreach(_ => {}) + + sc.stop() + } +} From 14334dea36982c7977c24a62baf626270c4b6970 Mon Sep 17 00:00:00 2001 From: Chenzhao Guo Date: Thu, 19 Mar 2020 11:29:29 +0800 Subject: [PATCH 2/8] Docs --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index c97e316ad..47b9065f5 100644 --- a/README.md +++ b/README.md @@ -43,11 +43,11 @@ There are totally 19 workloads in HiBench. The workloads are divided into 6 cate This workload benchmarks shuffle performance. Input data is generated by Hadoop TeraGen. It is firstly cached in memory, then shuffle write and read in order to repartition. The last 2 stages solely reflects shuffle's performance, excluding I/O and other compute. -4. Sleep (sleep) +5. Sleep (sleep) This workload sleep an amount of seconds in each task to test framework scheduler. -5. enhanced DFSIO (dfsioe) +6. enhanced DFSIO (dfsioe) Enhanced DFSIO tests the HDFS throughput of the Hadoop cluster by generating a large number of tasks performing writes and reads simultaneously. It measures the average I/O rate of each map task, the average throughput of each map task, and the aggregated throughput of HDFS cluster. Note: this benchmark doesn't have Spark corresponding implementation. From 3b6a0fb0732d176a0f71f0eb3a857605e4e8ea38 Mon Sep 17 00:00:00 2001 From: Chenzhao Guo Date: Mon, 23 Mar 2020 17:29:15 +0800 Subject: [PATCH 3/8] Not using RDD[Tuple2] so as to leverage unsafe shuffle writer --- .../scala/com/intel/sparkbench/micro/ScalaRepartition.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala b/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala index f50686831..fd6d84c25 100644 --- a/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala +++ b/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala @@ -36,7 +36,7 @@ object ScalaRepartition { val sc = new SparkContext(sparkConf) val data = sc.newAPIHadoopFile[Text, Text, TeraInputFormat](args(0)).map { - case (k,v) => (k.copyBytes, v.copyBytes) + case (k,v) => k.copyBytes ++ v.copyBytes } data.persist(StorageLevel.MEMORY_ONLY) From bf801cd0169d5a3bf6b2ff02442789152ed93dfd Mon Sep 17 00:00:00 2001 From: Chenzhao Guo Date: Mon, 23 Mar 2020 18:14:40 +0800 Subject: [PATCH 4/8] Add a parameter to switch on/off cache --- README.md | 2 +- bin/functions/hibench_prop_env_mapping.py | 2 ++ bin/workloads/micro/repartition/spark/run.sh | 2 +- conf/workloads/micro/repartition.conf | 2 ++ .../intel/sparkbench/micro/ScalaRepartition.scala | 13 +++++++++---- 5 files changed, 15 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 47b9065f5..97a759231 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,7 @@ There are totally 19 workloads in HiBench. The workloads are divided into 6 cate 4. Repartition (micro/repartition) - This workload benchmarks shuffle performance. Input data is generated by Hadoop TeraGen. It is firstly cached in memory, then shuffle write and read in order to repartition. The last 2 stages solely reflects shuffle's performance, excluding I/O and other compute. + This workload benchmarks shuffle performance. Input data is generated by Hadoop TeraGen. It is firstly cached in memory by default, then shuffle write and read in order to repartition. The last 2 stages solely reflects shuffle's performance, excluding I/O and other compute. Note: The parameter hibench.repartition.cacheinmemory(default is true) is provided, to allow reading from storage in the 1st stage without caching 5. Sleep (sleep) diff --git a/bin/functions/hibench_prop_env_mapping.py b/bin/functions/hibench_prop_env_mapping.py index 13028fb7f..ed34883db 100644 --- a/bin/functions/hibench_prop_env_mapping.py +++ b/bin/functions/hibench_prop_env_mapping.py @@ -69,6 +69,8 @@ HADOOP_SLEEP_JAR="hibench.sleep.job.jar", # For Sort, Terasort, Wordcount, Repartition DATASIZE="hibench.workload.datasize", + # For repartition + CACHE_IN_MEMORY="hibench.repartition.cacheinmemory", # For hive related workload, data scale PAGES="hibench.workload.pages", diff --git a/bin/workloads/micro/repartition/spark/run.sh b/bin/workloads/micro/repartition/spark/run.sh index 0e4ac49f8..a22e21092 100755 --- a/bin/workloads/micro/repartition/spark/run.sh +++ b/bin/workloads/micro/repartition/spark/run.sh @@ -27,7 +27,7 @@ rmr_hdfs $OUTPUT_HDFS || true SIZE=`dir_size $INPUT_HDFS` START_TIME=`timestamp` -run_spark_job com.intel.hibench.sparkbench.micro.ScalaRepartition $INPUT_HDFS $OUTPUT_HDFS +run_spark_job com.intel.hibench.sparkbench.micro.ScalaRepartition $INPUT_HDFS $OUTPUT_HDFS $CACHE_IN_MEMORY END_TIME=`timestamp` gen_report ${START_TIME} ${END_TIME} ${SIZE} diff --git a/conf/workloads/micro/repartition.conf b/conf/workloads/micro/repartition.conf index 2ce1864f6..a8de6d8d7 100644 --- a/conf/workloads/micro/repartition.conf +++ b/conf/workloads/micro/repartition.conf @@ -11,3 +11,5 @@ hibench.workload.datasize ${hibench.repartition.${hibench.scale.profile}.datasi # export for shell script hibench.workload.input ${hibench.hdfs.data.dir}/Repartition/Input hibench.workload.output ${hibench.hdfs.data.dir}/Repartition/Output + +hibench.repartition.cacheinmemory true \ No newline at end of file diff --git a/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala b/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala index fd6d84c25..33ea97901 100644 --- a/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala +++ b/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala @@ -26,9 +26,9 @@ import org.apache.spark.storage.StorageLevel object ScalaRepartition { def main(args: Array[String]) { - if (args.length != 2) { + if (args.length != 3) { System.err.println( - s"Usage: $ScalaRepartition " + s"Usage: $ScalaRepartition " ) System.exit(1) } @@ -39,8 +39,13 @@ object ScalaRepartition { case (k,v) => k.copyBytes ++ v.copyBytes } - data.persist(StorageLevel.MEMORY_ONLY) - data.count() + if (args(2) == "true") { + data.persist(StorageLevel.MEMORY_ONLY) + data.count() + } else if (args(2) != "false") { + throw new IllegalArgumentException( + s"Unrecognizable parameter CACHE_IN_MEMORY: ${args(2)}, should be true or false") + } val mapParallelism = sc.getConf.getInt("spark.default.parallelism", sc.defaultParallelism) val reduceParallelism = IOCommon.getProperty("hibench.default.shuffle.parallelism") From 708e8e67a5c8cfad7da94955061e8d2124019ad6 Mon Sep 17 00:00:00 2001 From: Chenzhao Guo Date: Mon, 23 Mar 2020 18:30:02 +0800 Subject: [PATCH 5/8] Save RDDs in reduce stage --- .../sparkbench/micro/ScalaRepartition.scala | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala b/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala index 33ea97901..09700905e 100644 --- a/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala +++ b/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala @@ -17,10 +17,13 @@ package com.intel.hibench.sparkbench.micro +import java.util.Random + import com.intel.hibench.sparkbench.common.IOCommon import org.apache.hadoop.examples.terasort.TeraInputFormat import org.apache.hadoop.io.Text import org.apache.spark._ +import org.apache.spark.rdd.{CoalescedRDD, RDD, ShuffledRDD} import org.apache.spark.storage.StorageLevel object ScalaRepartition { @@ -51,8 +54,27 @@ object ScalaRepartition { val reduceParallelism = IOCommon.getProperty("hibench.default.shuffle.parallelism") .getOrElse((mapParallelism / 2).toString).toInt - data.repartition(reduceParallelism).foreach(_ => {}) + reparition(data, reduceParallelism).foreach(_ => {}) sc.stop() } + + // Save a CoalescedRDD than RDD.repartition API + private def reparition(previous: RDD[Array[Byte]], numReducers: Int): ShuffledRDD[Int, Array[Byte], Array[Byte]] = { + /** Distributes elements evenly across output partitions, starting from a random partition. */ + val distributePartition = (index: Int, items: Iterator[Array[Byte]]) => { + var position = (new Random(index)).nextInt(numReducers) + items.map { t => + // Note that the hash code of the key will just be the key itself. The HashPartitioner + // will mod it with the number of total partitions. + position = position + 1 + (position, t) + } + } : Iterator[(Int, Array[Byte])] + + // include a shuffle step so that our upstream tasks are still distributed + new ShuffledRDD[Int, Array[Byte], Array[Byte]](previous.mapPartitionsWithIndex(distributePartition), + new HashPartitioner(numReducers)) + } + } From 296e5edfa8801f477d161abddf80a912548d95af Mon Sep 17 00:00:00 2001 From: Chenzhao Guo Date: Mon, 30 Mar 2020 15:21:47 +0800 Subject: [PATCH 6/8] Change default --- README.md | 2 +- conf/workloads/micro/repartition.conf | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 97a759231..37612f7e3 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,7 @@ There are totally 19 workloads in HiBench. The workloads are divided into 6 cate 4. Repartition (micro/repartition) - This workload benchmarks shuffle performance. Input data is generated by Hadoop TeraGen. It is firstly cached in memory by default, then shuffle write and read in order to repartition. The last 2 stages solely reflects shuffle's performance, excluding I/O and other compute. Note: The parameter hibench.repartition.cacheinmemory(default is true) is provided, to allow reading from storage in the 1st stage without caching + This workload benchmarks shuffle performance. Input data is generated by Hadoop TeraGen. The workload randomly selects the post-shuffle partition for each record, performs shuffle write and read, evenly repartitioning the records. There are 2 parameters providing options to eliminate data source & sink I/Os: hibench.repartition.cacheinmemory(default: false) and hibench.repartition.disableOutput(default: false), controlling whether or not to 1) cache the input in memory at first 2) write the result to storage 5. Sleep (sleep) diff --git a/conf/workloads/micro/repartition.conf b/conf/workloads/micro/repartition.conf index a8de6d8d7..53443bdd7 100644 --- a/conf/workloads/micro/repartition.conf +++ b/conf/workloads/micro/repartition.conf @@ -1,10 +1,10 @@ #datagen -hibench.repartition.tiny.datasize 100000 -hibench.repartition.small.datasize 10000000 -hibench.repartition.large.datasize 100000000 -hibench.repartition.huge.datasize 1000000000 -hibench.repartition.gigantic.datasize 10000000000 -hibench.repartition.bigdata.datasize 60000000000 +hibench.repartition.tiny.datasize 32000 +hibench.repartition.small.datasize 3200000 +hibench.repartition.large.datasize 32000000 +hibench.repartition.huge.datasize 320000000 +hibench.repartition.gigantic.datasize 3200000000 +hibench.repartition.bigdata.datasize 6000000000 hibench.workload.datasize ${hibench.repartition.${hibench.scale.profile}.datasize} @@ -12,4 +12,4 @@ hibench.workload.datasize ${hibench.repartition.${hibench.scale.profile}.datasi hibench.workload.input ${hibench.hdfs.data.dir}/Repartition/Input hibench.workload.output ${hibench.hdfs.data.dir}/Repartition/Output -hibench.repartition.cacheinmemory true \ No newline at end of file +hibench.repartition.cacheinmemory false \ No newline at end of file From b5477ba0181a7d20b3a2b94a0459a7ec2d96f061 Mon Sep 17 00:00:00 2001 From: Chenzhao Guo Date: Mon, 30 Mar 2020 16:51:48 +0800 Subject: [PATCH 7/8] Write result or not configurable --- bin/functions/hibench_prop_env_mapping.py | 1 + bin/workloads/micro/repartition/spark/run.sh | 2 +- conf/workloads/micro/repartition.conf | 5 ++- .../sparkbench/micro/ScalaRepartition.scala | 42 ++++++++++++++----- 4 files changed, 36 insertions(+), 14 deletions(-) diff --git a/bin/functions/hibench_prop_env_mapping.py b/bin/functions/hibench_prop_env_mapping.py index ed34883db..f17a5cad9 100644 --- a/bin/functions/hibench_prop_env_mapping.py +++ b/bin/functions/hibench_prop_env_mapping.py @@ -71,6 +71,7 @@ DATASIZE="hibench.workload.datasize", # For repartition CACHE_IN_MEMORY="hibench.repartition.cacheinmemory", + DISABLE_OUTPUT="hibench.repartition.disableoutput", # For hive related workload, data scale PAGES="hibench.workload.pages", diff --git a/bin/workloads/micro/repartition/spark/run.sh b/bin/workloads/micro/repartition/spark/run.sh index a22e21092..160e2ba3b 100755 --- a/bin/workloads/micro/repartition/spark/run.sh +++ b/bin/workloads/micro/repartition/spark/run.sh @@ -27,7 +27,7 @@ rmr_hdfs $OUTPUT_HDFS || true SIZE=`dir_size $INPUT_HDFS` START_TIME=`timestamp` -run_spark_job com.intel.hibench.sparkbench.micro.ScalaRepartition $INPUT_HDFS $OUTPUT_HDFS $CACHE_IN_MEMORY +run_spark_job com.intel.hibench.sparkbench.micro.ScalaRepartition $INPUT_HDFS $OUTPUT_HDFS $CACHE_IN_MEMORY $DISABLE_OUTPUT END_TIME=`timestamp` gen_report ${START_TIME} ${END_TIME} ${SIZE} diff --git a/conf/workloads/micro/repartition.conf b/conf/workloads/micro/repartition.conf index 53443bdd7..59204f94f 100644 --- a/conf/workloads/micro/repartition.conf +++ b/conf/workloads/micro/repartition.conf @@ -1,5 +1,5 @@ #datagen -hibench.repartition.tiny.datasize 32000 +hibench.repartition.tiny.datasize 3200 hibench.repartition.small.datasize 3200000 hibench.repartition.large.datasize 32000000 hibench.repartition.huge.datasize 320000000 @@ -12,4 +12,5 @@ hibench.workload.datasize ${hibench.repartition.${hibench.scale.profile}.datasi hibench.workload.input ${hibench.hdfs.data.dir}/Repartition/Input hibench.workload.output ${hibench.hdfs.data.dir}/Repartition/Output -hibench.repartition.cacheinmemory false \ No newline at end of file +hibench.repartition.cacheinmemory false +hibench.repartition.disableoutput false diff --git a/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala b/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala index 09700905e..f8efc6966 100644 --- a/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala +++ b/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala @@ -20,21 +20,26 @@ package com.intel.hibench.sparkbench.micro import java.util.Random import com.intel.hibench.sparkbench.common.IOCommon -import org.apache.hadoop.examples.terasort.TeraInputFormat +import org.apache.hadoop.examples.terasort.{TeraInputFormat, TeraOutputFormat} import org.apache.hadoop.io.Text import org.apache.spark._ -import org.apache.spark.rdd.{CoalescedRDD, RDD, ShuffledRDD} +import org.apache.spark.rdd.{RDD, ShuffledRDD} import org.apache.spark.storage.StorageLevel +import scala.util.hashing + object ScalaRepartition { def main(args: Array[String]) { - if (args.length != 3) { + if (args.length != 4) { System.err.println( - s"Usage: $ScalaRepartition " + s"Usage: $ScalaRepartition " ) System.exit(1) } + val cache = toBoolean(args(2), ("CACHE_IN_MEMORY")) + val disableOutput = toBoolean(args(3), ("DISABLE_OUTPUT")) + val sparkConf = new SparkConf().setAppName("ScalaRepartition") val sc = new SparkContext(sparkConf) @@ -42,28 +47,43 @@ object ScalaRepartition { case (k,v) => k.copyBytes ++ v.copyBytes } - if (args(2) == "true") { + if (cache) { data.persist(StorageLevel.MEMORY_ONLY) data.count() - } else if (args(2) != "false") { - throw new IllegalArgumentException( - s"Unrecognizable parameter CACHE_IN_MEMORY: ${args(2)}, should be true or false") } val mapParallelism = sc.getConf.getInt("spark.default.parallelism", sc.defaultParallelism) val reduceParallelism = IOCommon.getProperty("hibench.default.shuffle.parallelism") .getOrElse((mapParallelism / 2).toString).toInt - reparition(data, reduceParallelism).foreach(_ => {}) + val postShuffle = repartition(data, reduceParallelism) + if (disableOutput) { + postShuffle.foreach(_ => {}) + } else { + postShuffle.map { + case (_, v) => (new Text(v.slice(0, 10)), new Text(v.slice(10, 100))) + }.saveAsNewAPIHadoopFile[TeraOutputFormat](args(1)) + } sc.stop() } + // More hints on Exceptions + private def toBoolean(str: String, parameterName: String): Boolean = { + try { + str.toBoolean + } catch { + case e: IllegalArgumentException => + throw new IllegalArgumentException( + s"Unrecognizable parameter ${parameterName}: ${str}, should be true or false") + } + } + // Save a CoalescedRDD than RDD.repartition API - private def reparition(previous: RDD[Array[Byte]], numReducers: Int): ShuffledRDD[Int, Array[Byte], Array[Byte]] = { + private def repartition(previous: RDD[Array[Byte]], numReducers: Int): ShuffledRDD[Int, Array[Byte], Array[Byte]] = { /** Distributes elements evenly across output partitions, starting from a random partition. */ val distributePartition = (index: Int, items: Iterator[Array[Byte]]) => { - var position = (new Random(index)).nextInt(numReducers) + var position = new Random(hashing.byteswap32(index)).nextInt(numReducers) items.map { t => // Note that the hash code of the key will just be the key itself. The HashPartitioner // will mod it with the number of total partitions. From fcf13d4be4dc47e980b2f8b5a971d4bbacb0a34e Mon Sep 17 00:00:00 2001 From: Chenzhao Guo Date: Tue, 31 Mar 2020 11:09:29 +0800 Subject: [PATCH 8/8] Travis? --- bin/workloads/micro/repartition/spark/run.sh | 1 - 1 file changed, 1 deletion(-) diff --git a/bin/workloads/micro/repartition/spark/run.sh b/bin/workloads/micro/repartition/spark/run.sh index 160e2ba3b..75d2ca440 100755 --- a/bin/workloads/micro/repartition/spark/run.sh +++ b/bin/workloads/micro/repartition/spark/run.sh @@ -33,4 +33,3 @@ END_TIME=`timestamp` gen_report ${START_TIME} ${END_TIME} ${SIZE} show_bannar finish leave_bench -