Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
fb9a42d
add two implementations (sparse and dense) for UnsafeArrayData
kiszk Jun 14, 2016
d931428
fix failures of testsuite
kiszk Jun 15, 2016
9777a2d
fix errors of unit tests
kiszk Jun 15, 2016
000eda4
fix failures of unit tests
kiszk Jun 15, 2016
804f081
make DenseID public
kiszk Jun 23, 2016
e6fb261
Use one implementation approach
kiszk Jun 25, 2016
a313084
fix test failures
kiszk Jun 25, 2016
68d92f7
fix test failures
kiszk Jun 25, 2016
7f2da14
update test suite
kiszk Jun 25, 2016
2f26f6f
fix scala style error
kiszk Jun 25, 2016
ccef63c
revert changes
kiszk Jun 25, 2016
c4f1b5e
addressed comments
kiszk Jun 28, 2016
34a5c6a
add benchmark
kiszk Jun 28, 2016
7a77b20
fix scala style error
kiszk Jun 28, 2016
7b0d4da
addressed comments
kiszk Jul 1, 2016
b4eac29
addressed comments
kiszk Jul 2, 2016
eecf6bd
fix parameters of Platform.OFFSET
kiszk Jul 3, 2016
d88a25a
update benchmark results
kiszk Jul 3, 2016
db15432
add test cases
kiszk Jul 3, 2016
3fa7052
addressed comments
kiszk Jul 4, 2016
4c094c2
addressed comments
kiszk Jul 6, 2016
9887171
update test cases
kiszk Jul 6, 2016
9fe7ad0
address comments
kiszk Jul 7, 2016
e4b4b52
address comments for test cases and benchmark
kiszk Jul 7, 2016
585ca7b
addressed comments
kiszk Jul 8, 2016
9933a06
addressed review comments
kiszk Aug 6, 2016
919e832
fixed test failures
kiszk Aug 7, 2016
0886e3a
update test suites
kiszk Aug 9, 2016
c385bf4
align each of variable length elements to 8 bytes
kiszk Aug 18, 2016
c8813db
fixed test failures
kiszk Aug 20, 2016
aa7cfdb
fixed test failures
kiszk Sep 9, 2016
0b7867b
address review comments
kiszk Sep 20, 2016
ab9a16a
address review comments
kiszk Sep 20, 2016
515701b
address review comments
kiszk Sep 20, 2016
8169abd
change benchmark size
kiszk Sep 26, 2016
e356a79
addressed comments
kiszk Sep 26, 2016
2ef6e3b
update performance results
kiszk Sep 26, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
addressed comments
  • Loading branch information
kiszk committed Sep 26, 2016
commit b4eac29ebc8ea7b2c0e9e5717fbbbf13f653a4fb
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
* In the `values or offset` region, we store the content of elements. For fields that hold
* fixed-length primitive types, such as long, double, or int, we store the value directly
* in the field. For fields with non-primitive or variable-length values, we store a relative
* offset (w.r.t. the base address of the row) that points to the beginning of the variable-length
* field, and length (they are combined into a long).
* offset (w.r.t. the base address of the array) that points to the beginning of
* the variable-length field, and length (they are combined into a long).
*
* Instances of `UnsafeArrayData` act as pointers to row data stored in this format.
*/
Expand Down Expand Up @@ -301,6 +301,7 @@ public boolean equals(Object other) {
}
return false;
}

public void writeToMemory(Object target, long targetOffset) {
Platform.copyMemory(baseObject, baseOffset, target, targetOffset, sizeInBytes);
}
Expand Down Expand Up @@ -387,50 +388,52 @@ public double[] toDoubleArray() {
return values;
}

private static UnsafeArrayData fromPrimitiveArray(Object arr, int length, final int elementSize) {
final int headerSize = calculateHeaderPortionInBytes(length);
if (length > (Integer.MAX_VALUE - headerSize) / elementSize) {
private static UnsafeArrayData fromPrimitiveArray(
Object arr, int offset, int length, int elementSize) {
final long headerSize = calculateHeaderPortionInBytes(length);
final long valueRegionSize = (long)elementSize * (long)length;
final long allocationSize = (headerSize + valueRegionSize + 7) / 8;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's confusing to use size for all the names, how about headerInBytes, valueRegionInBytes, totalSizeInWords

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated these names

if (allocationSize > (long)Integer.MAX_VALUE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, why cast to long here? It's ok to compare long and int directly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed cast

throw new UnsupportedOperationException("Cannot convert this array to unsafe format as " +
"it's too big.");
}

final int valueRegionSize = elementSize * length;
final byte[] data = new byte[valueRegionSize + headerSize];
final long[] data = new long[(int)allocationSize];

Platform.putInt(data, Platform.BYTE_ARRAY_OFFSET, length);
Platform.copyMemory(arr, Platform.INT_ARRAY_OFFSET, data,
Platform.BYTE_ARRAY_OFFSET + headerSize, valueRegionSize);

UnsafeArrayData result = new UnsafeArrayData();
result.pointTo(data, Platform.BYTE_ARRAY_OFFSET, valueRegionSize + headerSize);
result.pointTo(data, Platform.BYTE_ARRAY_OFFSET, (int)allocationSize * 8);
return result;
}

public static UnsafeArrayData fromPrimitiveArray(boolean[] arr) {
return fromPrimitiveArray(arr, arr.length, 1);
return fromPrimitiveArray(arr, Platform.BYTE_ARRAY_OFFSET, arr.length, 1);
}

public static UnsafeArrayData fromPrimitiveArray(byte[] arr) {
return fromPrimitiveArray(arr, arr.length, 1);
return fromPrimitiveArray(arr, Platform.BYTE_ARRAY_OFFSET, arr.length, 1);
}

public static UnsafeArrayData fromPrimitiveArray(short[] arr) {
return fromPrimitiveArray(arr, arr.length, 2);
return fromPrimitiveArray(arr, Platform.SHORT_ARRAY_OFFSET, arr.length, 2);
}

public static UnsafeArrayData fromPrimitiveArray(int[] arr) {
return fromPrimitiveArray(arr, arr.length, 4);
return fromPrimitiveArray(arr, Platform.INT_ARRAY_OFFSET, arr.length, 4);
}

public static UnsafeArrayData fromPrimitiveArray(long[] arr) {
return fromPrimitiveArray(arr, arr.length, 8);
return fromPrimitiveArray(arr, Platform.LONG_ARRAY_OFFSET, arr.length, 8);
}

public static UnsafeArrayData fromPrimitiveArray(float[] arr) {
return fromPrimitiveArray(arr, arr.length, 4);
return fromPrimitiveArray(arr, Platform.FLOAT_ARRAY_OFFSET, arr.length, 4);
}

public static UnsafeArrayData fromPrimitiveArray(double[] arr) {
return fromPrimitiveArray(arr, arr.length, 8);
return fromPrimitiveArray(arr, Platform.DOUBLE_ARRAY_OFFSET, arr.length, 8);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ private void assertIndexIsValid(int index) {
}

public void initialize(BufferHolder holder, int numElements, int fixedElementSize) {
// We need 4 bytes to store numElements in header
this.numElements = numElements;
this.headerInBytes = calculateHeaderPortionInBytes(numElements);

Expand All @@ -60,14 +61,14 @@ public void initialize(BufferHolder holder, int numElements, int fixedElementSiz

// Initialize information in header
Platform.putInt(holder.buffer, startingOffset, numElements);
Arrays.fill(holder.buffer, startingOffset + 4 - Platform.BYTE_ARRAY_OFFSET,
startingOffset + headerInBytes - Platform.BYTE_ARRAY_OFFSET, (byte)0);

for (int i = 4; i < headerInBytes; i += 8) {
Platform.putLong(holder.buffer, startingOffset + i, 0L);
}
holder.cursor += (headerInBytes + fixedElementSize * numElements);
}

private long getElementOffset(int ordinal, int scale) {
return startingOffset + headerInBytes + ordinal * scale;
private long getElementOffset(int ordinal, int elementSize) {
return startingOffset + headerInBytes + ordinal * elementSize;
}

public void setOffsetAndSize(int ordinal, long currentCursor, long size) {
Expand All @@ -77,10 +78,6 @@ public void setOffsetAndSize(int ordinal, long currentCursor, long size) {
write(ordinal, offsetAndSize);
}

public void setNullAt(int ordinal) {
throw new UnsupportedOperationException("setNullAt() is not supported");
}

private void setNullBit(int ordinal) {
assertIndexIsValid(ordinal);
BitSetMethods.set(holder.buffer, startingOffset + 4, ordinal);
Expand Down Expand Up @@ -187,7 +184,7 @@ public void write(int ordinal, Decimal input, int precision, int scale) {
// Write the bytes to the variable length portion.
Platform.copyMemory(
bytes, Platform.BYTE_ARRAY_OFFSET, holder.buffer, holder.cursor, bytes.length);
write(ordinal, ((long)(holder.cursor - startingOffset) << 32) | ((long) bytes.length));
setOffsetAndSize(ordinal, holder.cursor, bytes.length);
holder.cursor += bytes.length;
}
} else {
Expand All @@ -204,7 +201,7 @@ public void write(int ordinal, UTF8String input) {
// Write the bytes to the variable length portion.
input.writeToMemory(holder.buffer, holder.cursor);

write(ordinal, ((long)(holder.cursor - startingOffset) << 32) | ((long) numBytes));
setOffsetAndSize(ordinal, holder.cursor, numBytes);

// move the cursor forward.
holder.cursor += numBytes;
Expand All @@ -218,7 +215,7 @@ public void write(int ordinal, byte[] input) {
Platform.copyMemory(
input, Platform.BYTE_ARRAY_OFFSET, holder.buffer, holder.cursor, input.length);

write(ordinal, ((long)(holder.cursor - startingOffset) << 32) | ((long) input.length));
setOffsetAndSize(ordinal, holder.cursor, input.length);

// move the cursor forward.
holder.cursor += input.length;
Expand All @@ -232,7 +229,7 @@ public void write(int ordinal, CalendarInterval input) {
Platform.putLong(holder.buffer, holder.cursor, input.months);
Platform.putLong(holder.buffer, holder.cursor + 8, input.microseconds);

write(ordinal, ((long)(holder.cursor - startingOffset) << 32) | ((long) 16));
setOffsetAndSize(ordinal, holder.cursor, 16);

// move the cursor forward.
holder.cursor += 16;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,10 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro

val jt = ctx.javaType(et)

val fixedElementSize = et match {
val elementOrOffsetSize = et match {
case t: DecimalType if t.precision <= Decimal.MAX_LONG_DIGITS => 8
case _ if ctx.isPrimitiveType(jt) => et.defaultSize
case _ => 8
case _ => 8 // we need 8 bytes to store offset and length for variable-length types]
}

val tmpCursor = ctx.freshName("tmpCursor")
Expand Down Expand Up @@ -232,7 +232,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
${writeUnsafeData(ctx, s"((UnsafeArrayData) $input)", bufferHolder)}
} else {
final int $numElements = $input.numElements();
$arrayWriter.initialize($bufferHolder, $numElements, $fixedElementSize);
$arrayWriter.initialize($bufferHolder, $numElements, $elementOrOffsetSize);

for (int $index = 0; $index < $numElements; $index++) {
if ($input.isNullAt($index)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.benchmark

import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeArrayWriter}
import org.apache.spark.unsafe.Platform
Expand All @@ -32,11 +33,6 @@ import org.apache.spark.util.Benchmark
*/
class UnsafeArrayDataBenchmark extends BenchmarkBase {

new SparkConf()
.setMaster("local[1]")
.setAppName("microbenchmark")
.set("spark.driver.memory", "3g")

def calculateHeaderPortionInBytes(count: Int) : Int = {
// Use this assignment for SPARK-15962
// val size = 4 + 4 * count
Expand All @@ -47,12 +43,11 @@ class UnsafeArrayDataBenchmark extends BenchmarkBase {
def readUnsafeArray(iters: Int): Unit = {
val count = 1024 * 1024 * 16

val intUnsafeArray = new UnsafeArrayData
var intResult: Int = 0
val intSize = calculateHeaderPortionInBytes(count) + 4 * count
val intBuffer = new Array[Byte](intSize)
Platform.putInt(intBuffer, Platform.BYTE_ARRAY_OFFSET, count)
intUnsafeArray.pointTo(intBuffer, Platform.BYTE_ARRAY_OFFSET, intSize)
val intBuffer = new Array[Int](count)
val intEncoder = ExpressionEncoder[Array[Int]].resolveAndBind()
val intInternalRow = intEncoder.toRow(intBuffer)
val intUnsafeArray = intInternalRow.getArray(0)
val readIntArray = { i: Int =>
var n = 0
while (n < iters) {
Expand All @@ -68,12 +63,11 @@ class UnsafeArrayDataBenchmark extends BenchmarkBase {
}
}

val doubleUnsafeArray = new UnsafeArrayData
var doubleResult: Double = 0
val doubleSize = calculateHeaderPortionInBytes(count) + 8 * count
val doubleBuffer = new Array[Byte](doubleSize)
Platform.putInt(doubleBuffer, Platform.BYTE_ARRAY_OFFSET, count)
doubleUnsafeArray.pointTo(doubleBuffer, Platform.BYTE_ARRAY_OFFSET, doubleSize)
val doubleBuffer = new Array[Double](count)
val doubleEncoder = ExpressionEncoder[Array[Double]].resolveAndBind()
val doubleInternalRow = doubleEncoder.toRow(doubleBuffer)
val doubleUnsafeArray = doubleInternalRow.getArray(0)
val readDoubleArray = { i: Int =>
var n = 0
while (n < iters) {
Expand All @@ -94,16 +88,6 @@ class UnsafeArrayDataBenchmark extends BenchmarkBase {
benchmark.addCase("Double")(readDoubleArray)
benchmark.run
/*
Without SPARK-15962
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.0.4-301.fc22.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)
Read UnsafeArrayData: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Int 370 / 471 454.0 2.2 1.0X
Double 351 / 466 477.5 2.1 1.1X
*/
/*
With SPARK-15962
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.0.4-301.fc22.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)
Read UnsafeArrayData: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
Expand Down Expand Up @@ -161,16 +145,6 @@ class UnsafeArrayDataBenchmark extends BenchmarkBase {
benchmark.addCase("Double")(writeDoubleArray)
benchmark.run
/*
Without SPARK-15962
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.0.4-301.fc22.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)
Write UnsafeArrayData: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Int 337 / 407 498.0 2.0 1.0X
Double 458 / 496 366.2 2.7 0.7X
*/
/*
With SPARK-15962
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.0.4-301.fc22.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)
Write UnsafeArrayData: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
Expand Down Expand Up @@ -216,16 +190,6 @@ class UnsafeArrayDataBenchmark extends BenchmarkBase {
benchmark.addCase("Double")(readDoubleArray)
benchmark.run
/*
Without SPARK-15962
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.0.4-301.fc22.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)
Get primitive array from UnsafeArrayData: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Int 218 / 256 288.1 3.5 1.0X
Double 318 / 539 198.0 5.1 0.7X
*/
/*
With SPARK-15962
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.0.4-301.fc22.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)
Get primitive array from UnsafeArrayData: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
Expand Down Expand Up @@ -263,16 +227,6 @@ class UnsafeArrayDataBenchmark extends BenchmarkBase {
benchmark.addCase("Double")(createDoubleArray)
benchmark.run
/*
Without SPARK-15962
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.0.4-301.fc22.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)
Create UnsafeArrayData from primitive array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Int 343 / 437 183.6 5.4 1.0X
Double 322 / 505 195.6 5.1 1.1X
*/
/*
With SPARK-15962
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.0.4-301.fc22.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)
Create UnsafeArrayData from primitive array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
Expand Down