Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ final class BufferHolder {

private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;

// buffer is guarantee to be word-aligned since UnsafeRow assumes each field is word-aligned.
private byte[] buffer;
private int cursor = Platform.BYTE_ARRAY_OFFSET;
private final UnsafeRow row;
Expand All @@ -52,7 +53,8 @@ final class BufferHolder {
"too many fields (number of fields: " + row.numFields() + ")");
}
this.fixedSize = bitsetWidthInBytes + 8 * row.numFields();
this.buffer = new byte[fixedSize + initialSize];
int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(fixedSize + initialSize);
this.buffer = new byte[roundedSize];
this.row = row;
this.row.pointTo(buffer, buffer.length);
}
Expand All @@ -61,16 +63,21 @@ final class BufferHolder {
* Grows the buffer by at least neededSize and points the row to the buffer.
*/
void grow(int neededSize) {
if (neededSize < 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use assert here. When neededSize is negative, there must be an overflow.

throw new IllegalArgumentException(
"Cannot grow BufferHolder by size " + neededSize + " because the size is negative");
}
if (neededSize > ARRAY_MAX - totalSize()) {
throw new UnsupportedOperationException(
throw new IllegalArgumentException(
"Cannot grow BufferHolder by size " + neededSize + " because the size after growing " +
"exceeds size limitation " + ARRAY_MAX);
}
final int length = totalSize() + neededSize;
if (buffer.length < length) {
// This will not happen frequently, because the buffer is re-used.
int newLength = length < ARRAY_MAX / 2 ? length * 2 : ARRAY_MAX;
final byte[] tmp = new byte[newLength];
int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(newLength);
final byte[] tmp = new byte[roundedSize];
Platform.copyMemory(
buffer,
Platform.BYTE_ARRAY_OFFSET,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.expressions.codegen

import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.scalatest.{Assertions, BeforeAndAfterEach, Matchers}

import org.apache.spark.{SparkFunSuite, TestUtils}
import org.apache.spark.deploy.SparkSubmitSuite
Expand All @@ -39,7 +39,7 @@ class BufferHolderSparkSubmitSuite
val argsForSparkSubmit = Seq(
"--class", BufferHolderSparkSubmitSuite.getClass.getName.stripSuffix("$"),
"--name", "SPARK-22222",
"--master", "local-cluster[2,1,1024]",
"--master", "local-cluster[1,1,4096]",
"--driver-memory", "4g",
"--conf", "spark.ui.enabled=false",
"--conf", "spark.master.rest.enabled=false",
Expand All @@ -49,28 +49,36 @@ class BufferHolderSparkSubmitSuite
}
}

object BufferHolderSparkSubmitSuite {
object BufferHolderSparkSubmitSuite extends Assertions {

def main(args: Array[String]): Unit = {

val ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH

val holder = new BufferHolder(new UnsafeRow(1000))
val unsafeRow = new UnsafeRow(1000)
val holder = new BufferHolder(unsafeRow)

holder.reset()
holder.grow(roundToWord(ARRAY_MAX / 2))

holder.reset()
holder.grow(roundToWord(ARRAY_MAX / 2 + 8))
assert(intercept[IllegalArgumentException] {
holder.grow(-1)
}.getMessage.contains("because the size is negative"))

holder.reset()
holder.grow(roundToWord(Integer.MAX_VALUE / 2))
// while to reuse a buffer may happen, this test checks whether the buffer can be grown
holder.grow(ARRAY_MAX / 2)
assert(unsafeRow.getSizeInBytes % 8 == 0)

holder.reset()
holder.grow(roundToWord(Integer.MAX_VALUE))
}
holder.grow(ARRAY_MAX / 2 + 7)
assert(unsafeRow.getSizeInBytes % 8 == 0)

holder.grow(Integer.MAX_VALUE / 2)
assert(unsafeRow.getSizeInBytes % 8 == 0)

holder.grow(ARRAY_MAX - holder.totalSize())
assert(unsafeRow.getSizeInBytes % 8 == 0)

private def roundToWord(len: Int): Int = {
ByteArrayMethods.roundNumberOfBytesToNearestWord(len)
assert(intercept[IllegalArgumentException] {
holder.grow(ARRAY_MAX + 1 - holder.totalSize())
}.getMessage.contains("because the size after growing"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,15 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
class BufferHolderSuite extends SparkFunSuite {

test("SPARK-16071 Check the size limit to avoid integer overflow") {
var e = intercept[UnsupportedOperationException] {
assert(intercept[UnsupportedOperationException] {
new BufferHolder(new UnsafeRow(Int.MaxValue / 8))
}
assert(e.getMessage.contains("too many fields"))
}.getMessage.contains("too many fields"))

val holder = new BufferHolder(new UnsafeRow(1000))
holder.reset()
holder.grow(1000)
e = intercept[UnsupportedOperationException] {
assert(intercept[IllegalArgumentException] {
holder.grow(Integer.MAX_VALUE)
}
assert(e.getMessage.contains("exceeds size limitation"))
}.getMessage.contains("exceeds size limitation"))
}
}