-
Notifications
You must be signed in to change notification settings - Fork 4k
protobuf, api, core, netty: zero copy into protobuf #7330
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
9936df1
aa93e1d
454e224
3f47d5e
b3c9974
2ede525
3dca312
6249353
29dce67
22ea6c3
53e347c
27801fe
eb71a68
5d3c657
9fd8d3c
e3afe50
e2fdd07
033270b
0622d51
0e8caee
ba4e91b
69618b2
437857d
1363505
c46eb73
7b4e070
772b3ba
b1c99e5
692076c
10c13b8
f13c165
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| /* | ||
| * Copyright 2020 The gRPC Authors | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package io.grpc; | ||
|
|
||
| import java.nio.ByteBuffer; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| /** | ||
| * Extension to an {@link java.io.InputStream} whose content can be accessed as {@link | ||
| * ByteBuffer}s. | ||
| * | ||
| * <p>This can be used for optimizing the case for the consumer of a {@link ByteBuffer}-backed | ||
| * input stream supports efficient reading from {@link ByteBuffer}s directly. This turns the reader | ||
| * interface from an {@link java.io.InputStream} to {@link ByteBuffer}s, without copying the | ||
| * content to a byte array and read from it. | ||
| */ | ||
| // TODO(chengyuanzhang): add ExperimentalApi annotation. | ||
| public interface HasByteBuffer { | ||
|
|
||
| /** | ||
| * Gets a {@link ByteBuffer} containing up to {@code length} bytes of the content, or {@code | ||
| * null} if has reached end of the content. | ||
| * @param length the maximum number of bytes to contain in returned {@link ByteBuffer}. | ||
| */ | ||
| @Nullable | ||
| ByteBuffer getByteBuffer(int length); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,8 +20,10 @@ | |
| import java.io.OutputStream; | ||
| import java.nio.Buffer; | ||
| import java.nio.ByteBuffer; | ||
| import java.nio.InvalidMarkException; | ||
| import java.util.ArrayDeque; | ||
| import java.util.Queue; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| /** | ||
| * A {@link ReadableBuffer} that is composed of 0 or more {@link ReadableBuffer}s. This provides a | ||
|
|
@@ -34,7 +36,9 @@ | |
| public class CompositeReadableBuffer extends AbstractReadableBuffer { | ||
|
|
||
| private int readableBytes; | ||
| private final Queue<ReadableBuffer> buffers = new ArrayDeque<>(); | ||
| private final Queue<ReadableBuffer> readableBuffers = new ArrayDeque<>(); | ||
| private final Queue<ReadableBuffer> rewindableBuffers = new ArrayDeque<>(); | ||
| private boolean marked; | ||
|
|
||
| /** | ||
| * Adds a new {@link ReadableBuffer} at the end of the buffer list. After a buffer is added, it is | ||
|
|
@@ -43,16 +47,24 @@ public class CompositeReadableBuffer extends AbstractReadableBuffer { | |
| * this {@code CompositeBuffer}. | ||
| */ | ||
| public void addBuffer(ReadableBuffer buffer) { | ||
| boolean markHead = marked && readableBuffers.isEmpty(); | ||
| enqueueBuffer(buffer); | ||
| if (markHead) { | ||
| readableBuffers.peek().mark(); | ||
| } | ||
| } | ||
|
|
||
| private void enqueueBuffer(ReadableBuffer buffer) { | ||
| if (!(buffer instanceof CompositeReadableBuffer)) { | ||
| buffers.add(buffer); | ||
| readableBuffers.add(buffer); | ||
| readableBytes += buffer.readableBytes(); | ||
| return; | ||
| } | ||
|
|
||
| CompositeReadableBuffer compositeBuffer = (CompositeReadableBuffer) buffer; | ||
| while (!compositeBuffer.buffers.isEmpty()) { | ||
| ReadableBuffer subBuffer = compositeBuffer.buffers.remove(); | ||
| buffers.add(subBuffer); | ||
| while (!compositeBuffer.readableBuffers.isEmpty()) { | ||
| ReadableBuffer subBuffer = compositeBuffer.readableBuffers.remove(); | ||
| readableBuffers.add(subBuffer); | ||
| } | ||
| readableBytes += compositeBuffer.readableBytes; | ||
| compositeBuffer.readableBytes = 0; | ||
|
|
@@ -138,27 +150,78 @@ public int readInternal(ReadableBuffer buffer, int length) throws IOException { | |
|
|
||
| @Override | ||
| public CompositeReadableBuffer readBytes(int length) { | ||
| checkReadable(length); | ||
| readableBytes -= length; | ||
|
|
||
| CompositeReadableBuffer newBuffer = new CompositeReadableBuffer(); | ||
| while (length > 0) { | ||
| ReadableBuffer buffer = buffers.peek(); | ||
| if (buffer.readableBytes() > length) { | ||
| final CompositeReadableBuffer newBuffer = new CompositeReadableBuffer(); | ||
| execute(new ReadOperation() { | ||
| @Override | ||
| int readInternal(ReadableBuffer buffer, int length) { | ||
| newBuffer.addBuffer(buffer.readBytes(length)); | ||
| length = 0; | ||
| } else { | ||
| newBuffer.addBuffer(buffers.poll()); | ||
| length -= buffer.readableBytes(); | ||
| return 0; | ||
| } | ||
| } | ||
| }, length); | ||
| return newBuffer; | ||
| } | ||
|
|
||
| @Override | ||
| public void mark() { | ||
| while (!rewindableBuffers.isEmpty()) { | ||
| rewindableBuffers.remove().close(); | ||
| } | ||
| marked = true; | ||
| ReadableBuffer buffer = readableBuffers.peek(); | ||
| if (buffer != null) { | ||
| buffer.mark(); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void reset() { | ||
| if (!marked) { | ||
| throw new InvalidMarkException(); | ||
| } | ||
| ReadableBuffer buffer; | ||
| if ((buffer = readableBuffers.peek()) != null) { | ||
| int currentRemain = buffer.readableBytes(); | ||
| buffer.reset(); | ||
| readableBytes += (buffer.readableBytes() - currentRemain); | ||
| } | ||
| int size = readableBuffers.size(); | ||
|
||
| while ((buffer = rewindableBuffers.poll()) != null) { | ||
| buffer.reset(); | ||
| readableBuffers.add(buffer); | ||
| readableBytes += buffer.readableBytes(); | ||
| } | ||
| for (int i = 0; i < size; i++) { | ||
| readableBuffers.add(readableBuffers.remove()); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public boolean canUseByteBuffer() { | ||
| for (ReadableBuffer buffer : readableBuffers) { | ||
| if (!buffer.canUseByteBuffer()) { | ||
| return false; | ||
| } | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
| @Nullable | ||
| @Override | ||
| public ByteBuffer getByteBuffer(int length) { | ||
| if (readableBuffers.isEmpty()) { | ||
| return null; | ||
| } | ||
| ReadableBuffer buffer = readableBuffers.peek(); | ||
| return buffer.getByteBuffer(Math.min(length, buffer.readableBytes())); | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| while (!buffers.isEmpty()) { | ||
| buffers.remove().close(); | ||
| while (!readableBuffers.isEmpty()) { | ||
| readableBuffers.remove().close(); | ||
| } | ||
| while (!rewindableBuffers.isEmpty()) { | ||
| rewindableBuffers.remove().close(); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -169,12 +232,12 @@ public void close() { | |
| private void execute(ReadOperation op, int length) { | ||
| checkReadable(length); | ||
|
|
||
| if (!buffers.isEmpty()) { | ||
| if (!readableBuffers.isEmpty()) { | ||
| advanceBufferIfNecessary(); | ||
| } | ||
|
|
||
| for (; length > 0 && !buffers.isEmpty(); advanceBufferIfNecessary()) { | ||
| ReadableBuffer buffer = buffers.peek(); | ||
| for (; length > 0 && !readableBuffers.isEmpty(); advanceBufferIfNecessary()) { | ||
| ReadableBuffer buffer = readableBuffers.peek(); | ||
| int lengthToCopy = Math.min(length, buffer.readableBytes()); | ||
|
|
||
| // Perform the read operation for this buffer. | ||
|
|
@@ -197,9 +260,17 @@ private void execute(ReadOperation op, int length) { | |
| * If the current buffer is exhausted, removes and closes it. | ||
| */ | ||
| private void advanceBufferIfNecessary() { | ||
| ReadableBuffer buffer = buffers.peek(); | ||
| ReadableBuffer buffer = readableBuffers.peek(); | ||
| if (buffer.readableBytes() == 0) { | ||
| buffers.remove().close(); | ||
| if (marked) { | ||
| rewindableBuffers.add(readableBuffers.remove()); | ||
| ReadableBuffer next = readableBuffers.peek(); | ||
| if (next != null) { | ||
| next.mark(); | ||
| } | ||
| } else { | ||
| readableBuffers.remove().close(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ | |
| import java.io.IOException; | ||
| import java.io.OutputStream; | ||
| import java.nio.ByteBuffer; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| /** | ||
| * Interface for an abstract byte buffer. Buffers are intended to be a read-only, except for the | ||
|
|
@@ -123,6 +124,39 @@ public interface ReadableBuffer extends Closeable { | |
| */ | ||
| int arrayOffset(); | ||
|
|
||
| /** | ||
| * Marks the current position in this buffer. A subsequent call to the {@link #reset} method | ||
| * repositions this stream at the last marked position so that subsequent reads re-read the same | ||
| * bytes. | ||
| */ | ||
| void mark(); | ||
|
|
||
| /** | ||
| * Repositions this buffer to the position at the time {@link #mark} was last called on this | ||
| * buffer. | ||
| */ | ||
| void reset(); | ||
|
|
||
| /** | ||
| * Indicates whether or not {@link #getByteBuffer} operation is supported for this buffer. | ||
| */ | ||
| boolean canUseByteBuffer(); | ||
|
||
|
|
||
| /** | ||
| * Gets a {@link ByteBuffer} that contains up to {@code length} bytes of this buffer's content, | ||
| * or {@code null} if this buffer has been exhausted. The position of this buffer is unchanged | ||
| * after calling this method. The returned buffer's content should not be modified, but the | ||
| * position, limit, and mark may be changed. Operations for changing the position, limit, and | ||
| * mark of the returned {@link ByteBuffer} does not affect the position, limit, and mark of | ||
| * this buffer. {@link ByteBuffer}s returned by this method have independent position, limit | ||
| * and mark. This is an optional method, so callers should first check {@link #canUseByteBuffer}. | ||
| * | ||
| * @param length the maximum number of bytes to contain in returned {@link ByteBuffer}. | ||
| * @throws UnsupportedOperationException the buffer does not support this method. | ||
| */ | ||
| @Nullable | ||
| ByteBuffer getByteBuffer(int length); | ||
|
|
||
| /** | ||
| * Closes this buffer and releases any resources. | ||
| */ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would either expect a
get(void)method or aread(length)method. Providing a length here doesn't seem to provide any value as if you specify a smaller length it just limits what is returned but doesn't change any internal state.read(length)actually changes the "read position."There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds fair. Changed.