Skip to content
This repository was archived by the owner on Dec 15, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
---
### OVERVIEW ###

HiBench is a big data benchmark suite that helps evaluate different big data frameworks in terms of speed, throughput and system resource utilizations. It contains a set of Hadoop, Spark and streaming workloads, including Sort, WordCount, TeraSort, Sleep, SQL, PageRank, Nutch indexing, Bayes, Kmeans, NWeight and enhanced DFSIO, etc. It also contains several streaming workloads for Spark Streaming, Flink, Storm and Gearpump.
HiBench is a big data benchmark suite that helps evaluate different big data frameworks in terms of speed, throughput and system resource utilizations. It contains a set of Hadoop, Spark and streaming workloads, including Sort, WordCount, TeraSort, Repartition, Sleep, SQL, PageRank, Nutch indexing, Bayes, Kmeans, NWeight and enhanced DFSIO, etc. It also contains several streaming workloads for Spark Streaming, Flink, Storm and Gearpump.

### Getting Started ###
* [Build HiBench](docs/build-hibench.md)
Expand All @@ -38,12 +38,16 @@ There are totally 19 workloads in HiBench. The workloads are divided into 6 cate
3. TeraSort (terasort)

TeraSort is a standard benchmark created by Jim Gray. Its input data is generated by Hadoop TeraGen example program.

4. Repartition (micro/repartition)

This workload benchmarks shuffle performance. Input data is generated by Hadoop TeraGen. The workload randomly selects the post-shuffle partition for each record, performs shuffle write and read, evenly repartitioning the records. There are 2 parameters providing options to eliminate data source & sink I/Os: hibench.repartition.cacheinmemory(default: false) and hibench.repartition.disableOutput(default: false), controlling whether or not to 1) cache the input in memory at first 2) write the result to storage

4. Sleep (sleep)
5. Sleep (sleep)

This workload sleep an amount of seconds in each task to test framework scheduler.

5. enhanced DFSIO (dfsioe)
6. enhanced DFSIO (dfsioe)

Enhanced DFSIO tests the HDFS throughput of the Hadoop cluster by generating a large number of tasks performing writes and reads simultaneously. It measures the average I/O rate of each map task, the average throughput of each map task, and the aggregated throughput of HDFS cluster. Note: this benchmark doesn't have Spark corresponding implementation.

Expand Down Expand Up @@ -124,9 +128,9 @@ There are totally 19 workloads in HiBench. The workloads are divided into 6 cate

This workload reads input data from Kafka and then writes result to Kafka immediately, there is no complex business logic involved.

2. Repartition (repartition)
2. Repartition (streaming/repartition)

This workload reads input data from Kafka and changes the level of parallelism by creating more or fewer partitionstests. It tests the efficiency of data shuffle in the streaming frameworks.
This workload reads input data from Kafka and changes the level of parallelism by creating more or fewer partitions. It tests the efficiency of data shuffle in the streaming frameworks.

3. Stateful Wordcount (wordcount)

Expand Down
5 changes: 4 additions & 1 deletion bin/functions/hibench_prop_env_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,11 @@
MAP_SLEEP_TIME="hibench.sleep.mapper.seconds",
RED_SLEEP_TIME="hibench.sleep.reducer.seconds",
HADOOP_SLEEP_JAR="hibench.sleep.job.jar",
# For Sort, Terasort, Wordcount
# For Sort, Terasort, Wordcount, Repartition
DATASIZE="hibench.workload.datasize",
# For repartition
CACHE_IN_MEMORY="hibench.repartition.cacheinmemory",
DISABLE_OUTPUT="hibench.repartition.disableoutput",

# For hive related workload, data scale
PAGES="hibench.workload.pages",
Expand Down
35 changes: 35 additions & 0 deletions bin/workloads/micro/repartition/prepare/prepare.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

current_dir=`dirname "$0"`
current_dir=`cd "$current_dir"; pwd`
root_dir=${current_dir}/../../../../..
workload_config=${root_dir}/conf/workloads/micro/repartition.conf
. "${root_dir}/bin/functions/load_bench_config.sh"

enter_bench HadoopPrepareRepartition ${workload_config} ${current_dir}
show_bannar start

rmr_hdfs $INPUT_HDFS || true
START_TIME=`timestamp`
run_hadoop_job ${HADOOP_EXAMPLES_JAR} teragen \
-D mapreduce.job.maps=${NUM_MAPS} \
-D mapreduce.job.reduces=${NUM_REDS} \
${DATASIZE} ${INPUT_HDFS}
END_TIME=`timestamp`

show_bannar finish
leave_bench
36 changes: 36 additions & 0 deletions bin/workloads/micro/repartition/spark/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

current_dir=`dirname "$0"`
current_dir=`cd "$current_dir"; pwd`
root_dir=${current_dir}/../../../../..
workload_config=${root_dir}/conf/workloads/micro/repartition.conf
. "${root_dir}/bin/functions/load_bench_config.sh"

enter_bench ScalaRepartition ${workload_config} ${current_dir}
show_bannar start

rmr_hdfs $OUTPUT_HDFS || true

SIZE=`dir_size $INPUT_HDFS`
START_TIME=`timestamp`
run_spark_job com.intel.hibench.sparkbench.micro.ScalaRepartition $INPUT_HDFS $OUTPUT_HDFS $CACHE_IN_MEMORY $DISABLE_OUTPUT
END_TIME=`timestamp`

gen_report ${START_TIME} ${END_TIME} ${SIZE}
show_bannar finish
leave_bench

1 change: 1 addition & 0 deletions conf/benchmarks.lst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ micro.sleep
micro.sort
micro.terasort
micro.wordcount
micro.repartition
micro.dfsioe

sql.aggregation
Expand Down
16 changes: 16 additions & 0 deletions conf/workloads/micro/repartition.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#datagen
hibench.repartition.tiny.datasize 3200
hibench.repartition.small.datasize 3200000
hibench.repartition.large.datasize 32000000
hibench.repartition.huge.datasize 320000000
hibench.repartition.gigantic.datasize 3200000000
hibench.repartition.bigdata.datasize 6000000000

hibench.workload.datasize ${hibench.repartition.${hibench.scale.profile}.datasize}

# export for shell script
hibench.workload.input ${hibench.hdfs.data.dir}/Repartition/Input
hibench.workload.output ${hibench.hdfs.data.dir}/Repartition/Output

hibench.repartition.cacheinmemory false
hibench.repartition.disableoutput false
2 changes: 1 addition & 1 deletion conf/workloads/micro/terasort.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ hibench.workload.datasize ${hibench.terasort.${hibench.scale.profile}.datasize}

# export for shell script
hibench.workload.input ${hibench.hdfs.data.dir}/Terasort/Input
hibench.workload.output ${hibench.hdfs.data.dir}/Terasort/Output
hibench.workload.output ${hibench.hdfs.data.dir}/Terasort/Output
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.hibench.sparkbench.micro

import java.util.Random

import com.intel.hibench.sparkbench.common.IOCommon
import org.apache.hadoop.examples.terasort.{TeraInputFormat, TeraOutputFormat}
import org.apache.hadoop.io.Text
import org.apache.spark._
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.storage.StorageLevel

import scala.util.hashing

object ScalaRepartition {

def main(args: Array[String]) {
if (args.length != 4) {
System.err.println(
s"Usage: $ScalaRepartition <INPUT_HDFS> <OUTPUT_HDFS> <CACHE_IN_MEMORY> <DISABLE_OUTPUT>"
)
System.exit(1)
}
val cache = toBoolean(args(2), ("CACHE_IN_MEMORY"))
val disableOutput = toBoolean(args(3), ("DISABLE_OUTPUT"))

val sparkConf = new SparkConf().setAppName("ScalaRepartition")
val sc = new SparkContext(sparkConf)

val data = sc.newAPIHadoopFile[Text, Text, TeraInputFormat](args(0)).map {
case (k,v) => k.copyBytes ++ v.copyBytes
}

if (cache) {
data.persist(StorageLevel.MEMORY_ONLY)
data.count()
}

val mapParallelism = sc.getConf.getInt("spark.default.parallelism", sc.defaultParallelism)
val reduceParallelism = IOCommon.getProperty("hibench.default.shuffle.parallelism")
.getOrElse((mapParallelism / 2).toString).toInt

val postShuffle = repartition(data, reduceParallelism)
if (disableOutput) {
postShuffle.foreach(_ => {})
} else {
postShuffle.map {
case (_, v) => (new Text(v.slice(0, 10)), new Text(v.slice(10, 100)))
}.saveAsNewAPIHadoopFile[TeraOutputFormat](args(1))
}

sc.stop()
}

// More hints on Exceptions
private def toBoolean(str: String, parameterName: String): Boolean = {
try {
str.toBoolean
} catch {
case e: IllegalArgumentException =>
throw new IllegalArgumentException(
s"Unrecognizable parameter ${parameterName}: ${str}, should be true or false")
}
}

// Save a CoalescedRDD than RDD.repartition API
private def repartition(previous: RDD[Array[Byte]], numReducers: Int): ShuffledRDD[Int, Array[Byte], Array[Byte]] = {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[Array[Byte]]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numReducers)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(new Random(hashing.byteswap32(index))) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, Array[Byte])]

// include a shuffle step so that our upstream tasks are still distributed
new ShuffledRDD[Int, Array[Byte], Array[Byte]](previous.mapPartitionsWithIndex(distributePartition),
new HashPartitioner(numReducers))
}

}