Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private[spark] class DiskStore(

def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
put(blockId) { channel =>
bytes.writeFully(channel)
bytes.writeWithSlice(channel)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
require(chunks != null, "chunks must not be null")
require(chunks.forall(_.position() == 0), "chunks' positions must be 0")

private val NIO_BUFFER_LIMIT = 64 * 1024 * 1024 // Chunk size in bytes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you tested with different numbers? is 64mb the best choice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually i did not tested too much, it depends on value from our cluster.And i do some simple test then found 64mb will not affect perfomance compared with logic before.And benifit is we only need one buffer cache for block larger than 64 mb

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we make it configurable? we can add a new entry in org.apache.spark.internal.config, and mark it as an internal config first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, i will refactor later.Thanks for your suggestion.


private[this] var disposed: Boolean = false

/**
Expand All @@ -62,6 +64,19 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
}
}

/**
* Write this buffer to a channel with slice.
*/
def writeWithSlice(channel: WritableByteChannel): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use this one to replace writeFully?

for (bytes <- getChunks()) {
val capacity = bytes.limit()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line is not used.

while (bytes.position() < capacity) {
bytes.limit(Math.min(capacity, bytes.position + NIO_BUFFER_LIMIT.toInt))
Copy link
Member

@kiszk kiszk Aug 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we replace Math.min(...) with Math.min(capacity, bytes.position + NIO_BUFFER_LIMIT.toLong)? I am afraid about int underflow. For example, if capacity = 0x7FFFFFF0 and bytes.position = 0x7FFFFF00, the result of bytes.position + NIO_BUFFER_LIMIT.toInt is negative (i.e. greater than 0x80000000).
To avoid this underflow, it would be good to compare them by using long.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good review.I refactor the code.

channel.write(bytes)
}
}
}

/**
* Wrap this buffer to view it as a Netty ByteBuf.
*/
Expand Down