Skip to content
This repository was archived by the owner on Dec 15, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
---
### OVERVIEW ###

HiBench is a big data benchmark suite that helps evaluate different big data frameworks in terms of speed, throughput and system resource utilizations. It contains a set of Hadoop, Spark and streaming workloads, including Sort, WordCount, TeraSort, Sleep, SQL, PageRank, Nutch indexing, Bayes, Kmeans, NWeight and enhanced DFSIO, etc. It also contains several streaming workloads for Spark Streaming, Flink, Storm and Gearpump.
HiBench is a big data benchmark suite that helps evaluate different big data frameworks in terms of speed, throughput and system resource utilizations. It contains a set of Hadoop, Spark and streaming workloads, including Sort, WordCount, TeraSort, Repartition, Sleep, SQL, PageRank, Nutch indexing, Bayes, Kmeans, NWeight and enhanced DFSIO, etc. It also contains several streaming workloads for Spark Streaming, Flink, Storm and Gearpump.

### Getting Started ###
* [Build HiBench](docs/build-hibench.md)
Expand All @@ -38,12 +38,16 @@ There are totally 19 workloads in HiBench. The workloads are divided into 6 cate
3. TeraSort (terasort)

TeraSort is a standard benchmark created by Jim Gray. Its input data is generated by Hadoop TeraGen example program.

4. Repartition (micro/repartition)

This workload benchmarks shuffle performance. Input data is generated by Hadoop TeraGen. It is firstly cached in memory by default, then shuffle write and read in order to repartition. The last 2 stages solely reflects shuffle's performance, excluding I/O and other compute. Note: The parameter hibench.repartition.cacheinmemory(default is true) is provided, to allow reading from storage in the 1st stage without caching
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about setting this to hibench.repartition.cacheinmemory to false by default? HiBench measures the execution time of the entire workload and calculates the throughput. Caching in memory seems to be for our own need to measure the shuffle write and shuffle read. So we need to look at the stage level execution time ourselves.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense!


4. Sleep (sleep)
5. Sleep (sleep)

This workload sleep an amount of seconds in each task to test framework scheduler.

5. enhanced DFSIO (dfsioe)
6. enhanced DFSIO (dfsioe)

Enhanced DFSIO tests the HDFS throughput of the Hadoop cluster by generating a large number of tasks performing writes and reads simultaneously. It measures the average I/O rate of each map task, the average throughput of each map task, and the aggregated throughput of HDFS cluster. Note: this benchmark doesn't have Spark corresponding implementation.

Expand Down Expand Up @@ -124,9 +128,9 @@ There are totally 19 workloads in HiBench. The workloads are divided into 6 cate

This workload reads input data from Kafka and then writes result to Kafka immediately, there is no complex business logic involved.

2. Repartition (repartition)
2. Repartition (streaming/repartition)

This workload reads input data from Kafka and changes the level of parallelism by creating more or fewer partitionstests. It tests the efficiency of data shuffle in the streaming frameworks.
This workload reads input data from Kafka and changes the level of parallelism by creating more or fewer partitions. It tests the efficiency of data shuffle in the streaming frameworks.

3. Stateful Wordcount (wordcount)

Expand Down
4 changes: 3 additions & 1 deletion bin/functions/hibench_prop_env_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@
MAP_SLEEP_TIME="hibench.sleep.mapper.seconds",
RED_SLEEP_TIME="hibench.sleep.reducer.seconds",
HADOOP_SLEEP_JAR="hibench.sleep.job.jar",
# For Sort, Terasort, Wordcount
# For Sort, Terasort, Wordcount, Repartition
DATASIZE="hibench.workload.datasize",
# For repartition
CACHE_IN_MEMORY="hibench.repartition.cacheinmemory",

# For hive related workload, data scale
PAGES="hibench.workload.pages",
Expand Down
35 changes: 35 additions & 0 deletions bin/workloads/micro/repartition/prepare/prepare.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

current_dir=`dirname "$0"`
current_dir=`cd "$current_dir"; pwd`
root_dir=${current_dir}/../../../../..
workload_config=${root_dir}/conf/workloads/micro/repartition.conf
. "${root_dir}/bin/functions/load_bench_config.sh"

enter_bench HadoopPrepareRepartition ${workload_config} ${current_dir}
show_bannar start

rmr_hdfs $INPUT_HDFS || true
START_TIME=`timestamp`
run_hadoop_job ${HADOOP_EXAMPLES_JAR} teragen \
-D mapreduce.job.maps=${NUM_MAPS} \
-D mapreduce.job.reduces=${NUM_REDS} \
${DATASIZE} ${INPUT_HDFS}
END_TIME=`timestamp`

show_bannar finish
leave_bench
36 changes: 36 additions & 0 deletions bin/workloads/micro/repartition/spark/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

current_dir=`dirname "$0"`
current_dir=`cd "$current_dir"; pwd`
root_dir=${current_dir}/../../../../..
workload_config=${root_dir}/conf/workloads/micro/repartition.conf
. "${root_dir}/bin/functions/load_bench_config.sh"

enter_bench ScalaRepartition ${workload_config} ${current_dir}
show_bannar start

rmr_hdfs $OUTPUT_HDFS || true

SIZE=`dir_size $INPUT_HDFS`
START_TIME=`timestamp`
run_spark_job com.intel.hibench.sparkbench.micro.ScalaRepartition $INPUT_HDFS $OUTPUT_HDFS $CACHE_IN_MEMORY
END_TIME=`timestamp`

gen_report ${START_TIME} ${END_TIME} ${SIZE}
show_bannar finish
leave_bench

1 change: 1 addition & 0 deletions conf/benchmarks.lst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ micro.sleep
micro.sort
micro.terasort
micro.wordcount
micro.repartition
micro.dfsioe

sql.aggregation
Expand Down
15 changes: 15 additions & 0 deletions conf/workloads/micro/repartition.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#datagen
hibench.repartition.tiny.datasize 100000
hibench.repartition.small.datasize 10000000
hibench.repartition.large.datasize 100000000
hibench.repartition.huge.datasize 1000000000
hibench.repartition.gigantic.datasize 10000000000
hibench.repartition.bigdata.datasize 60000000000
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change this to the same size defined in TeraSort to be consistent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem, I made them bigger because it takes less time than Terasort. But since we have an output by default, durations should be on same level.


hibench.workload.datasize ${hibench.repartition.${hibench.scale.profile}.datasize}

# export for shell script
hibench.workload.input ${hibench.hdfs.data.dir}/Repartition/Input
hibench.workload.output ${hibench.hdfs.data.dir}/Repartition/Output

hibench.repartition.cacheinmemory true
2 changes: 1 addition & 1 deletion conf/workloads/micro/terasort.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ hibench.workload.datasize ${hibench.terasort.${hibench.scale.profile}.datasize}

# export for shell script
hibench.workload.input ${hibench.hdfs.data.dir}/Terasort/Input
hibench.workload.output ${hibench.hdfs.data.dir}/Terasort/Output
hibench.workload.output ${hibench.hdfs.data.dir}/Terasort/Output
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.hibench.sparkbench.micro

import java.util.Random

import com.intel.hibench.sparkbench.common.IOCommon
import org.apache.hadoop.examples.terasort.TeraInputFormat
import org.apache.hadoop.io.Text
import org.apache.spark._
import org.apache.spark.rdd.{CoalescedRDD, RDD, ShuffledRDD}
import org.apache.spark.storage.StorageLevel

object ScalaRepartition {

def main(args: Array[String]) {
if (args.length != 3) {
System.err.println(
s"Usage: $ScalaRepartition <INPUT_HDFS> <OUTPUT_HDFS> <CACHE_IN_MEMORY>"
)
System.exit(1)
}
val sparkConf = new SparkConf().setAppName("ScalaRepartition")
val sc = new SparkContext(sparkConf)

val data = sc.newAPIHadoopFile[Text, Text, TeraInputFormat](args(0)).map {
case (k,v) => k.copyBytes ++ v.copyBytes
}

if (args(2) == "true") {
data.persist(StorageLevel.MEMORY_ONLY)
data.count()
} else if (args(2) != "false") {
throw new IllegalArgumentException(
s"Unrecognizable parameter CACHE_IN_MEMORY: ${args(2)}, should be true or false")
}

val mapParallelism = sc.getConf.getInt("spark.default.parallelism", sc.defaultParallelism)
val reduceParallelism = IOCommon.getProperty("hibench.default.shuffle.parallelism")
.getOrElse((mapParallelism / 2).toString).toInt

reparition(data, reduceParallelism).foreach(_ => {})

sc.stop()
}

// Save a CoalescedRDD than RDD.repartition API
private def reparition(previous: RDD[Array[Byte]], numReducers: Int): ShuffledRDD[Int, Array[Byte], Array[Byte]] = {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[Array[Byte]]) => {
var position = (new Random(index)).nextInt(numReducers)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In spark code, I noticed hashing.byteswap32(index) is used for the seed. the hashing is removed here by purpose as there is no difference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I copied that from Spark 2.0.0, and the code you mentioned is introduced in apache/spark#18990, solving the skewed repartition when numReducers is power of 2.

items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, Array[Byte])]

// include a shuffle step so that our upstream tasks are still distributed
new ShuffledRDD[Int, Array[Byte], Array[Byte]](previous.mapPartitionsWithIndex(distributePartition),
new HashPartitioner(numReducers))
}

}