Skip to content
Closed
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
9936df1
Add mark&reset methods and canUseByteBuffer&getByteBuffer methods to …
voidzcy Aug 17, 2020
aa93e1d
Default implementations.
voidzcy Aug 17, 2020
454e224
Wire new methods for forwarder
voidzcy Aug 17, 2020
3f47d5e
Support mark&reset and retrieving content via ByteBuffer for netty.
voidzcy Aug 17, 2020
b3c9974
Implementation for composite.
voidzcy Aug 17, 2020
2ede525
Define interface for accessing readable content via ByteBuffers.
voidzcy Aug 17, 2020
3dca312
Implement mark&reset for simple readable buffers.
voidzcy Aug 17, 2020
6249353
Use HasByteBuffer interface for accesing input stream's backing ByteB…
voidzcy Aug 17, 2020
29dce67
Eliminate the length argument for retrieving the ByteBuffer.
voidzcy Aug 17, 2020
22ea6c3
Do no require netty buffer to be direct from API's perspective.
voidzcy Aug 17, 2020
53e347c
Use Deque operations to avoid unncessary moves.
voidzcy Aug 17, 2020
27801fe
Make a list of ByteBuffers up-front instead of a running iterator.
voidzcy Aug 17, 2020
eb71a68
Add getByteBufferSupported method for HasByteBuffer so that it can be…
voidzcy Aug 17, 2020
5d3c657
It's not necessary to implement getByteBuffer for ByteReadbaleBufferW…
voidzcy Aug 17, 2020
9fd8d3c
Add test coverage for mark&reset and getByteBuffer for generic ByteBu…
voidzcy Aug 17, 2020
e3afe50
Add test coverage for netty's special get NIO bytebuffer operation.
voidzcy Aug 17, 2020
e2fdd07
Skip test for operations not supported by okhttp.
voidzcy Aug 17, 2020
033270b
Add test coverage for BufferInputStream with getByteBuffer operation.
voidzcy Aug 17, 2020
0622d51
Add test using a known-length input stream with getByteBuffer operati…
voidzcy Aug 17, 2020
0e8caee
Modify test method name.
voidzcy Aug 17, 2020
ba4e91b
Add test coverage for mark&reset and getByteBuffer for CompositeReada…
voidzcy Aug 17, 2020
69618b2
Add getByteBuffer support for ByteReadableBufferWrapper.
voidzcy Aug 20, 2020
437857d
Only pull ByteBuffers when message is large.
voidzcy Aug 21, 2020
1363505
Run ByteBuffer codepath only in Java 9+.
voidzcy Aug 28, 2020
c46eb73
Slight improvement for avoiding array creation if not necessary.
voidzcy Aug 28, 2020
7b4e070
Merge branch 'master' of github.com:grpc/grpc-java into impl/zero_cop…
voidzcy Aug 28, 2020
772b3ba
Change ReadableBuffer#canUseByteBuffer to hasByteBuffer.
voidzcy Sep 1, 2020
b1c99e5
Removed unnecessary reset.
voidzcy Sep 1, 2020
692076c
Simplify checking runtime java version.
voidzcy Sep 1, 2020
10c13b8
Add ExperimentalApi annotation.
voidzcy Sep 1, 2020
f13c165
Rename ReadableBuffer#hasByteBuffer to getByteBufferSupported.
voidzcy Sep 2, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions api/src/main/java/io/grpc/HasByteBuffer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc;

import java.nio.ByteBuffer;
import javax.annotation.Nullable;

/**
* Extension to an {@link java.io.InputStream} whose content can be accessed as {@link
* ByteBuffer}s.
*
* <p>This can be used for optimizing the case for the consumer of a {@link ByteBuffer}-backed
* input stream supports efficient reading from {@link ByteBuffer}s directly. This turns the reader
* interface from an {@link java.io.InputStream} to {@link ByteBuffer}s, without copying the
* content to a byte array and read from it.
*/
// TODO(chengyuanzhang): add ExperimentalApi annotation.
public interface HasByteBuffer {

/**
* Indicates whether or not {@link #getByteBuffer} operation is supported.
*/
boolean getByteBufferSupported();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not give this the same name as the equivalent added ReadableBuffer method (my suggestion would be hasByteBuffer())?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't need to match what we use for ReadableBuffer right? We'd probably have a discussion on this API this Thursday and we usually do a vote for the name.

hasByteBuffer 1 vote now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As well as for consistency, having it line up with ReadableBuffer would permit ReadableBuffers to themselves be InputStreams implementing this interface, which I think might allow for some other streamlining later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, for that reason.

My original thought was this HasByteBuffer interface should only have the getByteBuffer method. InputStreams not supporting the operation should just not implement this interface. #7330 (comment) suggests combining two InputStream implementations into one by adding this getByteBufferSupported method. This look ok, but making the method hasByteBuffer does make this API wired. I'd rather change the hasByteBuffer method on ReadableBuffer interface to getByteBufferSupported.


/**
* Gets a {@link ByteBuffer} containing some bytes of the content next to be read, or {@code
* null} if has reached end of the content. The number of bytes contained in the returned buffer
* is implementation specific. Calling this method does not change the position of the input
* stream. The returned buffer's content should not be modified, but the position, limit, and
* mark may be changed. Operations for changing the position, limit, and mark of the returned
* buffer does not affect the position, limit, and mark of this input stream. This is an optional
* method, so callers should first check {@link #getByteBufferSupported}.
*
* @throws UnsupportedOperationException if this operation is not supported.
*/
@Nullable
ByteBuffer getByteBuffer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make the API cleaner how about having this return null for not supported and a (constant) empty ByteBuffer for EOS?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having this return null for not supported and a (constant) empty ByteBuffer for EOS

The semantics of returning null for operation not supported isn't strong enough. Most existing Java APIs (e.g.,java.nio.ByteBuffer uses hasArray() and array()) throw an UnsupportedOperationException for unsupported optional APIs. I think the current approach is cleaner.

}
20 changes: 20 additions & 0 deletions core/src/main/java/io/grpc/internal/AbstractReadableBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.grpc.internal;

import java.nio.ByteBuffer;

/**
* Abstract base class for {@link ReadableBuffer} implementations.
*/
Expand Down Expand Up @@ -45,6 +47,24 @@ public int arrayOffset() {
throw new UnsupportedOperationException();
}

@Override
public void mark() {}

@Override
public void reset() {
throw new UnsupportedOperationException();
}

@Override
public boolean canUseByteBuffer() {
return false;
}

@Override
public ByteBuffer getByteBuffer() {
throw new UnsupportedOperationException();
}

@Override
public void close() {}

Expand Down
116 changes: 91 additions & 25 deletions core/src/main/java/io/grpc/internal/CompositeReadableBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import java.io.OutputStream;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.InvalidMarkException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.Deque;
import javax.annotation.Nullable;

/**
* A {@link ReadableBuffer} that is composed of 0 or more {@link ReadableBuffer}s. This provides a
Expand All @@ -34,7 +36,9 @@
public class CompositeReadableBuffer extends AbstractReadableBuffer {

private int readableBytes;
private final Queue<ReadableBuffer> buffers = new ArrayDeque<>();
private final Deque<ReadableBuffer> readableBuffers = new ArrayDeque<>();
private final Deque<ReadableBuffer> rewindableBuffers = new ArrayDeque<>();
private boolean marked;

/**
* Adds a new {@link ReadableBuffer} at the end of the buffer list. After a buffer is added, it is
Expand All @@ -43,16 +47,24 @@ public class CompositeReadableBuffer extends AbstractReadableBuffer {
* this {@code CompositeBuffer}.
*/
public void addBuffer(ReadableBuffer buffer) {
boolean markHead = marked && readableBuffers.isEmpty();
enqueueBuffer(buffer);
if (markHead) {
readableBuffers.peek().mark();
}
}

private void enqueueBuffer(ReadableBuffer buffer) {
if (!(buffer instanceof CompositeReadableBuffer)) {
buffers.add(buffer);
readableBuffers.add(buffer);
readableBytes += buffer.readableBytes();
return;
}

CompositeReadableBuffer compositeBuffer = (CompositeReadableBuffer) buffer;
while (!compositeBuffer.buffers.isEmpty()) {
ReadableBuffer subBuffer = compositeBuffer.buffers.remove();
buffers.add(subBuffer);
while (!compositeBuffer.readableBuffers.isEmpty()) {
ReadableBuffer subBuffer = compositeBuffer.readableBuffers.remove();
readableBuffers.add(subBuffer);
}
readableBytes += compositeBuffer.readableBytes;
compositeBuffer.readableBytes = 0;
Expand Down Expand Up @@ -136,27 +148,73 @@ public int readInternal(ReadableBuffer buffer, int length) throws IOException {

@Override
public CompositeReadableBuffer readBytes(int length) {
checkReadable(length);
readableBytes -= length;

CompositeReadableBuffer newBuffer = new CompositeReadableBuffer();
while (length > 0) {
ReadableBuffer buffer = buffers.peek();
if (buffer.readableBytes() > length) {
final CompositeReadableBuffer newBuffer = new CompositeReadableBuffer();
execute(new ReadOperation() {
@Override
int readInternal(ReadableBuffer buffer, int length) {
newBuffer.addBuffer(buffer.readBytes(length));
length = 0;
} else {
newBuffer.addBuffer(buffers.poll());
length -= buffer.readableBytes();
return 0;
}
}
}, length);
return newBuffer;
}

@Override
public void mark() {
while (!rewindableBuffers.isEmpty()) {
rewindableBuffers.remove().close();
}
marked = true;
ReadableBuffer buffer = readableBuffers.peek();
if (buffer != null) {
buffer.mark();
}
}

@Override
public void reset() {
if (!marked) {
throw new InvalidMarkException();
}
ReadableBuffer buffer;
if ((buffer = readableBuffers.peek()) != null) {
int currentRemain = buffer.readableBytes();
buffer.reset();
readableBytes += (buffer.readableBytes() - currentRemain);
}
while ((buffer = rewindableBuffers.pollLast()) != null) {
buffer.reset();
readableBuffers.addFirst(buffer);
readableBytes += buffer.readableBytes();
}
}

@Override
public boolean canUseByteBuffer() {
for (ReadableBuffer buffer : readableBuffers) {
if (!buffer.canUseByteBuffer()) {
return false;
}
}
return true;
}

@Nullable
@Override
public ByteBuffer getByteBuffer() {
if (readableBuffers.isEmpty()) {
return null;
}
return readableBuffers.peek().getByteBuffer();
}

@Override
public void close() {
while (!buffers.isEmpty()) {
buffers.remove().close();
while (!readableBuffers.isEmpty()) {
readableBuffers.remove().close();
}
while (!rewindableBuffers.isEmpty()) {
rewindableBuffers.remove().close();
}
}

Expand All @@ -167,12 +225,12 @@ public void close() {
private void execute(ReadOperation op, int length) {
checkReadable(length);

if (!buffers.isEmpty()) {
if (!readableBuffers.isEmpty()) {
advanceBufferIfNecessary();
}

for (; length > 0 && !buffers.isEmpty(); advanceBufferIfNecessary()) {
ReadableBuffer buffer = buffers.peek();
for (; length > 0 && !readableBuffers.isEmpty(); advanceBufferIfNecessary()) {
ReadableBuffer buffer = readableBuffers.peek();
int lengthToCopy = Math.min(length, buffer.readableBytes());

// Perform the read operation for this buffer.
Expand All @@ -195,9 +253,17 @@ private void execute(ReadOperation op, int length) {
* If the current buffer is exhausted, removes and closes it.
*/
private void advanceBufferIfNecessary() {
ReadableBuffer buffer = buffers.peek();
ReadableBuffer buffer = readableBuffers.peek();
if (buffer.readableBytes() == 0) {
buffers.remove().close();
if (marked) {
rewindableBuffers.add(readableBuffers.remove());
ReadableBuffer next = readableBuffers.peek();
if (next != null) {
next.mark();
}
} else {
readableBuffers.remove().close();
}
}
}

Expand Down
22 changes: 22 additions & 0 deletions core/src/main/java/io/grpc/internal/ForwardingReadableBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import javax.annotation.Nullable;

/**
* Base class for a wrapper around another {@link ReadableBuffer}.
Expand Down Expand Up @@ -96,6 +97,27 @@ public int arrayOffset() {
return buf.arrayOffset();
}

@Override
public void mark() {
buf.mark();
}

@Override
public void reset() {
buf.reset();
}

@Override
public boolean canUseByteBuffer() {
return buf.canUseByteBuffer();
}

@Nullable
@Override
public ByteBuffer getByteBuffer() {
return buf.getByteBuffer();
}

@Override
public void close() {
buf.close();
Expand Down
34 changes: 34 additions & 0 deletions core/src/main/java/io/grpc/internal/ReadableBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import javax.annotation.Nullable;

/**
* Interface for an abstract byte buffer. Buffers are intended to be a read-only, except for the
Expand Down Expand Up @@ -123,6 +124,39 @@ public interface ReadableBuffer extends Closeable {
*/
int arrayOffset();

/**
* Marks the current position in this buffer. A subsequent call to the {@link #reset} method
* repositions this stream at the last marked position so that subsequent reads re-read the same
* bytes.
*/
void mark();

/**
* Repositions this buffer to the position at the time {@link #mark} was last called on this
* buffer.
*/
void reset();

/**
* Indicates whether or not {@link #getByteBuffer} operation is supported for this buffer.
*/
boolean canUseByteBuffer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe better name would be hasByteBuffer()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, go as you like. I don't have a strong preference.


/**
* Gets a {@link ByteBuffer} that contains some bytes of the content next to be read, or {@code
* null} if this buffer has been exhausted. The number of bytes contained in the returned buffer
* is implementation specific. The position of this buffer is unchanged after calling this
* method. The returned buffer's content should not be modified, but the position, limit, and
* mark may be changed. Operations for changing the position, limit, and mark of the returned
* buffer does not affect the position, limit, and mark of this buffer. Buffers returned by this
* method have independent position, limit and mark. This is an optional method, so callers
* should first check {@link #canUseByteBuffer}.
*
* @throws UnsupportedOperationException the buffer does not support this method.
*/
@Nullable
ByteBuffer getByteBuffer();

/**
* Closes this buffer and releases any resources.
*/
Expand Down
Loading