Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix no ending loop
  • Loading branch information
caneGuy committed Aug 24, 2017
commit 72aef679b498bb042ecb9ffa8df62ed41e1f519d
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
def writeWithSlice(channel: WritableByteChannel): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use this one to replace writeFully?

for (bytes <- getChunks()) {
val capacity = bytes.limit()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line is not used.

while (bytes.position() < capacity) {
while (bytes.position() < capacity && bytes.remaining() > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need bytes.position() < capacity? isn't bytes.remaining() > 0 enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.Sorry for that.

val ioSize = Math.min(bytes.remaining(), NIO_BUFFER_LIMIT.toInt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why toInt here?

bytes.limit(bytes.position + ioSize)
channel.write(bytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,23 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
assert(chunkedByteBuffer.getChunks().head.position() === 0)
}

test("benchmark") {
val buffer100 = ByteBuffer.allocate(1024 * 1024 * 100)
val buffer30 = ByteBuffer.allocate(1024 * 1024 * 30)
val chunkedByteBuffer = new ChunkedByteBuffer(Array.fill(5)(buffer100))
var starTime = System.currentTimeMillis()
for (i <- 1 to 10) {
chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt))
}
// scalastyle:off
System.out.println(System.currentTimeMillis() - starTime)
starTime = System.currentTimeMillis()
for (i <- 1 to 10) {
chunkedByteBuffer.writeWithSlice(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt))
}
System.out.println(System.currentTimeMillis() - starTime)
}

test("toArray()") {
val empty = ByteBuffer.wrap(Array.empty[Byte])
val bytes = ByteBuffer.wrap(Array.tabulate(8)(_.toByte))
Expand Down