Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,24 @@

import io.netty.buffer.ByteBuf;

import org.apache.spark.network.protocol.Message;
import org.apache.spark.network.server.MessageHandler;
import org.apache.spark.network.util.TransportFrameDecoder;

/**
* An interceptor that is registered with the frame decoder to feed stream data to a
* callback.
*/
class StreamInterceptor implements TransportFrameDecoder.Interceptor {
public class StreamInterceptor<T extends Message> implements TransportFrameDecoder.Interceptor {

private final TransportResponseHandler handler;
private final MessageHandler<T> handler;
private final String streamId;
private final long byteCount;
private final StreamCallback callback;
private long bytesRead;

StreamInterceptor(
TransportResponseHandler handler,
public StreamInterceptor(
MessageHandler<T> handler,
String streamId,
long byteCount,
StreamCallback callback) {
Expand All @@ -50,16 +52,22 @@ class StreamInterceptor implements TransportFrameDecoder.Interceptor {

@Override
public void exceptionCaught(Throwable cause) throws Exception {
handler.deactivateStream();
deactivateStream();
callback.onFailure(streamId, cause);
}

@Override
public void channelInactive() throws Exception {
handler.deactivateStream();
deactivateStream();
callback.onFailure(streamId, new ClosedChannelException());
}

private void deactivateStream() {
if (handler instanceof TransportResponseHandler) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we need to do this for TransportRequestHandler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only purpose of TransportResponseHandler.deactivateStream() is to include the stream request in the count for numOutstandingRequests (its not doing any critical cleanup). I will include a comment here explaining that.

((TransportResponseHandler) handler).deactivateStream();
}
}

@Override
public boolean handle(ByteBuf buf) throws Exception {
int toRead = (int) Math.min(buf.readableBytes(), byteCount - bytesRead);
Expand All @@ -72,10 +80,10 @@ public boolean handle(ByteBuf buf) throws Exception {
RuntimeException re = new IllegalStateException(String.format(
"Read too many bytes? Expected %d, but read %d.", byteCount, bytesRead));
callback.onFailure(streamId, re);
handler.deactivateStream();
deactivateStream();
throw re;
} else if (bytesRead == byteCount) {
handler.deactivateStream();
deactivateStream();
callback.onComplete(streamId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,13 @@
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.SettableFuture;
import io.netty.channel.Channel;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.protocol.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.protocol.ChunkFetchRequest;
import org.apache.spark.network.protocol.OneWayMessage;
import org.apache.spark.network.protocol.RpcRequest;
import org.apache.spark.network.protocol.StreamChunkId;
import org.apache.spark.network.protocol.StreamRequest;

import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;

/**
Expand Down Expand Up @@ -244,6 +242,54 @@ public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {
return requestId;
}

/**
* Send data to the remote end as a stream. This differs from stream() in that this is a request
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you're in the "2 spaces after period camp", but that's 3.

* to *send* data to the remote end, not to receive it from the remote.
*
* @param meta meta data associated with the stream, which will be read completely on the
* receiving end before the stream itself.
* @param data this will be streamed to the remote end to allow for transferring large amounts
* of data without reading into memory.
* @param callback handles the reply -- onSuccess will only be called when both message and data
* are received successfully.
*/
public long uploadStream(
ManagedBuffer meta,
ManagedBuffer data,
RpcResponseCallback callback) {
long startTime = System.currentTimeMillis();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like it should be easy to move this to StdChannelListener's constructor. Looks pretty similar in all methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't do that the originally as I figured you wanted the startTime to be before writeAndFlush, but I can work around that too.

if (logger.isTraceEnabled()) {
logger.trace("Sending RPC to {}", getRemoteAddress(channel));
}

long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Math.abs(UUID.randomUUID().getLeastSignificantBits()); is repeated twice. Move it to a separate new method .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

handler.addRpcRequest(requestId, callback);

channel.writeAndFlush(new UploadStream(requestId, meta, data))
.addListener(future -> {
if (future.isSuccess()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First reaction is that it's about the right time to refactor this into a helper method... all instances in this class look quite similar.

long timeTaken = System.currentTimeMillis() - startTime;
if (logger.isTraceEnabled()) {
logger.trace("Sending request {} to {} took {} ms", requestId,
getRemoteAddress(channel), timeTaken);
}
} else {
String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
getRemoteAddress(channel), future.cause());
logger.error(errorMsg, future.cause());
handler.removeRpcRequest(requestId);
channel.close();
try {
callback.onFailure(new IOException(errorMsg, future.cause()));
} catch (Exception e) {
logger.error("Uncaught exception in RPC response callback handler!", e);
}
}
});

return requestId;
}

/**
* Synchronously sends an opaque message to the RpcHandler on the server-side, waiting for up to
* a specified timeout for a response.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.spark.network.sasl.SecretKeyHolder;
import org.apache.spark.network.sasl.SaslRpcHandler;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamData;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.util.TransportConf;

Expand Down Expand Up @@ -80,9 +81,13 @@ class AuthRpcHandler extends RpcHandler {
}

@Override
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
public void receive(
TransportClient client,
ByteBuffer message,
StreamData streamData,
RpcResponseCallback callback) {
if (doDelegate) {
delegate.receive(client, message, callback);
delegate.receive(client, message, streamData, callback);
return;
}

Expand All @@ -100,7 +105,7 @@ public void receive(TransportClient client, ByteBuffer message, RpcResponseCallb
delegate = new SaslRpcHandler(conf, channel, delegate, secretKeyHolder);
message.position(position);
message.limit(limit);
delegate.receive(client, message, callback);
delegate.receive(client, message, streamData, callback);
doDelegate = true;
} else {
LOG.debug("Unexpected challenge message from client {}, closing channel.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ enum Type implements Encodable {
ChunkFetchRequest(0), ChunkFetchSuccess(1), ChunkFetchFailure(2),
RpcRequest(3), RpcResponse(4), RpcFailure(5),
StreamRequest(6), StreamResponse(7), StreamFailure(8),
OneWayMessage(9), User(-1);
OneWayMessage(9), UploadStream(10), User(-1);

private final byte id;

Expand Down Expand Up @@ -65,6 +65,7 @@ public static Type decode(ByteBuf buf) {
case 7: return StreamResponse;
case 8: return StreamFailure;
case 9: return OneWayMessage;
case 10: return UploadStream;
case -1: throw new IllegalArgumentException("User type messages cannot be decoded.");
default: throw new IllegalArgumentException("Unknown message type: " + id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ private Message decode(Message.Type msgType, ByteBuf in) {
case StreamFailure:
return StreamFailure.decode(in);

case UploadStream:
return UploadStream.decode(in);

default:
throw new IllegalArgumentException("Unexpected message type: " + msgType);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.protocol;

import java.io.IOException;
import java.nio.ByteBuffer;

import com.google.common.base.Objects;
import io.netty.buffer.ByteBuf;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NettyManagedBuffer;

/**
* An RPC with data that is sent outside of the frame, so it can be read in a stream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a stream?

*/
public final class UploadStream extends AbstractMessage implements RequestMessage {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to merge UploadStream and RpcRequest into a class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps, but do you think that is really that useful? the handling of them is different (both in the network layer and the outer RpcHandler). And other things being equal, I'm biased to fewer changes to existing code paths.

/** Used to link an RPC request with its response. */
public final long requestId;
public final ManagedBuffer meta;
public final long bodyByteCount;

public UploadStream(long requestId, ManagedBuffer meta, ManagedBuffer body) {
super(body, false); // body is *not* included in the frame
this.requestId = requestId;
this.meta = meta;
bodyByteCount = body.size();
}

// this version is called when decoding the bytes on the receiving end. The body is handled
// separately.
private UploadStream(long requestId, ManagedBuffer meta, long bodyByteCount) {
super(null, false);
this.requestId = requestId;
this.meta = meta;
this.bodyByteCount = bodyByteCount;
}

@Override
public Type type() { return Type.UploadStream; }

@Override
public int encodedLength() {
// the requestId, meta size, meta and bodyByteCount (body is not included)
return 8 + 4 + ((int) meta.size()) + 8;
}

@Override
public void encode(ByteBuf buf) {
buf.writeLong(requestId);
try {
ByteBuffer metaBuf = meta.nioByteBuffer();
buf.writeInt(metaBuf.remaining());
buf.writeBytes(metaBuf);
} catch (IOException io) {
throw new RuntimeException(io);
}
buf.writeLong(bodyByteCount);
}

public static UploadStream decode(ByteBuf buf) {
long requestId = buf.readLong();
int metaSize = buf.readInt();
ManagedBuffer meta = new NettyManagedBuffer(buf.readRetainedSlice(metaSize));
long bodyByteCount = buf.readLong();
// This is called by the frame decoder, so the data is still null. We need a StreamInterceptor
// to read the data.
return new UploadStream(requestId, meta, bodyByteCount);
}

@Override
public int hashCode() {
return Objects.hashCode(requestId, body());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The equals() and hashCode() implementations of this UploadStream class appear to differ slightly: the equals() method only checks equality of the requestIds, whereas this hashCode is checking both the requestId and the body(). I'm not sure what a ManagedBuffer's hashCode() is: the hashCode() might not depend on the buffer contents, in which case this could lead to false hashCode mismatches for equal requests. Should we use just requestId here instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a good point. Admittedly I just copied this from StreamResponse without thinking about it too much -- that class exhibits the same issue. I'll remove body from both.

(In practice, we're not using sticking them in hashmaps now so there wouldn't be any bugs in behavior because of this.)

}

@Override
public boolean equals(Object other) {
if (other instanceof UploadStream) {
UploadStream o = (UploadStream) other;
return requestId == o.requestId && super.equals(o);
}
return false;
}

@Override
public String toString() {
return Objects.toStringHelper(this)
.add("requestId", requestId)
.add("body", body())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar question here about whether body() is useful in this context: will this actually end up printing buffer contents, which are potentially huge? Or will it do something reasonable and print only the buffer type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be honest, this was also just parroted from other classes -- looking now at implementations of ManagedBuffer, if they have a toString() it does something reasonable.

Is that actually useful for debugging? maybe not, don't think I ever actually looked at this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not actually sure. I wonder if this is a latent problem in the old code waiting to happen in case we turn on trace logging. We can probably investigate that separately, but just wanted to note it since it seemed a little dodgy.

.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamData;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.TransportConf;
Expand Down Expand Up @@ -76,10 +77,14 @@ public SaslRpcHandler(
}

@Override
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
public void receive(
TransportClient client,
ByteBuffer message,
StreamData streamData,
RpcResponseCallback callback) {
if (isComplete) {
// Authentication complete, delegate to base handler.
delegate.receive(client, message, callback);
delegate.receive(client, message, streamData, callback);
return;
}
if (saslServer == null || !saslServer.isComplete()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ public NoOpRpcHandler() {
}

@Override
public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
public void receive(
TransportClient client,
ByteBuffer message,
StreamData streamData,
RpcResponseCallback callback) {
throw new UnsupportedOperationException("Cannot handle messages");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,24 @@ public abstract class RpcHandler {
*
* This method will not be called in parallel for a single TransportClient (i.e., channel).
*
* The rpc *might* included a data stream in <code>streamData</code>(eg. for uploading a large
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space before (

* amount of data which should not be buffered in memory here). Any errors while handling the
* streamData will lead to failing this entire connection -- all other in-flight rpcs will fail.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps naive question: what are the implications of this? Is this referring to a scenario where we've multiplexed multiple asynchronous requests / responses over a single network connection? I think I understand why the failure mode is as stated (we're worried about leaving non-consumed leftover data in the channel) but I just wanted to ask about the implications of failing other in-flight RPCs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pretty good question actually :)

I will take a closer look at this myself but I believe this connection is shared by other tasks running on the same executor which are trying to talk to the same destination. So that might mean another task which is replicating to the same destination, or reading data from that same remote executor. those don't have specific retry behavior for connection closed -- that might result in the data just not getting replicated, fetching data from elsewhere, or the task getting retried.

I think this is actually OK -- the existing code could cause an OOM on the remote end anyway, which obviously would fail a lot more. This failure behavior seems reasonable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to think through whether we'll risk introducing any weird new failure modes (or increasing the occurrence of existing-but-improbable failure modes). For example, causing in-flight RPCs to fail could surface latent RPC timeout issues: if we have a timeout which is way too long and we drop in-flight responses on the floor without sending back negative ACKs then we could see (finite but potentially long) hangs.

On the other hand, this pathway is used for executor <-> executor transfers and generally not executor <-> driver transfers, so my understanding is that failures in this channel generally won't impact control RPCs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you bring up a good point here. I was thinking about how the places we might have an error occur:

  1. while reading the stream data (ie. StreamCallback.onData). In the intended use case, this is basically just opening a file and writing bytes to it.

  2. post-processing the complete data (StreamCallback.onComplete). This is doing the whole BlockManager.put, which can be rather complex.

Failures in (1) are unlikely and are difficult to recover; failures in (2) are more likely, but the channel should be totally fine. I've updated the code, comments, and test to make sure things are OK for (2). 6c086c5

though your points are still valid for (1), though I think we can live with it.

* If stream data is not null, you *must* call <code>streamData.registerStreamCallback</code>
* before this method returns.
*
* @param client A channel client which enables the handler to make requests back to the sender
* of this RPC. This will always be the exact same object for a particular channel.
* @param message The serialized bytes of the RPC.
* @param streamData StreamData if there is data which is meant to be read via a StreamCallback;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if a separate callback for these streams wouldn't be better. It would at the very least avoid having to change all the existing handlers.

But it would also make it clearer what the contract is. For example, the callback could return the stream callback to be registered.

It also doesn't seem like StreamData itself has a lot of useful information other than the registration method, so it could be replaced with parameters in the new callback, avoiding having to expose that type to RPC handlers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've done this refactoring, and I agree it made the change significantly simpler.

* otherwise it is null.
* @param callback Callback which should be invoked exactly once upon success or failure of the
* RPC.
*/
public abstract void receive(
TransportClient client,
ByteBuffer message,
StreamData streamData,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moving discussion from here: #21451 (comment)

@witgo suggested the message could be moved inside streamData -- any particular reason to do that? It would work fine to do it that way as well, though I don't see any advantage. I guess I'm in favor of keeping it this way.

RpcResponseCallback callback);

/**
Expand All @@ -57,15 +66,15 @@ public abstract void receive(

/**
* Receives an RPC message that does not expect a reply. The default implementation will
* call "{@link #receive(TransportClient, ByteBuffer, RpcResponseCallback)}" and log a warning if
* any of the callback methods are called.
* call "{@link #receive(TransportClient, ByteBuffer, StreamData, RpcResponseCallback)}" and log a
* warning if any of the callback methods are called.
*
* @param client A channel client which enables the handler to make requests back to the sender
* of this RPC. This will always be the exact same object for a particular channel.
* @param message The serialized bytes of the RPC.
*/
public void receive(TransportClient client, ByteBuffer message) {
receive(client, message, ONE_WAY_CALLBACK);
receive(client, message, null, ONE_WAY_CALLBACK);
}

/**
Expand Down
Loading