Browse Source

HDFS-5649. Merging change 1556405 from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1556409 13f79535-47bb-0310-9956-ffa450edef68
Brandon Li 11 years ago
parent
commit
531605ef28

+ 17 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.oncrpc.RpcProgram;
 import org.apache.hadoop.oncrpc.SimpleTcpServer;
 import org.apache.hadoop.oncrpc.SimpleUdpServer;
 import org.apache.hadoop.portmap.PortmapMapping;
+import org.apache.hadoop.util.ShutdownHookManager;
 
 /**
  * Main class for starting mountd daemon. This daemon implements the NFS
@@ -71,8 +72,24 @@ abstract public class MountdBase {
     startUDPServer();
     startTCPServer();
     if (register) {
+      ShutdownHookManager.get().addShutdownHook(new Unregister(),
+          SHUTDOWN_HOOK_PRIORITY);
       rpcProgram.register(PortmapMapping.TRANSPORT_UDP, udpBoundPort);
       rpcProgram.register(PortmapMapping.TRANSPORT_TCP, tcpBoundPort);
     }
   }
+  
+  /**
+   * Priority of the mountd shutdown hook.
+   */
+  public static final int SHUTDOWN_HOOK_PRIORITY = 10;
+
+  private class Unregister implements Runnable {
+    @Override
+    public synchronized void run() {
+      rpcProgram.unregister(PortmapMapping.TRANSPORT_UDP, udpBoundPort);
+      rpcProgram.unregister(PortmapMapping.TRANSPORT_TCP, tcpBoundPort);
+    }
+  }
+  
 }

+ 15 - 0
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.oncrpc.RpcProgram;
 import org.apache.hadoop.oncrpc.SimpleTcpServer;
 import org.apache.hadoop.portmap.PortmapMapping;
+import org.apache.hadoop.util.ShutdownHookManager;
 
 /**
  * Nfs server. Supports NFS v3 using {@link RpcProgram}.
@@ -50,6 +51,8 @@ public abstract class Nfs3Base {
     startTCPServer(); // Start TCP server
     
     if (register) {
+      ShutdownHookManager.get().addShutdownHook(new Unregister(),
+          SHUTDOWN_HOOK_PRIORITY);
       rpcProgram.register(PortmapMapping.TRANSPORT_TCP, nfsBoundPort);
     }
   }
@@ -61,4 +64,16 @@ public abstract class Nfs3Base {
     tcpServer.run();
     nfsBoundPort = tcpServer.getBoundPort();
   }
+  
+  /**
+   * Priority of the nfsd shutdown hook.
+   */
+  public static final int SHUTDOWN_HOOK_PRIORITY = 10;
+
+  private class Unregister implements Runnable {
+    @Override
+    public synchronized void run() {
+      rpcProgram.unregister(PortmapMapping.TRANSPORT_TCP, nfsBoundPort);
+    }
+  }
 }

+ 23 - 5
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java

@@ -78,23 +78,41 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
     for (int vers = lowProgVersion; vers <= highProgVersion; vers++) {
       PortmapMapping mapEntry = new PortmapMapping(progNumber, vers, transport,
           port);
-      register(mapEntry);
+      register(mapEntry, true);
+    }
+  }
+  
+  /**
+   * Unregister this program with the local portmapper.
+   */
+  public void unregister(int transport, int boundPort) {
+    if (boundPort != port) {
+      LOG.info("The bound port is " + boundPort
+          + ", different with configured port " + port);
+      port = boundPort;
+    }
+    // Unregister all the program versions with portmapper for a given transport
+    for (int vers = lowProgVersion; vers <= highProgVersion; vers++) {
+      PortmapMapping mapEntry = new PortmapMapping(progNumber, vers, transport,
+          port);
+      register(mapEntry, false);
     }
   }
   
   /**
    * Register the program with Portmap or Rpcbind
    */
-  protected void register(PortmapMapping mapEntry) {
-    XDR mappingRequest = PortmapRequest.create(mapEntry);
+  protected void register(PortmapMapping mapEntry, boolean set) {
+    XDR mappingRequest = PortmapRequest.create(mapEntry, set);
     SimpleUdpClient registrationClient = new SimpleUdpClient(host, RPCB_PORT,
         mappingRequest);
     try {
       registrationClient.run();
     } catch (IOException e) {
-      LOG.error("Registration failure with " + host + ":" + port
+      String request = set ? "Registration" : "Unregistration";
+      LOG.error(request + " failure with " + host + ":" + port
           + ", portmap entry: " + mapEntry);
-      throw new RuntimeException("Registration failure");
+      throw new RuntimeException(request + " failure");
     }
   }
 

+ 5 - 4
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java

@@ -31,13 +31,14 @@ public class PortmapRequest {
     return PortmapMapping.deserialize(xdr);
   }
 
-  public static XDR create(PortmapMapping mapping) {
+  public static XDR create(PortmapMapping mapping, boolean set) {
     XDR request = new XDR();
+    int procedure = set ? RpcProgramPortmap.PMAPPROC_SET
+        : RpcProgramPortmap.PMAPPROC_UNSET;
     RpcCall call = RpcCall.getInstance(
         RpcUtil.getNewXid(String.valueOf(RpcProgramPortmap.PROGRAM)),
-        RpcProgramPortmap.PROGRAM, RpcProgramPortmap.VERSION,
-        RpcProgramPortmap.PMAPPROC_SET, new CredentialsNone(),
-        new VerifierNone());
+        RpcProgramPortmap.PROGRAM, RpcProgramPortmap.VERSION, procedure,
+        new CredentialsNone(), new VerifierNone());
     call.write(request);
     return mapping.serialize(request);
   }

+ 51 - 1
hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java

@@ -19,6 +19,10 @@ package org.apache.hadoop.hdfs.nfs.nfs3;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -29,7 +33,9 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSInputStream;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ShutdownHookManager;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
@@ -94,7 +100,7 @@ class DFSClientCache {
   DFSClientCache(Configuration config) {
     this(config, DEFAULT_DFS_CLIENT_CACHE_SIZE);
   }
-
+  
   DFSClientCache(Configuration config, int clientCache) {
     this.config = config;
     this.clientCache = CacheBuilder.newBuilder()
@@ -107,8 +113,52 @@ class DFSClientCache {
         .expireAfterAccess(DEFAULT_DFS_INPUTSTREAM_CACHE_TTL, TimeUnit.SECONDS)
         .removalListener(inputStreamRemovalListener())
         .build(inputStreamLoader());
+    
+    ShutdownHookManager.get().addShutdownHook(new CacheFinalizer(),
+        SHUTDOWN_HOOK_PRIORITY);
+  }
+
+  /**
+   * Priority of the FileSystem shutdown hook.
+   */
+  public static final int SHUTDOWN_HOOK_PRIORITY = 10;
+  
+  private class CacheFinalizer implements Runnable {
+    @Override
+    public synchronized void run() {
+      try {
+        closeAll(true);
+      } catch (IOException e) {
+        LOG.info("DFSClientCache.closeAll() threw an exception:\n", e);
+      }
+    }
   }
+  
+  /**
+   * Close all DFSClient instances in the Cache.
+   * @param onlyAutomatic only close those that are marked for automatic closing
+   */
+  synchronized void closeAll(boolean onlyAutomatic) throws IOException {
+    List<IOException> exceptions = new ArrayList<IOException>();
 
+    ConcurrentMap<String, DFSClient> map = clientCache.asMap();
+
+    for (Entry<String, DFSClient> item : map.entrySet()) {
+      final DFSClient client = item.getValue();
+      if (client != null) {
+        try {
+          client.close();
+        } catch (IOException ioe) {
+          exceptions.add(ioe);
+        }
+      }
+    }
+
+    if (!exceptions.isEmpty()) {
+      throw MultipleIOException.createIOException(exceptions);
+    }
+  }
+  
   private CacheLoader<String, DFSClient> clientLoader() {
     return new CacheLoader<String, DFSClient>() {
       @Override

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -582,6 +582,9 @@ Release 2.3.0 - UNRELEASED
 
     HDFS-5671. Fix socket leak in DFSInputStream#getBlockReader. (JamesLi via umamahesh) 
 
+    HDFS-5649. Unregister NFS and Mount service when NFS gateway is shutting down.
+    (brandonli)
+
 Release 2.2.0 - 2013-10-13
 
   INCOMPATIBLE CHANGES