Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,16 @@ public ByteBuffer nioByteBuffer() throws IOException {
return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
}
} catch (IOException e) {
String errorMessage = "Error in reading " + this;
try {
if (channel != null) {
long size = channel.size();
throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just thrown and then ignored. I assigned it to errorMessage so that we can see it in the error.

e);
errorMessage = "Error in reading " + this + " (actual file length " + size + ")";
}
} catch (IOException ignored) {
// ignore
}
throw new IOException("Error in opening " + this, e);
throw new IOException(errorMessage, e);
} finally {
JavaUtils.closeQuietly(channel);
}
Expand All @@ -95,26 +95,24 @@ public ByteBuffer nioByteBuffer() throws IOException {
@Override
public InputStream createInputStream() throws IOException {
FileInputStream is = null;
boolean shouldClose = true;
try {
is = new FileInputStream(file);
ByteStreams.skipFully(is, offset);
return new LimitedInputStream(is, length);
InputStream r = new LimitedInputStream(is, length);
shouldClose = false;
return r;
} catch (IOException e) {
try {
if (is != null) {
long size = file.length();
throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

e);
}
} catch (IOException ignored) {
// ignore
} finally {
String errorMessage = "Error in reading " + this;
if (is != null) {
long size = file.length();
errorMessage = "Error in reading " + this + " (actual file length " + size + ")";
}
throw new IOException(errorMessage, e);
} finally {
if (shouldClose) {
JavaUtils.closeQuietly(is);
}
throw new IOException("Error in opening " + this, e);
} catch (RuntimeException e) {
JavaUtils.closeQuietly(is);
throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,14 @@ public TransportServer(
this.appRpcHandler = appRpcHandler;
this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));

boolean shouldClose = true;
try {
init(hostToBind, portToBind);
} catch (RuntimeException e) {
JavaUtils.closeQuietly(this);
throw e;
shouldClose = false;
} finally {
if (shouldClose) {
JavaUtils.closeQuietly(this);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,43 +42,59 @@ private[spark] class SocketAuthHelper(conf: SparkConf) {
* Read the auth secret from the socket and compare to the expected value. Write the reply back
* to the socket.
*
* If authentication fails, this method will close the socket.
* If authentication fails or error is thrown, this method will close the socket.
*
* @param s The client socket.
* @throws IllegalArgumentException If authentication fails.
*/
def authClient(s: Socket): Unit = {
// Set the socket timeout while checking the auth secret. Reset it before returning.
val currentTimeout = s.getSoTimeout()
var shouldClose = true
try {
s.setSoTimeout(10000)
val clientSecret = readUtf8(s)
if (secret == clientSecret) {
writeUtf8("ok", s)
} else {
writeUtf8("err", s)
JavaUtils.closeQuietly(s)
// Set the socket timeout while checking the auth secret. Reset it before returning.
val currentTimeout = s.getSoTimeout()
try {
s.setSoTimeout(10000)
val clientSecret = readUtf8(s)
if (secret == clientSecret) {
writeUtf8("ok", s)
shouldClose = false
} else {
writeUtf8("err", s)
throw new IllegalArgumentException("Authentication failed.")
}
} finally {
s.setSoTimeout(currentTimeout)
}
} finally {
s.setSoTimeout(currentTimeout)
if (shouldClose) {
JavaUtils.closeQuietly(s)
}
}
}

/**
* Authenticate with a server by writing the auth secret and checking the server's reply.
*
* If authentication fails, this method will close the socket.
* If authentication fails or error is thrown, this method will close the socket.
*
* @param s The socket connected to the server.
* @throws IllegalArgumentException If authentication fails.
*/
def authToServer(s: Socket): Unit = {
writeUtf8(secret, s)
var shouldClose = true
try {
writeUtf8(secret, s)

val reply = readUtf8(s)
if (reply != "ok") {
JavaUtils.closeQuietly(s)
throw new IllegalArgumentException("Authentication failed.")
val reply = readUtf8(s)
if (reply != "ok") {
throw new IllegalArgumentException("Authentication failed.")
} else {
shouldClose = false
}
} finally {
if (shouldClose) {
JavaUtils.closeQuietly(s)
}
}
}

Expand Down