-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-21527][CORE] Use buffer limit in order to use JAVA NIO Util's buffercache #18730
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 14 commits
7cbadc5
fc91f96
bab91db
72aef67
4789772
aeabe1d
ca77b51
e84a6d7
d708142
33a2796
ab384d4
b669351
717f886
9d3004d
e48f44f
fc184aa
f1d67a3
14ca824
a02575e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ import java.nio.channels.WritableByteChannel | |
| import com.google.common.primitives.UnsignedBytes | ||
| import io.netty.buffer.{ByteBuf, Unpooled} | ||
|
|
||
| import org.apache.spark.internal.config | ||
| import org.apache.spark.network.util.ByteArrayWritableChannel | ||
| import org.apache.spark.storage.StorageUtils | ||
|
|
||
|
|
@@ -40,6 +41,9 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { | |
| require(chunks != null, "chunks must not be null") | ||
| require(chunks.forall(_.position() == 0), "chunks' positions must be 0") | ||
|
|
||
| // Chunk size in bytes | ||
| private val NIO_BUFFER_LIMIT = SparkEnv.get.conf.get(config.BUFFER_WRITE_CHUNK_SIZE) | ||
|
||
|
|
||
| private[this] var disposed: Boolean = false | ||
|
|
||
| /** | ||
|
|
@@ -56,7 +60,10 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { | |
| */ | ||
| def writeFully(channel: WritableByteChannel): Unit = { | ||
| for (bytes <- getChunks()) { | ||
| while (bytes.remaining > 0) { | ||
| val capacity = bytes.limit() | ||
|
||
| while (bytes.remaining() > 0) { | ||
| val ioSize = Math.min(bytes.remaining(), NIO_BUFFER_LIMIT) | ||
| bytes.limit(bytes.position + ioSize) | ||
| channel.write(bytes) | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The chunk size during writing out the bytes of ChunkedByteBuffer