Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
fb9a42d
add two implementations (sparse and dense) for UnsafeArrayData
kiszk Jun 14, 2016
d931428
fix failures of testsuite
kiszk Jun 15, 2016
9777a2d
fix errors of unit tests
kiszk Jun 15, 2016
000eda4
fix failures of unit tests
kiszk Jun 15, 2016
804f081
make DenseID public
kiszk Jun 23, 2016
e6fb261
Use one implementation approach
kiszk Jun 25, 2016
a313084
fix test failures
kiszk Jun 25, 2016
68d92f7
fix test failures
kiszk Jun 25, 2016
7f2da14
update test suite
kiszk Jun 25, 2016
2f26f6f
fix scala style error
kiszk Jun 25, 2016
ccef63c
revert changes
kiszk Jun 25, 2016
c4f1b5e
addressed comments
kiszk Jun 28, 2016
34a5c6a
add benchmark
kiszk Jun 28, 2016
7a77b20
fix scala style error
kiszk Jun 28, 2016
7b0d4da
addressed comments
kiszk Jul 1, 2016
b4eac29
addressed comments
kiszk Jul 2, 2016
eecf6bd
fix parameters of Platform.OFFSET
kiszk Jul 3, 2016
d88a25a
update benchmark results
kiszk Jul 3, 2016
db15432
add test cases
kiszk Jul 3, 2016
3fa7052
addressed comments
kiszk Jul 4, 2016
4c094c2
addressed comments
kiszk Jul 6, 2016
9887171
update test cases
kiszk Jul 6, 2016
9fe7ad0
address comments
kiszk Jul 7, 2016
e4b4b52
address comments for test cases and benchmark
kiszk Jul 7, 2016
585ca7b
addressed comments
kiszk Jul 8, 2016
9933a06
addressed review comments
kiszk Aug 6, 2016
919e832
fixed test failures
kiszk Aug 7, 2016
0886e3a
update test suites
kiszk Aug 9, 2016
c385bf4
align each of variable length elements to 8 bytes
kiszk Aug 18, 2016
c8813db
fixed test failures
kiszk Aug 20, 2016
aa7cfdb
fixed test failures
kiszk Sep 9, 2016
0b7867b
address review comments
kiszk Sep 20, 2016
ab9a16a
address review comments
kiszk Sep 20, 2016
515701b
address review comments
kiszk Sep 20, 2016
8169abd
change benchmark size
kiszk Sep 26, 2016
e356a79
addressed comments
kiszk Sep 26, 2016
2ef6e3b
update performance results
kiszk Sep 26, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
align each of variable length elements to 8 bytes
  • Loading branch information
kiszk committed Sep 26, 2016
commit c385bf485af2ed33465aae906abd8246b512a5e2
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,20 @@
/**
* An Unsafe implementation of Array which is backed by raw memory instead of Java objects.
*
* Each array has four parts: [numElements][null bits][values or offset][variable length portion]
* Each array has four parts:
* [numElements][null bits][values or offset&length][variable length portion]
*
* The `numElements` is 8 bytes storing the number of elements of this array.
*
* In the `null bits` region, we store 1 bit per element, represents whether a element has null
* Its total size is ceil(numElements / 8) bytes, and it is aligned to 8-byte word boundaries.
* Its total size is ceil(numElements / 8) bytes, and it is aligned to 8-byte boundaries.
*
* In the `values or offset` region, we store the content of elements. For fields that hold
* In the `values or offset&length` region, we store the content of elements. For fields that hold
* fixed-length primitive types, such as long, double, or int, we store the value directly
* in the field. For fields with non-primitive or variable-length values, we store a relative
* offset (w.r.t. the base address of the array) that points to the beginning of
* the variable-length field into int. It can only be calculated by knowing the total bytes of
* the array. Its length can be got by subtracting 2 adjacent offsets,
* the variable-length field and length (they are combined into a long). For variable length
* portion, each is aligned to 8-byte boundaries.
*
* Instances of `UnsafeArrayData` act as pointers to row data stored in this format.
*/
Expand Down Expand Up @@ -116,14 +117,6 @@ public void pointTo(Object baseObject, long baseOffset, int sizeInBytes) {
this.elementOffset = baseOffset + calculateHeaderPortionInBytes(this.numElements);
}

private int getSize(int ordinal) {
if (ordinal != numElements - 1) {
return getInt(ordinal + 1) - getInt(ordinal);
} else {
return sizeInBytes - getInt(ordinal);
}
}

@Override
public boolean isNullAt(int ordinal) {
assertIndexIsValid(ordinal);
Expand Down Expand Up @@ -232,16 +225,18 @@ public Decimal getDecimal(int ordinal, int precision, int scale) {
@Override
public UTF8String getUTF8String(int ordinal) {
if (isNullAt(ordinal)) return null;
final int offset = getInt(ordinal);
final int size = getSize(ordinal);
final long offsetAndSize = getLong(ordinal);
final int offset = (int) (offsetAndSize >> 32);
final int size = (int) offsetAndSize;
return UTF8String.fromAddress(baseObject, baseOffset + offset, size);
}

@Override
public byte[] getBinary(int ordinal) {
if (isNullAt(ordinal)) return null;
final int offset = getInt(ordinal);
final int size = getSize(ordinal);
final long offsetAndSize = getLong(ordinal);
final int offset = (int) (offsetAndSize >> 32);
final int size = (int) offsetAndSize;
final byte[] bytes = new byte[size];
Platform.copyMemory(baseObject, baseOffset + offset, bytes, Platform.BYTE_ARRAY_OFFSET, size);
return bytes;
Expand All @@ -250,7 +245,8 @@ public byte[] getBinary(int ordinal) {
@Override
public CalendarInterval getInterval(int ordinal) {
if (isNullAt(ordinal)) return null;
final int offset = getInt(ordinal);
final long offsetAndSize = getLong(ordinal);
final int offset = (int) (offsetAndSize >> 32);
final int months = (int) Platform.getLong(baseObject, baseOffset + offset);
final long microseconds = Platform.getLong(baseObject, baseOffset + offset + 8);
return new CalendarInterval(months, microseconds);
Expand All @@ -259,8 +255,9 @@ public CalendarInterval getInterval(int ordinal) {
@Override
public UnsafeRow getStruct(int ordinal, int numFields) {
if (isNullAt(ordinal)) return null;
final int offset = getInt(ordinal);
final int size = getSize(ordinal);
final long offsetAndSize = getLong(ordinal);
final int offset = (int) (offsetAndSize >> 32);
final int size = (int) offsetAndSize;
final UnsafeRow row = new UnsafeRow(numFields);
row.pointTo(baseObject, baseOffset + offset, size);
return row;
Expand All @@ -269,8 +266,9 @@ public UnsafeRow getStruct(int ordinal, int numFields) {
@Override
public UnsafeArrayData getArray(int ordinal) {
if (isNullAt(ordinal)) return null;
final int offset = getInt(ordinal);
final int size = getSize(ordinal);
final long offsetAndSize = getLong(ordinal);
final int offset = (int) (offsetAndSize >> 32);
final int size = (int) offsetAndSize;
final UnsafeArrayData array = new UnsafeArrayData();
array.pointTo(baseObject, baseOffset + offset, size);
return array;
Expand All @@ -279,8 +277,9 @@ public UnsafeArrayData getArray(int ordinal) {
@Override
public UnsafeMapData getMap(int ordinal) {
if (isNullAt(ordinal)) return null;
final int offset = getInt(ordinal);
final int size = getSize(ordinal);
final long offsetAndSize = getLong(ordinal);
final int offset = (int) (offsetAndSize >> 32);
final int size = (int) offsetAndSize;
final UnsafeMapData map = new UnsafeMapData();
map.pointTo(baseObject, baseOffset + offset, size);
return map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,41 @@ public void initialize(BufferHolder holder, int numElements, int elementSize) {
this.startingOffset = holder.cursor;

// Grows the global buffer ahead for header and fixed size data.
holder.grow(headerInBytes + elementSize * numElements);
int fixedPartLength = ((elementSize * numElements + 7) / 8) * 8;
holder.grow(headerInBytes + fixedPartLength);

// Write numElements and clear out null bits to header
Platform.putLong(holder.buffer, startingOffset, numElements);
for (int i = 8; i < headerInBytes; i += 8) {
Platform.putLong(holder.buffer, startingOffset + i, 0L);
}
holder.cursor += (headerInBytes + elementSize * numElements);
holder.cursor += (headerInBytes + fixedPartLength);
}

private long getElementOffset(int ordinal, int elementSize) {
return startingOffset + headerInBytes + ordinal * elementSize;
}

public void setOffset(int ordinal) {
write(ordinal, holder.cursor - startingOffset);
public void setOffsetAndSize(int ordinal, long currentCursor, long size) {
final long relativeOffset = currentCursor - startingOffset;
final long offsetAndSize = (relativeOffset << 32) | size;

write(ordinal, offsetAndSize);
}

// Do word alignment for this row and grow the row buffer if needed.
public void alignToEightBytes(int numBytes) {
final int remainder = numBytes & 0x07;

if (remainder > 0) {
final int paddingBytes = 8 - remainder;
holder.grow(paddingBytes);

for (int i = 0; i < paddingBytes; i++) {
Platform.putByte(holder.buffer, holder.cursor, (byte) 0);
holder.cursor++;
}
}
}

private void setNullBit(int ordinal) {
Expand Down Expand Up @@ -182,10 +201,10 @@ public void write(int ordinal, Decimal input, int precision, int scale) {
// Write the bytes to the variable length portion.
Platform.copyMemory(
bytes, Platform.BYTE_ARRAY_OFFSET, holder.buffer, holder.cursor, bytes.length);
setOffset(ordinal);
write(ordinal, ((long)(holder.cursor - startingOffset) << 32) | ((long) bytes.length));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should abstract in into a method, like UnsafeRowWriter.setOffsetAndSize

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, done


// move the cursor forward.
holder.cursor += bytes.length;
// move the cursor forward with 8-bytes boundary
holder.cursor += ((bytes.length + 7) / 8) * 8;
}
} else {
setNull(ordinal);
Expand All @@ -194,31 +213,34 @@ public void write(int ordinal, Decimal input, int precision, int scale) {

public void write(int ordinal, UTF8String input) {
final int numBytes = input.numBytes();
final int bufferLength = ((numBytes + 7) / 8) * 8; // 8-bytes boundary

// grow the global buffer before writing data.
holder.grow(numBytes);
holder.grow(bufferLength);

// Write the bytes to the variable length portion.
input.writeToMemory(holder.buffer, holder.cursor);

setOffset(ordinal);
write(ordinal, ((long)(holder.cursor - startingOffset) << 32) | ((long) numBytes));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setOffsetAndSize? it's not the only place, please fix all of them


// move the cursor forward.
holder.cursor += numBytes;
holder.cursor += bufferLength;
}

public void write(int ordinal, byte[] input) {
final int bufferLength = ((input.length + 7) / 8) * 8; // 8-bytes boundary

// grow the global buffer before writing data.
holder.grow(input.length);
holder.grow(bufferLength);

// Write the bytes to the variable length portion.
Platform.copyMemory(
input, Platform.BYTE_ARRAY_OFFSET, holder.buffer, holder.cursor, input.length);

setOffset(ordinal);
write(ordinal, ((long)(holder.cursor - startingOffset) << 32) | ((long) input.length));

// move the cursor forward.
holder.cursor += input.length;
holder.cursor += bufferLength;
}

public void write(int ordinal, CalendarInterval input) {
Expand All @@ -229,7 +251,7 @@ public void write(int ordinal, CalendarInterval input) {
Platform.putLong(holder.buffer, holder.cursor, input.months);
Platform.putLong(holder.buffer, holder.cursor + 8, input.microseconds);

setOffset(ordinal);
write(ordinal, ((long)(holder.cursor - startingOffset) << 32) | ((long) 16));

// move the cursor forward.
holder.cursor += 16;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,26 +192,33 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
val elementOrOffsetSize = et match {
case t: DecimalType if t.precision <= Decimal.MAX_LONG_DIGITS => 8
case _ if ctx.isPrimitiveType(jt) => et.defaultSize
case _ => 4 // we need 4 bytes to store offset
case _ => 8 // we need 8 bytes to store offset and length
}

val tmpCursor = ctx.freshName("tmpCursor")
val writeElement = et match {
case t: StructType =>
s"""
$arrayWriter.setOffset($index);
final int $tmpCursor = $bufferHolder.cursor;
${writeStructToBuffer(ctx, element, t.map(_.dataType), bufferHolder)}
$arrayWriter.setOffsetAndSize($index, $tmpCursor, $bufferHolder.cursor - $tmpCursor);
$arrayWriter.alignToEightBytes($bufferHolder.cursor - $tmpCursor);
"""

case a @ ArrayType(et, _) =>
s"""
$arrayWriter.setOffset($index);
final int $tmpCursor = $bufferHolder.cursor;
${writeArrayToBuffer(ctx, element, et, bufferHolder)}
$arrayWriter.setOffsetAndSize($index, $tmpCursor, $bufferHolder.cursor - $tmpCursor);
$arrayWriter.alignToEightBytes($bufferHolder.cursor - $tmpCursor);
"""

case m @ MapType(kt, vt, _) =>
s"""
$arrayWriter.setOffset($index);
final int $tmpCursor = $bufferHolder.cursor;
${writeMapToBuffer(ctx, element, kt, vt, bufferHolder)}
$arrayWriter.setOffsetAndSize($index, $tmpCursor, $bufferHolder.cursor - $tmpCursor);
$arrayWriter.alignToEightBytes($bufferHolder.cursor - $tmpCursor);
"""

case t: DecimalType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ private[columnar] case class ARRAY(dataType: ArrayType)

override def actualSize(row: InternalRow, ordinal: Int): Int = {
val unsafeArray = getField(row, ordinal)
4 + unsafeArray.getSizeInBytes
8 + unsafeArray.getSizeInBytes
}

override def append(value: UnsafeArrayData, buffer: ByteBuffer): Unit = {
Expand Down Expand Up @@ -640,7 +640,7 @@ private[columnar] case class MAP(dataType: MapType)

override def actualSize(row: InternalRow, ordinal: Int): Int = {
val unsafeMap = getField(row, ordinal)
4 + unsafeMap.getSizeInBytes
8 + unsafeMap.getSizeInBytes
}

override def append(value: UnsafeMapData, buffer: ByteBuffer): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
checkActualSize(BINARY, Array.fill[Byte](4)(0.toByte), 4 + 4)
checkActualSize(COMPACT_DECIMAL(15, 10), Decimal(0, 15, 10), 8)
checkActualSize(LARGE_DECIMAL(20, 10), Decimal(0, 20, 10), 5)
checkActualSize(ARRAY_TYPE, Array[Any](1), 8 + 8 + 8)
checkActualSize(MAP_TYPE, Map(1 -> "a"), 8 + (8 + 8 + 8) + (8 + 8 + 4 + 1))
checkActualSize(ARRAY_TYPE, Array[Any](1), 8 + 8 + 8 + 8)
checkActualSize(MAP_TYPE, Map(1 -> "a"), 8 + (8 + 8 + 8 + 8) + (8 + 8 + 8 + 8))
checkActualSize(STRUCT_TYPE, Row("hello"), 28)
}

Expand Down