Skip to content

Commit 5024c4c

Browse files
ericlrxin
authored andcommitted
[SPARK-16432] Empty blocks fail to serialize due to assert in ChunkedByteBuffer
## What changes were proposed in this pull request? It's possible to also change the callers to not pass in empty chunks, but it seems cleaner to just allow `ChunkedByteBuffer` to handle empty arrays. cc JoshRosen ## How was this patch tested? Unit tests, also checked that the original reproduction case in #11748 (comment) is resolved. Author: Eric Liang <[email protected]> Closes #14099 from ericl/spark-16432. (cherry picked from commit d8b06f1) Signed-off-by: Reynold Xin <[email protected]>
1 parent 16202ba commit 5024c4c

File tree

2 files changed

+8
-13
lines changed

2 files changed

+8
-13
lines changed

core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,13 @@ import org.apache.spark.storage.StorageUtils
3131
* Read-only byte buffer which is physically stored as multiple chunks rather than a single
3232
* contiguous array.
3333
*
34-
* @param chunks an array of [[ByteBuffer]]s. Each buffer in this array must be non-empty and have
35-
* position == 0. Ownership of these buffers is transferred to the ChunkedByteBuffer,
36-
* so if these buffers may also be used elsewhere then the caller is responsible for
37-
* copying them as needed.
34+
* @param chunks an array of [[ByteBuffer]]s. Each buffer in this array must have position == 0.
35+
* Ownership of these buffers is transferred to the ChunkedByteBuffer, so if these
36+
* buffers may also be used elsewhere then the caller is responsible for copying
37+
* them as needed.
3838
*/
3939
private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
4040
require(chunks != null, "chunks must not be null")
41-
require(chunks.forall(_.limit() > 0), "chunks must be non-empty")
4241
require(chunks.forall(_.position() == 0), "chunks' positions must be 0")
4342

4443
private[this] var disposed: Boolean = false

core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,6 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
3838
emptyChunkedByteBuffer.toInputStream(dispose = true).close()
3939
}
4040

41-
test("chunks must be non-empty") {
42-
intercept[IllegalArgumentException] {
43-
new ChunkedByteBuffer(Array(ByteBuffer.allocate(0)))
44-
}
45-
}
46-
4741
test("getChunks() duplicates chunks") {
4842
val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(8)))
4943
chunkedByteBuffer.getChunks().head.position(4)
@@ -63,8 +57,9 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
6357
}
6458

6559
test("toArray()") {
60+
val empty = ByteBuffer.wrap(Array[Byte]())
6661
val bytes = ByteBuffer.wrap(Array.tabulate(8)(_.toByte))
67-
val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes, bytes))
62+
val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes, bytes, empty))
6863
assert(chunkedByteBuffer.toArray === bytes.array() ++ bytes.array())
6964
}
7065

@@ -79,9 +74,10 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
7974
}
8075

8176
test("toInputStream()") {
77+
val empty = ByteBuffer.wrap(Array[Byte]())
8278
val bytes1 = ByteBuffer.wrap(Array.tabulate(256)(_.toByte))
8379
val bytes2 = ByteBuffer.wrap(Array.tabulate(128)(_.toByte))
84-
val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes1, bytes2))
80+
val chunkedByteBuffer = new ChunkedByteBuffer(Array(empty, bytes1, bytes2))
8581
assert(chunkedByteBuffer.size === bytes1.limit() + bytes2.limit())
8682

8783
val inputStream = chunkedByteBuffer.toInputStream(dispose = false)

0 commit comments

Comments
 (0)