Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
spark-14290 avoid significant memory copy in netty's transferTo
  • Loading branch information
liyezhang556520 committed Mar 31, 2016
commit 63ca85a5548858b4fe46a4ade062776cb6747cba
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.network.protocol;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import javax.annotation.Nullable;

Expand All @@ -43,6 +44,14 @@ class MessageWithHeader extends AbstractReferenceCounted implements FileRegion {
private final long bodyLength;
private long totalBytesTransferred;

/**
* When the write buffer size is larger than this limit, I/O will be done in chunks of this size.
* The size should not be too large as it will waste underlying memory copy. e.g. If network
* avaliable buffer is smaller than this limit, the data cannot be sent within one single write
* operation while it still will make memory copy with this size.
*/
private static final int NIO_BUFFER_LIMIT = 512 * 1024;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I set this limit to 512K because in my test, it can successfully write about 600KB ~1.5MB size data for each WritableByteChannel.write(). This size need to be decided after more tests by someone else.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to know the accurate number? I guess not because it's OS dependent and may be changed vis OS settings.

However, I saw Hadoop uses private static int NIO_BUFFER_LIMIT = 8*1024; //should not be more than 64KB.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also a little worried that 512k might be a bit too much. On my machine, /proc/sys/net/core/wmem_default is around 200k, which (I assume) means you'd be copying about half of the buffer with no need here.

Instead, how about using a more conservative value (like hadoop's), and loop in copyByteBuf until you either write the whole source buffer, or get a short write?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a too small value will waste a lot of system calls. Our use case is different than Hadoop. Here we may send large messages.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we create DirectByteBuffer here manually for a big buf (big enough so that we can get benefits even if creating a direct buffer is slow) and try to write as many as possible? Then we can avoid the memory copy in IOUtil.write.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to know the accurate number? I guess not because it's OS dependent and may be changed vis OS settings.

@zsxwing There might be a way to get the accurate number of the network buffer, but I think it's meaningless to do that because even we get the accurate number, we cannot guarantee the network send buffer is empty each time we write the data, which means, it's always possible that we can only write part of the data whatever size we set NIO_BUFFER_LIMIT. We can only say the smaller the NIO_BUFFER_LIMIT is, the less redundant copy will be made.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On my machine, /proc/sys/net/core/wmem_default is around 200k, which (I assume) means you'd be copying about half of the buffer with no need here.

@vanzin , on my machine, both wmem_default and wmem_max are also around 200K, but in my test, I can successfully write more than 512K for each WritableByteChannel.write(), this size should be the same with return size of writeFromNativeBuffer as in line http://www.grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/7u40-b43/sun/nio/ch/IOUtil.java#65. I don't know why. Can you also make a test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we create DirectByteBuffer here manually for a big buf (big enough so that we can get benefits even if creating a direct buffer is slow) and try to write as many as possible? Then we can avoid the memory copy in IOUtil.write.

@zsxwing , Yes, redundant copy can be avoided if we give a directBuffer directly to WritableByteChannel.write() because of code in line http://www.grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/7u40-b43/sun/nio/ch/IOUtil.java#50, but I don't know if that's worthwhile. IOUtil will maintain a directBuffer pool to avoid frequently allocate the directBuffers. I think that's why when I made the test, the first time I run code sc.parallelize(Array(1,2,3),3).mapPartitions(a=>Array(new Array[Long](1024 * 1024 * 200)).iterator).reduce((a,b)=> a).length, the network throughput is extremely low on executor side, and if I ran this code after I ran the code sc.parallelize(Array(1,2,3),3).mapPartitions(a=>Array(new Array[Double](1024 * 1024 * 50)).iterator).reduce((a,b)=> a).length, the network throughput will be much higher.

So, If we want create direct Buffer manually in Spark, It's better also maintain a buffer pool, but that will introduce much more complexity and have the risk of memory leak.


/**
* Construct a new MessageWithHeader.
*
Expand Down Expand Up @@ -128,8 +137,27 @@ protected void deallocate() {
}

private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws IOException {
int written = target.write(buf.nioBuffer());
ByteBuffer buffer = buf.nioBuffer();
int written = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
target.write(buffer) : writeNioBuffer(target, buffer);
buf.skipBytes(written);
return written;
}

private int writeNioBuffer(
WritableByteChannel writeCh,
ByteBuffer buf) throws IOException {
int originalLimit = buf.limit();
int ret = 0;

try {
int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
buf.limit(buf.position() + ioSize);
ret = writeCh.write(buf);
} finally {
buf.limit(originalLimit);
}

return ret;
}
}