Skip to content

Commit 1bde06a

Browse files
author
Brandon Li
committed
HADOOP-11757. NFS gateway should shutdown when it can't start UDP or TCP server. Contributed by Brandon Li
(cherry picked from commit 60ce825) (cherry picked from commit 8e2f1a9)
1 parent 8ebbbc6 commit 1bde06a

File tree

5 files changed

+82
-28
lines changed

5 files changed

+82
-28
lines changed

hadoop-common-project/hadoop-common/CHANGES.txt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -715,7 +715,10 @@ Release 2.7.0 - UNRELEASED
715715

716716
HADOOP-11787. OpensslSecureRandom.c pthread_threadid_np usage signature is
717717
wrong on 32-bit Mac. (Kiran Kumar M R via cnauroth)
718-
718+
719+
HADOOP-11757. NFS gateway should shutdown when it can't start UDP or TCP
720+
server (brandonli)
721+
719722
Release 2.6.1 - UNRELEASED
720723

721724
INCOMPATIBLE CHANGES

hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,17 @@ private void startUDPServer() {
6060
SimpleUdpServer udpServer = new SimpleUdpServer(rpcProgram.getPort(),
6161
rpcProgram, 1);
6262
rpcProgram.startDaemons();
63-
udpServer.run();
63+
try {
64+
udpServer.run();
65+
} catch (Throwable e) {
66+
LOG.fatal("Failed to start the UDP server.", e);
67+
if (udpServer.getBoundPort() > 0) {
68+
rpcProgram.unregister(PortmapMapping.TRANSPORT_UDP,
69+
udpServer.getBoundPort());
70+
}
71+
udpServer.shutdown();
72+
terminate(1, e);
73+
}
6474
udpBoundPort = udpServer.getBoundPort();
6575
}
6676

@@ -69,7 +79,17 @@ private void startTCPServer() {
6979
SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
7080
rpcProgram, 1);
7181
rpcProgram.startDaemons();
72-
tcpServer.run();
82+
try {
83+
tcpServer.run();
84+
} catch (Throwable e) {
85+
LOG.fatal("Failed to start the TCP server.", e);
86+
if (tcpServer.getBoundPort() > 0) {
87+
rpcProgram.unregister(PortmapMapping.TRANSPORT_TCP,
88+
tcpServer.getBoundPort());
89+
}
90+
tcpServer.shutdown();
91+
terminate(1, e);
92+
}
7393
tcpBoundPort = tcpServer.getBoundPort();
7494
}
7595

@@ -83,7 +103,7 @@ public void start(boolean register) {
83103
rpcProgram.register(PortmapMapping.TRANSPORT_UDP, udpBoundPort);
84104
rpcProgram.register(PortmapMapping.TRANSPORT_TCP, tcpBoundPort);
85105
} catch (Throwable e) {
86-
LOG.fatal("Failed to start the server. Cause:", e);
106+
LOG.fatal("Failed to register the MOUNT service.", e);
87107
terminate(1, e);
88108
}
89109
}

hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929

3030
/**
3131
* Nfs server. Supports NFS v3 using {@link RpcProgram}.
32-
* Currently Mountd program is also started inside this class.
3332
* Only TCP server is supported and UDP is not supported.
3433
*/
3534
public abstract class Nfs3Base {
@@ -55,7 +54,7 @@ public void start(boolean register) {
5554
try {
5655
rpcProgram.register(PortmapMapping.TRANSPORT_TCP, nfsBoundPort);
5756
} catch (Throwable e) {
58-
LOG.fatal("Failed to start the server. Cause:", e);
57+
LOG.fatal("Failed to register the NFSv3 service.", e);
5958
terminate(1, e);
6059
}
6160
}
@@ -65,7 +64,17 @@ private void startTCPServer() {
6564
SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
6665
rpcProgram, 0);
6766
rpcProgram.startDaemons();
68-
tcpServer.run();
67+
try {
68+
tcpServer.run();
69+
} catch (Throwable e) {
70+
LOG.fatal("Failed to start the TCP server.", e);
71+
if (tcpServer.getBoundPort() > 0) {
72+
rpcProgram.unregister(PortmapMapping.TRANSPORT_TCP,
73+
tcpServer.getBoundPort());
74+
}
75+
tcpServer.shutdown();
76+
terminate(1, e);
77+
}
6978
nfsBoundPort = tcpServer.getBoundPort();
7079
}
7180

hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ public class SimpleTcpServer {
3939
protected final int port;
4040
protected int boundPort = -1; // Will be set after server starts
4141
protected final SimpleChannelUpstreamHandler rpcProgram;
42-
42+
private ServerBootstrap server;
43+
private Channel ch;
44+
4345
/** The maximum number of I/O worker threads */
4446
protected final int workerCount;
4547

@@ -53,7 +55,7 @@ public SimpleTcpServer(int port, RpcProgram program, int workercount) {
5355
this.rpcProgram = program;
5456
this.workerCount = workercount;
5557
}
56-
58+
5759
public void run() {
5860
// Configure the Server.
5961
ChannelFactory factory;
@@ -66,9 +68,9 @@ public void run() {
6668
Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
6769
workerCount);
6870
}
69-
70-
ServerBootstrap bootstrap = new ServerBootstrap(factory);
71-
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
71+
72+
server = new ServerBootstrap(factory);
73+
server.setPipelineFactory(new ChannelPipelineFactory() {
7274

7375
@Override
7476
public ChannelPipeline getPipeline() throws Exception {
@@ -77,14 +79,14 @@ public ChannelPipeline getPipeline() throws Exception {
7779
RpcUtil.STAGE_RPC_TCP_RESPONSE);
7880
}
7981
});
80-
bootstrap.setOption("child.tcpNoDelay", true);
81-
bootstrap.setOption("child.keepAlive", true);
82-
82+
server.setOption("child.tcpNoDelay", true);
83+
server.setOption("child.keepAlive", true);
84+
8385
// Listen to TCP port
84-
Channel ch = bootstrap.bind(new InetSocketAddress(port));
86+
ch = server.bind(new InetSocketAddress(port));
8587
InetSocketAddress socketAddr = (InetSocketAddress) ch.getLocalAddress();
8688
boundPort = socketAddr.getPort();
87-
89+
8890
LOG.info("Started listening to TCP requests at port " + boundPort + " for "
8991
+ rpcProgram + " with workerCount " + workerCount);
9092
}
@@ -93,4 +95,13 @@ public ChannelPipeline getPipeline() throws Exception {
9395
public int getBoundPort() {
9496
return this.boundPort;
9597
}
98+
99+
public void shutdown() {
100+
if (ch != null) {
101+
ch.close().awaitUninterruptibly();
102+
}
103+
if (server != null) {
104+
server.releaseExternalResources();
105+
}
106+
}
96107
}

hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,11 @@ public class SimpleUdpServer {
4141
protected final SimpleChannelUpstreamHandler rpcProgram;
4242
protected final int workerCount;
4343
protected int boundPort = -1; // Will be set after server starts
44+
private ConnectionlessBootstrap server;
45+
private Channel ch;
4446

45-
public SimpleUdpServer(int port, SimpleChannelUpstreamHandler program, int workerCount) {
47+
public SimpleUdpServer(int port, SimpleChannelUpstreamHandler program,
48+
int workerCount) {
4649
this.port = port;
4750
this.rpcProgram = program;
4851
this.workerCount = workerCount;
@@ -53,20 +56,19 @@ public void run() {
5356
DatagramChannelFactory f = new NioDatagramChannelFactory(
5457
Executors.newCachedThreadPool(), workerCount);
5558

56-
ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
57-
b.setPipeline(Channels.pipeline(
58-
RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram,
59-
RpcUtil.STAGE_RPC_UDP_RESPONSE));
59+
server = new ConnectionlessBootstrap(f);
60+
server.setPipeline(Channels.pipeline(RpcUtil.STAGE_RPC_MESSAGE_PARSER,
61+
rpcProgram, RpcUtil.STAGE_RPC_UDP_RESPONSE));
62+
63+
server.setOption("broadcast", "false");
64+
server.setOption("sendBufferSize", SEND_BUFFER_SIZE);
65+
server.setOption("receiveBufferSize", RECEIVE_BUFFER_SIZE);
6066

61-
b.setOption("broadcast", "false");
62-
b.setOption("sendBufferSize", SEND_BUFFER_SIZE);
63-
b.setOption("receiveBufferSize", RECEIVE_BUFFER_SIZE);
64-
6567
// Listen to the UDP port
66-
Channel ch = b.bind(new InetSocketAddress(port));
68+
ch = server.bind(new InetSocketAddress(port));
6769
InetSocketAddress socketAddr = (InetSocketAddress) ch.getLocalAddress();
6870
boundPort = socketAddr.getPort();
69-
71+
7072
LOG.info("Started listening to UDP requests at port " + boundPort + " for "
7173
+ rpcProgram + " with workerCount " + workerCount);
7274
}
@@ -75,4 +77,13 @@ public void run() {
7577
public int getBoundPort() {
7678
return this.boundPort;
7779
}
80+
81+
public void shutdown() {
82+
if (ch != null) {
83+
ch.close().awaitUninterruptibly();
84+
}
85+
if (server != null) {
86+
server.releaseExternalResources();
87+
}
88+
}
7889
}

0 commit comments

Comments
 (0)