1
0
Переглянути джерело

HDFS-6439. NFS should not reject NFS requests to the NULL procedure whether port monitoring is enabled or not. Contributed by Brandon Li

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1603622 13f79535-47bb-0310-9956-ffa450edef68
Brandon Li 11 роки тому
батько
коміт
2ecab65e3e

+ 31 - 39
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Interface.java

@@ -17,12 +17,9 @@
  */
  */
 package org.apache.hadoop.nfs.nfs3;
 package org.apache.hadoop.nfs.nfs3;
 
 
-import java.net.InetAddress;
-
 import org.apache.hadoop.nfs.nfs3.response.NFS3Response;
 import org.apache.hadoop.nfs.nfs3.response.NFS3Response;
+import org.apache.hadoop.oncrpc.RpcInfo;
 import org.apache.hadoop.oncrpc.XDR;
 import org.apache.hadoop.oncrpc.XDR;
-import org.apache.hadoop.oncrpc.security.SecurityHandler;
-import org.jboss.netty.channel.Channel;
 
 
 /**
 /**
  * RPC procedures as defined in RFC 1813.
  * RPC procedures as defined in RFC 1813.
@@ -33,70 +30,65 @@ public interface Nfs3Interface {
   public NFS3Response nullProcedure();
   public NFS3Response nullProcedure();
 
 
   /** GETATTR: Get file attributes */
   /** GETATTR: Get file attributes */
-  public NFS3Response getattr(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client);
+  public NFS3Response getattr(XDR xdr, RpcInfo info);
 
 
   /** SETATTR: Set file attributes */
   /** SETATTR: Set file attributes */
-  public NFS3Response setattr(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client);
+  public NFS3Response setattr(XDR xdr, RpcInfo info);
 
 
   /** LOOKUP: Lookup filename */
   /** LOOKUP: Lookup filename */
-  public NFS3Response lookup(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client);
+  public NFS3Response lookup(XDR xdr, RpcInfo info);
 
 
   /** ACCESS: Check access permission */
   /** ACCESS: Check access permission */
-  public NFS3Response access(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client);
+  public NFS3Response access(XDR xdr, RpcInfo info);
+
+    /** READLINK: Read from symbolic link */
+  public NFS3Response readlink(XDR xdr, RpcInfo info);
 
 
   /** READ: Read from file */
   /** READ: Read from file */
-  public NFS3Response read(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client);
+  public NFS3Response read(XDR xdr, RpcInfo info);
 
 
   /** WRITE: Write to file */
   /** WRITE: Write to file */
-  public NFS3Response write(XDR xdr, Channel channel, int xid,
-      SecurityHandler securityHandler, InetAddress client);
+  public NFS3Response write(XDR xdr, RpcInfo info);
 
 
   /** CREATE: Create a file */
   /** CREATE: Create a file */
-  public NFS3Response create(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client);
+  public NFS3Response create(XDR xdr, RpcInfo info);
 
 
   /** MKDIR: Create a directory */
   /** MKDIR: Create a directory */
-  public NFS3Response mkdir(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client);
+  public NFS3Response mkdir(XDR xdr, RpcInfo info);
+
+  /** SYMLINK: Create a symbolic link */
+  public NFS3Response symlink(XDR xdr, RpcInfo info);
+
+  /** MKNOD: Create a special device */
+  public NFS3Response mknod(XDR xdr, RpcInfo info);
 
 
   /** REMOVE: Remove a file */
   /** REMOVE: Remove a file */
-  public NFS3Response remove(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client);
+  public NFS3Response remove(XDR xdr, RpcInfo info);
 
 
   /** RMDIR: Remove a directory */
   /** RMDIR: Remove a directory */
-  public NFS3Response rmdir(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client);
+  public NFS3Response rmdir(XDR xdr, RpcInfo info);
 
 
   /** RENAME: Rename a file or directory */
   /** RENAME: Rename a file or directory */
-  public NFS3Response rename(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client);
+  public NFS3Response rename(XDR xdr, RpcInfo info);
 
 
-  /** SYMLINK: Create a symbolic link */
-  public NFS3Response symlink(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client);
+  /** LINK: create link to an object */
+  public NFS3Response link(XDR xdr, RpcInfo info);
 
 
   /** READDIR: Read From directory */
   /** READDIR: Read From directory */
-  public NFS3Response readdir(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client);
+  public NFS3Response readdir(XDR xdr, RpcInfo info);
 
 
+  /** READDIRPLUS: Extended read from directory */
+  public NFS3Response readdirplus(XDR xdr, RpcInfo info);
+  
   /** FSSTAT: Get dynamic file system information */
   /** FSSTAT: Get dynamic file system information */
-  public NFS3Response fsstat(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client);
+  public NFS3Response fsstat(XDR xdr, RpcInfo info);
 
 
   /** FSINFO: Get static file system information */
   /** FSINFO: Get static file system information */
-  public NFS3Response fsinfo(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client);
+  public NFS3Response fsinfo(XDR xdr, RpcInfo info);
 
 
   /** PATHCONF: Retrieve POSIX information */
   /** PATHCONF: Retrieve POSIX information */
-  public NFS3Response pathconf(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client);
+  public NFS3Response pathconf(XDR xdr, RpcInfo info);
 
 
   /** COMMIT: Commit cached data on a server to stable storage */
   /** COMMIT: Commit cached data on a server to stable storage */
-  public NFS3Response commit(XDR xdr, Channel channel, int xid,
-      SecurityHandler securityHandler, InetAddress client);
+  public NFS3Response commit(XDR xdr, RpcInfo info);
 }
 }

+ 25 - 27
hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java

@@ -48,7 +48,7 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
   private final int progNumber;
   private final int progNumber;
   private final int lowProgVersion;
   private final int lowProgVersion;
   private final int highProgVersion;
   private final int highProgVersion;
-  private final boolean allowInsecurePorts;
+  protected final boolean allowInsecurePorts;
   
   
   /**
   /**
    * If not null, this will be used as the socket to use to connect to the
    * If not null, this will be used as the socket to use to connect to the
@@ -146,31 +146,6 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
     RpcCall call = (RpcCall) info.header();
     RpcCall call = (RpcCall) info.header();
     
     
     SocketAddress remoteAddress = info.remoteAddress();
     SocketAddress remoteAddress = info.remoteAddress();
-    if (!allowInsecurePorts) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Will not allow connections from unprivileged ports. " +
-            "Checking for valid client port...");
-      }
-      if (remoteAddress instanceof InetSocketAddress) {
-        InetSocketAddress inetRemoteAddress = (InetSocketAddress) remoteAddress;
-        if (inetRemoteAddress.getPort() > 1023) {
-          LOG.warn("Connection attempted from '" + inetRemoteAddress + "' "
-              + "which is an unprivileged port. Rejecting connection.");
-          sendRejectedReply(call, remoteAddress, ctx);
-          return;
-        } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Accepting connection from '" + remoteAddress + "'");
-          }
-        }
-      } else {
-        LOG.warn("Could not determine remote port of socket address '" +
-            remoteAddress + "'. Rejecting connection.");
-        sendRejectedReply(call, remoteAddress, ctx);
-        return;
-      }
-    }
-    
     if (LOG.isTraceEnabled()) {
     if (LOG.isTraceEnabled()) {
       LOG.trace(program + " procedure #" + call.getProcedure());
       LOG.trace(program + " procedure #" + call.getProcedure());
     }
     }
@@ -191,6 +166,29 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
     handleInternal(ctx, info);
     handleInternal(ctx, info);
   }
   }
   
   
+  public boolean doPortMonitoring(SocketAddress remoteAddress) {
+    if (!allowInsecurePorts) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Will not allow connections from unprivileged ports. "
+            + "Checking for valid client port...");
+      }
+
+      if (remoteAddress instanceof InetSocketAddress) {
+        InetSocketAddress inetRemoteAddress = (InetSocketAddress) remoteAddress;
+        if (inetRemoteAddress.getPort() > 1023) {
+          LOG.warn("Connection attempted from '" + inetRemoteAddress + "' "
+              + "which is an unprivileged port. Rejecting connection.");
+          return false;
+        }
+      } else {
+        LOG.warn("Could not determine remote port of socket address '"
+            + remoteAddress + "'. Rejecting connection.");
+        return false;
+      }
+    }
+    return true;
+  }
+  
   private void sendAcceptedReply(RpcCall call, SocketAddress remoteAddress,
   private void sendAcceptedReply(RpcCall call, SocketAddress remoteAddress,
       AcceptState acceptState, ChannelHandlerContext ctx) {
       AcceptState acceptState, ChannelHandlerContext ctx) {
     RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
     RpcAcceptedReply reply = RpcAcceptedReply.getInstance(call.getXid(),
@@ -208,7 +206,7 @@ public abstract class RpcProgram extends SimpleChannelUpstreamHandler {
     RpcUtil.sendRpcResponse(ctx, rsp);
     RpcUtil.sendRpcResponse(ctx, rsp);
   }
   }
   
   
-  private static void sendRejectedReply(RpcCall call,
+  protected static void sendRejectedReply(RpcCall call,
       SocketAddress remoteAddress, ChannelHandlerContext ctx) {
       SocketAddress remoteAddress, ChannelHandlerContext ctx) {
     XDR out = new XDR();
     XDR out = new XDR();
     RpcDeniedReply reply = new RpcDeniedReply(call.getXid(),
     RpcDeniedReply reply = new RpcDeniedReply(call.getXid(),

+ 26 - 0
hadoop-common-project/hadoop-nfs/src/test/java/org/apache/hadoop/oncrpc/TestFrameDecoder.java

@@ -66,6 +66,18 @@ public class TestFrameDecoder {
 
 
     @Override
     @Override
     protected void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
     protected void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
+      // This is just like what's done in RpcProgramMountd#handleInternal and
+      // RpcProgramNfs3#handleInternal.
+      RpcCall rpcCall = (RpcCall) info.header();
+      final int procedure = rpcCall.getProcedure();
+      if (procedure != 0) {
+        boolean portMonitorSuccess = doPortMonitoring(info.remoteAddress());
+        if (!portMonitorSuccess) {
+          sendRejectedReply(rpcCall, info.remoteAddress(), ctx);
+          return;
+        }
+      }
+      
       resultSize = info.data().readableBytes();
       resultSize = info.data().readableBytes();
       RpcAcceptedReply reply = RpcAcceptedReply.getAcceptInstance(1234,
       RpcAcceptedReply reply = RpcAcceptedReply.getAcceptInstance(1234,
           new VerifierNone());
           new VerifierNone());
@@ -190,6 +202,20 @@ public class TestFrameDecoder {
 
 
     // Verify the server rejected the request.
     // Verify the server rejected the request.
     assertEquals(0, resultSize);
     assertEquals(0, resultSize);
+    
+    // Ensure that the NULL procedure does in fact succeed.
+    xdrOut = new XDR();
+    createPortmapXDRheader(xdrOut, 0);
+    int headerSize = xdrOut.size();
+    buffer = new byte[bufsize];
+    xdrOut.writeFixedOpaque(buffer);
+    int requestSize = xdrOut.size() - headerSize;
+    
+    // Send the request to the server
+    testRequest(xdrOut, serverPort);
+
+    // Verify the server did not reject the request.
+    assertEquals(requestSize, resultSize);
   }
   }
   
   
   private static int startRpcServer(boolean allowInsecurePorts) {
   private static int startRpcServer(boolean allowInsecurePorts) {

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfigKeys.java

@@ -51,7 +51,8 @@ public class NfsConfigKeys {
   public static final String DFS_NFS_KEYTAB_FILE_KEY = "nfs.keytab.file";
   public static final String DFS_NFS_KEYTAB_FILE_KEY = "nfs.keytab.file";
   public static final String DFS_NFS_KERBEROS_PRINCIPAL_KEY = "nfs.kerberos.principal";
   public static final String DFS_NFS_KERBEROS_PRINCIPAL_KEY = "nfs.kerberos.principal";
   public static final String DFS_NFS_REGISTRATION_PORT_KEY = "nfs.registration.port";
   public static final String DFS_NFS_REGISTRATION_PORT_KEY = "nfs.registration.port";
-  public static final int    DFS_NFS_REGISTRATION_PORT_DEFAULT = 40; // Currently unassigned.
-  public static final String  DFS_NFS_ALLOW_INSECURE_PORTS_KEY = "nfs.allow.insecure.ports";
-  public static final boolean DFS_NFS_ALLOW_INSECURE_PORTS_DEFAULT = true;
+  public static final int DFS_NFS_REGISTRATION_PORT_DEFAULT = 40; // Currently unassigned.
+  public static final String DFS_NFS_PORT_MONITORING_DISABLED_KEY = "nfs.port.monitoring.disabled";
+  public static final boolean DFS_NFS_PORT_MONITORING_DISABLED_DEFAULT = true;
+
 }
 }

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfiguration.java

@@ -49,6 +49,8 @@ public class NfsConfiguration extends HdfsConfiguration {
         new DeprecationDelta("dfs.nfs3.stream.timeout",
         new DeprecationDelta("dfs.nfs3.stream.timeout",
             NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_KEY),
             NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_KEY),
         new DeprecationDelta("dfs.nfs3.export.point",
         new DeprecationDelta("dfs.nfs3.export.point",
-            NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY) });
+            NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY),
+        new DeprecationDelta("nfs.allow.insecure.ports",
+            NfsConfigKeys.DFS_NFS_PORT_MONITORING_DISABLED_KEY) });
   }
   }
 }
 }

+ 7 - 1
hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java

@@ -194,7 +194,13 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
     if (mntproc == MNTPROC.NULL) {
     if (mntproc == MNTPROC.NULL) {
       out = nullOp(out, xid, client);
       out = nullOp(out, xid, client);
     } else if (mntproc == MNTPROC.MNT) {
     } else if (mntproc == MNTPROC.MNT) {
-      out = mnt(xdr, out, xid, client);
+      // Only do port monitoring for MNT
+      if (!doPortMonitoring(info.remoteAddress())) {
+        out = MountResponse.writeMNTResponse(Nfs3Status.NFS3ERR_ACCES, out,
+            xid, null);
+      } else {
+        out = mnt(xdr, out, xid, client);
+      }
     } else if (mntproc == MNTPROC.DUMP) {
     } else if (mntproc == MNTPROC.DUMP) {
       out = dump(out, xid, client);
       out = dump(out, xid, client);
     } else if (mntproc == MNTPROC.UMNT) {      
     } else if (mntproc == MNTPROC.UMNT) {      

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java

@@ -61,8 +61,8 @@ public class Nfs3 extends Nfs3Base {
     StringUtils.startupShutdownMessage(Nfs3.class, args, LOG);
     StringUtils.startupShutdownMessage(Nfs3.class, args, LOG);
     NfsConfiguration conf = new NfsConfiguration();
     NfsConfiguration conf = new NfsConfiguration();
     boolean allowInsecurePorts = conf.getBoolean(
     boolean allowInsecurePorts = conf.getBoolean(
-        NfsConfigKeys.DFS_NFS_ALLOW_INSECURE_PORTS_KEY,
-        NfsConfigKeys.DFS_NFS_ALLOW_INSECURE_PORTS_DEFAULT);
+        NfsConfigKeys.DFS_NFS_PORT_MONITORING_DISABLED_KEY,
+        NfsConfigKeys.DFS_NFS_PORT_MONITORING_DISABLED_DEFAULT);
     final Nfs3 nfsServer = new Nfs3(conf, registrationSocket,
     final Nfs3 nfsServer = new Nfs3(conf, registrationSocket,
         allowInsecurePorts);
         allowInsecurePorts);
     nfsServer.startServiceInternal(true);
     nfsServer.startServiceInternal(true);

+ 145 - 89
hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java

@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.net.DatagramSocket;
 import java.net.DatagramSocket;
 import java.net.InetAddress;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.ByteBuffer;
 import java.util.EnumSet;
 import java.util.EnumSet;
 
 
@@ -230,15 +231,15 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   }
   }
 
 
   @Override
   @Override
-  public GETATTR3Response getattr(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public GETATTR3Response getattr(XDR xdr, RpcInfo info) {
     GETATTR3Response response = new GETATTR3Response(Nfs3Status.NFS3_OK);
     GETATTR3Response response = new GETATTR3Response(Nfs3Status.NFS3_OK);
     
     
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
       return response;
     }
     }
     
     
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -322,9 +323,9 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   }
   }
 
 
   @Override
   @Override
-  public SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public SETATTR3Response setattr(XDR xdr, RpcInfo info) {
     SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK);
     SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK);
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -370,7 +371,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
       }
       }
       
       
       // check the write access privilege
       // check the write access privilege
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
         return new SETATTR3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
         return new SETATTR3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
             preOpWcc, preOpAttr));
             preOpWcc, preOpAttr));
       }
       }
@@ -398,15 +399,15 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   }
   }
 
 
   @Override
   @Override
-  public LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public LOOKUP3Response lookup(XDR xdr, RpcInfo info) {
     LOOKUP3Response response = new LOOKUP3Response(Nfs3Status.NFS3_OK);
     LOOKUP3Response response = new LOOKUP3Response(Nfs3Status.NFS3_OK);
     
     
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
       return response;
     }
     }
     
     
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -460,15 +461,15 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   }
   }
   
   
   @Override
   @Override
-  public ACCESS3Response access(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public ACCESS3Response access(XDR xdr, RpcInfo info) {
     ACCESS3Response response = new ACCESS3Response(Nfs3Status.NFS3_OK);
     ACCESS3Response response = new ACCESS3Response(Nfs3Status.NFS3_OK);
     
     
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
       return response;
     }
     }
     
     
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -519,15 +520,16 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
     }
     }
   }
   }
 
 
-  public READLINK3Response readlink(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  @Override
+  public READLINK3Response readlink(XDR xdr, RpcInfo info) {
     READLINK3Response response = new READLINK3Response(Nfs3Status.NFS3_OK);
     READLINK3Response response = new READLINK3Response(Nfs3Status.NFS3_OK);
 
 
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
       return response;
     }
     }
 
 
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -591,12 +593,19 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   }
   }
 
 
   @Override
   @Override
-  public READ3Response read(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public READ3Response read(XDR xdr, RpcInfo info) {
+    SecurityHandler securityHandler = getSecurityHandler(info);
+    SocketAddress remoteAddress = info.remoteAddress();
+    return read(xdr, securityHandler, remoteAddress);
+  }
+  
+  @VisibleForTesting
+  READ3Response read(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     READ3Response response = new READ3Response(Nfs3Status.NFS3_OK);
     READ3Response response = new READ3Response(Nfs3Status.NFS3_OK);
     final String userName = securityHandler.getUser();
     final String userName = securityHandler.getUser();
     
     
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
       return response;
     }
     }
@@ -715,8 +724,17 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   }
   }
 
 
   @Override
   @Override
-  public WRITE3Response write(XDR xdr, Channel channel, int xid,
-      SecurityHandler securityHandler, InetAddress client) {
+  public WRITE3Response write(XDR xdr, RpcInfo info) {
+    SecurityHandler securityHandler = getSecurityHandler(info);
+    RpcCall rpcCall = (RpcCall) info.header();
+    int xid = rpcCall.getXid();
+    SocketAddress remoteAddress = info.remoteAddress();
+    return write(xdr, info.channel(), xid, securityHandler, remoteAddress);
+  }
+  
+  @VisibleForTesting
+  WRITE3Response write(XDR xdr, Channel channel, int xid,
+      SecurityHandler securityHandler, SocketAddress remoteAddress) {
     WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK);
     WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK);
 
 
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
@@ -758,7 +776,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
         return new WRITE3Response(Nfs3Status.NFS3ERR_STALE);
         return new WRITE3Response(Nfs3Status.NFS3ERR_STALE);
       }
       }
       
       
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
         return new WRITE3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
         return new WRITE3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
             Nfs3Utils.getWccAttr(preOpAttr), preOpAttr), 0, stableHow,
             Nfs3Utils.getWccAttr(preOpAttr), preOpAttr), 0, stableHow,
             Nfs3Constant.WRITE_COMMIT_VERF);
             Nfs3Constant.WRITE_COMMIT_VERF);
@@ -791,8 +809,15 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   }
   }
 
 
   @Override
   @Override
-  public CREATE3Response create(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public CREATE3Response create(XDR xdr, RpcInfo info) {
+    SecurityHandler securityHandler = getSecurityHandler(info);
+    SocketAddress remoteAddress = info.remoteAddress();
+    return create(xdr, securityHandler, remoteAddress);
+  }
+  
+  @VisibleForTesting
+  CREATE3Response create(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
     CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK);
     CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
     if (dfsClient == null) {
@@ -838,7 +863,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
         return new CREATE3Response(Nfs3Status.NFS3ERR_STALE);
         return new CREATE3Response(Nfs3Status.NFS3ERR_STALE);
       }
       }
       
       
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) {
         return new CREATE3Response(Nfs3Status.NFS3ERR_ACCES, null,
         return new CREATE3Response(Nfs3Status.NFS3ERR_ACCES, null,
             preOpDirAttr, new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
             preOpDirAttr, new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
                 preOpDirAttr));
                 preOpDirAttr));
@@ -922,9 +947,9 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   }
   }
 
 
   @Override
   @Override
-  public MKDIR3Response mkdir(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public MKDIR3Response mkdir(XDR xdr, RpcInfo info) {
     MKDIR3Response response = new MKDIR3Response(Nfs3Status.NFS3_OK);
     MKDIR3Response response = new MKDIR3Response(Nfs3Status.NFS3_OK);
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -960,7 +985,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
         return new MKDIR3Response(Nfs3Status.NFS3ERR_STALE);
         return new MKDIR3Response(Nfs3Status.NFS3ERR_STALE);
       }
       }
 
 
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
         return new MKDIR3Response(Nfs3Status.NFS3ERR_ACCES, null, preOpDirAttr,
         return new MKDIR3Response(Nfs3Status.NFS3ERR_ACCES, null, preOpDirAttr,
             new WccData(Nfs3Utils.getWccAttr(preOpDirAttr), preOpDirAttr));
             new WccData(Nfs3Utils.getWccAttr(preOpDirAttr), preOpDirAttr));
       }
       }
@@ -1012,15 +1037,15 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
     }
     }
   }
   }
 
 
-  public READDIR3Response mknod(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  @Override
+  public READDIR3Response mknod(XDR xdr, RpcInfo info) {
     return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
     return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
   }
   }
   
   
   @Override
   @Override
-  public REMOVE3Response remove(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
+  public REMOVE3Response remove(XDR xdr, RpcInfo info) {
     REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK);
     REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK);
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -1093,9 +1118,9 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   }
   }
 
 
   @Override
   @Override
-  public RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public RMDIR3Response rmdir(XDR xdr, RpcInfo info) {
     RMDIR3Response response = new RMDIR3Response(Nfs3Status.NFS3_OK);
     RMDIR3Response response = new RMDIR3Response(Nfs3Status.NFS3_OK);
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -1129,7 +1154,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
       
       
       WccData errWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
       WccData errWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
           preOpDirAttr);
           preOpDirAttr);
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
         return new RMDIR3Response(Nfs3Status.NFS3ERR_ACCES, errWcc); 
         return new RMDIR3Response(Nfs3Status.NFS3ERR_ACCES, errWcc); 
       }
       }
 
 
@@ -1175,9 +1200,9 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   }
   }
 
 
   @Override
   @Override
-  public RENAME3Response rename(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public RENAME3Response rename(XDR xdr, RpcInfo info) {
     RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK);
     RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK);
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -1221,7 +1246,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
         return new RENAME3Response(Nfs3Status.NFS3ERR_STALE);
         return new RENAME3Response(Nfs3Status.NFS3ERR_STALE);
       }
       }
       
       
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
         WccData fromWcc = new WccData(Nfs3Utils.getWccAttr(fromPreOpAttr),
         WccData fromWcc = new WccData(Nfs3Utils.getWccAttr(fromPreOpAttr),
             fromPreOpAttr);
             fromPreOpAttr);
         WccData toWcc = new WccData(Nfs3Utils.getWccAttr(toPreOpAttr),
         WccData toWcc = new WccData(Nfs3Utils.getWccAttr(toPreOpAttr),
@@ -1263,15 +1288,15 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   }
   }
 
 
   @Override
   @Override
-  public SYMLINK3Response symlink(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public SYMLINK3Response symlink(XDR xdr, RpcInfo info) {
     SYMLINK3Response response = new SYMLINK3Response(Nfs3Status.NFS3_OK);
     SYMLINK3Response response = new SYMLINK3Response(Nfs3Status.NFS3_OK);
 
 
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+    if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
       return response;
     }
     }
 
 
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -1322,8 +1347,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
     }
     }
   }
   }
 
 
-  public READDIR3Response link(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  @Override
+  public READDIR3Response link(XDR xdr, RpcInfo info) {
     return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
     return new READDIR3Response(Nfs3Status.NFS3ERR_NOTSUPP);
   }
   }
 
 
@@ -1351,11 +1376,16 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   }
   }
   
   
   @Override
   @Override
+  public READDIR3Response readdir(XDR xdr, RpcInfo info) {
+    SecurityHandler securityHandler = getSecurityHandler(info);
+    SocketAddress remoteAddress = info.remoteAddress();
+    return readdir(xdr, securityHandler, remoteAddress);
+  }
   public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler,
   public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+      SocketAddress remoteAddress) {
     READDIR3Response response = new READDIR3Response(Nfs3Status.NFS3_OK);
     READDIR3Response response = new READDIR3Response(Nfs3Status.NFS3_OK);
     
     
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
       return response;
     }
     }
@@ -1491,9 +1521,17 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
         dirStatus.getModificationTime(), dirList);
         dirStatus.getModificationTime(), dirList);
   }
   }
 
 
-  public READDIRPLUS3Response readdirplus(XDR xdr,
-      SecurityHandler securityHandler, InetAddress client) {
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+  @Override
+  public READDIRPLUS3Response readdirplus(XDR xdr, RpcInfo info) {
+    SecurityHandler securityHandler = getSecurityHandler(info);
+    SocketAddress remoteAddress = info.remoteAddress();
+    return readdirplus(xdr, securityHandler, remoteAddress);
+  }
+
+  @VisibleForTesting
+  READDIRPLUS3Response readdirplus(XDR xdr, SecurityHandler securityHandler,
+      SocketAddress remoteAddress) {
+    if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_ONLY)) {
       return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_ACCES);
       return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_ACCES);
     }
     }
     
     
@@ -1643,15 +1681,15 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   }
   }
   
   
   @Override
   @Override
-  public FSSTAT3Response fsstat(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public FSSTAT3Response fsstat(XDR xdr, RpcInfo info) {
     FSSTAT3Response response = new FSSTAT3Response(Nfs3Status.NFS3_OK);
     FSSTAT3Response response = new FSSTAT3Response(Nfs3Status.NFS3_OK);
     
     
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
       return response;
     }
     }
     
     
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -1711,15 +1749,15 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   }
   }
 
 
   @Override
   @Override
-  public FSINFO3Response fsinfo(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public FSINFO3Response fsinfo(XDR xdr, RpcInfo info) {
     FSINFO3Response response = new FSINFO3Response(Nfs3Status.NFS3_OK);
     FSINFO3Response response = new FSINFO3Response(Nfs3Status.NFS3_OK);
     
     
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
       return response;
     }
     }
     
     
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -1769,15 +1807,15 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   }
   }
 
 
   @Override
   @Override
-  public PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler,
-      InetAddress client) {
+  public PATHCONF3Response pathconf(XDR xdr, RpcInfo info) {
     PATHCONF3Response response = new PATHCONF3Response(Nfs3Status.NFS3_OK);
     PATHCONF3Response response = new PATHCONF3Response(Nfs3Status.NFS3_OK);
     
     
-    if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
+    if (!checkAccessPrivilege(info, AccessPrivilege.READ_ONLY)) {
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       response.setStatus(Nfs3Status.NFS3ERR_ACCES);
       return response;
       return response;
     }
     }
     
     
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -1816,9 +1854,11 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   }
   }
 
 
   @Override
   @Override
-  public COMMIT3Response commit(XDR xdr, Channel channel, int xid,
-      SecurityHandler securityHandler, InetAddress client) {
+  public COMMIT3Response commit(XDR xdr, RpcInfo info) {
+    //Channel channel, int xid,
+    //    SecurityHandler securityHandler, InetAddress client) {
     COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK);
     COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK);
+    SecurityHandler securityHandler = getSecurityHandler(info);
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
     if (dfsClient == null) {
     if (dfsClient == null) {
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
       response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
@@ -1849,7 +1889,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
         return new COMMIT3Response(Nfs3Status.NFS3ERR_STALE);
         return new COMMIT3Response(Nfs3Status.NFS3ERR_STALE);
       }
       }
       
       
-      if (!checkAccessPrivilege(client, AccessPrivilege.READ_WRITE)) {
+      if (!checkAccessPrivilege(info, AccessPrivilege.READ_WRITE)) {
         return new COMMIT3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
         return new COMMIT3Response(Nfs3Status.NFS3ERR_ACCES, new WccData(
             Nfs3Utils.getWccAttr(preOpAttr), preOpAttr),
             Nfs3Utils.getWccAttr(preOpAttr), preOpAttr),
             Nfs3Constant.WRITE_COMMIT_VERF);
             Nfs3Constant.WRITE_COMMIT_VERF);
@@ -1859,8 +1899,10 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
           : (request.getOffset() + request.getCount());
           : (request.getOffset() + request.getCount());
       
       
       // Insert commit as an async request
       // Insert commit as an async request
-      writeManager.handleCommit(dfsClient, handle, commitOffset, channel, xid,
-          preOpAttr);
+      RpcCall rpcCall = (RpcCall) info.header();
+      int xid = rpcCall.getXid();
+      writeManager.handleCommit(dfsClient, handle, commitOffset,
+          info.channel(), xid, preOpAttr);
       return null;
       return null;
     } catch (IOException e) {
     } catch (IOException e) {
       LOG.warn("Exception ", e);
       LOG.warn("Exception ", e);
@@ -1885,11 +1927,16 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
       return null;
       return null;
     }
     }
   }
   }
+
+  private SecurityHandler getSecurityHandler(RpcInfo info) {
+    RpcCall rpcCall = (RpcCall) info.header();
+    return getSecurityHandler(rpcCall.getCredential(), rpcCall.getVerifier());
+  }
   
   
   @Override
   @Override
   public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
   public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
     RpcCall rpcCall = (RpcCall) info.header();
     RpcCall rpcCall = (RpcCall) info.header();
-    final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(rpcCall.getProcedure());
+    final NFSPROC3 nfsproc3 = NFSPROC3.fromValue(rpcCall.getProcedure());    
     int xid = rpcCall.getXid();
     int xid = rpcCall.getXid();
     byte[] data = new byte[info.data().readableBytes()];
     byte[] data = new byte[info.data().readableBytes()];
     info.data().readBytes(data);
     info.data().readBytes(data);
@@ -1897,9 +1944,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
     XDR out = new XDR();
     XDR out = new XDR();
     InetAddress client = ((InetSocketAddress) info.remoteAddress())
     InetAddress client = ((InetSocketAddress) info.remoteAddress())
         .getAddress();
         .getAddress();
-    Channel channel = info.channel();
-
     Credentials credentials = rpcCall.getCredential();
     Credentials credentials = rpcCall.getCredential();
+    
     // Ignore auth only for NFSPROC3_NULL, especially for Linux clients.
     // Ignore auth only for NFSPROC3_NULL, especially for Linux clients.
     if (nfsproc3 != NFSPROC3.NULL) {
     if (nfsproc3 != NFSPROC3.NULL) {
       if (credentials.getFlavor() != AuthFlavor.AUTH_SYS
       if (credentials.getFlavor() != AuthFlavor.AUTH_SYS
@@ -1937,27 +1983,24 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
       }
       }
     }
     }
     
     
-    SecurityHandler securityHandler = getSecurityHandler(credentials,
-        rpcCall.getVerifier());
-    
     NFS3Response response = null;
     NFS3Response response = null;
     if (nfsproc3 == NFSPROC3.NULL) {
     if (nfsproc3 == NFSPROC3.NULL) {
       response = nullProcedure();
       response = nullProcedure();
     } else if (nfsproc3 == NFSPROC3.GETATTR) {
     } else if (nfsproc3 == NFSPROC3.GETATTR) {
-      response = getattr(xdr, securityHandler, client);
+      response = getattr(xdr, info);
     } else if (nfsproc3 == NFSPROC3.SETATTR) {
     } else if (nfsproc3 == NFSPROC3.SETATTR) {
-      response = setattr(xdr, securityHandler, client);
+      response = setattr(xdr, info);
     } else if (nfsproc3 == NFSPROC3.LOOKUP) {
     } else if (nfsproc3 == NFSPROC3.LOOKUP) {
-      response = lookup(xdr, securityHandler, client);
+      response = lookup(xdr, info);
     } else if (nfsproc3 == NFSPROC3.ACCESS) {
     } else if (nfsproc3 == NFSPROC3.ACCESS) {
-      response = access(xdr, securityHandler, client);
+      response = access(xdr, info);
     } else if (nfsproc3 == NFSPROC3.READLINK) {
     } else if (nfsproc3 == NFSPROC3.READLINK) {
-      response = readlink(xdr, securityHandler, client);
+      response = readlink(xdr, info);
     } else if (nfsproc3 == NFSPROC3.READ) {
     } else if (nfsproc3 == NFSPROC3.READ) {
       if (LOG.isDebugEnabled()) {
       if (LOG.isDebugEnabled()) {
           LOG.debug(Nfs3Utils.READ_RPC_START + xid);
           LOG.debug(Nfs3Utils.READ_RPC_START + xid);
       }    
       }    
-      response = read(xdr, securityHandler, client);
+      response = read(xdr, info);
       if (LOG.isDebugEnabled() && (nfsproc3 == NFSPROC3.READ)) {
       if (LOG.isDebugEnabled() && (nfsproc3 == NFSPROC3.READ)) {
         LOG.debug(Nfs3Utils.READ_RPC_END + xid);
         LOG.debug(Nfs3Utils.READ_RPC_END + xid);
       }
       }
@@ -1965,36 +2008,36 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
       if (LOG.isDebugEnabled()) {
       if (LOG.isDebugEnabled()) {
           LOG.debug(Nfs3Utils.WRITE_RPC_START + xid);
           LOG.debug(Nfs3Utils.WRITE_RPC_START + xid);
       }
       }
-      response = write(xdr, channel, xid, securityHandler, client);
+      response = write(xdr, info);
       // Write end debug trace is in Nfs3Utils.writeChannel
       // Write end debug trace is in Nfs3Utils.writeChannel
     } else if (nfsproc3 == NFSPROC3.CREATE) {
     } else if (nfsproc3 == NFSPROC3.CREATE) {
-      response = create(xdr, securityHandler, client);
+      response = create(xdr, info);
     } else if (nfsproc3 == NFSPROC3.MKDIR) {      
     } else if (nfsproc3 == NFSPROC3.MKDIR) {      
-      response = mkdir(xdr, securityHandler, client);
+      response = mkdir(xdr, info);
     } else if (nfsproc3 == NFSPROC3.SYMLINK) {
     } else if (nfsproc3 == NFSPROC3.SYMLINK) {
-      response = symlink(xdr, securityHandler, client);
+      response = symlink(xdr, info);
     } else if (nfsproc3 == NFSPROC3.MKNOD) {
     } else if (nfsproc3 == NFSPROC3.MKNOD) {
-      response = mknod(xdr, securityHandler, client);
+      response = mknod(xdr, info);
     } else if (nfsproc3 == NFSPROC3.REMOVE) {
     } else if (nfsproc3 == NFSPROC3.REMOVE) {
-      response = remove(xdr, securityHandler, client);
+      response = remove(xdr, info);
     } else if (nfsproc3 == NFSPROC3.RMDIR) {
     } else if (nfsproc3 == NFSPROC3.RMDIR) {
-      response = rmdir(xdr, securityHandler, client);
+      response = rmdir(xdr, info);
     } else if (nfsproc3 == NFSPROC3.RENAME) {
     } else if (nfsproc3 == NFSPROC3.RENAME) {
-      response = rename(xdr, securityHandler, client);
+      response = rename(xdr, info);
     } else if (nfsproc3 == NFSPROC3.LINK) {
     } else if (nfsproc3 == NFSPROC3.LINK) {
-      response = link(xdr, securityHandler, client);
+      response = link(xdr, info);
     } else if (nfsproc3 == NFSPROC3.READDIR) {
     } else if (nfsproc3 == NFSPROC3.READDIR) {
-      response = readdir(xdr, securityHandler, client);
+      response = readdir(xdr, info);
     } else if (nfsproc3 == NFSPROC3.READDIRPLUS) {
     } else if (nfsproc3 == NFSPROC3.READDIRPLUS) {
-      response = readdirplus(xdr, securityHandler, client);
+      response = readdirplus(xdr, info);
     } else if (nfsproc3 == NFSPROC3.FSSTAT) {
     } else if (nfsproc3 == NFSPROC3.FSSTAT) {
-      response = fsstat(xdr, securityHandler, client);
+      response = fsstat(xdr, info);
     } else if (nfsproc3 == NFSPROC3.FSINFO) {
     } else if (nfsproc3 == NFSPROC3.FSINFO) {
-      response = fsinfo(xdr, securityHandler, client);
+      response = fsinfo(xdr, info);
     } else if (nfsproc3 == NFSPROC3.PATHCONF) {
     } else if (nfsproc3 == NFSPROC3.PATHCONF) {
-      response = pathconf(xdr, securityHandler, client);
+      response = pathconf(xdr,info);
     } else if (nfsproc3 == NFSPROC3.COMMIT) {
     } else if (nfsproc3 == NFSPROC3.COMMIT) {
-      response = commit(xdr, channel, xid, securityHandler, client);
+      response = commit(xdr, info);
     } else {
     } else {
       // Invalid procedure
       // Invalid procedure
       RpcAcceptedReply.getInstance(xid,
       RpcAcceptedReply.getInstance(xid,
@@ -2027,8 +2070,21 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
     return nfsproc3 == null || nfsproc3.isIdempotent();
     return nfsproc3 == null || nfsproc3.isIdempotent();
   }
   }
   
   
-  private boolean checkAccessPrivilege(final InetAddress client,
+  private boolean checkAccessPrivilege(RpcInfo info,
+      final AccessPrivilege expected) {
+    SocketAddress remoteAddress = info.remoteAddress();
+    return checkAccessPrivilege(remoteAddress, expected);
+  }
+
+  private boolean checkAccessPrivilege(SocketAddress remoteAddress,
       final AccessPrivilege expected) {
       final AccessPrivilege expected) {
+    // Port monitoring
+    if (!doPortMonitoring(remoteAddress)) {
+      return false;
+    }
+    
+    // Check export table
+    InetAddress client = ((InetSocketAddress) remoteAddress).getAddress();
     AccessPrivilege access = exports.getAccessPrivilege(client);
     AccessPrivilege access = exports.getAccessPrivilege(client);
     if (access == AccessPrivilege.NONE) {
     if (access == AccessPrivilege.NONE) {
       return false;
       return false;

+ 16 - 8
hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestReaddir.java → hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestReaddir.java

@@ -16,12 +16,14 @@
  * limitations under the License.
  * limitations under the License.
  */
  */
 
 
-package org.apache.hadoop.hdfs.nfs;
+package org.apache.hadoop.hdfs.nfs.nfs3;
 
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertTrue;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.util.List;
 import java.util.List;
 
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
@@ -38,10 +40,15 @@ import org.apache.hadoop.nfs.nfs3.response.READDIR3Response;
 import org.apache.hadoop.nfs.nfs3.response.READDIR3Response.Entry3;
 import org.apache.hadoop.nfs.nfs3.response.READDIR3Response.Entry3;
 import org.apache.hadoop.nfs.nfs3.response.READDIRPLUS3Response;
 import org.apache.hadoop.nfs.nfs3.response.READDIRPLUS3Response;
 import org.apache.hadoop.nfs.nfs3.response.READDIRPLUS3Response.EntryPlus3;
 import org.apache.hadoop.nfs.nfs3.response.READDIRPLUS3Response.EntryPlus3;
+import org.apache.hadoop.oncrpc.RpcInfo;
+import org.apache.hadoop.oncrpc.RpcMessage;
 import org.apache.hadoop.oncrpc.XDR;
 import org.apache.hadoop.oncrpc.XDR;
 import org.apache.hadoop.oncrpc.security.SecurityHandler;
 import org.apache.hadoop.oncrpc.security.SecurityHandler;
 import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
 import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
 import org.junit.AfterClass;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.BeforeClass;
@@ -122,7 +129,7 @@ public class TestReaddir {
     xdr_req.writeInt(100); // count
     xdr_req.writeInt(100); // count
 
 
     READDIR3Response response = nfsd.readdir(xdr_req.asReadOnlyWrap(),
     READDIR3Response response = nfsd.readdir(xdr_req.asReadOnlyWrap(),
-        securityHandler, InetAddress.getLocalHost());
+        securityHandler, new InetSocketAddress("localhost", 1234));
     List<Entry3> dirents = response.getDirList().getEntries();
     List<Entry3> dirents = response.getDirList().getEntries();
     assertTrue(dirents.size() == 5); // inculding dot, dotdot
     assertTrue(dirents.size() == 5); // inculding dot, dotdot
 
 
@@ -139,7 +146,7 @@ public class TestReaddir {
     xdr_req.writeInt(100); // count
     xdr_req.writeInt(100); // count
 
 
     response = nfsd.readdir(xdr_req.asReadOnlyWrap(), securityHandler,
     response = nfsd.readdir(xdr_req.asReadOnlyWrap(), securityHandler,
-        InetAddress.getLocalHost());
+        new InetSocketAddress("localhost", 1234));
     dirents = response.getDirList().getEntries();
     dirents = response.getDirList().getEntries();
     assertTrue(dirents.size() == 1);
     assertTrue(dirents.size() == 1);
     Entry3 entry = dirents.get(0);
     Entry3 entry = dirents.get(0);
@@ -149,7 +156,7 @@ public class TestReaddir {
     hdfs.delete(new Path(testdir + "/f2"), false);
     hdfs.delete(new Path(testdir + "/f2"), false);
 
 
     response = nfsd.readdir(xdr_req.asReadOnlyWrap(), securityHandler,
     response = nfsd.readdir(xdr_req.asReadOnlyWrap(), securityHandler,
-        InetAddress.getLocalHost());
+        new InetSocketAddress("localhost", 1234));
     dirents = response.getDirList().getEntries();
     dirents = response.getDirList().getEntries();
     assertTrue(dirents.size() == 2); // No dot, dotdot
     assertTrue(dirents.size() == 2); // No dot, dotdot
   }
   }
@@ -170,8 +177,9 @@ public class TestReaddir {
     xdr_req.writeInt(100); // dirCount
     xdr_req.writeInt(100); // dirCount
     xdr_req.writeInt(1000); // maxCount
     xdr_req.writeInt(1000); // maxCount
 
 
-    READDIRPLUS3Response responsePlus = nfsd.readdirplus(
-        xdr_req.asReadOnlyWrap(), securityHandler, InetAddress.getLocalHost());
+    READDIRPLUS3Response responsePlus = nfsd.readdirplus(xdr_req
+        .asReadOnlyWrap(), securityHandler, new InetSocketAddress("localhost",
+        1234));
     List<EntryPlus3> direntPlus = responsePlus.getDirListPlus().getEntries();
     List<EntryPlus3> direntPlus = responsePlus.getDirListPlus().getEntries();
     assertTrue(direntPlus.size() == 5); // including dot, dotdot
     assertTrue(direntPlus.size() == 5); // including dot, dotdot
 
 
@@ -189,7 +197,7 @@ public class TestReaddir {
     xdr_req.writeInt(1000); // maxCount
     xdr_req.writeInt(1000); // maxCount
 
 
     responsePlus = nfsd.readdirplus(xdr_req.asReadOnlyWrap(), securityHandler,
     responsePlus = nfsd.readdirplus(xdr_req.asReadOnlyWrap(), securityHandler,
-        InetAddress.getLocalHost());
+        new InetSocketAddress("localhost", 1234));
     direntPlus = responsePlus.getDirListPlus().getEntries();
     direntPlus = responsePlus.getDirListPlus().getEntries();
     assertTrue(direntPlus.size() == 1);
     assertTrue(direntPlus.size() == 1);
     EntryPlus3 entryPlus = direntPlus.get(0);
     EntryPlus3 entryPlus = direntPlus.get(0);
@@ -199,7 +207,7 @@ public class TestReaddir {
     hdfs.delete(new Path(testdir + "/f2"), false);
     hdfs.delete(new Path(testdir + "/f2"), false);
 
 
     responsePlus = nfsd.readdirplus(xdr_req.asReadOnlyWrap(), securityHandler,
     responsePlus = nfsd.readdirplus(xdr_req.asReadOnlyWrap(), securityHandler,
-        InetAddress.getLocalHost());
+        new InetSocketAddress("localhost", 1234));
     direntPlus = responsePlus.getDirListPlus().getEntries();
     direntPlus = responsePlus.getDirListPlus().getEntries();
     assertTrue(direntPlus.size() == 2); // No dot, dotdot
     assertTrue(direntPlus.size() == 2); // No dot, dotdot
   }
   }

+ 7 - 7
hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java

@@ -22,7 +22,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assert.fail;
 
 
 import java.io.IOException;
 import java.io.IOException;
-import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentNavigableMap;
@@ -318,7 +318,7 @@ public class TestWrites {
       XDR createXdr = new XDR();
       XDR createXdr = new XDR();
       createReq.serialize(createXdr);
       createReq.serialize(createXdr);
       CREATE3Response createRsp = nfsd.create(createXdr.asReadOnlyWrap(),
       CREATE3Response createRsp = nfsd.create(createXdr.asReadOnlyWrap(),
-          securityHandler, InetAddress.getLocalHost());
+          securityHandler, new InetSocketAddress("localhost", 1234));
       FileHandle handle = createRsp.getObjHandle();
       FileHandle handle = createRsp.getObjHandle();
 
 
       // Test DATA_SYNC
       // Test DATA_SYNC
@@ -331,7 +331,7 @@ public class TestWrites {
       XDR writeXdr = new XDR();
       XDR writeXdr = new XDR();
       writeReq.serialize(writeXdr);
       writeReq.serialize(writeXdr);
       nfsd.write(writeXdr.asReadOnlyWrap(), null, 1, securityHandler,
       nfsd.write(writeXdr.asReadOnlyWrap(), null, 1, securityHandler,
-          InetAddress.getLocalHost());
+          new InetSocketAddress("localhost", 1234));
 
 
       waitWrite(nfsd, handle, 60000);
       waitWrite(nfsd, handle, 60000);
 
 
@@ -340,7 +340,7 @@ public class TestWrites {
       XDR readXdr = new XDR();
       XDR readXdr = new XDR();
       readReq.serialize(readXdr);
       readReq.serialize(readXdr);
       READ3Response readRsp = nfsd.read(readXdr.asReadOnlyWrap(),
       READ3Response readRsp = nfsd.read(readXdr.asReadOnlyWrap(),
-          securityHandler, InetAddress.getLocalHost());
+          securityHandler, new InetSocketAddress("localhost", 1234));
 
 
       assertTrue(Arrays.equals(buffer, readRsp.getData().array()));
       assertTrue(Arrays.equals(buffer, readRsp.getData().array()));
 
 
@@ -352,7 +352,7 @@ public class TestWrites {
       XDR createXdr2 = new XDR();
       XDR createXdr2 = new XDR();
       createReq2.serialize(createXdr2);
       createReq2.serialize(createXdr2);
       CREATE3Response createRsp2 = nfsd.create(createXdr2.asReadOnlyWrap(),
       CREATE3Response createRsp2 = nfsd.create(createXdr2.asReadOnlyWrap(),
-          securityHandler, InetAddress.getLocalHost());
+          securityHandler, new InetSocketAddress("localhost", 1234));
       FileHandle handle2 = createRsp2.getObjHandle();
       FileHandle handle2 = createRsp2.getObjHandle();
 
 
       WRITE3Request writeReq2 = new WRITE3Request(handle2, 0, 10,
       WRITE3Request writeReq2 = new WRITE3Request(handle2, 0, 10,
@@ -360,7 +360,7 @@ public class TestWrites {
       XDR writeXdr2 = new XDR();
       XDR writeXdr2 = new XDR();
       writeReq2.serialize(writeXdr2);
       writeReq2.serialize(writeXdr2);
       nfsd.write(writeXdr2.asReadOnlyWrap(), null, 1, securityHandler,
       nfsd.write(writeXdr2.asReadOnlyWrap(), null, 1, securityHandler,
-          InetAddress.getLocalHost());
+          new InetSocketAddress("localhost", 1234));
 
 
       waitWrite(nfsd, handle2, 60000);
       waitWrite(nfsd, handle2, 60000);
 
 
@@ -369,7 +369,7 @@ public class TestWrites {
       XDR readXdr2 = new XDR();
       XDR readXdr2 = new XDR();
       readReq2.serialize(readXdr2);
       readReq2.serialize(readXdr2);
       READ3Response readRsp2 = nfsd.read(readXdr2.asReadOnlyWrap(),
       READ3Response readRsp2 = nfsd.read(readXdr2.asReadOnlyWrap(),
-          securityHandler, InetAddress.getLocalHost());
+          securityHandler, new InetSocketAddress("localhost", 1234));
 
 
       assertTrue(Arrays.equals(buffer, readRsp2.getData().array()));
       assertTrue(Arrays.equals(buffer, readRsp2.getData().array()));
       // FILE_SYNC should sync the file size
       // FILE_SYNC should sync the file size

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -659,6 +659,9 @@ Release 2.5.0 - UNRELEASED
     HDFS-6551. Rename with OVERWRITE option may throw NPE when the target
     HDFS-6551. Rename with OVERWRITE option may throw NPE when the target
     file/directory is a reference INode. (jing9)
     file/directory is a reference INode. (jing9)
 
 
+    HDFS-6439. NFS should not reject NFS requests to the NULL procedure whether
+    port monitoring is enabled or not. (brandonli)
+
   BREAKDOWN OF HDFS-2006 SUBTASKS AND RELATED JIRAS
   BREAKDOWN OF HDFS-2006 SUBTASKS AND RELATED JIRAS
 
 
     HDFS-6299. Protobuf for XAttr and client-side implementation. (Yi Liu via umamahesh)
     HDFS-6299. Protobuf for XAttr and client-side implementation. (Yi Liu via umamahesh)

+ 16 - 0
hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HdfsNfsGateway.apt.vm

@@ -322,6 +322,22 @@ HDFS NFS Gateway
   Then the users can access HDFS as part of the local file system except that, 
   Then the users can access HDFS as part of the local file system except that, 
   hard link and random write are not supported yet.
   hard link and random write are not supported yet.
 
 
+* {Allow mounts from unprivileged clients}
+
+  In environments where root access on client machines is not generally
+  available, some measure of security can be obtained by ensuring that only NFS
+  clients originating from privileged ports can connect to the NFS server. This
+  feature is referred to as "port monitoring." This feature is not enabled by default
+  in the HDFS NFS Gateway, but can be optionally enabled by setting the
+  following config in hdfs-site.xml on the NFS Gateway machine:
+
+-------------------------------------------------------------------
+<property>
+  <name>nfs.port.monitoring.disabled</name>
+  <value>false</value>
+</property>
+-------------------------------------------------------------------
+
 * {User authentication and mapping}
 * {User authentication and mapping}
 
 
   NFS gateway in this release uses AUTH_UNIX style authentication. When the user on NFS client
   NFS gateway in this release uses AUTH_UNIX style authentication. When the user on NFS client