Skip to content

Conversation

@voidzcy
Copy link
Contributor

@voidzcy voidzcy commented Aug 17, 2020

TODO: add tests and enhance Javadoc.

The argument for the maximum number of bytes to be kept within the marked range is unimplemented, as underlying buffers do not support it. CompositeReadableBuffer could support it in a coarser granularity, but it seems not have much value.

@ejona86 Could you please give a quick glance to see if I am on the right direction? Just in case for not going towards a dark end.

@voidzcy voidzcy marked this pull request as draft August 17, 2020 02:31
Copy link
Member

@ejona86 ejona86 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach looks good

public interface HasByteBuffer {

/**
* Gets a {@link ByteBuffer} containing up to {@code length} bytes of the content, or {@code
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would either expect a get(void) method or a read(length) method. Providing a length here doesn't seem to provide any value as if you specify a smaller length it just limits what is returned but doesn't change any internal state. read(length) actually changes the "read position."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds fair. Changed.

buffer.reset();
readableBytes += (buffer.readableBytes() - currentRemain);
}
int size = readableBuffers.size();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems you could use rewindableBuffers.pollLast()/rewindableBuffers.removeLast() along with readableBuffers.addFirst()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, thanks for the suggestion.

if (!owner) {
buffer = ignoreClose(buffer);
}
return buffer.canUseByteBuffer()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One option to simplify this "which interfaces should we support" logic is to not rely on the interface to determine whether it supports getByteBuffer(). We could have another method like boolean getByteBufferSupported(). See InputStream.markSupported(). Basically, that would allow us to always return an instance that implements HasByteBuffer but some of the time the getByteBuffer() method is non-functional.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, updated.

@Override
public Iterator<ByteBuffer> iterator() {
try {
stream.reset();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach would work, but does assume that only one iterator would be used at a time. Since it is basically just the same amount of work to make a list of ByteBuffers up-front and provide the list as an iterator, that seems superior as it does not assume one-iterator-at-a-time behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, something I missed for consideration. I only thought about using an iterator has the benefit of only expanding the content as it goes, while it doesn't give much value for this case.

Changed to make a list up-front.

@voidzcy voidzcy marked this pull request as ready for review August 17, 2020 22:46
Copy link
Contributor

@njhill njhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @voidzcy I really like this improvement :)

I saw there was some iteration on the interface design, probably there is something I'm missing but why not just have a List<ByteBuffer> peekByteBuffers(int length) method instead of the mark/reset etc?

And similar to netty how about also int byteBufferCount(int length) and ByteBuffer peekByteBuffer(int length) so to help minimize allocations. The buffer count method then can also serve to indicate whether the ReadableBuffer/InputStream supports buffer peeking by returning -1 if not. WDYT?

}
stream.reset();
cis = CodedInputStream.newInstance(buffers);
} else if (size > 0 && size <= DEFAULT_MAX_MESSAGE_SIZE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check should still be done in the new case I think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have decided to keep "normal" messages (2~4 KB) in the current codepath, copying small things isn't necessarily bad with CPU caches. So we only enable this for messages >= 64 KB (most messages should be below this threshold).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable, would be interesting to benchmark different message sizes/sparsity to verify the threshold (of course likely system dependent).

But what about the size <= DEFAULT_MAX_MESSAGE_SIZE check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For large messages (larger than DEFAULT_MAX_MESSAGE_SIZE), we would prefer the ByteBuffer approach if it is supported. This shouldn't cause a problem.


@Override
public ByteBuffer getByteBuffer() {
return buffer.nioBuffers()[0];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach could be pretty heavy on garbage, especially if underlying buffer is composite. I'd suggest at least doing buffer.nioBufferCount() == 1 ? buffer.nioBuffer() : buffer.nioBuffers()[0].

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, sound fair. Improved.

@ejona86
Copy link
Member

ejona86 commented Aug 18, 2020

why not just have a List peekByteBuffers(int length) method instead of the mark/reset etc

We discussed just a List<ByteBuffer> getByteBuffers() to get all of them. That would be the simplest API. I don't see much need to support peek returning a list of buffers. Normal reads don't have to read all the available data, so I don't see much need to return more than one ByteBuffer unless returning all the buffers. peekByteBuffer() would be a quite fair name though.

The need to iterate over the byte buffers multiple times seems a temporary detail. Protobuf only needs to loop multiple times so it can determine if all the buffers are direct, in which case it takes a faster code path. Using mark/reset allows us to stop using mark/reset in the future and so be able to release consumed ByteBuffers during message parsing.

Mark/reset is also useful to some existing users of gRPC. Some interceptors need to read the message, so with mark/reset they can avoid a copy.

stream.skip(buffer.remaining());
buffers.add(buffer);
}
stream.reset();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the reset here really serves any purpose? IIUC the mark is only done to say "don't release the stuff I'm about to read" but it will all be released when the stream is closed once this method returns regardless.

If the reset stays then to be more "correct" shouldn't stream.skip(size) also be called after CodedInputStream.newInstance(buffers)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks to #7330 (comment).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@voidzcy I was referring to this specific line here, not the existence of the reset() method. This line could be deleted, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Deleted.

* @throws UnsupportedOperationException if this operation is not supported.
*/
@Nullable
ByteBuffer getByteBuffer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make the API cleaner how about having this return null for not supported and a (constant) empty ByteBuffer for EOS?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having this return null for not supported and a (constant) empty ByteBuffer for EOS

The semantics of returning null for operation not supported isn't strong enough. Most existing Java APIs (e.g.,java.nio.ByteBuffer uses hasArray() and array()) throw an UnsupportedOperationException for unsupported optional APIs. I think the current approach is cleaner.

@njhill
Copy link
Contributor

njhill commented Aug 18, 2020

Thanks @ejona86

We discussed just a List<ByteBuffer> getByteBuffers() to get all of them. That would be the simplest API. I don't see much need to support peek returning a list of buffers. Normal reads don't have to read all the available data, so I don't see much need to return more than one ByteBuffer unless returning all the buffers.Mark/reset is also useful to some existing users of gRPC. Some interceptors need to read the message, so with mark/reset they can avoid a copy.

Right but peekByteBuffers(length) would give exactly what's needed for this particular usage and avoid the need for mark/reset (which is kind of cumbersome imo). It would also entail less overhead w.r.t. intermediate allocations, particularly if the underlying netty ByteBuf is composite.

Mark/reset is also useful to some existing users of gRPC. Some interceptors need to read the message, so with mark/reset they can avoid a copy.

Ah, that makes sense. An alternative that comes to mind would be to have some kind of clone() method on the InputStream with the understanding that the copy must be closed after use, and then have a simple ref count to determine when the buffers can be released. This would also mean that the InputStream/ByteBuffers could be used beyond the immediate scope of onMessage() to eliminate data copies in some async use cases (e.g. data streamed directly somewhere else). I know there was prior debate about such ownership transfer but this change already opens things up if not to potential leaks then to the possibility of corruption if some app held on to a ByteBuffer. It would also only possible via "advanced" usage anyhow since it implies a custom marshaller. WDYT?

But otherwise I guess mark/reset already exist in the interface so might be preferable for that reason.

@njhill
Copy link
Contributor

njhill commented Aug 26, 2020

@ejona86 thanks for the detailed response and for humouring me in general :) I don't mean any of it as a criticism of past decisions, this only just occurred to me really.

I guess on further reflection what I really feel is that ReadableBuffer-like types could be more appropriate than InputStream, particularly if ownership transfer is later considered where it may be that the chunk of data as a whole is going to be used for something else (something like flatbuffers comes to mind). This is also why I felt accessing the list of backing ByteBuffers in one go rather than having to pull them out one-by one to populate an indeterminately-sized list might be nice.

I accept your points though and probably the only meaningful difference would be the incremental release behaviour.

This might now be off-topic but I had a thought about a simple way to support ownership transfer - how about a "ParseTakesOwnership" marker interface on the Marshaller? If implemented then it becomes mandatory rather than optional for parse to close the InputStream.

I'm also curious about outbound ownership transfer. What if a message is written which has some backing resource that requires releasing? This could be done when the parsed InputStream is closed but IIUC messages might be serialized multiple times in some cases like retries (or am I wrong about that)?

But let's say you receive a 100 MB message

Wouldn't it be better to send this kind of thing as a stream of smaller messages? Otherwise, the example you gave would be a strong case i.m.o. for figuring out a protobuf ownership transfer option so that the message could be parsed with "aliasing" enabled i.e. full zero copy. You won't be reducing mem requirement if you are using netty pooled buffers since these will be separate from the proto-allocated byte[]s.

@ejona86
Copy link
Member

ejona86 commented Aug 26, 2020

I guess on further reflection what I really feel is that ReadableBuffer-like types could be more appropriate than InputStream

I don't really disagree. But that's also a different approach. I think our current approach is more "here's an API that you can use to integrate with existing Java code." If we exposed a buffer directly then it's not normally directly useful; you have to convert it to byte[] or ByteBuffer or InputStream to pass to existing code. Our current API is really just ways to convert the data to some other form. But the fact that it would be nicer to expose ReadableBuffer I think points more to "Java's buffer APIs are sucky," which isn't really a revelation.

There's also the problem that we didn't/don't want to expose such a "wide" API as ReadableBuffer. I really don't want to get into the buffer business!

This might now be off-topic but I had a thought about a simple way to support ownership transfer...

That's a neat idea, but yeah, a bit off-topic. I've moved that discussion to #1054 where we have been talking about ways to address ownership transfer.

Wouldn't it be better to send this kind of thing as a stream of smaller messages?

Generally. But that's obviously more complex and adds no benefit if the entire message has to be processed at once.

for figuring out a protobuf ownership transfer option so that the message could be parsed with "aliasing" enabled i.e. full zero copy.

Yes, although the user will have to "do something" to enable that optimization since they'll need to manage a lifetime in some fashion; it can't be free. Also, alias can be harmful if you aren't dealing with large bytes fields, so even without lifecycle issues it can be a bad idea to enable it by default. So there's still value in having reduced copies when possible without user interaction. I'll also point out that full zero copy is possible with the API proposed in this PR, but only if the message is fully consumed within the onNext() call. I recognize that is a serious limitation, but there's a lot more nuance to the specific use-cases involved.

You won't be reducing mem requirement if you are using netty pooled buffers since these will be separate from the proto-allocated byte[]s.

Yes, Netty may not free them immediately, but Netty could reuse them for other RPCs immediately.

njhill added a commit to njhill/grpc-java that referenced this pull request Aug 27, 2020
Outbound equivalent of grpc#7330.

Protobuf doesn't support multiple ByteBuffers in this direction but I don't think that matters much since the outbound buffers are typically allocated/sized to fit the messages.
@voidzcy voidzcy force-pushed the impl/zero_copy_into_protobuf_2 branch from eee4f0f to 7b4e070 Compare August 28, 2020 22:55
@voidzcy voidzcy requested a review from ejona86 August 28, 2020 23:00
@voidzcy
Copy link
Contributor Author

voidzcy commented Aug 28, 2020

Thanks for reviewing, all comments addressed. PTAL.

/**
* Indicates whether or not {@link #getByteBuffer} operation is supported for this buffer.
*/
boolean canUseByteBuffer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe better name would be hasByteBuffer()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, go as you like. I don't have a strong preference.

*/
@VisibleForTesting
static final boolean IS_JAVA9_OR_HIGHER =
!"1.7".equals(JAVA_VERSION) && !"1.8".equals(JAVA_VERSION);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about something like

static {
  boolean isJava9 = true;
  try {
    Class.forName("java.lang.StackWalker");
  } catch (ClassNotFoundException cnfe) {
    isJava9 = false;
  }
  IS_JAVA9_OR_HIGHER = isJava9;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, seems to be simpler and guess it should work equally well. Changed.

/**
* Indicates whether or not {@link #getByteBuffer} operation is supported.
*/
boolean getByteBufferSupported();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not give this the same name as the equivalent added ReadableBuffer method (my suggestion would be hasByteBuffer())?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't need to match what we use for ReadableBuffer right? We'd probably have a discussion on this API this Thursday and we usually do a vote for the name.

hasByteBuffer 1 vote now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As well as for consistency, having it line up with ReadableBuffer would permit ReadableBuffers to themselves be InputStreams implementing this interface, which I think might allow for some other streamlining later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, for that reason.

My original thought was this HasByteBuffer interface should only have the getByteBuffer method. InputStreams not supporting the operation should just not implement this interface. #7330 (comment) suggests combining two InputStream implementations into one by adding this getByteBufferSupported method. This look ok, but making the method hasByteBuffer does make this API wired. I'd rather change the hasByteBuffer method on ReadableBuffer interface to getByteBufferSupported.

@voidzcy
Copy link
Contributor Author

voidzcy commented Sep 2, 2020

@ejona86 All comments addressed, PTAL.

@ejona86
Copy link
Member

ejona86 commented Sep 24, 2020

So this is in limbo because Java doesn't really let protobuf avoid the initialization. There is a JDK enhancement request, but will not probably be resolved soon enough to be helpful to users within the next year-to-two-ish (even after a fix, it will take time to roll out to users).

So that leaves this as a not-clear-winner. We did see a benchmark show better performance, in terms of CPU time. Since we aren't needing to allocate a large contiguous byte[], then maybe that by itself provides benefits. I do question that, or rather, I think there's some cases where HTTP/2 frame fragmentation will reduce the benefit. Although I'd also like to see other optimizations increasing the frame size as appropriate for performance.

We can try running the TransportBenchmark with the 'gc' profiler and see if it shows clear benefit. Or maybe some other benchmark. But without any further evidence, I think we should close this PR.

@voidzcy
Copy link
Contributor Author

voidzcy commented Sep 25, 2020

So that leaves this as a not-clear-winner. We did see a benchmark show better performance, in terms of CPU time. Since we aren't needing to allocate a large contiguous byte[], then maybe that by itself provides benefits. I do question that, or rather, I think there's some cases where HTTP/2 frame fragmentation will reduce the benefit. Although I'd also like to see other optimizations increasing the frame size as appropriate for performance.

The problem for this change without protobuf's array initialization avoidance in ByteBuffer codepath is that the improvement provided by this change in gRPC will be outweighed by the cost of unnecessary initialization in protobuf (note that protobuf's byte array codepath's array initialization is eliminated by JDK's new byte[] + System.arraycopy() optimization).

Closing this PR now as the change doesn't seem to make the overall performance better without JDK enhancement request being resolved.

@voidzcy voidzcy closed this Sep 25, 2020
veblush added a commit to veblush/grpc-java that referenced this pull request Feb 10, 2021
@voidzcy voidzcy reopened this Apr 19, 2021
@voidzcy
Copy link
Contributor Author

voidzcy commented Apr 21, 2021

Close in favor of #8102.

@voidzcy voidzcy closed this Apr 21, 2021
@github-actions github-actions bot locked as resolved and limited conversation to collaborators Jul 20, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants