Bläddra i källkod

HADOOP-11757. NFS gateway should shutdown when it can't start UDP or TCP server. Contributed by Brandon Li

(cherry picked from commit 60ce825a71850fe0622d551159e8d66f32448bb5)
(cherry picked from commit 8e2f1a93e462ed15084c2dfa010086d0112f89bd)
Brandon Li 10 år sedan
förälder
incheckning
1bde06aca2

+ 4 - 1
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -715,7 +715,10 @@ Release 2.7.0 - UNRELEASED
 
     HADOOP-11787. OpensslSecureRandom.c pthread_threadid_np usage signature is
     wrong on 32-bit Mac. (Kiran Kumar M R via cnauroth)
-
+    
+    HADOOP-11757. NFS gateway should shutdown when it can't start UDP or TCP
+    server (brandonli)
+    
 Release 2.6.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 23 - 3
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java

@@ -60,7 +60,17 @@ abstract public class MountdBase {
     SimpleUdpServer udpServer = new SimpleUdpServer(rpcProgram.getPort(),
         rpcProgram, 1);
     rpcProgram.startDaemons();
-    udpServer.run();
+    try {
+      udpServer.run();
+    } catch (Throwable e) {
+      LOG.fatal("Failed to start the UDP server.", e);
+      if (udpServer.getBoundPort() > 0) {
+        rpcProgram.unregister(PortmapMapping.TRANSPORT_UDP,
+            udpServer.getBoundPort());
+      }
+      udpServer.shutdown();
+      terminate(1, e);
+    }
     udpBoundPort = udpServer.getBoundPort();
   }
 
@@ -69,7 +79,17 @@ abstract public class MountdBase {
     SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
         rpcProgram, 1);
     rpcProgram.startDaemons();
-    tcpServer.run();
+    try {
+      tcpServer.run();
+    } catch (Throwable e) {
+      LOG.fatal("Failed to start the TCP server.", e);
+      if (tcpServer.getBoundPort() > 0) {
+        rpcProgram.unregister(PortmapMapping.TRANSPORT_TCP,
+            tcpServer.getBoundPort());
+      }
+      tcpServer.shutdown();
+      terminate(1, e);
+    }
     tcpBoundPort = tcpServer.getBoundPort();
   }
 
@@ -83,7 +103,7 @@ abstract public class MountdBase {
         rpcProgram.register(PortmapMapping.TRANSPORT_UDP, udpBoundPort);
         rpcProgram.register(PortmapMapping.TRANSPORT_TCP, tcpBoundPort);
       } catch (Throwable e) {
-        LOG.fatal("Failed to start the server. Cause:", e);
+        LOG.fatal("Failed to register the MOUNT service.", e);
         terminate(1, e);
       }
     }

+ 12 - 3
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java

@@ -29,7 +29,6 @@ import static org.apache.hadoop.util.ExitUtil.terminate;
 
 /**
  * Nfs server. Supports NFS v3 using {@link RpcProgram}.
- * Currently Mountd program is also started inside this class.
  * Only TCP server is supported and UDP is not supported.
  */
 public abstract class Nfs3Base {
@@ -55,7 +54,7 @@ public abstract class Nfs3Base {
       try {
         rpcProgram.register(PortmapMapping.TRANSPORT_TCP, nfsBoundPort);
       } catch (Throwable e) {
-        LOG.fatal("Failed to start the server. Cause:", e);
+        LOG.fatal("Failed to register the NFSv3 service.", e);
         terminate(1, e);
       }
     }
@@ -65,7 +64,17 @@ public abstract class Nfs3Base {
     SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
         rpcProgram, 0);
     rpcProgram.startDaemons();
-    tcpServer.run();
+    try {
+      tcpServer.run();
+    } catch (Throwable e) {
+      LOG.fatal("Failed to start the TCP server.", e);
+      if (tcpServer.getBoundPort() > 0) {
+        rpcProgram.unregister(PortmapMapping.TRANSPORT_TCP,
+            tcpServer.getBoundPort());
+      }
+      tcpServer.shutdown();
+      terminate(1, e);
+    }
     nfsBoundPort = tcpServer.getBoundPort();
   }
 

+ 21 - 10
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleTcpServer.java

@@ -39,7 +39,9 @@ public class SimpleTcpServer {
   protected final int port;
   protected int boundPort = -1; // Will be set after server starts
   protected final SimpleChannelUpstreamHandler rpcProgram;
-  
+  private ServerBootstrap server;
+  private Channel ch;
+
   /** The maximum number of I/O worker threads */
   protected final int workerCount;
 
@@ -53,7 +55,7 @@ public class SimpleTcpServer {
     this.rpcProgram = program;
     this.workerCount = workercount;
   }
-  
+
   public void run() {
     // Configure the Server.
     ChannelFactory factory;
@@ -66,9 +68,9 @@ public class SimpleTcpServer {
           Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
           workerCount);
     }
-    
-    ServerBootstrap bootstrap = new ServerBootstrap(factory);
-    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+
+    server = new ServerBootstrap(factory);
+    server.setPipelineFactory(new ChannelPipelineFactory() {
 
       @Override
       public ChannelPipeline getPipeline() throws Exception {
@@ -77,14 +79,14 @@ public class SimpleTcpServer {
             RpcUtil.STAGE_RPC_TCP_RESPONSE);
       }
     });
-    bootstrap.setOption("child.tcpNoDelay", true);
-    bootstrap.setOption("child.keepAlive", true);
-    
+    server.setOption("child.tcpNoDelay", true);
+    server.setOption("child.keepAlive", true);
+
     // Listen to TCP port
-    Channel ch = bootstrap.bind(new InetSocketAddress(port));
+    ch = server.bind(new InetSocketAddress(port));
     InetSocketAddress socketAddr = (InetSocketAddress) ch.getLocalAddress();
     boundPort = socketAddr.getPort();
-    
+
     LOG.info("Started listening to TCP requests at port " + boundPort + " for "
         + rpcProgram + " with workerCount " + workerCount);
   }
@@ -93,4 +95,13 @@ public class SimpleTcpServer {
   public int getBoundPort() {
     return this.boundPort;
   }
+
+  public void shutdown() {
+    if (ch != null) {
+      ch.close().awaitUninterruptibly();
+    }
+    if (server != null) {
+      server.releaseExternalResources();
+    }
+  }
 }

+ 22 - 11
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/SimpleUdpServer.java

@@ -41,8 +41,11 @@ public class SimpleUdpServer {
   protected final SimpleChannelUpstreamHandler rpcProgram;
   protected final int workerCount;
   protected int boundPort = -1; // Will be set after server starts
+  private ConnectionlessBootstrap server;
+  private Channel ch;
 
-  public SimpleUdpServer(int port, SimpleChannelUpstreamHandler program, int workerCount) {
+  public SimpleUdpServer(int port, SimpleChannelUpstreamHandler program,
+      int workerCount) {
     this.port = port;
     this.rpcProgram = program;
     this.workerCount = workerCount;
@@ -53,20 +56,19 @@ public class SimpleUdpServer {
     DatagramChannelFactory f = new NioDatagramChannelFactory(
         Executors.newCachedThreadPool(), workerCount);
 
-    ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
-    b.setPipeline(Channels.pipeline(
-            RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram,
-            RpcUtil.STAGE_RPC_UDP_RESPONSE));
+    server = new ConnectionlessBootstrap(f);
+    server.setPipeline(Channels.pipeline(RpcUtil.STAGE_RPC_MESSAGE_PARSER,
+        rpcProgram, RpcUtil.STAGE_RPC_UDP_RESPONSE));
+
+    server.setOption("broadcast", "false");
+    server.setOption("sendBufferSize", SEND_BUFFER_SIZE);
+    server.setOption("receiveBufferSize", RECEIVE_BUFFER_SIZE);
 
-    b.setOption("broadcast", "false");
-    b.setOption("sendBufferSize", SEND_BUFFER_SIZE);
-    b.setOption("receiveBufferSize", RECEIVE_BUFFER_SIZE);
-    
     // Listen to the UDP port
-    Channel ch = b.bind(new InetSocketAddress(port));
+    ch = server.bind(new InetSocketAddress(port));
     InetSocketAddress socketAddr = (InetSocketAddress) ch.getLocalAddress();
     boundPort = socketAddr.getPort();
-    
+
     LOG.info("Started listening to UDP requests at port " + boundPort + " for "
         + rpcProgram + " with workerCount " + workerCount);
   }
@@ -75,4 +77,13 @@ public class SimpleUdpServer {
   public int getBoundPort() {
     return this.boundPort;
   }
+
+  public void shutdown() {
+    if (ch != null) {
+      ch.close().awaitUninterruptibly();
+    }
+    if (server != null) {
+      server.releaseExternalResources();
+    }
+  }
 }