Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Fixed SPARK-1436: in-memory column byte buffer must be able to be acc…
…essed multiple times

Forgot to duplicate the in-memory column byte buffer when creating new ColumnAccessor's, so that when the column byte buffer is accessed multiple times, the position is not reset to 0.
  • Loading branch information
liancheng committed Apr 8, 2014
commit 1d037b83191099da961c247a57ef686cb508c447
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,21 @@ private[sql] class GenericColumnAccessor(buffer: ByteBuffer)

private[sql] object ColumnAccessor {
def apply(buffer: ByteBuffer): ColumnAccessor = {
val dup = buffer.duplicate().order(ByteOrder.nativeOrder)
// The first 4 bytes in the buffer indicate the column type.
val columnTypeId = buffer.getInt()
val columnTypeId = dup.getInt()

columnTypeId match {
case INT.typeId => new IntColumnAccessor(buffer)
case LONG.typeId => new LongColumnAccessor(buffer)
case FLOAT.typeId => new FloatColumnAccessor(buffer)
case DOUBLE.typeId => new DoubleColumnAccessor(buffer)
case BOOLEAN.typeId => new BooleanColumnAccessor(buffer)
case BYTE.typeId => new ByteColumnAccessor(buffer)
case SHORT.typeId => new ShortColumnAccessor(buffer)
case STRING.typeId => new StringColumnAccessor(buffer)
case BINARY.typeId => new BinaryColumnAccessor(buffer)
case GENERIC.typeId => new GenericColumnAccessor(buffer)
case INT.typeId => new IntColumnAccessor(dup)
case LONG.typeId => new LongColumnAccessor(dup)
case FLOAT.typeId => new FloatColumnAccessor(dup)
case DOUBLE.typeId => new DoubleColumnAccessor(dup)
case BOOLEAN.typeId => new BooleanColumnAccessor(dup)
case BYTE.typeId => new ByteColumnAccessor(dup)
case SHORT.typeId => new ShortColumnAccessor(dup)
case STRING.typeId => new StringColumnAccessor(dup)
case BINARY.typeId => new BinaryColumnAccessor(dup)
case GENERIC.typeId => new GenericColumnAccessor(dup)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,12 @@ class ColumnarQuerySuite extends QueryTest {

checkAnswer(scan, testData.collect().toSeq)
}

test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))

checkAnswer(scan, testData.collect().toSeq)
checkAnswer(scan, testData.collect().toSeq)
}
}