Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
c8810ad
Define an interface for retrieving a list of ByteBuffers.
voidzcy Jul 24, 2020
17d6dbb
Add methods to ReadableBuffer to support returning read bytes via Byt…
voidzcy Jul 24, 2020
83f0448
CompositeReadableBuffer's implementation for reading by returning Byt…
voidzcy Jul 24, 2020
69a1ddf
NettyReadableBuffer's implementation for reading by directly returnin…
voidzcy Jul 24, 2020
88d2b6f
Deframer gives an InputStream if the backing ReadableBuffer supports …
voidzcy Jul 24, 2020
94b5124
Feed a list of ByteBuffers into protobuf if the InputStream implement…
voidzcy Jul 24, 2020
ce8969e
Fix readByteBuffers operation for CompositeReadableBuffer.
voidzcy Aug 4, 2020
a16b6ae
Improve ByteBuffer read for netty.
voidzcy Aug 4, 2020
0cb6ea8
update interface for getting a list of ByteBuffers.
voidzcy Aug 4, 2020
f9aaffc
Improve feeding ByteBuffer to protobuf.
voidzcy Aug 4, 2020
8a0c7a2
Should not close ReadableBuffer before reading the returned ByteBuffe…
voidzcy Aug 4, 2020
8fb4534
Add TODO for remembering current implementation leaks buffer.
voidzcy Aug 4, 2020
58165d6
Replace ByteBufferReadable interface with ManagedBytes, which takes t…
voidzcy Aug 5, 2020
e2b3991
Add api for reading data as ManagedBytes from ReadableBuffer.
voidzcy Aug 5, 2020
8959de5
Implement reading via ManagedBytes for netty.
voidzcy Aug 5, 2020
86414ad
Implement reading via ManagedBytes for composite.
voidzcy Aug 5, 2020
093e435
Change stream to use ManagedBytes for data transfer.
voidzcy Aug 5, 2020
c9c123c
Use new ManagedBufferReadable api to transfer data into protobuf and …
voidzcy Aug 5, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions api/src/main/java/io/grpc/ManagedBytes.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc;

import java.nio.ByteBuffer;
import java.util.List;
import javax.annotation.Nullable;

/**
* A logical representation of bytes with the responsibility of managing the resource lifecycle
* used to hold the data. Usually used for transferring data by directly hand over the ownership
* of the backing resources.
*/
public abstract class ManagedBytes {

/**
* Returns the data in a list of {@link ByteBuffer}s.
*/
public abstract List<ByteBuffer> asByteBuffers();

/**
* Release the usage of the data. The underlying resource used to hold the data may be released
* and no more access should be attempted.
*/
public void release() {
// no-op
}

/**
* A readable data source that supports the operation of reading its content into
* {@link ManagedBytes}.
*/
public interface ManagedBytesReadable {

/**
* Reads up to a total of {@code length} bytes as {@link ManagedBytes}.
*
* @param length the maximum number of bytes to be read.
* @return {@link ManagedBytes} that contains the bytes being read or {@code null}
* if no more bytes can be read.
*/
@Nullable
ManagedBytes readManagedBytes(int length);
}
}
12 changes: 12 additions & 0 deletions core/src/main/java/io/grpc/internal/AbstractReadableBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.grpc.internal;

import io.grpc.ManagedBytes;

/**
* Abstract base class for {@link ReadableBuffer} implementations.
*/
Expand All @@ -30,6 +32,16 @@ public final int readInt() {
return (b1 << 24) | (b2 << 16) | (b3 << 8) | b4;
}

@Override
public boolean shouldUseManagedBytes() {
return false;
}

@Override
public ManagedBytes readManagedBytes(int length) {
throw new UnsupportedOperationException();
}

@Override
public boolean hasArray() {
return false;
Expand Down
73 changes: 73 additions & 0 deletions core/src/main/java/io/grpc/internal/CompositeReadableBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@

package io.grpc.internal;

import com.google.common.base.Preconditions;
import io.grpc.ManagedBytes;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;

/**
Expand Down Expand Up @@ -59,6 +63,75 @@ public void addBuffer(ReadableBuffer buffer) {
compositeBuffer.close();
}

@Override
public boolean shouldUseManagedBytes() {
for (ReadableBuffer buf : buffers) {
if (!buf.shouldUseManagedBytes()) {
return false;
}
}
return true;
}

@Override
public ManagedBytes readManagedBytes(int length) {
checkReadable(length);
readableBytes -= length;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to decrement this within the loop so that the buffer state remains consistent in case of runtime exception?

Also how about a special case (should be common):

if (buffers.size() == 1) {
  return (readableBytes == 0 ? buffers.poll() : buffers.peek()).readByteBuffers(length);
}


CompositeManagedBytes res = new CompositeManagedBytes();
while (length > 0) {
ReadableBuffer buffer = buffers.peek();
if (buffer.readableBytes() > length) {
res.addSharedBytes(buffer.readManagedBytes(length));
length = 0;
} else {
res.addBuffer(buffers.poll());
length -= buffer.readableBytes();
}
}
return res;
}

private static final class CompositeManagedBytes extends ManagedBytes {
private final List<ManagedBytes> managedBytesList = new ArrayList<>();

private void addBuffer(final ReadableBuffer buffer) {
Preconditions.checkArgument(
buffer.shouldUseManagedBytes(), "buffer does not support shared bytes");
managedBytesList.add(new ManagedBytes() {
@Override
public List<ByteBuffer> asByteBuffers() {
return buffer.readManagedBytes(buffer.readableBytes()).asByteBuffers();
}

@Override
public void release() {
buffer.close();
}
});
}

private void addSharedBytes(ManagedBytes managedBytes) {
managedBytesList.add(managedBytes);
}

@Override
public List<ByteBuffer> asByteBuffers() {
List<ByteBuffer> res = new ArrayList<>();
for (ManagedBytes managedBytes : managedBytesList) {
res.addAll(managedBytes.asByteBuffers());
}
return res;
}

@Override
public void release() {
for (ManagedBytes managedBytes : managedBytesList) {
managedBytes.release();
}
}
}

@Override
public int readableBytes() {
return readableBytes;
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/io/grpc/internal/ForwardingReadableBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import io.grpc.ManagedBytes;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -81,6 +82,16 @@ public ReadableBuffer readBytes(int length) {
return buf.readBytes(length);
}

@Override
public boolean shouldUseManagedBytes() {
return buf.shouldUseManagedBytes();
}

@Override
public ManagedBytes readManagedBytes(int length) {
return buf.readManagedBytes(length);
}

@Override
public boolean hasArray() {
return buf.hasArray();
Expand Down
18 changes: 18 additions & 0 deletions core/src/main/java/io/grpc/internal/ReadableBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.grpc.internal;

import io.grpc.ManagedBytes;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
Expand Down Expand Up @@ -102,6 +103,23 @@ public interface ReadableBuffer extends Closeable {
*/
ReadableBuffer readBytes(int length);

/**
* Indicates whether or not this buffer supports {@link #readManagedBytes} operation that returns
* buffer's content as {@link ManagedBytes}.
*/
boolean shouldUseManagedBytes();

/**
* Reads {@code length} bytes as {@link ManagedBytes}. This is an optional method, so callers
* should first check {@link #shouldUseManagedBytes}. Closing this buffer too early may
* result in the returned {@link ManagedBytes} no longer readable.
*
* @param length the total number of bytes to contain in the returned {@link ManagedBytes}s.
* @throws UnsupportedOperationException the buffer does not support this method
* @throws IndexOutOfBoundsException if required bytes are not readable
*/
ManagedBytes readManagedBytes(int length);

/**
* Indicates whether or not this buffer exposes a backing array.
*/
Expand Down
34 changes: 32 additions & 2 deletions core/src/main/java/io/grpc/internal/ReadableBuffers.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@

import com.google.common.base.Preconditions;
import io.grpc.KnownLength;
import io.grpc.ManagedBytes;
import io.grpc.ManagedBytes.ManagedBytesReadable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import javax.annotation.Nullable;

/**
* Utility methods for creating {@link ReadableBuffer} instances.
Expand Down Expand Up @@ -103,7 +106,11 @@ public static String readAsStringUtf8(ReadableBuffer buffer) {
* @param owner if {@code true}, the returned stream will close the buffer when closed.
*/
public static InputStream openStream(ReadableBuffer buffer, boolean owner) {
return new BufferInputStream(owner ? buffer : ignoreClose(buffer));
if (!owner) {
buffer = ignoreClose(buffer);
}
return buffer.shouldUseManagedBytes()
? new ManagedBytesInputStream(buffer) : new BufferInputStream(buffer);
}

/**
Expand Down Expand Up @@ -297,7 +304,7 @@ public int arrayOffset() {
/**
* An {@link InputStream} that is backed by a {@link ReadableBuffer}.
*/
private static final class BufferInputStream extends InputStream implements KnownLength {
private static class BufferInputStream extends InputStream implements KnownLength {
final ReadableBuffer buffer;

public BufferInputStream(ReadableBuffer buffer) {
Expand Down Expand Up @@ -336,5 +343,28 @@ public void close() throws IOException {
}
}

/**
* A {@link BufferInputStream} that supports data transfer via {@link ManagedBytes}.
*/
private static final class ManagedBytesInputStream extends BufferInputStream
implements ManagedBytesReadable {

ManagedBytesInputStream(ReadableBuffer buffer) {
super(buffer);
}

@Nullable
@Override
public ManagedBytes readManagedBytes(int length) {
if (buffer.readableBytes() == 0) {
// EOF.
return null;
}

length = Math.min(buffer.readableBytes(), length);
return buffer.readManagedBytes(length);
}
}

private ReadableBuffers() {}
}
23 changes: 23 additions & 0 deletions netty/src/main/java/io/grpc/netty/NettyReadableBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
package io.grpc.netty;

import com.google.common.base.Preconditions;
import io.grpc.ManagedBytes;
import io.grpc.internal.AbstractReadableBuffer;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;

/**
* A {@link java.nio.Buffer} implementation that is backed by a Netty {@link ByteBuf}. This class
Expand Down Expand Up @@ -79,6 +82,26 @@ public NettyReadableBuffer readBytes(int length) {
return new NettyReadableBuffer(buffer.readRetainedSlice(length));
}

@Override
public boolean shouldUseManagedBytes() {
return buffer.nioBufferCount() > 0;
}

@Override
public ManagedBytes readManagedBytes(int length) {
if (buffer.readableBytes() < length) {
throw new IndexOutOfBoundsException();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return empty list here if length == 0?

final List<ByteBuffer> res = Arrays.asList(buffer.nioBuffers(buffer.readerIndex(), length));
buffer.skipBytes(length);
return new ManagedBytes() {
@Override
public List<ByteBuffer> asByteBuffers() {
return res;
}
};
}

@Override
public boolean hasArray() {
return buffer.hasArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.google.protobuf.Parser;
import io.grpc.ExperimentalApi;
import io.grpc.KnownLength;
import io.grpc.ManagedBytes;
import io.grpc.ManagedBytes.ManagedBytesReadable;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor.Marshaller;
import io.grpc.MethodDescriptor.PrototypeMarshaller;
Expand Down Expand Up @@ -170,10 +172,19 @@ public T parse(InputStream stream) {
}
}
CodedInputStream cis = null;
ManagedBytes managedBytes = null;
try {
if (stream instanceof KnownLength) {
int size = stream.available();
if (size > 0 && size <= DEFAULT_MAX_MESSAGE_SIZE) {
if (size <= 0) {
return defaultInstance;
}
// TODO(chengyuanzhang): we may still want to go with the byte array approach for small
// messages.
if (stream instanceof ManagedBytesReadable) {
managedBytes = ((ManagedBytesReadable) stream).readManagedBytes(size);
cis = CodedInputStream.newInstance(managedBytes.asByteBuffers());
} else if (size < DEFAULT_MAX_MESSAGE_SIZE) {
Reference<byte[]> ref;
// buf should not be used after this method has returned.
byte[] buf;
Expand All @@ -197,8 +208,6 @@ public T parse(InputStream stream) {
throw new RuntimeException("size inaccurate: " + size + " != " + position);
}
cis = CodedInputStream.newInstance(buf, 0, size);
} else if (size == 0) {
return defaultInstance;
}
}
} catch (IOException e) {
Expand All @@ -216,6 +225,10 @@ public T parse(InputStream stream) {
} catch (InvalidProtocolBufferException ipbe) {
throw Status.INTERNAL.withDescription("Invalid protobuf byte sequence")
.withCause(ipbe).asRuntimeException();
} finally {
if (managedBytes != null) {
managedBytes.release();
}
}
}

Expand Down