diff --git a/README.md b/README.md index 478f89219..37612f7e3 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ --- ### OVERVIEW ### -HiBench is a big data benchmark suite that helps evaluate different big data frameworks in terms of speed, throughput and system resource utilizations. It contains a set of Hadoop, Spark and streaming workloads, including Sort, WordCount, TeraSort, Sleep, SQL, PageRank, Nutch indexing, Bayes, Kmeans, NWeight and enhanced DFSIO, etc. It also contains several streaming workloads for Spark Streaming, Flink, Storm and Gearpump. +HiBench is a big data benchmark suite that helps evaluate different big data frameworks in terms of speed, throughput and system resource utilizations. It contains a set of Hadoop, Spark and streaming workloads, including Sort, WordCount, TeraSort, Repartition, Sleep, SQL, PageRank, Nutch indexing, Bayes, Kmeans, NWeight and enhanced DFSIO, etc. It also contains several streaming workloads for Spark Streaming, Flink, Storm and Gearpump. ### Getting Started ### * [Build HiBench](docs/build-hibench.md) @@ -38,12 +38,16 @@ There are totally 19 workloads in HiBench. The workloads are divided into 6 cate 3. TeraSort (terasort) TeraSort is a standard benchmark created by Jim Gray. Its input data is generated by Hadoop TeraGen example program. + +4. Repartition (micro/repartition) + + This workload benchmarks shuffle performance. Input data is generated by Hadoop TeraGen. The workload randomly selects the post-shuffle partition for each record, performs shuffle write and read, evenly repartitioning the records. There are 2 parameters providing options to eliminate data source & sink I/Os: hibench.repartition.cacheinmemory(default: false) and hibench.repartition.disableOutput(default: false), controlling whether or not to 1) cache the input in memory at first 2) write the result to storage -4. Sleep (sleep) +5. Sleep (sleep) This workload sleep an amount of seconds in each task to test framework scheduler. -5. enhanced DFSIO (dfsioe) +6. enhanced DFSIO (dfsioe) Enhanced DFSIO tests the HDFS throughput of the Hadoop cluster by generating a large number of tasks performing writes and reads simultaneously. It measures the average I/O rate of each map task, the average throughput of each map task, and the aggregated throughput of HDFS cluster. Note: this benchmark doesn't have Spark corresponding implementation. @@ -124,9 +128,9 @@ There are totally 19 workloads in HiBench. The workloads are divided into 6 cate This workload reads input data from Kafka and then writes result to Kafka immediately, there is no complex business logic involved. -2. Repartition (repartition) +2. Repartition (streaming/repartition) - This workload reads input data from Kafka and changes the level of parallelism by creating more or fewer partitionstests. It tests the efficiency of data shuffle in the streaming frameworks. + This workload reads input data from Kafka and changes the level of parallelism by creating more or fewer partitions. It tests the efficiency of data shuffle in the streaming frameworks. 3. Stateful Wordcount (wordcount) diff --git a/bin/functions/hibench_prop_env_mapping.py b/bin/functions/hibench_prop_env_mapping.py index a3b91d064..f17a5cad9 100644 --- a/bin/functions/hibench_prop_env_mapping.py +++ b/bin/functions/hibench_prop_env_mapping.py @@ -67,8 +67,11 @@ MAP_SLEEP_TIME="hibench.sleep.mapper.seconds", RED_SLEEP_TIME="hibench.sleep.reducer.seconds", HADOOP_SLEEP_JAR="hibench.sleep.job.jar", - # For Sort, Terasort, Wordcount + # For Sort, Terasort, Wordcount, Repartition DATASIZE="hibench.workload.datasize", + # For repartition + CACHE_IN_MEMORY="hibench.repartition.cacheinmemory", + DISABLE_OUTPUT="hibench.repartition.disableoutput", # For hive related workload, data scale PAGES="hibench.workload.pages", diff --git a/bin/workloads/micro/repartition/prepare/prepare.sh b/bin/workloads/micro/repartition/prepare/prepare.sh new file mode 100755 index 000000000..22419d129 --- /dev/null +++ b/bin/workloads/micro/repartition/prepare/prepare.sh @@ -0,0 +1,35 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +current_dir=`dirname "$0"` +current_dir=`cd "$current_dir"; pwd` +root_dir=${current_dir}/../../../../.. +workload_config=${root_dir}/conf/workloads/micro/repartition.conf +. "${root_dir}/bin/functions/load_bench_config.sh" + +enter_bench HadoopPrepareRepartition ${workload_config} ${current_dir} +show_bannar start + +rmr_hdfs $INPUT_HDFS || true +START_TIME=`timestamp` +run_hadoop_job ${HADOOP_EXAMPLES_JAR} teragen \ + -D mapreduce.job.maps=${NUM_MAPS} \ + -D mapreduce.job.reduces=${NUM_REDS} \ + ${DATASIZE} ${INPUT_HDFS} +END_TIME=`timestamp` + +show_bannar finish +leave_bench diff --git a/bin/workloads/micro/repartition/spark/run.sh b/bin/workloads/micro/repartition/spark/run.sh new file mode 100755 index 000000000..75d2ca440 --- /dev/null +++ b/bin/workloads/micro/repartition/spark/run.sh @@ -0,0 +1,35 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +current_dir=`dirname "$0"` +current_dir=`cd "$current_dir"; pwd` +root_dir=${current_dir}/../../../../.. +workload_config=${root_dir}/conf/workloads/micro/repartition.conf +. "${root_dir}/bin/functions/load_bench_config.sh" + +enter_bench ScalaRepartition ${workload_config} ${current_dir} +show_bannar start + +rmr_hdfs $OUTPUT_HDFS || true + +SIZE=`dir_size $INPUT_HDFS` +START_TIME=`timestamp` +run_spark_job com.intel.hibench.sparkbench.micro.ScalaRepartition $INPUT_HDFS $OUTPUT_HDFS $CACHE_IN_MEMORY $DISABLE_OUTPUT +END_TIME=`timestamp` + +gen_report ${START_TIME} ${END_TIME} ${SIZE} +show_bannar finish +leave_bench diff --git a/conf/benchmarks.lst b/conf/benchmarks.lst index f8b8fedba..df09bb4b0 100644 --- a/conf/benchmarks.lst +++ b/conf/benchmarks.lst @@ -2,6 +2,7 @@ micro.sleep micro.sort micro.terasort micro.wordcount +micro.repartition micro.dfsioe sql.aggregation diff --git a/conf/workloads/micro/repartition.conf b/conf/workloads/micro/repartition.conf new file mode 100644 index 000000000..59204f94f --- /dev/null +++ b/conf/workloads/micro/repartition.conf @@ -0,0 +1,16 @@ +#datagen +hibench.repartition.tiny.datasize 3200 +hibench.repartition.small.datasize 3200000 +hibench.repartition.large.datasize 32000000 +hibench.repartition.huge.datasize 320000000 +hibench.repartition.gigantic.datasize 3200000000 +hibench.repartition.bigdata.datasize 6000000000 + +hibench.workload.datasize ${hibench.repartition.${hibench.scale.profile}.datasize} + +# export for shell script +hibench.workload.input ${hibench.hdfs.data.dir}/Repartition/Input +hibench.workload.output ${hibench.hdfs.data.dir}/Repartition/Output + +hibench.repartition.cacheinmemory false +hibench.repartition.disableoutput false diff --git a/conf/workloads/micro/terasort.conf b/conf/workloads/micro/terasort.conf index 48efc9469..92986dfd0 100644 --- a/conf/workloads/micro/terasort.conf +++ b/conf/workloads/micro/terasort.conf @@ -10,4 +10,4 @@ hibench.workload.datasize ${hibench.terasort.${hibench.scale.profile}.datasize} # export for shell script hibench.workload.input ${hibench.hdfs.data.dir}/Terasort/Input -hibench.workload.output ${hibench.hdfs.data.dir}/Terasort/Output \ No newline at end of file +hibench.workload.output ${hibench.hdfs.data.dir}/Terasort/Output diff --git a/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala b/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala new file mode 100644 index 000000000..f8efc6966 --- /dev/null +++ b/sparkbench/micro/src/main/scala/com/intel/sparkbench/micro/ScalaRepartition.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.hibench.sparkbench.micro + +import java.util.Random + +import com.intel.hibench.sparkbench.common.IOCommon +import org.apache.hadoop.examples.terasort.{TeraInputFormat, TeraOutputFormat} +import org.apache.hadoop.io.Text +import org.apache.spark._ +import org.apache.spark.rdd.{RDD, ShuffledRDD} +import org.apache.spark.storage.StorageLevel + +import scala.util.hashing + +object ScalaRepartition { + + def main(args: Array[String]) { + if (args.length != 4) { + System.err.println( + s"Usage: $ScalaRepartition " + ) + System.exit(1) + } + val cache = toBoolean(args(2), ("CACHE_IN_MEMORY")) + val disableOutput = toBoolean(args(3), ("DISABLE_OUTPUT")) + + val sparkConf = new SparkConf().setAppName("ScalaRepartition") + val sc = new SparkContext(sparkConf) + + val data = sc.newAPIHadoopFile[Text, Text, TeraInputFormat](args(0)).map { + case (k,v) => k.copyBytes ++ v.copyBytes + } + + if (cache) { + data.persist(StorageLevel.MEMORY_ONLY) + data.count() + } + + val mapParallelism = sc.getConf.getInt("spark.default.parallelism", sc.defaultParallelism) + val reduceParallelism = IOCommon.getProperty("hibench.default.shuffle.parallelism") + .getOrElse((mapParallelism / 2).toString).toInt + + val postShuffle = repartition(data, reduceParallelism) + if (disableOutput) { + postShuffle.foreach(_ => {}) + } else { + postShuffle.map { + case (_, v) => (new Text(v.slice(0, 10)), new Text(v.slice(10, 100))) + }.saveAsNewAPIHadoopFile[TeraOutputFormat](args(1)) + } + + sc.stop() + } + + // More hints on Exceptions + private def toBoolean(str: String, parameterName: String): Boolean = { + try { + str.toBoolean + } catch { + case e: IllegalArgumentException => + throw new IllegalArgumentException( + s"Unrecognizable parameter ${parameterName}: ${str}, should be true or false") + } + } + + // Save a CoalescedRDD than RDD.repartition API + private def repartition(previous: RDD[Array[Byte]], numReducers: Int): ShuffledRDD[Int, Array[Byte], Array[Byte]] = { + /** Distributes elements evenly across output partitions, starting from a random partition. */ + val distributePartition = (index: Int, items: Iterator[Array[Byte]]) => { + var position = new Random(hashing.byteswap32(index)).nextInt(numReducers) + items.map { t => + // Note that the hash code of the key will just be the key itself. The HashPartitioner + // will mod it with the number of total partitions. + position = position + 1 + (position, t) + } + } : Iterator[(Int, Array[Byte])] + + // include a shuffle step so that our upstream tasks are still distributed + new ShuffledRDD[Int, Array[Byte], Array[Byte]](previous.mapPartitionsWithIndex(distributePartition), + new HashPartitioner(numReducers)) + } + +}