Skip to content

Commit 737d0d9

Browse files
committed
POC: outbound protobuf zero-copy
Outbound equivalent of #7330. Protobuf doesn't support multiple ByteBuffers in this direction but I don't think that matters much since the outbound buffers are typically allocated/sized to fit the messages.
1 parent e6ab167 commit 737d0d9

File tree

5 files changed

+148
-16
lines changed

5 files changed

+148
-16
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2020 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc;
18+
19+
import java.nio.ByteBuffer;
20+
21+
/**
22+
* Extension to an {@link java.io.OutputStream} or alike by adding methods that
23+
* allow writing directly to an underlying {@link ByteBuffer}
24+
*/
25+
public interface ByteBufferBacked {
26+
27+
/**
28+
* If available, returns a {@link ByteBuffer} backing this writable
29+
* object whose position corresponds to this object's current
30+
* writing position and with at least {@code size} remaining bytes.
31+
*
32+
* @param size minimum required size
33+
* @return null if not supported or writable buffer of insufficient size
34+
*/
35+
ByteBuffer getWritableBuffer(int size);
36+
37+
/**
38+
* This must be called to notify that data has been written to the
39+
* buffer previously returned from {@link #getWritableBuffer(int)},
40+
* prior to calling any other methods.
41+
*
42+
* @param written number of bytes written
43+
*/
44+
void bufferBytesWritten(int written);
45+
}

core/src/main/java/io/grpc/internal/MessageFramer.java

Lines changed: 66 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static java.lang.Math.min;
2323

2424
import com.google.common.io.ByteStreams;
25+
import io.grpc.ByteBufferBacked;
2526
import io.grpc.Codec;
2627
import io.grpc.Compressor;
2728
import io.grpc.Drainable;
@@ -277,17 +278,12 @@ private static int writeToOutputStream(InputStream message, OutputStream outputS
277278
}
278279
}
279280

280-
private void writeRaw(byte[] b, int off, int len) {
281+
// package-private to avoid synthetic access from OutputStreamAdapter
282+
void writeRaw(byte[] b, int off, int len) {
281283
while (len > 0) {
282-
if (buffer != null && buffer.writableBytes() == 0) {
283-
commitToSink(false, false);
284-
}
285-
if (buffer == null) {
286-
// Request a buffer allocation using the message length as a hint.
287-
buffer = bufferAllocator.allocate(len);
288-
}
289-
int toWrite = min(len, buffer.writableBytes());
290-
buffer.write(b, off, toWrite);
284+
WritableBuffer buf = getWritableBuffer(len);
285+
int toWrite = min(len, buf.writableBytes());
286+
buf.write(b, off, toWrite);
291287
off += toWrite;
292288
len -= toWrite;
293289
}
@@ -303,6 +299,17 @@ public void flush() {
303299
}
304300
}
305301

302+
WritableBuffer getWritableBuffer(int len) {
303+
if (buffer != null && buffer.writableBytes() == 0) {
304+
commitToSink(false, false);
305+
}
306+
if (buffer == null) {
307+
// Request a buffer allocation using the message length as a hint.
308+
buffer = bufferAllocator.allocate(len);
309+
}
310+
return buffer;
311+
}
312+
306313
/**
307314
* Indicates whether or not this framer has been closed via a call to either
308315
* {@link #close()} or {@link #dispose()}.
@@ -360,7 +367,7 @@ private void verifyNotClosed() {
360367
}
361368

362369
/** OutputStream whose write()s are passed to the framer. */
363-
private class OutputStreamAdapter extends OutputStream {
370+
private class OutputStreamAdapter extends OutputStream implements ByteBufferBacked {
364371
/**
365372
* This is slow, don't call it. If you care about write overhead, use a BufferedOutputStream.
366373
* Better yet, you can use your own single byte buffer and call
@@ -376,13 +383,29 @@ public void write(int b) {
376383
public void write(byte[] b, int off, int len) {
377384
writeRaw(b, off, len);
378385
}
386+
387+
@Override
388+
public ByteBuffer getWritableBuffer(int size) {
389+
if (size > 0) {
390+
WritableBuffer buf = MessageFramer.this.getWritableBuffer(size);
391+
if (buf instanceof ByteBufferBacked) {
392+
return ((ByteBufferBacked) buf).getWritableBuffer(size);
393+
}
394+
}
395+
return null;
396+
}
397+
398+
@Override
399+
public void bufferBytesWritten(int size) {
400+
MessageFramer.bufferBytesWritten(buffer, size);
401+
}
379402
}
380403

381404
/**
382405
* Produce a collection of {@link WritableBuffer} instances from the data written to an
383406
* {@link OutputStream}.
384407
*/
385-
private final class BufferChainOutputStream extends OutputStream {
408+
private final class BufferChainOutputStream extends OutputStream implements ByteBufferBacked {
386409
private final List<WritableBuffer> bufferList = new ArrayList<>();
387410
private WritableBuffer current;
388411

@@ -403,7 +426,7 @@ public void write(int b) throws IOException {
403426

404427
@Override
405428
public void write(byte[] b, int off, int len) {
406-
if (current == null) {
429+
if (current == null && len > 0) {
407430
// Request len bytes initially from the allocator, it may give us more.
408431
current = bufferAllocator.allocate(len);
409432
bufferList.add(current);
@@ -431,5 +454,35 @@ private int readableBytes() {
431454
}
432455
return readable;
433456
}
457+
458+
@Override
459+
public ByteBuffer getWritableBuffer(int size) {
460+
if (size > 0) {
461+
if (current == null || current.writableBytes() == 0) {
462+
bufferList.add(current = bufferAllocator.allocate(size));
463+
}
464+
if (current instanceof ByteBufferBacked) {
465+
return ((ByteBufferBacked) current).getWritableBuffer(size);
466+
}
467+
}
468+
return null;
469+
}
470+
471+
@Override
472+
public void bufferBytesWritten(int size) {
473+
MessageFramer.bufferBytesWritten(current, size);
474+
}
475+
}
476+
477+
static void bufferBytesWritten(WritableBuffer buffer, int size) {
478+
try {
479+
((ByteBufferBacked) buffer).bufferBytesWritten(size);
480+
return;
481+
} catch (ClassCastException cce) {
482+
// fall-through
483+
} catch (NullPointerException npe) {
484+
// fall-through
485+
}
486+
throw new IllegalStateException();
434487
}
435488
}

netty/src/main/java/io/grpc/netty/NettyWritableBuffer.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@
1616

1717
package io.grpc.netty;
1818

19+
import io.grpc.ByteBufferBacked;
1920
import io.grpc.internal.WritableBuffer;
2021
import io.netty.buffer.ByteBuf;
2122

23+
import java.nio.ByteBuffer;
24+
2225
/**
2326
* The {@link WritableBuffer} used by the Netty transport.
2427
*/
25-
class NettyWritableBuffer implements WritableBuffer {
28+
class NettyWritableBuffer implements WritableBuffer, ByteBufferBacked {
2629

2730
private final ByteBuf bytebuf;
2831

@@ -50,6 +53,26 @@ public int readableBytes() {
5053
return bytebuf.readableBytes();
5154
}
5255

56+
@Override
57+
public ByteBuffer getWritableBuffer(int size) {
58+
if (bytebuf.writableBytes() >= size && size > 0) {
59+
try {
60+
return bytebuf.internalNioBuffer(bytebuf.writerIndex(), size);
61+
} catch (UnsupportedOperationException uoe) {
62+
// fall-through
63+
}
64+
}
65+
return null;
66+
}
67+
68+
@Override
69+
public void bufferBytesWritten(int size) {
70+
if (size < 0 || size > bytebuf.writableBytes()) {
71+
throw new IllegalStateException();
72+
}
73+
bytebuf.writerIndex(bytebuf.writerIndex() + size);
74+
}
75+
5376
@Override
5477
public void release() {
5578
bytebuf.release();

protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoInputStream.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@
1919
import com.google.protobuf.CodedOutputStream;
2020
import com.google.protobuf.MessageLite;
2121
import com.google.protobuf.Parser;
22+
import io.grpc.ByteBufferBacked;
2223
import io.grpc.Drainable;
2324
import io.grpc.KnownLength;
2425
import java.io.ByteArrayInputStream;
2526
import java.io.IOException;
2627
import java.io.InputStream;
2728
import java.io.OutputStream;
29+
import java.nio.ByteBuffer;
2830
import javax.annotation.Nullable;
2931

3032
/**
@@ -49,7 +51,16 @@ public int drainTo(OutputStream target) throws IOException {
4951
int written;
5052
if (message != null) {
5153
written = message.getSerializedSize();
52-
message.writeTo(target);
54+
ByteBuffer buffer;
55+
if (target instanceof ByteBufferBacked
56+
&& (buffer = ((ByteBufferBacked) target).getWritableBuffer(written)) != null) {
57+
CodedOutputStream coded = CodedOutputStream.newInstance(buffer);
58+
message.writeTo(coded);
59+
coded.flush();
60+
((ByteBufferBacked) target).bufferBytesWritten(written);
61+
} else {
62+
message.writeTo(target);
63+
}
5364
message = null;
5465
} else if (partial != null) {
5566
written = (int) ProtoLiteUtils.copy(partial, target);

protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoLiteUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ public T parse(InputStream stream) {
160160
if (protoStream.parser() == parser) {
161161
try {
162162
@SuppressWarnings("unchecked")
163-
T message = (T) ((ProtoInputStream) stream).message();
163+
T message = (T) protoStream.message();
164164
return message;
165165
} catch (IllegalStateException ignored) {
166166
// Stream must have been read from, which is a strange state. Since the point of this

0 commit comments

Comments
 (0)