Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.columnar

import scala.collection.mutable

import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -88,7 +90,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
case array: ArrayType => classOf[ArrayColumnAccessor].getName
case t: MapType => classOf[MapColumnAccessor].getName
}
ctx.addMutableState(accessorCls, accessorName, s"$accessorName = null;")
ctx.addMutableState(accessorCls, accessorName, "")

val createCode = dt match {
case t if ctx.isPrimitiveType(dt) =>
Expand All @@ -97,7 +99,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
s"$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder));"
case other =>
s"""$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder),
(${dt.getClass.getName}) columnTypes[$index]);"""
(${dt.getClass.getName}) columnTypes[$index]);"""
}

val extract = s"$accessorName.extractTo(mutableRow, $index);"
Expand All @@ -114,6 +116,42 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
(createCode, extract + patch)
}.unzip

/*
* 200 = 6000 bytes / 30 (up to 30 bytes per one call))
* the maximum byte code size to be compiled for HotSpot is 8000.
* We should keep less than 8000
*/
val numberOfStatementsThreshold = 200
val (initializerAccessorCalls, extractorCalls) =
if (initializeAccessors.length <= numberOfStatementsThreshold) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could always put them in groups

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we could do. But, (another PR)[https://github.com//pull/7076#issuecomment-122176653] intentionally avoid putting them into a group if they can be put into one group.
Which is preferable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping @davies

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either is fine.

(initializeAccessors.mkString("\n"), extractors.mkString("\n"))
} else {
val groupedAccessorsItr = initializeAccessors.grouped(numberOfStatementsThreshold)
val groupedExtractorsItr = extractors.grouped(numberOfStatementsThreshold)
var groupedAccessorsLength = 0
groupedAccessorsItr.zipWithIndex.map { case (body, i) =>
groupedAccessorsLength += 1
val funcName = s"accessors$i"
val funcCode = s"""
|private void $funcName() {
| ${body.mkString("\n")}
|}
""".stripMargin
ctx.addNewFunction(funcName, funcCode)
}
groupedExtractorsItr.zipWithIndex.map { case (body, i) =>
val funcName = s"extractors$i"
val funcCode = s"""
|private void $funcName() {
| ${body.mkString("\n")}
|}
""".stripMargin
ctx.addNewFunction(funcName, funcCode)
}
((0 to groupedAccessorsLength - 1).map { i => s"accessors$i();" }.mkString("\n"),
(0 to groupedAccessorsLength - 1).map { i => s"extractors$i();" }.mkString("\n"))
}

val code = s"""
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
Expand Down Expand Up @@ -149,8 +187,6 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
this.nativeOrder = ByteOrder.nativeOrder();
this.buffers = new byte[${columnTypes.length}][];
this.mutableRow = new MutableUnsafeRow(rowWriter);

${initMutableStates(ctx)}
}

public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) {
Expand All @@ -159,6 +195,8 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
this.columnIndexes = columnIndexes;
}

${declareAddedFunctions(ctx)}

public boolean hasNext() {
if (currentRow < numRowsInBatch) {
return true;
Expand All @@ -173,7 +211,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
for (int i = 0; i < columnIndexes.length; i ++) {
buffers[i] = batch.buffers()[columnIndexes[i]];
}
${initializeAccessors.mkString("\n")}
${initializerAccessorCalls}

return hasNext();
}
Expand All @@ -182,7 +220,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
currentRow += 1;
bufferHolder.reset();
rowWriter.initialize(bufferHolder, $numFields);
${extractors.mkString("\n")}
${extractorCalls}
unsafeRow.pointTo(bufferHolder.buffer, $numFields, bufferHolder.totalSize());
return unsafeRow;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,4 +219,14 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
assert(data.count() === 10)
assert(data.filter($"s" === "3").count() === 1)
}

test("SPARK-14138: Generated SpecificColumnarIterator can exceed JVM size limit for cached DF") {
val length1 = 3999
val columnTypes1 = List.fill(length1)(IntegerType)
val columnarIterator1 = GenerateColumnAccessor.generate(columnTypes1)

val length2 = 10000
val columnTypes2 = List.fill(length2)(IntegerType)
val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2)
}
}