Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,8 @@ public void operationComplete(Future<? super Void> future) throws Exception {
}
} else {
logger.error("Failed to send RPC {} to {}", future.cause(),
MDC.of(LogKeys.REQUEST_ID$.MODULE$, requestId),
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)));
MDC.of(LogKeys.REQUEST_ID, requestId),
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)));
channel.close();
try {
String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@ public TransportClient createClient(String remoteHost, int remotePort, boolean f
final String resolvMsg = resolvedAddress.isUnresolved() ? "failed" : "succeed";
if (hostResolveTimeMs > 2000) {
logger.warn("DNS resolution {} for {} took {} ms",
MDC.of(LogKeys.STATUS$.MODULE$, resolvMsg),
MDC.of(LogKeys.HOST_PORT$.MODULE$, resolvedAddress),
MDC.of(LogKeys.TIME$.MODULE$, hostResolveTimeMs));
MDC.of(LogKeys.STATUS, resolvMsg),
MDC.of(LogKeys.HOST_PORT, resolvedAddress),
MDC.of(LogKeys.TIME, hostResolveTimeMs));
} else {
logger.trace("DNS resolution {} for {} took {} ms",
resolvMsg, resolvedAddress, hostResolveTimeMs);
Expand All @@ -210,7 +210,7 @@ public TransportClient createClient(String remoteHost, int remotePort, boolean f
return cachedClient;
} else {
logger.info("Found inactive connection to {}, creating a new one.",
MDC.of(LogKeys.HOST_PORT$.MODULE$, resolvedAddress));
MDC.of(LogKeys.HOST_PORT, resolvedAddress));
}
}
// If this connection should fast fail when last connection failed in last fast fail time
Expand Down Expand Up @@ -314,7 +314,7 @@ public void operationComplete(final Future<Channel> handshakeFuture) {
logger.debug("{} successfully completed TLS handshake to ", address);
} else {
logger.info("failed to complete TLS handshake to {}", handshakeFuture.cause(),
MDC.of(LogKeys.HOST_PORT$.MODULE$, address));
MDC.of(LogKeys.HOST_PORT, address));
cf.channel().close();
}
}
Expand All @@ -340,17 +340,17 @@ public void operationComplete(final Future<Channel> handshakeFuture) {
} catch (Exception e) { // catch non-RuntimeExceptions too as bootstrap may be written in Scala
long bootstrapTimeMs = (System.nanoTime() - preBootstrap) / 1000000;
logger.error("Exception while bootstrapping client after {} ms", e,
MDC.of(LogKeys.BOOTSTRAP_TIME$.MODULE$, bootstrapTimeMs));
MDC.of(LogKeys.BOOTSTRAP_TIME, bootstrapTimeMs));
client.close();
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
long postBootstrap = System.nanoTime();

logger.info("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)",
MDC.of(LogKeys.HOST_PORT$.MODULE$, address),
MDC.of(LogKeys.ELAPSED_TIME$.MODULE$, (postBootstrap - preConnect) / 1000000),
MDC.of(LogKeys.BOOTSTRAP_TIME$.MODULE$, (postBootstrap - preBootstrap) / 1000000));
MDC.of(LogKeys.HOST_PORT, address),
MDC.of(LogKeys.ELAPSED_TIME, (postBootstrap - preConnect) / 1000000),
MDC.of(LogKeys.BOOTSTRAP_TIME, (postBootstrap - preBootstrap) / 1000000));

return client;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ public void channelInactive() {
if (hasOutstandingRequests()) {
String remoteAddress = getRemoteAddress(channel);
logger.error("Still have {} requests outstanding when connection from {} is closed",
MDC.of(LogKeys.COUNT$.MODULE$, numOutstandingRequests()),
MDC.of(LogKeys.HOST_PORT$.MODULE$, remoteAddress));
MDC.of(LogKeys.COUNT, numOutstandingRequests()),
MDC.of(LogKeys.HOST_PORT, remoteAddress));
failOutstandingRequests(new IOException("Connection from " + remoteAddress + " closed"));
}
}
Expand All @@ -156,8 +156,8 @@ public void exceptionCaught(Throwable cause) {
if (hasOutstandingRequests()) {
String remoteAddress = getRemoteAddress(channel);
logger.error("Still have {} requests outstanding when connection from {} is closed",
MDC.of(LogKeys.COUNT$.MODULE$, numOutstandingRequests()),
MDC.of(LogKeys.HOST_PORT$.MODULE$, remoteAddress));
MDC.of(LogKeys.COUNT, numOutstandingRequests()),
MDC.of(LogKeys.HOST_PORT, remoteAddress));
failOutstandingRequests(cause);
}
}
Expand All @@ -168,8 +168,8 @@ public void handle(ResponseMessage message) throws Exception {
ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
if (listener == null) {
logger.warn("Ignoring response for block {} from {} since it is not outstanding",
MDC.of(LogKeys.STREAM_CHUNK_ID$.MODULE$, resp.streamChunkId),
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)));
MDC.of(LogKeys.STREAM_CHUNK_ID, resp.streamChunkId),
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)));
resp.body().release();
} else {
outstandingFetches.remove(resp.streamChunkId);
Expand All @@ -180,9 +180,9 @@ public void handle(ResponseMessage message) throws Exception {
ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
if (listener == null) {
logger.warn("Ignoring response for block {} from {} ({}) since it is not outstanding",
MDC.of(LogKeys.STREAM_CHUNK_ID$.MODULE$, resp.streamChunkId),
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)),
MDC.of(LogKeys.ERROR$.MODULE$, resp.errorString));
MDC.of(LogKeys.STREAM_CHUNK_ID, resp.streamChunkId),
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)),
MDC.of(LogKeys.ERROR, resp.errorString));
} else {
outstandingFetches.remove(resp.streamChunkId);
listener.onFailure(resp.streamChunkId.chunkIndex(), new ChunkFetchFailureException(
Expand All @@ -192,9 +192,9 @@ public void handle(ResponseMessage message) throws Exception {
RpcResponseCallback listener = (RpcResponseCallback) outstandingRpcs.get(resp.requestId);
if (listener == null) {
logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",
MDC.of(LogKeys.REQUEST_ID$.MODULE$, resp.requestId),
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)),
MDC.of(LogKeys.RESPONSE_BODY_SIZE$.MODULE$, resp.body().size()));
MDC.of(LogKeys.REQUEST_ID, resp.requestId),
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)),
MDC.of(LogKeys.RESPONSE_BODY_SIZE, resp.body().size()));
resp.body().release();
} else {
outstandingRpcs.remove(resp.requestId);
Expand All @@ -208,9 +208,9 @@ public void handle(ResponseMessage message) throws Exception {
BaseResponseCallback listener = outstandingRpcs.get(resp.requestId);
if (listener == null) {
logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding",
MDC.of(LogKeys.REQUEST_ID$.MODULE$, resp.requestId),
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)),
MDC.of(LogKeys.ERROR$.MODULE$, resp.errorString));
MDC.of(LogKeys.REQUEST_ID, resp.requestId),
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)),
MDC.of(LogKeys.ERROR, resp.errorString));
} else {
outstandingRpcs.remove(resp.requestId);
listener.onFailure(new RuntimeException(resp.errorString));
Expand All @@ -222,9 +222,9 @@ public void handle(ResponseMessage message) throws Exception {
if (listener == null) {
logger.warn("Ignoring response for MergedBlockMetaRequest {} from {} ({} bytes) since "
+ "it is not outstanding",
MDC.of(LogKeys.REQUEST_ID$.MODULE$, resp.requestId),
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)),
MDC.of(LogKeys.RESPONSE_BODY_SIZE$.MODULE$, resp.body().size()));
MDC.of(LogKeys.REQUEST_ID, resp.requestId),
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)),
MDC.of(LogKeys.RESPONSE_BODY_SIZE, resp.body().size()));
} else {
outstandingRpcs.remove(resp.requestId);
listener.onSuccess(resp.getNumChunks(), resp.body());
Expand Down Expand Up @@ -269,7 +269,7 @@ public void handle(ResponseMessage message) throws Exception {
}
} else {
logger.warn("Stream failure with unknown callback: {}",
MDC.of(LogKeys.ERROR$.MODULE$, resp.error));
MDC.of(LogKeys.ERROR, resp.error));
}
} else {
throw new IllegalStateException("Unknown response type: " + message.type());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected boolean doAuthChallenge(
} catch (RuntimeException e) {
if (conf.saslFallback()) {
LOG.warn("Failed to parse new auth challenge, reverting to SASL for client {}.",
MDC.of(LogKeys.HOST_PORT$.MODULE$, channel.remoteAddress()));
MDC.of(LogKeys.HOST_PORT, channel.remoteAddress()));
saslHandler = new SaslRpcHandler(conf, channel, null, secretKeyHolder);
message.position(position);
message.limit(limit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ public void encode(ChannelHandlerContext ctx, Message in, List<Object> out) thro
// Re-encode this message as a failure response.
String error = e.getMessage() != null ? e.getMessage() : "null";
logger.error("Error processing {} for client {}", e,
MDC.of(LogKeys.MESSAGE$.MODULE$, in),
MDC.of(LogKeys.HOST_PORT$.MODULE$, ctx.channel().remoteAddress()));
MDC.of(LogKeys.MESSAGE, in),
MDC.of(LogKeys.HOST_PORT, ctx.channel().remoteAddress()));
encode(ctx, resp.createFailureResponse(error), out);
} else {
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ public void encode(ChannelHandlerContext ctx, Message in, List<Object> out) thro
// Re-encode this message as a failure response.
String error = e.getMessage() != null ? e.getMessage() : "null";
logger.error("Error processing {} for client {}", e,
MDC.of(LogKeys.MESSAGE$.MODULE$, in),
MDC.of(LogKeys.HOST_PORT$.MODULE$, ctx.channel().remoteAddress()));
MDC.of(LogKeys.MESSAGE, in),
MDC.of(LogKeys.HOST_PORT, ctx.channel().remoteAddress()));
encode(ctx, resp.createFailureResponse(error), out);
} else {
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public ChunkFetchRequestHandler(
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.warn("Exception in connection from {}", cause,
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(ctx.channel())));
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(ctx.channel())));
ctx.close();
}

Expand All @@ -96,8 +96,8 @@ public void processFetchRequest(
long chunksBeingTransferred = streamManager.chunksBeingTransferred();
if (chunksBeingTransferred >= maxChunksBeingTransferred) {
logger.warn("The number of chunks being transferred {} is above {}, close the connection.",
MDC.of(LogKeys.NUM_CHUNKS$.MODULE$, chunksBeingTransferred),
MDC.of(LogKeys.MAX_NUM_CHUNKS$.MODULE$, maxChunksBeingTransferred));
MDC.of(LogKeys.NUM_CHUNKS, chunksBeingTransferred),
MDC.of(LogKeys.MAX_NUM_CHUNKS, maxChunksBeingTransferred));
channel.close();
return;
}
Expand All @@ -111,8 +111,8 @@ public void processFetchRequest(
}
} catch (Exception e) {
logger.error("Error opening block {} for request from {}", e,
MDC.of(LogKeys.STREAM_CHUNK_ID$.MODULE$, msg.streamChunkId),
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)));
MDC.of(LogKeys.STREAM_CHUNK_ID, msg.streamChunkId),
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)));
respond(channel, new ChunkFetchFailure(msg.streamChunkId,
Throwables.getStackTraceAsString(e)));
return;
Expand Down Expand Up @@ -153,8 +153,8 @@ private ChannelFuture respond(
} else {
logger.error("Error sending result {} to {}; closing connection",
future.cause(),
MDC.of(LogKeys.RESULT$.MODULE$, result),
MDC.of(LogKeys.HOST_PORT$.MODULE$, remoteAddress));
MDC.of(LogKeys.RESULT, result),
MDC.of(LogKeys.HOST_PORT, remoteAddress));
channel.close();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public TransportClient getClient() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.warn("Exception in connection from {}", cause,
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(ctx.channel())));
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(ctx.channel())));
requestHandler.exceptionCaught(cause);
responseHandler.exceptionCaught(cause);
ctx.close();
Expand Down Expand Up @@ -168,9 +168,9 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
"requests. Assuming connection is dead; please adjust" +
" spark.{}.io.connectionTimeout if this is wrong.",
MDC.of(LogKeys.HOST_PORT$.MODULE$, address),
MDC.of(LogKeys.TIMEOUT$.MODULE$, requestTimeoutNs / 1000 / 1000),
MDC.of(LogKeys.MODULE_NAME$.MODULE$, transportContext.getConf().getModuleName()));
MDC.of(LogKeys.HOST_PORT, address),
MDC.of(LogKeys.TIMEOUT, requestTimeoutNs / 1000 / 1000),
MDC.of(LogKeys.MODULE_NAME, transportContext.getConf().getModuleName()));
client.timeOut();
ctx.close();
} else if (closeIdleConnections) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ private void processStreamRequest(final StreamRequest req) {
long chunksBeingTransferred = streamManager.chunksBeingTransferred();
if (chunksBeingTransferred >= maxChunksBeingTransferred) {
logger.warn("The number of chunks being transferred {} is above {}, close the connection.",
MDC.of(LogKeys.NUM_CHUNKS$.MODULE$, chunksBeingTransferred),
MDC.of(LogKeys.MAX_NUM_CHUNKS$.MODULE$, maxChunksBeingTransferred));
MDC.of(LogKeys.NUM_CHUNKS, chunksBeingTransferred),
MDC.of(LogKeys.MAX_NUM_CHUNKS, maxChunksBeingTransferred));
channel.close();
return;
}
Expand All @@ -143,8 +143,8 @@ private void processStreamRequest(final StreamRequest req) {
buf = streamManager.openStream(req.streamId);
} catch (Exception e) {
logger.error("Error opening stream {} for request from {}", e,
MDC.of(LogKeys.STREAM_ID$.MODULE$, req.streamId),
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)));
MDC.of(LogKeys.STREAM_ID, req.streamId),
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)));
respond(new StreamFailure(req.streamId, Throwables.getStackTraceAsString(e)));
return;
}
Expand Down Expand Up @@ -177,8 +177,8 @@ public void onFailure(Throwable e) {
});
} catch (Exception e) {
logger.error("Error while invoking RpcHandler#receive() on RPC id {} from {}", e,
MDC.of(LogKeys.REQUEST_ID$.MODULE$, req.requestId),
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)));
MDC.of(LogKeys.REQUEST_ID, req.requestId),
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)));
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
} finally {
req.body().release();
Expand Down Expand Up @@ -264,8 +264,8 @@ public String getID() {
new NioManagedBuffer(blockPushNonFatalFailure.getResponse())));
} else {
logger.error("Error while invoking RpcHandler#receive() on RPC id {} from {}", e,
MDC.of(LogKeys.REQUEST_ID$.MODULE$, req.requestId),
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)));
MDC.of(LogKeys.REQUEST_ID, req.requestId),
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)));
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
}
// We choose to totally fail the channel, rather than trying to recover as we do in other
Expand All @@ -282,7 +282,7 @@ private void processOneWayMessage(OneWayMessage req) {
rpcHandler.receive(reverseClient, req.body().nioByteBuffer());
} catch (Exception e) {
logger.error("Error while invoking RpcHandler#receive() for one-way message from {}.", e,
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)));
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)));
} finally {
req.body().release();
}
Expand All @@ -307,10 +307,10 @@ public void onFailure(Throwable e) {
});
} catch (Exception e) {
logger.error("Error while invoking receiveMergeBlockMetaReq() for appId {} shuffleId {} "
+ "reduceId {} from {}", e, MDC.of(LogKeys.APP_ID$.MODULE$, req.appId),
MDC.of(LogKeys.SHUFFLE_ID$.MODULE$, req.shuffleId),
MDC.of(LogKeys.REDUCE_ID$.MODULE$, req.reduceId),
MDC.of(LogKeys.HOST_PORT$.MODULE$, getRemoteAddress(channel)));
+ "reduceId {} from {}", e, MDC.of(LogKeys.APP_ID, req.appId),
MDC.of(LogKeys.SHUFFLE_ID, req.shuffleId),
MDC.of(LogKeys.REDUCE_ID, req.reduceId),
MDC.of(LogKeys.HOST_PORT, getRemoteAddress(channel)));
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
}
}
Expand All @@ -326,8 +326,8 @@ private ChannelFuture respond(Encodable result) {
logger.trace("Sent result {} to client {}", result, remoteAddress);
} else {
logger.error("Error sending result {} to {}; closing connection", future.cause(),
MDC.of(LogKeys.RESULT$.MODULE$, result),
MDC.of(LogKeys.HOST_PORT$.MODULE$, remoteAddress));
MDC.of(LogKeys.RESULT, result),
MDC.of(LogKeys.HOST_PORT, remoteAddress));
channel.close();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public static DB initLevelDB(File dbFile, StoreVersion version, ObjectMapper map
tmpDb = JniDBFactory.factory.open(dbFile, options);
} catch (NativeDB.DBException e) {
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
logger.info("Creating state database at {}", MDC.of(LogKeys.PATH$.MODULE$, dbFile));
logger.info("Creating state database at {}", MDC.of(LogKeys.PATH, dbFile));
options.createIfMissing(true);
try {
tmpDb = JniDBFactory.factory.open(dbFile, options);
Expand All @@ -61,16 +61,16 @@ public static DB initLevelDB(File dbFile, StoreVersion version, ObjectMapper map
// the leveldb file seems to be corrupt somehow. Lets just blow it away and create a new
// one, so we can keep processing new apps
logger.error("error opening leveldb file {}. Creating new file, will not be able to " +
"recover state for existing applications", e, MDC.of(LogKeys.PATH$.MODULE$, dbFile));
"recover state for existing applications", e, MDC.of(LogKeys.PATH, dbFile));
if (dbFile.isDirectory()) {
for (File f : dbFile.listFiles()) {
if (!f.delete()) {
logger.warn("error deleting {}", MDC.of(LogKeys.PATH$.MODULE$, f.getPath()));
logger.warn("error deleting {}", MDC.of(LogKeys.PATH, f.getPath()));
}
}
}
if (!dbFile.delete()) {
logger.warn("error deleting {}", MDC.of(LogKeys.PATH$.MODULE$, dbFile.getPath()));
logger.warn("error deleting {}", MDC.of(LogKeys.PATH, dbFile.getPath()));
}
options.createIfMissing(true);
try {
Expand Down
Loading