From 0ccf992677c5c781ab92249981fa578b60662ef7 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 19 Feb 2018 17:55:11 +0900 Subject: [PATCH 01/14] initial commit --- .../codegen/BufferHolderSparkSubmitSuite.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala index 85682cf6ea670..1b5dccec40a8e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala @@ -39,8 +39,8 @@ class BufferHolderSparkSubmitSuite val argsForSparkSubmit = Seq( "--class", BufferHolderSparkSubmitSuite.getClass.getName.stripSuffix("$"), "--name", "SPARK-22222", - "--master", "local-cluster[2,1,1024]", - "--driver-memory", "4g", + "--master", "local-cluster[1,1,7168]", + "--driver-memory", "7g", "--conf", "spark.ui.enabled=false", "--conf", "spark.master.rest.enabled=false", "--conf", "spark.driver.extraJavaOptions=-ea", @@ -58,15 +58,20 @@ object BufferHolderSparkSubmitSuite { val holder = new BufferHolder(new UnsafeRow(1000)) holder.reset() + // execute here since reset() updates holder.cursor + val smallBuffer = new Array[Byte](holder.cursor) holder.grow(roundToWord(ARRAY_MAX / 2)) holder.reset() + holder.buffer = smallBuffer // avoid to reuse an allocated large byte array holder.grow(roundToWord(ARRAY_MAX / 2 + 8)) holder.reset() + holder.buffer = smallBuffer // avoid to reuse an allocated large byte array holder.grow(roundToWord(Integer.MAX_VALUE / 2)) holder.reset() + holder.buffer = smallBuffer // avoid to reuse an allocated large byte array holder.grow(roundToWord(Integer.MAX_VALUE)) } From a92e97c5d9f8b4391abb67d927e525605026e0be Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 18 Apr 2018 03:07:11 +0100 Subject: [PATCH 02/14] rebase with master --- .../codegen/BufferHolderSparkSubmitSuite.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala index 1b5dccec40a8e..990ccbb30a990 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala @@ -59,22 +59,28 @@ object BufferHolderSparkSubmitSuite { holder.reset() // execute here since reset() updates holder.cursor - val smallBuffer = new Array[Byte](holder.cursor) + val smallBuffer = new Array[Byte](holder.getCursor) holder.grow(roundToWord(ARRAY_MAX / 2)) holder.reset() - holder.buffer = smallBuffer // avoid to reuse an allocated large byte array + setBuffer(holder, smallBuffer) // avoid to reuse an allocated large byte array holder.grow(roundToWord(ARRAY_MAX / 2 + 8)) holder.reset() - holder.buffer = smallBuffer // avoid to reuse an allocated large byte array + setBuffer(holder, smallBuffer) // avoid to reuse an allocated large byte array holder.grow(roundToWord(Integer.MAX_VALUE / 2)) holder.reset() - holder.buffer = smallBuffer // avoid to reuse an allocated large byte array + setBuffer(holder, smallBuffer) // avoid to reuse an allocated large byte array holder.grow(roundToWord(Integer.MAX_VALUE)) } + def setBuffer(holder: BufferHolder, smallBuffer: Array[Byte]): Unit = { + val field = holder.getClass.getDeclaredField("buffer") + field.setAccessible(true) + field.set(holder, smallBuffer) + } + private def roundToWord(len: Int): Int = { ByteArrayMethods.roundNumberOfBytesToNearestWord(len) } From 981aba5f49d0508c16748a18b5e39ad583176b24 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 20 Apr 2018 19:21:23 +0100 Subject: [PATCH 03/14] address review comment put additional check at BufferHolder.grow() --- .../expressions/codegen/BufferHolder.java | 4 ++++ .../codegen/BufferHolderSparkSubmitSuite.scala | 16 ++-------------- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java index 537ef244b7e81..6bd7a5ee0bb10 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java @@ -61,6 +61,10 @@ final class BufferHolder { * Grows the buffer by at least neededSize and points the row to the buffer. */ void grow(int neededSize) { + if (neededSize < 0) { + throw new UnsupportedOperationException( + "Cannot grow BufferHolder by size " + neededSize + " because the size is negative"); + } if (neededSize > ARRAY_MAX - totalSize()) { throw new UnsupportedOperationException( "Cannot grow BufferHolder by size " + neededSize + " because the size after growing " + diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala index 990ccbb30a990..1b594ec1c93fe 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala @@ -58,29 +58,17 @@ object BufferHolderSparkSubmitSuite { val holder = new BufferHolder(new UnsafeRow(1000)) holder.reset() - // execute here since reset() updates holder.cursor - val smallBuffer = new Array[Byte](holder.getCursor) + + // while to reuse a buffer may happen, this test checks whether the buffer can be grown holder.grow(roundToWord(ARRAY_MAX / 2)) - holder.reset() - setBuffer(holder, smallBuffer) // avoid to reuse an allocated large byte array holder.grow(roundToWord(ARRAY_MAX / 2 + 8)) - holder.reset() - setBuffer(holder, smallBuffer) // avoid to reuse an allocated large byte array holder.grow(roundToWord(Integer.MAX_VALUE / 2)) - holder.reset() - setBuffer(holder, smallBuffer) // avoid to reuse an allocated large byte array holder.grow(roundToWord(Integer.MAX_VALUE)) } - def setBuffer(holder: BufferHolder, smallBuffer: Array[Byte]): Unit = { - val field = holder.getClass.getDeclaredField("buffer") - field.setAccessible(true) - field.set(holder, smallBuffer) - } - private def roundToWord(len: Int): Int = { ByteArrayMethods.roundNumberOfBytesToNearestWord(len) } From c0fb167232dc07a61785981f59c1af8665bc12a0 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 13 Jun 2018 07:55:32 +0100 Subject: [PATCH 04/14] address review comment --- .../expressions/codegen/BufferHolder.java | 13 ++++++------- .../BufferHolderSparkSubmitSuite.scala | 19 ++++++++++--------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java index 6bd7a5ee0bb10..5b53a3fb214b5 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java @@ -61,16 +61,15 @@ final class BufferHolder { * Grows the buffer by at least neededSize and points the row to the buffer. */ void grow(int neededSize) { - if (neededSize < 0) { + assert neededSize < 0 : + "Cannot grow BufferHolder by size " + neededSize + " because the size is negative"; + int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(neededSize); + if (roundedSize > ARRAY_MAX - totalSize()) { throw new UnsupportedOperationException( - "Cannot grow BufferHolder by size " + neededSize + " because the size is negative"); - } - if (neededSize > ARRAY_MAX - totalSize()) { - throw new UnsupportedOperationException( - "Cannot grow BufferHolder by size " + neededSize + " because the size after growing " + + "Cannot grow BufferHolder by size " + roundedSize + " because the size after growing " + "exceeds size limitation " + ARRAY_MAX); } - final int length = totalSize() + neededSize; + final int length = totalSize() + roundedSize; if (buffer.length < length) { // This will not happen frequently, because the buffer is re-used. int newLength = length < ARRAY_MAX / 2 ? length * 2 : ARRAY_MAX; diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala index 1b594ec1c93fe..3442866095862 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala @@ -55,21 +55,22 @@ object BufferHolderSparkSubmitSuite { val ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH - val holder = new BufferHolder(new UnsafeRow(1000)) + val unsafeRow = new UnsafeRow(1000) + val holder = new BufferHolder(unsafeRow) holder.reset() // while to reuse a buffer may happen, this test checks whether the buffer can be grown - holder.grow(roundToWord(ARRAY_MAX / 2)) + holder.grow(ARRAY_MAX / 2) + assert(unsafeRow.getSizeInBytes % 8 == 0) - holder.grow(roundToWord(ARRAY_MAX / 2 + 8)) + holder.grow(ARRAY_MAX / 2 + 8) + assert(unsafeRow.getSizeInBytes % 8 == 0) - holder.grow(roundToWord(Integer.MAX_VALUE / 2)) + holder.grow(Integer.MAX_VALUE / 2) + assert(unsafeRow.getSizeInBytes % 8 == 0) - holder.grow(roundToWord(Integer.MAX_VALUE)) - } - - private def roundToWord(len: Int): Int = { - ByteArrayMethods.roundNumberOfBytesToNearestWord(len) + holder.grow(ARRAY_MAX - 8192) + assert(unsafeRow.getSizeInBytes % 8 == 0) } } From e3cd82147db4e484750a7261025611d1b89863bb Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 14 Jun 2018 11:22:23 +0100 Subject: [PATCH 05/14] address review comments --- .../expressions/codegen/BufferHolder.java | 15 +++++++++------ .../codegen/BufferHolderSparkSubmitSuite.scala | 13 ++++++++++++- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java index 5b53a3fb214b5..dbc4bb46e9002 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java @@ -35,6 +35,8 @@ final class BufferHolder { private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH; + // Buffer is guarantee to be word-aligned. This is because UnsafeRow assumes that each field + // is word-aligned. private byte[] buffer; private int cursor = Platform.BYTE_ARRAY_OFFSET; private final UnsafeRow row; @@ -52,7 +54,8 @@ final class BufferHolder { "too many fields (number of fields: " + row.numFields() + ")"); } this.fixedSize = bitsetWidthInBytes + 8 * row.numFields(); - this.buffer = new byte[fixedSize + initialSize]; + int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(fixedSize + initialSize); + this.buffer = new byte[roundedSize]; this.row = row; this.row.pointTo(buffer, buffer.length); } @@ -63,17 +66,17 @@ final class BufferHolder { void grow(int neededSize) { assert neededSize < 0 : "Cannot grow BufferHolder by size " + neededSize + " because the size is negative"; - int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(neededSize); - if (roundedSize > ARRAY_MAX - totalSize()) { + if (neededSize > ARRAY_MAX - totalSize()) { throw new UnsupportedOperationException( - "Cannot grow BufferHolder by size " + roundedSize + " because the size after growing " + + "Cannot grow BufferHolder by size " + neededSize + " because the size after growing " + "exceeds size limitation " + ARRAY_MAX); } - final int length = totalSize() + roundedSize; + final int length = totalSize() + neededSize; if (buffer.length < length) { // This will not happen frequently, because the buffer is re-used. int newLength = length < ARRAY_MAX / 2 ? length * 2 : ARRAY_MAX; - final byte[] tmp = new byte[newLength]; + int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(newLength); + final byte[] tmp = new byte[roundedSize]; Platform.copyMemory( buffer, Platform.BYTE_ARRAY_OFFSET, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala index 3442866095862..5102049c24db1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala @@ -70,7 +70,18 @@ object BufferHolderSparkSubmitSuite { holder.grow(Integer.MAX_VALUE / 2) assert(unsafeRow.getSizeInBytes % 8 == 0) - holder.grow(ARRAY_MAX - 8192) + holder.grow(Integer.MAX_VALUE / 2 + 7) assert(unsafeRow.getSizeInBytes % 8 == 0) + + holder.grow(ARRAY_MAX - holder.totalSize()) + assert(unsafeRow.getSizeInBytes % 8 == 0) + + try { + holder.grow(ARRAY_MAX + 1 - holder.totalSize()) + assert(false) + } catch { + case _: UnsupportedOperationException => assert(true) + case _ => assert(false) + } } } From 4e9aa75e049ae812eabb92ed713029fe9c1dc8b6 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 15 Jun 2018 02:23:12 +0100 Subject: [PATCH 06/14] revert a part of last commit to address build failure --- .../codegen/BufferHolderSparkSubmitSuite.scala | 8 -------- 1 file changed, 8 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala index 5102049c24db1..af8053dbb6e88 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala @@ -75,13 +75,5 @@ object BufferHolderSparkSubmitSuite { holder.grow(ARRAY_MAX - holder.totalSize()) assert(unsafeRow.getSizeInBytes % 8 == 0) - - try { - holder.grow(ARRAY_MAX + 1 - holder.totalSize()) - assert(false) - } catch { - case _: UnsupportedOperationException => assert(true) - case _ => assert(false) - } } } From 2ab6ee6259da2a20d0bc534842967825298a8be6 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 15 Jun 2018 03:23:31 +0100 Subject: [PATCH 07/14] revert a part of last commit to address build failure --- .../expressions/codegen/BufferHolderSparkSubmitSuite.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala index af8053dbb6e88..dcd08b0488d79 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala @@ -64,15 +64,12 @@ object BufferHolderSparkSubmitSuite { holder.grow(ARRAY_MAX / 2) assert(unsafeRow.getSizeInBytes % 8 == 0) - holder.grow(ARRAY_MAX / 2 + 8) + holder.grow(ARRAY_MAX / 2 + 7) assert(unsafeRow.getSizeInBytes % 8 == 0) holder.grow(Integer.MAX_VALUE / 2) assert(unsafeRow.getSizeInBytes % 8 == 0) - holder.grow(Integer.MAX_VALUE / 2 + 7) - assert(unsafeRow.getSizeInBytes % 8 == 0) - holder.grow(ARRAY_MAX - holder.totalSize()) assert(unsafeRow.getSizeInBytes % 8 == 0) } From 517a42e63e7704528b1a91ae83abfcadcadc3c80 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 15 Jun 2018 04:11:50 +0100 Subject: [PATCH 08/14] revert recent changes to avoid build error --- .../expressions/codegen/BufferHolder.java | 15 ++++++--------- .../codegen/BufferHolderSparkSubmitSuite.scala | 4 ++-- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java index dbc4bb46e9002..5b53a3fb214b5 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java @@ -35,8 +35,6 @@ final class BufferHolder { private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH; - // Buffer is guarantee to be word-aligned. This is because UnsafeRow assumes that each field - // is word-aligned. private byte[] buffer; private int cursor = Platform.BYTE_ARRAY_OFFSET; private final UnsafeRow row; @@ -54,8 +52,7 @@ final class BufferHolder { "too many fields (number of fields: " + row.numFields() + ")"); } this.fixedSize = bitsetWidthInBytes + 8 * row.numFields(); - int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(fixedSize + initialSize); - this.buffer = new byte[roundedSize]; + this.buffer = new byte[fixedSize + initialSize]; this.row = row; this.row.pointTo(buffer, buffer.length); } @@ -66,17 +63,17 @@ final class BufferHolder { void grow(int neededSize) { assert neededSize < 0 : "Cannot grow BufferHolder by size " + neededSize + " because the size is negative"; - if (neededSize > ARRAY_MAX - totalSize()) { + int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(neededSize); + if (roundedSize > ARRAY_MAX - totalSize()) { throw new UnsupportedOperationException( - "Cannot grow BufferHolder by size " + neededSize + " because the size after growing " + + "Cannot grow BufferHolder by size " + roundedSize + " because the size after growing " + "exceeds size limitation " + ARRAY_MAX); } - final int length = totalSize() + neededSize; + final int length = totalSize() + roundedSize; if (buffer.length < length) { // This will not happen frequently, because the buffer is re-used. int newLength = length < ARRAY_MAX / 2 ? length * 2 : ARRAY_MAX; - int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(newLength); - final byte[] tmp = new byte[roundedSize]; + final byte[] tmp = new byte[newLength]; Platform.copyMemory( buffer, Platform.BYTE_ARRAY_OFFSET, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala index dcd08b0488d79..3442866095862 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala @@ -64,13 +64,13 @@ object BufferHolderSparkSubmitSuite { holder.grow(ARRAY_MAX / 2) assert(unsafeRow.getSizeInBytes % 8 == 0) - holder.grow(ARRAY_MAX / 2 + 7) + holder.grow(ARRAY_MAX / 2 + 8) assert(unsafeRow.getSizeInBytes % 8 == 0) holder.grow(Integer.MAX_VALUE / 2) assert(unsafeRow.getSizeInBytes % 8 == 0) - holder.grow(ARRAY_MAX - holder.totalSize()) + holder.grow(ARRAY_MAX - 8192) assert(unsafeRow.getSizeInBytes % 8 == 0) } } From 96f8effdddba01eb97d59215fc02f566a15e08fd Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 15 Jun 2018 05:30:46 +0100 Subject: [PATCH 09/14] undo recent reverts remove warning in try-catch --- .../catalyst/expressions/codegen/BufferHolder.java | 14 ++++++++------ .../codegen/BufferHolderSparkSubmitSuite.scala | 12 ++++++++++-- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java index 5b53a3fb214b5..c8ddf246e3aa7 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java @@ -35,6 +35,7 @@ final class BufferHolder { private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH; + // buffer is guarantee to be word-aligned since UnsafeRow assumes each field is word-aligned. private byte[] buffer; private int cursor = Platform.BYTE_ARRAY_OFFSET; private final UnsafeRow row; @@ -52,7 +53,8 @@ final class BufferHolder { "too many fields (number of fields: " + row.numFields() + ")"); } this.fixedSize = bitsetWidthInBytes + 8 * row.numFields(); - this.buffer = new byte[fixedSize + initialSize]; + int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(fixedSize + initialSize); + this.buffer = new byte[roundedSize]; this.row = row; this.row.pointTo(buffer, buffer.length); } @@ -63,17 +65,17 @@ final class BufferHolder { void grow(int neededSize) { assert neededSize < 0 : "Cannot grow BufferHolder by size " + neededSize + " because the size is negative"; - int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(neededSize); - if (roundedSize > ARRAY_MAX - totalSize()) { + if (neededSize > ARRAY_MAX - totalSize()) { throw new UnsupportedOperationException( - "Cannot grow BufferHolder by size " + roundedSize + " because the size after growing " + + "Cannot grow BufferHolder by size " + neededSize + " because the size after growing " + "exceeds size limitation " + ARRAY_MAX); } - final int length = totalSize() + roundedSize; + final int length = totalSize() + neededSize; if (buffer.length < length) { // This will not happen frequently, because the buffer is re-used. int newLength = length < ARRAY_MAX / 2 ? length * 2 : ARRAY_MAX; - final byte[] tmp = new byte[newLength]; + int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(newLength); + final byte[] tmp = new byte[roundedSize]; Platform.copyMemory( buffer, Platform.BYTE_ARRAY_OFFSET, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala index 3442866095862..d7dfa87a6536f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala @@ -64,13 +64,21 @@ object BufferHolderSparkSubmitSuite { holder.grow(ARRAY_MAX / 2) assert(unsafeRow.getSizeInBytes % 8 == 0) - holder.grow(ARRAY_MAX / 2 + 8) + holder.grow(ARRAY_MAX / 2 + 7) assert(unsafeRow.getSizeInBytes % 8 == 0) holder.grow(Integer.MAX_VALUE / 2) assert(unsafeRow.getSizeInBytes % 8 == 0) - holder.grow(ARRAY_MAX - 8192) + holder.grow(ARRAY_MAX - holder.totalSize()) assert(unsafeRow.getSizeInBytes % 8 == 0) + + try { + holder.grow(ARRAY_MAX + 1 - holder.totalSize()) + assert(false) + } catch { + case _: UnsupportedOperationException => assert(true) + case _: Throwable => assert(false) + } } } From a134091aad0c3f8e3674f6cd751c2b8d5d83e39e Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 15 Jun 2018 18:34:25 +0100 Subject: [PATCH 10/14] fix test failures in ml --- .../spark/sql/catalyst/expressions/codegen/BufferHolder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java index c8ddf246e3aa7..761624bfe937f 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java @@ -63,7 +63,7 @@ final class BufferHolder { * Grows the buffer by at least neededSize and points the row to the buffer. */ void grow(int neededSize) { - assert neededSize < 0 : + assert neededSize >= 0 : "Cannot grow BufferHolder by size " + neededSize + " because the size is negative"; if (neededSize > ARRAY_MAX - totalSize()) { throw new UnsupportedOperationException( From 84940ee55e1d263ae9278afdb91e8d15961b5db4 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 8 Aug 2018 14:11:33 +0100 Subject: [PATCH 11/14] address review comments --- .../expressions/codegen/BufferHolder.java | 7 ++++--- .../BufferHolderSparkSubmitSuite.scala | 20 +++++++++---------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java index 761624bfe937f..455ec012172f6 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java @@ -63,10 +63,11 @@ final class BufferHolder { * Grows the buffer by at least neededSize and points the row to the buffer. */ void grow(int neededSize) { - assert neededSize >= 0 : - "Cannot grow BufferHolder by size " + neededSize + " because the size is negative"; + if (neededSize < 0) + throw new IllegalArgumentException( + "Cannot grow BufferHolder by size " + neededSize + " because the size is negative"); if (neededSize > ARRAY_MAX - totalSize()) { - throw new UnsupportedOperationException( + throw new IllegalArgumentException( "Cannot grow BufferHolder by size " + neededSize + " because the size after growing " + "exceeds size limitation " + ARRAY_MAX); } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala index d7dfa87a6536f..d2862c8f41d1b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen -import org.scalatest.{BeforeAndAfterEach, Matchers} +import org.scalatest.{Assertions, BeforeAndAfterEach, Matchers} import org.apache.spark.{SparkFunSuite, TestUtils} import org.apache.spark.deploy.SparkSubmitSuite @@ -39,8 +39,8 @@ class BufferHolderSparkSubmitSuite val argsForSparkSubmit = Seq( "--class", BufferHolderSparkSubmitSuite.getClass.getName.stripSuffix("$"), "--name", "SPARK-22222", - "--master", "local-cluster[1,1,7168]", - "--driver-memory", "7g", + "--master", "local-cluster[1,1,4096]", + "--driver-memory", "4g", "--conf", "spark.ui.enabled=false", "--conf", "spark.master.rest.enabled=false", "--conf", "spark.driver.extraJavaOptions=-ea", @@ -49,7 +49,7 @@ class BufferHolderSparkSubmitSuite } } -object BufferHolderSparkSubmitSuite { +object BufferHolderSparkSubmitSuite extends Assertions { def main(args: Array[String]): Unit = { @@ -60,6 +60,10 @@ object BufferHolderSparkSubmitSuite { holder.reset() + assert(intercept[IllegalArgumentException] { + holder.grow(-1) + }.getMessage.contains("because the size is negative")) + // while to reuse a buffer may happen, this test checks whether the buffer can be grown holder.grow(ARRAY_MAX / 2) assert(unsafeRow.getSizeInBytes % 8 == 0) @@ -73,12 +77,8 @@ object BufferHolderSparkSubmitSuite { holder.grow(ARRAY_MAX - holder.totalSize()) assert(unsafeRow.getSizeInBytes % 8 == 0) - try { + assert(intercept[IllegalArgumentException] { holder.grow(ARRAY_MAX + 1 - holder.totalSize()) - assert(false) - } catch { - case _: UnsupportedOperationException => assert(true) - case _: Throwable => assert(false) - } + }.getMessage.contains("because the size after growing")) } } From 2dd1b8235671c35593311cdf02b9816d920ec0f2 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 8 Aug 2018 17:11:46 +0100 Subject: [PATCH 12/14] fix test failure at BufferHolderSuite --- .../sql/catalyst/expressions/codegen/BufferHolderSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala index c7c386b5b838a..3ae889c256304 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala @@ -31,7 +31,7 @@ class BufferHolderSuite extends SparkFunSuite { val holder = new BufferHolder(new UnsafeRow(1000)) holder.reset() holder.grow(1000) - e = intercept[UnsupportedOperationException] { + e = intercept[IllegalArgumentException] { holder.grow(Integer.MAX_VALUE) } assert(e.getMessage.contains("exceeds size limitation")) From 5e59acafdd05142622f4de0c20132b9965f9cc5a Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 8 Aug 2018 18:05:05 +0100 Subject: [PATCH 13/14] fix build error --- .../expressions/codegen/BufferHolderSuite.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala index 3ae889c256304..4e0f903a030aa 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala @@ -23,17 +23,15 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow class BufferHolderSuite extends SparkFunSuite { test("SPARK-16071 Check the size limit to avoid integer overflow") { - var e = intercept[UnsupportedOperationException] { + assert(intercept[UnsupportedOperationException] { new BufferHolder(new UnsafeRow(Int.MaxValue / 8)) - } - assert(e.getMessage.contains("too many fields")) + }.getMessage.contains("too many fields")) val holder = new BufferHolder(new UnsafeRow(1000)) holder.reset() holder.grow(1000) - e = intercept[IllegalArgumentException] { + assert(intercept[IllegalArgumentException] { holder.grow(Integer.MAX_VALUE) - } - assert(e.getMessage.contains("exceeds size limitation")) + }.getMessage.contains("exceeds size limitation")) } } From 81d647793c7491c2f806e78ccae353d652d61d96 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 9 Aug 2018 03:12:55 +0100 Subject: [PATCH 14/14] fix Java style error --- .../spark/sql/catalyst/expressions/codegen/BufferHolder.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java index 455ec012172f6..6a52a5b0e0664 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java @@ -63,9 +63,10 @@ final class BufferHolder { * Grows the buffer by at least neededSize and points the row to the buffer. */ void grow(int neededSize) { - if (neededSize < 0) + if (neededSize < 0) { throw new IllegalArgumentException( "Cannot grow BufferHolder by size " + neededSize + " because the size is negative"); + } if (neededSize > ARRAY_MAX - totalSize()) { throw new IllegalArgumentException( "Cannot grow BufferHolder by size " + neededSize + " because the size after growing " +