-
Notifications
You must be signed in to change notification settings - Fork 4k
protobuf, api, core, netty: zero copy into protobuf #7330
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
9936df1
aa93e1d
454e224
3f47d5e
b3c9974
2ede525
3dca312
6249353
29dce67
22ea6c3
53e347c
27801fe
eb71a68
5d3c657
9fd8d3c
e3afe50
e2fdd07
033270b
0622d51
0e8caee
ba4e91b
69618b2
437857d
1363505
c46eb73
7b4e070
772b3ba
b1c99e5
692076c
10c13b8
f13c165
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,8 +37,8 @@ | |
| import java.lang.ref.Reference; | ||
| import java.lang.ref.WeakReference; | ||
| import java.nio.ByteBuffer; | ||
| import java.util.Iterator; | ||
| import java.util.NoSuchElementException; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
|
|
||
| /** | ||
| * Utility methods for using protobuf with grpc. | ||
|
|
@@ -180,7 +180,15 @@ public T parse(InputStream stream) { | |
| // TODO(chengyuanzhang): we may still want to go with the byte array approach for small | ||
| // messages. | ||
| if (stream instanceof HasByteBuffer && stream.markSupported()) { | ||
| cis = CodedInputStream.newInstance(new KnownLengthByteBufferIterable(stream, size)); | ||
| List<ByteBuffer> buffers = new ArrayList<>(); | ||
| stream.mark(size); | ||
| while (stream.available() != 0) { | ||
| ByteBuffer buffer = ((HasByteBuffer) stream).getByteBuffer(); | ||
| stream.skip(buffer.remaining()); | ||
| buffers.add(buffer); | ||
| } | ||
| stream.reset(); | ||
| cis = CodedInputStream.newInstance(buffers); | ||
| } else if (size > 0 && size <= DEFAULT_MAX_MESSAGE_SIZE) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This check should still be done in the new case I think?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have decided to keep "normal" messages (2~4 KB) in the current codepath, copying small things isn't necessarily bad with CPU caches. So we only enable this for messages >= 64 KB (most messages should be below this threshold).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds reasonable, would be interesting to benchmark different message sizes/sparsity to verify the threshold (of course likely system dependent). But what about the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For large messages (larger than DEFAULT_MAX_MESSAGE_SIZE), we would prefer the ByteBuffer approach if it is supported. This shouldn't cause a problem. |
||
| Reference<byte[]> ref; | ||
| // buf should not be used after this method has returned. | ||
|
|
@@ -263,57 +271,4 @@ public T parseBytes(byte[] serialized) { | |
| } | ||
| } | ||
| } | ||
|
|
||
| private static final class KnownLengthByteBufferIterable implements Iterable<ByteBuffer> { | ||
| private final InputStream stream; | ||
| private final int length; | ||
|
|
||
| private KnownLengthByteBufferIterable(InputStream stream, int length) { | ||
| this.stream = stream; | ||
| this.length = length; | ||
| stream.mark(length); | ||
| } | ||
|
|
||
| @Override | ||
| public Iterator<ByteBuffer> iterator() { | ||
| try { | ||
| stream.reset(); | ||
| stream.mark(length); | ||
| return new Iterator<ByteBuffer>() { | ||
| private ByteBuffer buffer; | ||
|
|
||
| @Override | ||
| public boolean hasNext() { | ||
| if (buffer != null) { | ||
| return true; | ||
| } | ||
| buffer = ((HasByteBuffer) stream).getByteBuffer(); | ||
| return buffer != null; | ||
| } | ||
|
|
||
| @Override | ||
| public ByteBuffer next() { | ||
| if (!hasNext()) { | ||
| throw new NoSuchElementException(); | ||
| } | ||
| ByteBuffer res = buffer; | ||
| try { | ||
| stream.skip(buffer.remaining()); | ||
| } catch (IOException e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| buffer = null; | ||
| return res; | ||
| } | ||
|
|
||
| @Override | ||
| public void remove() { | ||
| throw new UnsupportedOperationException(); | ||
| } | ||
| }; | ||
| } catch (IOException e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the reset here really serves any purpose? IIUC the mark is only done to say "don't release the stuff I'm about to read" but it will all be released when the stream is closed once this method returns regardless.
If the reset stays then to be more "correct" shouldn't
stream.skip(size)also be called afterCodedInputStream.newInstance(buffers)?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks to #7330 (comment).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@voidzcy I was referring to this specific line here, not the existence of the
reset()method. This line could be deleted, right?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Deleted.