Skip to content
This repository was archived by the owner on Dec 15, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bin/functions/hibench_prop_env_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
DATASIZE="hibench.workload.datasize",
# For repartition
CACHE_IN_MEMORY="hibench.repartition.cacheinmemory",
DISABLE_OUTPUT="hibench.repartition.disableoutput",

# For hive related workload, data scale
PAGES="hibench.workload.pages",
Expand Down
2 changes: 1 addition & 1 deletion bin/workloads/micro/repartition/spark/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ rmr_hdfs $OUTPUT_HDFS || true

SIZE=`dir_size $INPUT_HDFS`
START_TIME=`timestamp`
run_spark_job com.intel.hibench.sparkbench.micro.ScalaRepartition $INPUT_HDFS $OUTPUT_HDFS $CACHE_IN_MEMORY
run_spark_job com.intel.hibench.sparkbench.micro.ScalaRepartition $INPUT_HDFS $OUTPUT_HDFS $CACHE_IN_MEMORY $DISABLE_OUTPUT
END_TIME=`timestamp`

gen_report ${START_TIME} ${END_TIME} ${SIZE}
Expand Down
5 changes: 3 additions & 2 deletions conf/workloads/micro/repartition.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#datagen
hibench.repartition.tiny.datasize 32000
hibench.repartition.tiny.datasize 3200
hibench.repartition.small.datasize 3200000
hibench.repartition.large.datasize 32000000
hibench.repartition.huge.datasize 320000000
Expand All @@ -12,4 +12,5 @@ hibench.workload.datasize ${hibench.repartition.${hibench.scale.profile}.datasi
hibench.workload.input ${hibench.hdfs.data.dir}/Repartition/Input
hibench.workload.output ${hibench.hdfs.data.dir}/Repartition/Output

hibench.repartition.cacheinmemory false
hibench.repartition.cacheinmemory false
hibench.repartition.disableoutput false
Original file line number Diff line number Diff line change
Expand Up @@ -20,50 +20,70 @@ package com.intel.hibench.sparkbench.micro
import java.util.Random

import com.intel.hibench.sparkbench.common.IOCommon
import org.apache.hadoop.examples.terasort.TeraInputFormat
import org.apache.hadoop.examples.terasort.{TeraInputFormat, TeraOutputFormat}
import org.apache.hadoop.io.Text
import org.apache.spark._
import org.apache.spark.rdd.{CoalescedRDD, RDD, ShuffledRDD}
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.storage.StorageLevel

import scala.util.hashing

object ScalaRepartition {

def main(args: Array[String]) {
if (args.length != 3) {
if (args.length != 4) {
System.err.println(
s"Usage: $ScalaRepartition <INPUT_HDFS> <OUTPUT_HDFS> <CACHE_IN_MEMORY>"
s"Usage: $ScalaRepartition <INPUT_HDFS> <OUTPUT_HDFS> <CACHE_IN_MEMORY> <DISABLE_OUTPUT>"
)
System.exit(1)
}
val cache = toBoolean(args(2), ("CACHE_IN_MEMORY"))
val disableOutput = toBoolean(args(3), ("DISABLE_OUTPUT"))

val sparkConf = new SparkConf().setAppName("ScalaRepartition")
val sc = new SparkContext(sparkConf)

val data = sc.newAPIHadoopFile[Text, Text, TeraInputFormat](args(0)).map {
case (k,v) => k.copyBytes ++ v.copyBytes
}

if (args(2) == "true") {
if (cache) {
data.persist(StorageLevel.MEMORY_ONLY)
data.count()
} else if (args(2) != "false") {
throw new IllegalArgumentException(
s"Unrecognizable parameter CACHE_IN_MEMORY: ${args(2)}, should be true or false")
}

val mapParallelism = sc.getConf.getInt("spark.default.parallelism", sc.defaultParallelism)
val reduceParallelism = IOCommon.getProperty("hibench.default.shuffle.parallelism")
.getOrElse((mapParallelism / 2).toString).toInt

reparition(data, reduceParallelism).foreach(_ => {})
val postShuffle = repartition(data, reduceParallelism)
if (disableOutput) {
postShuffle.foreach(_ => {})
} else {
postShuffle.map {
case (_, v) => (new Text(v.slice(0, 10)), new Text(v.slice(10, 100)))
}.saveAsNewAPIHadoopFile[TeraOutputFormat](args(1))
}

sc.stop()
}

// More hints on Exceptions
private def toBoolean(str: String, parameterName: String): Boolean = {
try {
str.toBoolean
} catch {
case e: IllegalArgumentException =>
throw new IllegalArgumentException(
s"Unrecognizable parameter ${parameterName}: ${str}, should be true or false")
}
}

// Save a CoalescedRDD than RDD.repartition API
private def reparition(previous: RDD[Array[Byte]], numReducers: Int): ShuffledRDD[Int, Array[Byte], Array[Byte]] = {
private def repartition(previous: RDD[Array[Byte]], numReducers: Int): ShuffledRDD[Int, Array[Byte], Array[Byte]] = {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[Array[Byte]]) => {
var position = (new Random(index)).nextInt(numReducers)
var position = new Random(hashing.byteswap32(index)).nextInt(numReducers)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(new Random(hashing.byteswap32(index))) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
Expand Down