Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Parquet Read Benchmark
  • Loading branch information
sameeragarwal committed Mar 16, 2016
commit 4fa94a221e0da8d27e1c1c35fbc06243711d34f3
Original file line number Diff line number Diff line change
Expand Up @@ -236,21 +236,6 @@ public final Object get(int ordinal, DataType dataType) {
}
}

/**
* Marks a given row as "filtered" if one of its attributes is part of a non-nullable column
*
* @return true if a given rowId can be filtered
*/
public boolean shouldSkipRow(int rowId) {
for (int ordinal : nullFilteredColumns) {
if (columns[ordinal].getIsNull(rowId)) {
filteredRows[rowId] = true;
break;
}
}
return filteredRows[rowId];
}

/**
* Returns an iterator over the rows in this batch. This skips rows that are filtered out.
*/
Expand All @@ -262,7 +247,7 @@ public Iterator<Row> rowIterator() {

@Override
public boolean hasNext() {
while (rowId < maxRows && ColumnarBatch.this.filteredRows[rowId] && shouldSkipRow(rowId)) {
while (rowId < maxRows && ColumnarBatch.this.filteredRows[rowId]) {
++rowId;
}
return rowId < maxRows;
Expand Down Expand Up @@ -302,11 +287,23 @@ public void reset() {
}

/**
* Sets the number of rows that are valid.
* Sets the number of rows that are valid. Additionally, marks all rows as "filtered" if one or
* more of their attributes are part of a non-nullable column.
*/
public void setNumRows(int numRows) {
assert(numRows <= this.capacity);
assert (numRows <= this.capacity);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you mean to add a space?

this.numRows = numRows;

for (int ordinal : nullFilteredColumns) {
if (columns[ordinal].numNulls != 0) {
for (int rowId = 0; rowId < numRows; rowId++) {
if (!filteredRows[rowId] && columns[ordinal].getIsNull(rowId)) {
filteredRows[rowId] = true;
++numRowsFiltered;
}
}
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ private[sql] case class DataSourceScan(
// never requires UnsafeRow as input.
override protected def doProduce(ctx: CodegenContext): String = {
val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch"
val columnarBatchRowClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row"
val input = ctx.freshName("input")
val idx = ctx.freshName("batchIdx")
val batch = ctx.freshName("batch")
Expand Down Expand Up @@ -227,12 +226,10 @@ private[sql] case class DataSourceScan(
| private void $scanBatches() throws java.io.IOException {
| while (true) {
| int numRows = $batch.numRows();
| java.util.Iterator<$columnarBatchRowClz> rowIterator = $batch.rowIterator();
| if ($idx == 0) $numOutputRows.add(numRows);
|
| while (!shouldStop() && rowIterator.hasNext()) {
| InternalRow $row = ($columnarBatchRowClz)rowIterator.next();
| $idx++;
| while (!shouldStop() && $idx < numRows) {
| InternalRow $row = $batch.getRow($idx++);
| ${consume(ctx, columns1).trim}
| }
| if (shouldStop()) return;
Expand All @@ -242,7 +239,6 @@ private[sql] case class DataSourceScan(
| break;
| }
| $batch = ($columnarBatchClz)$input.next();
| // $batch.filterNullsInColumn()
| $idx = 0;
| }
| }""".stripMargin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ import org.apache.spark.sql.types.{IntegerType, StringType, StructType}

class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext {

setupTestData()

test("test null filtering") {
println(sql("select count(*) from testData where value is not NULL AND key > 5").collect())
}

test("range/filter should be combined") {
val df = sqlContext.range(10).filter("id = 1").selectExpr("id + 1")
val plan = df.queryExecution.executedPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,112 @@ object ParquetReadBenchmark {
}
}

def stringWithNullsScanBenchmark(values: Int, fractionOfNulls: Double): Unit = {
withTempPath { dir =>
withTempTable("t1", "tempTable") {
sqlContext.range(values).registerTempTable("t1")
sqlContext.sql(s"select IF(rand(1) < $fractionOfNulls, NULL, cast(id as STRING)) as c1, " +
s"IF(rand(2) < $fractionOfNulls, NULL, cast(id as STRING)) as c2 from t1")
.write.parquet(dir.getCanonicalPath)
sqlContext.read.parquet(dir.getCanonicalPath).registerTempTable("tempTable")

val benchmark = new Benchmark("String with Nulls Scan", values)

benchmark.addCase("SQL Parquet Vectorized") { iter =>
sqlContext.sql("select sum(length(c2)) from tempTable where c1 is " +
"not NULL and c2 is not NULL").collect()
}

val files = SpecificParquetRecordReaderBase.listDirectory(dir).toArray
benchmark.addCase("PR Vectorized") { num =>
var sum = 0
files.map(_.asInstanceOf[String]).foreach { p =>
val reader = new UnsafeRowParquetRecordReader
try {
reader.initialize(p, ("c1" :: "c2" :: Nil).asJava)
val batch = reader.resultBatch()
while (reader.nextBatch()) {
val rowIterator = batch.rowIterator()
while (rowIterator.hasNext) {
val row = rowIterator.next()
val value = row.getUTF8String(0)
if (!row.isNullAt(0) && !row.isNullAt(1)) sum += value.numBytes()
}
}
} finally {
reader.close()
}
}
}

benchmark.addCase("PR Vectorized (Null Filtering)") { num =>
var sum = 0L
files.map(_.asInstanceOf[String]).foreach { p =>
val reader = new UnsafeRowParquetRecordReader
try {
reader.initialize(p, ("c1" :: "c2" :: Nil).asJava)
val batch = reader.resultBatch()
batch.filterNullsInColumn(0)
batch.filterNullsInColumn(1)
while (reader.nextBatch()) {
val rowIterator = batch.rowIterator()
while (rowIterator.hasNext) {
sum += rowIterator.next().getUTF8String(0).numBytes()
}
}
} finally {
reader.close()
}
}
}

/*
=======================
Fraction of NULLs: 0
=======================

Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
String with Nulls Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
SQL Parquet Vectorized 1164 / 1333 9.0 111.0 1.0X
PR Vectorized 809 / 882 13.0 77.1 1.4X
PR Vectorized (Null Filtering) 723 / 800 14.5 69.0 1.6X

=======================
Fraction of NULLs: 0.5
=======================

Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
String with Nulls Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
SQL Parquet Vectorized 983 / 1001 10.7 93.8 1.0X
PR Vectorized 699 / 728 15.0 66.7 1.4X
PR Vectorized (Null Filtering) 722 / 746 14.5 68.9 1.4X

=======================
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of commenting it this way, can you put the fraction in the benchmark name?

e.g. String with Nulls Scan (95%)

Fraction of NULLs: 0.95
=======================

Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
String with Nulls Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
SQL Parquet Vectorized 332 / 343 31.6 31.6 1.0X
PR Vectorized 177 / 180 59.1 16.9 1.9X
PR Vectorized (Null Filtering) 168 / 175 62.4 16.0 2.0X
*/

benchmark.run()
}
}
}

def main(args: Array[String]): Unit = {
intScanBenchmark(1024 * 1024 * 15)
intStringScanBenchmark(1024 * 1024 * 10)
stringDictionaryScanBenchmark(1024 * 1024 * 10)
partitionTableScanBenchmark(1024 * 1024 * 15)
for (fractionOfNulls <- List(0.0, 0.50, 0.95)) {
stringWithNullsScanBenchmark(1024 * 1024 * 10, fractionOfNulls)
}
}
}