Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
480a74a
Initial import of code from Databricks unsafe utils repo.
JoshRosen Apr 17, 2015
ab68e08
Begin merging the UTF8String implementations.
JoshRosen Apr 18, 2015
f03e9c1
Play around with Unsafe implementations of more string methods.
JoshRosen Apr 18, 2015
5d55cef
Add skeleton for Row implementation.
JoshRosen Apr 18, 2015
8a8f9df
Add skeleton for GeneratedAggregate integration.
JoshRosen Apr 18, 2015
1ff814d
Add reminder to free memory on iterator completion
JoshRosen Apr 18, 2015
53ba9b7
Start prototyping Java Row -> UnsafeRow converters
JoshRosen Apr 19, 2015
fc4c3a8
Sketch how the converters will be used in UnsafeGeneratedAggregate
JoshRosen Apr 19, 2015
1a483c5
First version that passes some aggregation tests:
JoshRosen Apr 19, 2015
079f1bf
Some clarification of the BytesToBytesMap.lookup() / set() contract.
JoshRosen Apr 19, 2015
f764d13
Simplify address + length calculation in Location.
JoshRosen Apr 19, 2015
c754ae1
Now that the store*() contract has been stregthened, we can remove an…
JoshRosen Apr 19, 2015
ae39694
Add finalizer as "cleanup method of last resort"
JoshRosen Apr 19, 2015
c7f0b56
Reuse UnsafeRow pointer in UnsafeRowConverter
JoshRosen Apr 20, 2015
62ab054
Optimize for fact that get() is only called on String columns.
JoshRosen Apr 20, 2015
c55bf66
Free buffer once iterator has been fully consumed.
JoshRosen Apr 20, 2015
738fa33
Add feature flag to guard UnsafeGeneratedAggregate
JoshRosen Apr 20, 2015
c1b3813
Fix bug in UnsafeMemoryAllocator.free():
JoshRosen Apr 20, 2015
7df6008
Optimizations related to zeroing out memory:
JoshRosen Apr 21, 2015
58ac393
Use UNSAFE allocator in GeneratedAggregate (TODO: make this configura…
JoshRosen Apr 21, 2015
d2bb986
Update to implement new Row methods added upstream
JoshRosen Apr 22, 2015
b3eaccd
Extract aggregation map into its own class.
JoshRosen Apr 22, 2015
bade966
Comment update (bumping to refresh GitHub cache...)
JoshRosen Apr 22, 2015
d85eeff
Add basic sanity test for UnsafeFixedWidthAggregationMap
JoshRosen Apr 22, 2015
1f4b716
Merge Unsafe code into the regular GeneratedAggregate, guarded by a
JoshRosen Apr 22, 2015
92d5a06
Address a number of minor code review comments.
JoshRosen Apr 23, 2015
628f936
Use ints intead of longs for indexing.
JoshRosen Apr 23, 2015
23a440a
Bump up default hash map size
JoshRosen Apr 23, 2015
765243d
Enable optional performance metrics for hash map.
JoshRosen Apr 23, 2015
b26f1d3
Fix bug in murmur hash implementation.
JoshRosen Apr 23, 2015
49aed30
More long -> int conversion.
JoshRosen Apr 23, 2015
29a7575
Remove debug logging
JoshRosen Apr 24, 2015
ef6b3d3
Fix a bunch of FindBugs and IntelliJ inspections
JoshRosen Apr 24, 2015
06e929d
More warning cleanup
JoshRosen Apr 24, 2015
854201a
Import and comment cleanup
JoshRosen Apr 24, 2015
f3dcbfe
More mod replacement
JoshRosen Apr 24, 2015
afe8dca
Some Javadoc cleanup
JoshRosen Apr 24, 2015
a95291e
Cleanups to string handling code
JoshRosen Apr 24, 2015
31eaabc
Lots of TODO and doc cleanup.
JoshRosen Apr 24, 2015
6ffdaa1
Null handling improvements in UnsafeRow.
JoshRosen Apr 24, 2015
9c19fc0
Add configuration options for heap vs. offheap
JoshRosen Apr 24, 2015
cde4132
Add missing pom.xml
JoshRosen Apr 26, 2015
0925847
Disable MiMa checks for new unsafe module
JoshRosen Apr 27, 2015
a8e4a3f
Introduce MemoryManager interface; add to SparkEnv.
JoshRosen Apr 28, 2015
b45f070
Don't redundantly store the offset from key to value, since we can co…
JoshRosen Apr 28, 2015
162caf7
Fix test compilation
JoshRosen Apr 28, 2015
3ca84b2
Only zero the used portion of groupingKeyConversionScratchSpace
JoshRosen Apr 28, 2015
529e571
Measure timeSpentResizing in nanoseconds instead of milliseconds.
JoshRosen Apr 28, 2015
ce3c565
More comments, formatting, and code cleanup.
JoshRosen Apr 28, 2015
78a5b84
Add logging to MemoryManager
JoshRosen Apr 28, 2015
a19e066
Rename unsafe Java test suites to match Scala test naming convention.
JoshRosen Apr 28, 2015
de5e001
Fix debug vs. trace in logging message.
JoshRosen Apr 28, 2015
6e4b192
Remove an unused method from ByteArrayMethods.
JoshRosen Apr 28, 2015
70a39e4
Split MemoryManager into ExecutorMemoryManager and TaskMemoryManager:
JoshRosen Apr 28, 2015
50e9671
Throw memory leak warning even in case of error; add warning about co…
JoshRosen Apr 29, 2015
017b2dc
Remove BytesToBytesMap.finalize()
JoshRosen Apr 29, 2015
1bc36cc
Refactor UnsafeRowConverter to avoid unnecessary boxing.
JoshRosen Apr 29, 2015
81f34f8
Follow 'place children last' convention for GeneratedAggregate
JoshRosen Apr 29, 2015
eeee512
Add converters for Null, Boolean, Byte, and Short columns.
JoshRosen Apr 29, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Lots of TODO and doc cleanup.
  • Loading branch information
JoshRosen committed Apr 24, 2015
commit 31eaabcddcc5e3dda88a70645a28d476f853849f
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@
import org.apache.spark.sql.types.UTF8String;
import org.apache.spark.unsafe.PlatformDependent;
import org.apache.spark.unsafe.bitset.BitSetMethods;
import org.apache.spark.unsafe.string.UTF8StringMethods;

// TODO: pick a better name for this class, since this is potentially confusing.
// Maybe call it UnsafeMutableRow?

/**
* An Unsafe implementation of Row which is backed by raw memory instead of Java objects.
Expand All @@ -58,6 +54,7 @@ public final class UnsafeRow implements MutableRow {

private Object baseObject;
private long baseOffset;
/** The number of fields in this row, used for calculating the bitset width (and in assertions) */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: if we have comments, let's add a blank line

private int numFields;
/** The width of the null tracking bit set, in bytes */
private int bitSetWidthInBytes;
Expand All @@ -74,7 +71,7 @@ private long getFieldOffset(int ordinal) {
}

public static int calculateBitSetWidthInBytes(int numFields) {
return ((numFields / 64) + ((numFields % 64 == 0 ? 0 : 1))) * 8;
return ((numFields / 64) + (numFields % 64 == 0 ? 0 : 1)) * 8;
}

/**
Expand Down Expand Up @@ -211,7 +208,6 @@ public void setFloat(int ordinal, float value) {

@Override
public void setString(int ordinal, String value) {
// TODO: need to ensure that array has been suitably sized.
throw new UnsupportedOperationException();
}

Expand Down Expand Up @@ -240,23 +236,14 @@ public Object get(int i) {
assertIndexIsValid(i);
assert (schema != null) : "Schema must be defined when calling generic get() method";
final DataType dataType = schema.fields()[i].dataType();
// The ordering of these `if` statements is intentional: internally, it looks like this only
// gets invoked in JoinedRow when trying to access UTF8String columns. It's extremely unlikely
// that internal code will call this on non-string-typed columns, but we support that anyways
// just for the sake of completeness.
// TODO: complete this for the remaining types?
// UnsafeRow is only designed to be invoked by internal code, which only invokes this generic
// get() method when trying to access UTF8String-typed columns. If we refactor the codebase to
// separate the internal and external row interfaces, then internal code can fetch strings via
// a new getUTF8String() method and we'll be able to remove this method.
if (isNullAt(i)) {
return null;
} else if (dataType == StringType) {
return getUTF8String(i);
} else if (dataType == IntegerType) {
return getInt(i);
} else if (dataType == LongType) {
return getLong(i);
} else if (dataType == DoubleType) {
return getDouble(i);
} else if (dataType == FloatType) {
return getFloat(i);
} else {
throw new UnsupportedOperationException();
}
Expand Down Expand Up @@ -319,7 +306,7 @@ public UTF8String getUTF8String(int i) {
final byte[] strBytes = new byte[stringSizeInBytes];
PlatformDependent.copyMemory(
baseObject,
baseOffset + offsetToStringSize + 8, // The +8 is to skip past the size to get the data,
baseOffset + offsetToStringSize + 8, // The `+ 8` is to skip past the size to get the data
strBytes,
PlatformDependent.BYTE_ARRAY_OFFSET,
stringSizeInBytes
Expand All @@ -335,31 +322,26 @@ public String getString(int i) {

@Override
public BigDecimal getDecimal(int i) {
// TODO
throw new UnsupportedOperationException();
}

@Override
public Date getDate(int i) {
// TODO
throw new UnsupportedOperationException();
}

@Override
public <T> Seq<T> getSeq(int i) {
// TODO
throw new UnsupportedOperationException();
}

@Override
public <T> List<T> getList(int i) {
// TODO
throw new UnsupportedOperationException();
}

@Override
public <K, V> Map<K, V> getMap(int i) {
// TODO
throw new UnsupportedOperationException();
}

Expand All @@ -370,19 +352,16 @@ public <T> scala.collection.immutable.Map<String, T> getValuesMap(Seq<String> fi

@Override
public <K, V> java.util.Map<K, V> getJavaMap(int i) {
// TODO
throw new UnsupportedOperationException();
}

@Override
public Row getStruct(int i) {
// TODO
throw new UnsupportedOperationException();
}

@Override
public <T> T getAs(int i) {
// TODO
throw new UnsupportedOperationException();
}

Expand All @@ -398,7 +377,6 @@ public int fieldIndex(String name) {

@Override
public Row copy() {
// TODO
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,88 @@ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.PlatformDependent
import org.apache.spark.unsafe.array.ByteArrayMethods

/** Write a column into an UnsafeRow */
/**
* Converts Rows into UnsafeRow format. This class is NOT thread-safe.
*
* @param fieldTypes the data types of the row's columns.
*/
class UnsafeRowConverter(fieldTypes: Array[DataType]) {

def this(schema: StructType) {
this(schema.fields.map(_.dataType))
}

/** Re-used pointer to the unsafe row being written */
private[this] val unsafeRow = new UnsafeRow()

/** Functions for encoding each column */
private[this] val writers: Array[UnsafeColumnWriter[Any]] = {
fieldTypes.map(t => UnsafeColumnWriter.forType(t).asInstanceOf[UnsafeColumnWriter[Any]])
}

/** The size, in bytes, of the fixed-length portion of the row, including the null bitmap */
private[this] val fixedLengthSize: Int =
(8 * fieldTypes.length) + UnsafeRow.calculateBitSetWidthInBytes(fieldTypes.length)

/**
* Compute the amount of space, in bytes, required to encode the given row.
*/
def getSizeRequirement(row: Row): Int = {
var fieldNumber = 0
var variableLengthFieldSize: Int = 0
while (fieldNumber < writers.length) {
if (!row.isNullAt(fieldNumber)) {
variableLengthFieldSize += writers(fieldNumber).getSize(row(fieldNumber))
}
fieldNumber += 1
}
fixedLengthSize + variableLengthFieldSize
}

/**
* Convert the given row into UnsafeRow format.
*
* @param row the row to convert
* @param baseObject the base object of the destination address
* @param baseOffset the base offset of the destination address
* @return the number of bytes written. This should be equal to `getSizeRequirement(row)`.
*/
def writeRow(row: Row, baseObject: Object, baseOffset: Long): Long = {
unsafeRow.pointTo(baseObject, baseOffset, writers.length, null)
var fieldNumber = 0
var appendCursor: Int = fixedLengthSize
while (fieldNumber < writers.length) {
if (row.isNullAt(fieldNumber)) {
unsafeRow.setNullAt(fieldNumber)
// TODO: type-specific null value writing?
} else {
appendCursor += writers(fieldNumber).write(
row(fieldNumber),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty minor, but is there a reason we are boxing here instead of passing the row itself in, allowing a specific accessor to be used for extraction?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a really good idea; can't believe I didn't think of that...

Should be an easy change, so I'll do this shortly.

fieldNumber,
unsafeRow,
baseObject,
baseOffset,
appendCursor)
}
fieldNumber += 1
}
appendCursor
}

}

/**
* Function for writing a column into an UnsafeRow.
*/
private abstract class UnsafeColumnWriter[T] {
/**
* Write a value into an UnsafeRow.
*
* @param value the value to write
* @param columnNumber what column to write it to
* @param row a pointer to the unsafe row
* @param baseObject
* @param baseOffset
* @param baseObject the base object of the target row's address
* @param baseOffset the base offset of the target row's address
* @param appendCursor the offset from the start of the unsafe row to the end of the row;
* used for calculating where variable-length data should be written
* @return the number of variable-length bytes written
Expand All @@ -50,6 +122,12 @@ private abstract class UnsafeColumnWriter[T] {
}

private object UnsafeColumnWriter {
private object IntUnsafeColumnWriter extends IntUnsafeColumnWriter
private object LongUnsafeColumnWriter extends LongUnsafeColumnWriter
private object FloatUnsafeColumnWriter extends FloatUnsafeColumnWriter
private object DoubleUnsafeColumnWriter extends DoubleUnsafeColumnWriter
private object StringUnsafeColumnWriter extends StringUnsafeColumnWriter

def forType(dataType: DataType): UnsafeColumnWriter[_] = {
dataType match {
case IntegerType => IntUnsafeColumnWriter
Expand All @@ -63,34 +141,7 @@ private object UnsafeColumnWriter {
}
}

private class StringUnsafeColumnWriter private() extends UnsafeColumnWriter[UTF8String] {
def getSize(value: UTF8String): Int = {
// round to nearest word
val numBytes = value.getBytes.length
8 + ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes)
}

override def write(
value: UTF8String,
columnNumber: Int,
row: UnsafeRow,
baseObject: Object,
baseOffset: Long,
appendCursor: Int): Int = {
val numBytes = value.getBytes.length
PlatformDependent.UNSAFE.putLong(baseObject, baseOffset + appendCursor, numBytes)
PlatformDependent.copyMemory(
value.getBytes,
PlatformDependent.BYTE_ARRAY_OFFSET,
baseObject,
baseOffset + appendCursor + 8,
numBytes
)
row.setLong(columnNumber, appendCursor)
8 + ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes)
}
}
private object StringUnsafeColumnWriter extends StringUnsafeColumnWriter
// ------------------------------------------------------------------------------------------------

private abstract class PrimitiveUnsafeColumnWriter[T] extends UnsafeColumnWriter[T] {
def getSize(value: T): Int = 0
Expand All @@ -108,7 +159,6 @@ private class IntUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWrite
0
}
}
private object IntUnsafeColumnWriter extends IntUnsafeColumnWriter

private class LongUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter[Long] {
override def write(
Expand All @@ -122,7 +172,6 @@ private class LongUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWrit
0
}
}
private case object LongUnsafeColumnWriter extends LongUnsafeColumnWriter

private class FloatUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter[Float] {
override def write(
Expand All @@ -136,7 +185,6 @@ private class FloatUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWri
0
}
}
private case object FloatUnsafeColumnWriter extends FloatUnsafeColumnWriter

private class DoubleUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter[Double] {
override def write(
Expand All @@ -150,55 +198,29 @@ private class DoubleUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWr
0
}
}
private case object DoubleUnsafeColumnWriter extends DoubleUnsafeColumnWriter

class UnsafeRowConverter(fieldTypes: Array[DataType]) {

def this(schema: StructType) {
this(schema.fields.map(_.dataType))
}

private[this] val unsafeRow = new UnsafeRow()

private[this] val writers: Array[UnsafeColumnWriter[Any]] = {
fieldTypes.map(t => UnsafeColumnWriter.forType(t).asInstanceOf[UnsafeColumnWriter[Any]])
}

private[this] val fixedLengthSize: Int =
(8 * fieldTypes.length) + UnsafeRow.calculateBitSetWidthInBytes(fieldTypes.length)

def getSizeRequirement(row: Row): Int = {
var fieldNumber = 0
var variableLengthFieldSize: Int = 0
while (fieldNumber < writers.length) {
if (!row.isNullAt(fieldNumber)) {
variableLengthFieldSize += writers(fieldNumber).getSize(row(fieldNumber))
}
fieldNumber += 1
}
fixedLengthSize + variableLengthFieldSize
private class StringUnsafeColumnWriter private() extends UnsafeColumnWriter[UTF8String] {
def getSize(value: UTF8String): Int = {
8 + ByteArrayMethods.roundNumberOfBytesToNearestWord(value.getBytes.length)
}

def writeRow(row: Row, baseObject: Object, baseOffset: Long): Long = {
unsafeRow.pointTo(baseObject, baseOffset, writers.length, null)
var fieldNumber = 0
var appendCursor: Int = fixedLengthSize
while (fieldNumber < writers.length) {
if (row.isNullAt(fieldNumber)) {
unsafeRow.setNullAt(fieldNumber)
// TODO: type-specific null value writing?
} else {
appendCursor += writers(fieldNumber).write(
row(fieldNumber),
fieldNumber,
unsafeRow,
baseObject,
baseOffset,
appendCursor)
}
fieldNumber += 1
}
appendCursor
override def write(
value: UTF8String,
columnNumber: Int,
row: UnsafeRow,
baseObject: Object,
baseOffset: Long,
appendCursor: Int): Int = {
val numBytes = value.getBytes.length
PlatformDependent.UNSAFE.putLong(baseObject, baseOffset + appendCursor, numBytes)
PlatformDependent.copyMemory(
value.getBytes,
PlatformDependent.BYTE_ARRAY_OFFSET,
baseObject,
baseOffset + appendCursor + 8,
numBytes
)
row.setLong(columnNumber, appendCursor)
8 + ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes)
}

}
}
Loading