Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor
  • Loading branch information
gengliangwang committed May 28, 2018
commit bbe6925297ccc21c357362d25192990469b34099
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ import org.apache.spark.util.{Benchmark, Utils}
*/
object ParquetWriteBenchmark {
val tempTable = "temp"
val format = "orc"
val table = "t"
val format = "parquet"
val numRows = 1024 * 1024 * 15
val conf = new SparkConf()
val benchmark = new Benchmark(s"$format writer benchmark", numRows)

conf.set("spark.sql.parquet.compression.codec", "snappy")

val spark = SparkSession.builder
Expand All @@ -45,11 +49,6 @@ object ParquetWriteBenchmark {
// Set default configs. Individual cases will change them if necessary.
spark.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")

def withTempPath(f: File => Unit): Unit = {
val path = Utils.createTempDir()
path.delete()
try f(path) finally Utils.deleteRecursively(path)
}

def withTempTable(tableNames: String*)(f: => Unit): Unit = {
try f finally tableNames.foreach(spark.catalog.dropTempView)
Expand All @@ -63,83 +62,53 @@ object ParquetWriteBenchmark {
}
}

def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
val (keys, values) = pairs.unzip
val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
(keys, values).zipped.foreach(spark.conf.set)
try f finally {
keys.zip(currentValues).foreach {
case (key, Some(value)) => spark.conf.set(key, value)
case (key, None) => spark.conf.unset(key)
}
def writeInt(table: String, benchmark: Benchmark): Unit = {
spark.sql(s"create table $table(c1 INT, c2 STRING) using $format")
benchmark.addCase("Output Single Int Column") { _ =>
spark.sql(s"INSERT overwrite table $table select cast(id as INT) as " +
s"c1, cast(id as STRING) as c2 from $tempTable")
}
}

def runSQL(values: Int, name: String, sql: String, table: String = "t"): Unit = {
withTempTable(tempTable) {
spark.range(values).createOrReplaceTempView(tempTable)
val benchmark = new Benchmark(name, values)
benchmark.addCase("Parquet Writer") { _ =>
withTable(table) {
spark.sql(sql)
}
}
benchmark.run()
def writeIntString(table: String, benchmark: Benchmark): Unit = {
spark.sql(s"create table $table(c1 INT, c2 STRING) using $format")
benchmark.addCase("Output Int and String Column") { _ =>
spark.sql(s"INSERT overwrite table $table select cast(id as INT) as " +
s"c1, cast(id as STRING) as c2 from $tempTable")
}
}

def intWriteBenchmark(values: Int): Unit = {
/*
Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz

Output Single Int Column: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Writer 2536 / 2610 6.2 161.3 1.0X
*/
runSQL(values = values,
name = "Output Single Int Column",
sql = s"create table t using $format as select cast(id as INT) as id from $tempTable")
}

def intStringWriteBenchmark(values: Int): Unit = {
/*
Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz

Output Int and String Column: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Parquet Writer 4644 / 4673 2.3 442.9 1.0X
*/
runSQL(values = values,
name = "Output Int and String Column",
sql = s"create table t using $format as select cast(id as INT)" +
s" as c1, cast(id as STRING) as c2 from $tempTable")
}

def partitionTableWriteBenchmark(values: Int): Unit = {
/*
Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz

Partitioned Table: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------
Parquet Writer 4163 / 4173 3.8 264.7 1.0X
*/
runSQL(values = values,
name = "Partitioned Table",
sql = s"create table t using $format partitioned by (p) as select id % 2 as p," +
s" cast(id as INT) as id from $tempTable")
def writePartition(table: String, benchmark: Benchmark): Unit = {
spark.sql(s"create table $table(p INT, id INT) using $format PARTITIONED BY (p)")
benchmark.addCase("Output Partitions") { _ =>
spark.sql(s"INSERT overwrite table $table select cast(id as INT) as id," +
s" cast(id % 2 as INT) as p from $tempTable")
}
}

def clusteredTableWriteBenchmark(values: Int): Unit = {
runSQL(values = values,
name = "Clustered Table",
sql = s"create table t using $format CLUSTERED by (p) INTO 8 buckets as select id as p," +
s" cast(id as INT) as id from $tempTable")
def writeBucket(table: String, benchmark: Benchmark): Unit = {
spark.sql(s"create table $table(c1 INT, c2 INT) using $format CLUSTERED BY (c2) INTO 2 BUCKETS")
benchmark.addCase("Output Buckets") { _ =>
spark.sql(s"INSERT overwrite table $table select cast(id as INT) as " +
s"c1, cast(id as INT) as c2 from $tempTable")
}
}

def main(args: Array[String]): Unit = {
intWriteBenchmark(1024 * 1024 * 15)
intStringWriteBenchmark(1024 * 1024 * 10)
partitionTableWriteBenchmark(1024 * 1024 * 15)
clusteredTableWriteBenchmark(1024 * 1024 * 15)
val tableInt = "tableInt"
val tableIntString = "tableIntString"
val tablePartition = "tablePartition"
val tableBucket = "tableBucket"
withTempTable(tempTable) {
spark.range(numRows).createOrReplaceTempView(tempTable)
withTable(tableInt, tableIntString, tablePartition, tableBucket) {
val benchmark = new Benchmark(s"$format writer benchmark", numRows)
writeInt(tableInt, benchmark)
writeIntString(tableIntString, benchmark)
writePartition(tablePartition, benchmark)
writeBucket(tableBucket, benchmark)
benchmark.run()
}
}
}
}