Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
c8810ad
Define an interface for retrieving a list of ByteBuffers.
voidzcy Jul 24, 2020
17d6dbb
Add methods to ReadableBuffer to support returning read bytes via Byt…
voidzcy Jul 24, 2020
83f0448
CompositeReadableBuffer's implementation for reading by returning Byt…
voidzcy Jul 24, 2020
69a1ddf
NettyReadableBuffer's implementation for reading by directly returnin…
voidzcy Jul 24, 2020
88d2b6f
Deframer gives an InputStream if the backing ReadableBuffer supports …
voidzcy Jul 24, 2020
94b5124
Feed a list of ByteBuffers into protobuf if the InputStream implement…
voidzcy Jul 24, 2020
ce8969e
Fix readByteBuffers operation for CompositeReadableBuffer.
voidzcy Aug 4, 2020
a16b6ae
Improve ByteBuffer read for netty.
voidzcy Aug 4, 2020
0cb6ea8
update interface for getting a list of ByteBuffers.
voidzcy Aug 4, 2020
f9aaffc
Improve feeding ByteBuffer to protobuf.
voidzcy Aug 4, 2020
8a0c7a2
Should not close ReadableBuffer before reading the returned ByteBuffe…
voidzcy Aug 4, 2020
8fb4534
Add TODO for remembering current implementation leaks buffer.
voidzcy Aug 4, 2020
58165d6
Replace ByteBufferReadable interface with ManagedBytes, which takes t…
voidzcy Aug 5, 2020
e2b3991
Add api for reading data as ManagedBytes from ReadableBuffer.
voidzcy Aug 5, 2020
8959de5
Implement reading via ManagedBytes for netty.
voidzcy Aug 5, 2020
86414ad
Implement reading via ManagedBytes for composite.
voidzcy Aug 5, 2020
093e435
Change stream to use ManagedBytes for data transfer.
voidzcy Aug 5, 2020
c9c123c
Use new ManagedBufferReadable api to transfer data into protobuf and …
voidzcy Aug 5, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions api/src/main/java/io/grpc/ByteBufferReadable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc;

import java.nio.ByteBuffer;
import javax.annotation.Nullable;

/**
* An {@link java.io.InputStream} or alike that supports the operation of reading its content into
* {@link ByteBuffer}s.
*
* <p>Usually used for implementations (directly or indirectly) backed by {@link ByteBuffer}s so
* that read operations avoid making an extra copy by returning {@link ByteBuffer}s sharing content
* with the backing {@link ByteBuffer}s.
*/
public interface ByteBufferReadable {


/**
* Reads up to a total of {@code length} bytes as {@link ByteBuffer}s.
*
* @param length the maximum number of bytes to be read.
* @return {@link ByteBuffer}s that contains the bytes being read or {@code null} if no more
* bytes can be read.
*/
@Nullable
Iterable<ByteBuffer> readByteBuffers(int length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe return Collections.emptyList() instead of null?

}
13 changes: 13 additions & 0 deletions core/src/main/java/io/grpc/internal/AbstractReadableBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package io.grpc.internal;

import java.nio.ByteBuffer;
import java.util.List;

/**
* Abstract base class for {@link ReadableBuffer} implementations.
*/
Expand All @@ -30,6 +33,16 @@ public final int readInt() {
return (b1 << 24) | (b2 << 16) | (b3 << 8) | b4;
}

@Override
public boolean shouldUseByteBuffer() {
return false;
}

@Override
public List<ByteBuffer> readByteBuffers(int length) {
throw new UnsupportedOperationException();
}

@Override
public boolean hasArray() {
return false;
Expand Down
31 changes: 31 additions & 0 deletions core/src/main/java/io/grpc/internal/CompositeReadableBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;

/**
Expand Down Expand Up @@ -59,6 +61,35 @@ public void addBuffer(ReadableBuffer buffer) {
compositeBuffer.close();
}

@Override
public boolean shouldUseByteBuffer() {
for (ReadableBuffer buf : buffers) {
if (!buf.shouldUseByteBuffer()) {
return false;
}
}
return true;
}

@Override
public List<ByteBuffer> readByteBuffers(int length) {
checkReadable(length);
readableBytes -= length;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to decrement this within the loop so that the buffer state remains consistent in case of runtime exception?

Also how about a special case (should be common):

if (buffers.size() == 1) {
  return (readableBytes == 0 ? buffers.poll() : buffers.peek()).readByteBuffers(length);
}


List<ByteBuffer> res = new ArrayList<>();
while (length > 0) {
ReadableBuffer buffer = buffers.peek();
int readLength = length;
if (buffer.readableBytes() <= length) {
readLength = buffer.readableBytes();
buffers.poll();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buffers.remove()?

}
res.addAll(buffer.readByteBuffers(readLength));
length -= readLength;
}
return res;
}

@Override
public int readableBytes() {
return readableBytes;
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/io/grpc/internal/ForwardingReadableBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.List;

/**
* Base class for a wrapper around another {@link ReadableBuffer}.
Expand Down Expand Up @@ -81,6 +82,16 @@ public ReadableBuffer readBytes(int length) {
return buf.readBytes(length);
}

@Override
public boolean shouldUseByteBuffer() {
return buf.shouldUseByteBuffer();
}

@Override
public List<ByteBuffer> readByteBuffers(int length) {
return buf.readByteBuffers(length);
}

@Override
public boolean hasArray() {
return buf.hasArray();
Expand Down
17 changes: 17 additions & 0 deletions core/src/main/java/io/grpc/internal/ReadableBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.List;

/**
* Interface for an abstract byte buffer. Buffers are intended to be a read-only, except for the
Expand Down Expand Up @@ -102,6 +103,22 @@ public interface ReadableBuffer extends Closeable {
*/
ReadableBuffer readBytes(int length);

/**
* Indicates whether or not this buffer supports {@link #readByteBuffers} operation that returns
* buffer's content as {@link ByteBuffer}s without making an extra copy.
*/
boolean shouldUseByteBuffer();

/**
* Reads {@code length} bytes as {@link ByteBuffer}s. This is an optional method, so callers
* should first check {@link #shouldUseByteBuffer}.
*
* @param length the total number of bytes to contain in the returned {@link ByteBuffer}s.
* @throws UnsupportedOperationException the buffer does not support this method
* @throws IndexOutOfBoundsException if required bytes are not readable
*/
List<ByteBuffer> readByteBuffers(int length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just returning null here instead of separate boolean method? This would also be more efficient for composite case since it means not having to iterate over the buffers an additional time.


/**
* Indicates whether or not this buffer exposes a backing array.
*/
Expand Down
30 changes: 28 additions & 2 deletions core/src/main/java/io/grpc/internal/ReadableBuffers.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import static com.google.common.base.Charsets.UTF_8;

import com.google.common.base.Preconditions;
import io.grpc.ByteBufferReadable;
import io.grpc.KnownLength;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import javax.annotation.Nullable;

/**
* Utility methods for creating {@link ReadableBuffer} instances.
Expand Down Expand Up @@ -103,7 +105,11 @@ public static String readAsStringUtf8(ReadableBuffer buffer) {
* @param owner if {@code true}, the returned stream will close the buffer when closed.
*/
public static InputStream openStream(ReadableBuffer buffer, boolean owner) {
return new BufferInputStream(owner ? buffer : ignoreClose(buffer));
if (!owner) {
buffer = ignoreClose(buffer);
}
return buffer.shouldUseByteBuffer()
? new ByteBufferReadableInputStream(buffer) : new BufferInputStream(buffer);
}

/**
Expand Down Expand Up @@ -297,7 +303,7 @@ public int arrayOffset() {
/**
* An {@link InputStream} that is backed by a {@link ReadableBuffer}.
*/
private static final class BufferInputStream extends InputStream implements KnownLength {
private static class BufferInputStream extends InputStream implements KnownLength {
final ReadableBuffer buffer;

public BufferInputStream(ReadableBuffer buffer) {
Expand Down Expand Up @@ -336,5 +342,25 @@ public void close() throws IOException {
}
}

private static final class ByteBufferReadableInputStream extends BufferInputStream
implements ByteBufferReadable {

ByteBufferReadableInputStream(ReadableBuffer buffer) {
super(buffer);
}

@Nullable
@Override
public Iterable<ByteBuffer> readByteBuffers(int length) {
if (buffer.readableBytes() == 0) {
// EOF.
return null;
}

length = Math.min(buffer.readableBytes(), length);
return buffer.readByteBuffers(length);
}
}

private ReadableBuffers() {}
}
17 changes: 17 additions & 0 deletions netty/src/main/java/io/grpc/netty/NettyReadableBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;

/**
* A {@link java.nio.Buffer} implementation that is backed by a Netty {@link ByteBuf}. This class
Expand Down Expand Up @@ -79,6 +81,21 @@ public NettyReadableBuffer readBytes(int length) {
return new NettyReadableBuffer(buffer.readRetainedSlice(length));
}

@Override
public boolean shouldUseByteBuffer() {
return buffer.nioBufferCount() > 0;
}

@Override
public List<ByteBuffer> readByteBuffers(int length) {
if (buffer.readableBytes() < length) {
throw new IndexOutOfBoundsException();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return empty list here if length == 0?

List<ByteBuffer> res = Arrays.asList(buffer.nioBuffers(buffer.readerIndex(), length));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here would be preferable to do:

List<ByteBuffer> res = buffer.nioBufferCount() == 1
    ? Collections.singletonList(buffer.nioBuffer(buffer.readableIndex(), length)
    : Arrays.asList(buffer.nioBuffers(buffer.readerIndex(), length));

buffer.skipBytes(length);
return res;
}

@Override
public boolean hasArray() {
return buffer.hasArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
import io.grpc.ByteBufferReadable;
import io.grpc.ExperimentalApi;
import io.grpc.KnownLength;
import io.grpc.Metadata;
Expand Down Expand Up @@ -173,7 +174,15 @@ public T parse(InputStream stream) {
try {
if (stream instanceof KnownLength) {
int size = stream.available();
if (size > 0 && size <= DEFAULT_MAX_MESSAGE_SIZE) {
if (size <= 0) {
return defaultInstance;
}
// TODO(chengyuanzhang): we may still want to go with the byte array approach for small
// messages.
if (stream instanceof ByteBufferReadable) {
cis = CodedInputStream.newInstance(
((ByteBufferReadable) stream).readByteBuffers(size));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the protobuf code doesn't do this itself so would be better to handle (most common) singleton case separately:

List<ByteBuffer> bufs = ((ByteBufferReadable) stream).readByteBuffers(size);
cis = bufs.size() == 1 ? CodedInputStream.newInstance(bufs.get(0)) : CodedInputStream.newInstance(bufs);

} else if (size < DEFAULT_MAX_MESSAGE_SIZE) {
Reference<byte[]> ref;
// buf should not be used after this method has returned.
byte[] buf;
Expand All @@ -197,8 +206,6 @@ public T parse(InputStream stream) {
throw new RuntimeException("size inaccurate: " + size + " != " + position);
}
cis = CodedInputStream.newInstance(buf, 0, size);
} else if (size == 0) {
return defaultInstance;
}
}
} catch (IOException e) {
Expand Down