ソースを参照

Merge r1406007 through r1406326 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1406337 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 12 年 前
コミット
92c440c115
44 ファイル変更1140 行追加252 行削除
  1. 11 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 15 4
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Test.java
  3. 11 13
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
  4. 2 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
  5. 4 8
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcServer.java
  6. 21 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java
  7. 42 11
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
  8. 1 4
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/MiniRPCBenchmark.java
  9. 19 6
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
  10. 123 94
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java
  11. 15 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/SecurityUtilTestHelper.java
  12. 3 5
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java
  13. 52 4
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestSecurityUtil.java
  14. 74 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUGIWithExternalKdc.java
  15. 1 3
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUGIWithSecurityOn.java
  16. 12 1
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java
  17. 2 2
      hadoop-common-project/hadoop-common/src/test/resources/testConf.xml
  18. 10 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  19. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java
  20. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
  21. 3 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
  22. 4 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
  23. 5 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
  24. 23 14
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java
  25. 6 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  26. 19 8
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  27. 52 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
  28. 2 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
  29. 3 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
  30. 1 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
  31. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
  32. 6 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
  33. 37 7
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
  34. 100 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java
  35. 42 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestHdfsProtoUtil.java
  36. 117 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStartSecureDataNode.java
  37. 129 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSecureNameNodeWithExternalKdc.java
  38. 3 0
      hadoop-mapreduce-project/CHANGES.txt
  39. 1 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedPartitioner.java
  40. 19 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedPartitioner.java
  41. 3 0
      hadoop-yarn-project/CHANGES.txt
  42. 0 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
  43. 88 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
  44. 54 25
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java

+ 11 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -129,6 +129,9 @@ Trunk (Unreleased)
     HADOOP-8776. Provide an option in test-patch that can enable/disable
     compiling native code. (Chris Nauroth via suresh)
 
+    HADOOP-9004. Allow security unit tests to use external KDC. (Stephen Chu
+    via suresh)
+
   BUG FIXES
 
     HADOOP-8177. MBeans shouldn't try to register when it fails to create MBeanName.
@@ -333,6 +336,12 @@ Release 2.0.3-alpha - Unreleased
     HADOOP-8985. Add namespace declarations in .proto files for languages 
     other than java. (Binglin Chan via suresh)
 
+    HADOOP-9009. Add SecurityUtil methods to get/set authentication method
+    (daryn via bobby)
+
+    HADOOP-9010. Map UGI authenticationMethod to RPC authMethod (daryn via
+    bobby)
+
   OPTIMIZATIONS
 
     HADOOP-8866. SampleQuantiles#query is O(N^2) instead of O(N). (Andrew Wang
@@ -389,6 +398,8 @@ Release 2.0.3-alpha - Unreleased
     HADOOP-8713. TestRPCCompatibility fails intermittently with JDK7
     (Trevor Robinson via tgraves)
 
+    HADOOP-9012. IPC Client sends wrong connection context (daryn via bobby)
+
 Release 2.0.2-alpha - 2012-09-07 
 
   INCOMPATIBLE CHANGES

+ 15 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Test.java

@@ -37,16 +37,21 @@ class Test extends FsCommand {
   }
 
   public static final String NAME = "test";
-  public static final String USAGE = "-[ezd] <path>";
+  public static final String USAGE = "-[defsz] <path>";
   public static final String DESCRIPTION =
-    "If file exists, has zero length, is a directory\n" +
-    "then return 0, else return 1.";
+    "Answer various questions about <path>, with result via exit status.\n" +
+    "  -d  return 0 if <path> is a directory.\n" +
+    "  -e  return 0 if <path> exists.\n" +
+    "  -f  return 0 if <path> is a file.\n" +
+    "  -s  return 0 if file <path> is greater than zero bytes in size.\n" +
+    "  -z  return 0 if file <path> is zero bytes in size.\n" +
+    "else, return 1.";
 
   private char flag;
   
   @Override
   protected void processOptions(LinkedList<String> args) {
-    CommandFormat cf = new CommandFormat(1, 1, "e", "d", "z");
+    CommandFormat cf = new CommandFormat(1, 1, "e", "d", "f", "s", "z");
     cf.parse(args);
     
     String[] opts = cf.getOpts().toArray(new String[0]);
@@ -71,6 +76,12 @@ class Test extends FsCommand {
       case 'd':
         test = item.stat.isDirectory();
         break;
+      case 'f':
+        test = item.stat.isFile();
+        break;
+      case 's':
+        test = (item.stat.getLen() > 0);
+        break;
       case 'z':
         test = (item.stat.getLen() == 0);
         break;

+ 11 - 13
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

@@ -69,6 +69,7 @@ import org.apache.hadoop.security.SaslRpcClient;
 import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.TokenInfo;
@@ -222,7 +223,6 @@ public class Client {
   private class Connection extends Thread {
     private InetSocketAddress server;             // server ip:port
     private String serverPrincipal;  // server's krb5 principal name
-    private IpcConnectionContextProto connectionContext;   // connection context
     private final ConnectionId remoteId;                // connection id
     private AuthMethod authMethod; // authentication method
     private Token<? extends TokenIdentifier> token;
@@ -295,16 +295,14 @@ public class Client {
       }
       
       if (token != null) {
-        authMethod = AuthMethod.DIGEST;
+        authMethod = AuthenticationMethod.TOKEN.getAuthMethod();
       } else if (UserGroupInformation.isSecurityEnabled()) {
+        // eventually just use the ticket's authMethod
         authMethod = AuthMethod.KERBEROS;
       } else {
         authMethod = AuthMethod.SIMPLE;
       }
       
-      connectionContext = ProtoUtil.makeIpcConnectionContext(
-          RPC.getProtocolName(protocol), ticket, authMethod);
-      
       if (LOG.isDebugEnabled())
         LOG.debug("Use " + authMethod + " authentication for protocol "
             + protocol.getSimpleName());
@@ -605,11 +603,6 @@ public class Client {
             } else {
               // fall back to simple auth because server told us so.
               authMethod = AuthMethod.SIMPLE;
-              // remake the connectionContext             
-              connectionContext = ProtoUtil.makeIpcConnectionContext(
-                  connectionContext.getProtocol(), 
-                  ProtoUtil.getUgi(connectionContext.getUserInfo()),
-                  authMethod);
             }
           }
         
@@ -620,7 +613,7 @@ public class Client {
             this.in = new DataInputStream(new BufferedInputStream(inStream));
           }
           this.out = new DataOutputStream(new BufferedOutputStream(outStream));
-          writeConnectionContext();
+          writeConnectionContext(remoteId, authMethod);
 
           // update last activity time
           touch();
@@ -742,10 +735,15 @@ public class Client {
     /* Write the connection context header for each connection
      * Out is not synchronized because only the first thread does this.
      */
-    private void writeConnectionContext() throws IOException {
+    private void writeConnectionContext(ConnectionId remoteId,
+                                        AuthMethod authMethod)
+                                            throws IOException {
       // Write out the ConnectionHeader
       DataOutputBuffer buf = new DataOutputBuffer();
-      connectionContext.writeTo(buf);
+      ProtoUtil.makeIpcConnectionContext(
+          RPC.getProtocolName(remoteId.getProtocol()),
+          remoteId.getTicket(),
+          authMethod).writeTo(buf);
       
       // Write out the payload length
       int bufLen = buf.getLength();

+ 2 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -1526,11 +1526,11 @@ public abstract class Server {
       if (!useSasl) {
         user = protocolUser;
         if (user != null) {
-          user.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
+          user.setAuthenticationMethod(AuthMethod.SIMPLE);
         }
       } else {
         // user is authenticated
-        user.setAuthenticationMethod(authMethod.authenticationMethod);
+        user.setAuthenticationMethod(authMethod);
         //Now we check if this is a proxy user case. If the protocol user is
         //different from the 'user', it is a proxy user scenario. However, 
         //this is not allowed if user authenticated with DIGEST.

+ 4 - 8
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcServer.java

@@ -42,7 +42,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@@ -137,20 +136,17 @@ public class SaslRpcServer {
   /** Authentication method */
   @InterfaceStability.Evolving
   public static enum AuthMethod {
-    SIMPLE((byte) 80, "", AuthenticationMethod.SIMPLE),
-    KERBEROS((byte) 81, "GSSAPI", AuthenticationMethod.KERBEROS),
-    DIGEST((byte) 82, "DIGEST-MD5", AuthenticationMethod.TOKEN);
+    SIMPLE((byte) 80, ""),
+    KERBEROS((byte) 81, "GSSAPI"),
+    DIGEST((byte) 82, "DIGEST-MD5");
 
     /** The code for this method. */
     public final byte code;
     public final String mechanismName;
-    public final AuthenticationMethod authenticationMethod;
 
-    private AuthMethod(byte code, String mechanismName, 
-                       AuthenticationMethod authMethod) {
+    private AuthMethod(byte code, String mechanismName) { 
       this.code = code;
       this.mechanismName = mechanismName;
-      this.authenticationMethod = authMethod;
     }
 
     private static final int FIRST_CODE = values()[0].code;

+ 21 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java

@@ -16,6 +16,8 @@
  */
 package org.apache.hadoop.security;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -44,6 +46,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.security.ssl.SSLFactory;
@@ -665,4 +668,22 @@ public class SecurityUtil {
     }
   }
 
+  public static AuthenticationMethod getAuthenticationMethod(Configuration conf) {
+    String value = conf.get(HADOOP_SECURITY_AUTHENTICATION, "simple");
+    try {
+      return Enum.valueOf(AuthenticationMethod.class, value.toUpperCase());
+    } catch (IllegalArgumentException iae) {
+      throw new IllegalArgumentException("Invalid attribute value for " +
+          HADOOP_SECURITY_AUTHENTICATION + " of " + value);
+    }
+  }
+
+  public static void setAuthenticationMethod(
+      AuthenticationMethod authenticationMethod, Configuration conf) {
+    if (authenticationMethod == null) {
+      authenticationMethod = AuthenticationMethod.SIMPLE;
+    }
+    conf.set(HADOOP_SECURITY_AUTHENTICATION,
+             authenticationMethod.toString().toLowerCase());
+  }
 }

+ 42 - 11
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java

@@ -59,6 +59,7 @@ import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MutableRate;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
 import org.apache.hadoop.security.authentication.util.KerberosName;
 import org.apache.hadoop.security.authentication.util.KerberosUtil;
 import org.apache.hadoop.security.token.Token;
@@ -236,15 +237,15 @@ public class UserGroupInformation {
    * @param conf the configuration to use
    */
   private static synchronized void initUGI(Configuration conf) {
-    String value = conf.get(HADOOP_SECURITY_AUTHENTICATION);
-    if (value == null || "simple".equals(value)) {
+    AuthenticationMethod auth = SecurityUtil.getAuthenticationMethod(conf);
+    if (auth == AuthenticationMethod.SIMPLE) {
       useKerberos = false;
-    } else if ("kerberos".equals(value)) {
+    } else if (auth == AuthenticationMethod.KERBEROS) {
       useKerberos = true;
     } else {
       throw new IllegalArgumentException("Invalid attribute value for " +
                                          HADOOP_SECURITY_AUTHENTICATION + 
-                                         " of " + value);
+                                         " of " + auth);
     }
     try {
         kerberosMinSecondsBeforeRelogin = 1000L * conf.getLong(
@@ -1019,13 +1020,34 @@ public class UserGroupInformation {
   @InterfaceAudience.Public
   @InterfaceStability.Evolving
   public static enum AuthenticationMethod {
-    SIMPLE,
-    KERBEROS,
-    TOKEN,
-    CERTIFICATE,
-    KERBEROS_SSL,
-    PROXY;
-  }
+    // currently we support only one auth per method, but eventually a 
+    // subtype is needed to differentiate, ex. if digest is token or ldap
+    SIMPLE(AuthMethod.SIMPLE),
+    KERBEROS(AuthMethod.KERBEROS),
+    TOKEN(AuthMethod.DIGEST),
+    CERTIFICATE(null),
+    KERBEROS_SSL(null),
+    PROXY(null);
+    
+    private final AuthMethod authMethod;
+    private AuthenticationMethod(AuthMethod authMethod) {
+      this.authMethod = authMethod;
+    }
+    
+    public AuthMethod getAuthMethod() {
+      return authMethod;
+    }
+    
+    public static AuthenticationMethod valueOf(AuthMethod authMethod) {
+      for (AuthenticationMethod value : values()) {
+        if (value.getAuthMethod() == authMethod) {
+          return value;
+        }
+      }
+      throw new IllegalArgumentException(
+          "no authentication method for " + authMethod);
+    }
+  };
 
   /**
    * Create a proxy user using username of the effective user and the ugi of the
@@ -1290,6 +1312,15 @@ public class UserGroupInformation {
     user.setAuthenticationMethod(authMethod);
   }
 
+  /**
+   * Sets the authentication method in the subject
+   * 
+   * @param authMethod
+   */
+  public void setAuthenticationMethod(AuthMethod authMethod) {
+    user.setAuthenticationMethod(AuthenticationMethod.valueOf(authMethod));
+  }
+
   /**
    * Get the authentication method from the subject
    * 

+ 1 - 4
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/MiniRPCBenchmark.java

@@ -30,7 +30,6 @@ import junit.framework.Assert;
 
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.KerberosInfo;
@@ -380,9 +379,7 @@ public class MiniRPCBenchmark {
       elapsedTime = mb.runMiniBenchmarkWithDelegationToken(
                               conf, count, KEYTAB_FILE_KEY, USER_NAME_KEY);
     } else {
-      String auth = 
-        conf.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, 
-                        "simple");
+      String auth = SecurityUtil.getAuthenticationMethod(conf).toString();
       System.out.println(
           "Running MiniRPCBenchmark with " + auth + " authentication.");
       elapsedTime = mb.runMiniBenchmark(

+ 19 - 6
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

@@ -55,13 +55,16 @@ import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.Service;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.test.MockitoUtil;
+import org.junit.Before;
 import org.junit.Test;
 
 import com.google.protobuf.DescriptorProtos;
@@ -75,11 +78,14 @@ public class TestRPC {
   public static final Log LOG =
     LogFactory.getLog(TestRPC.class);
   
-  private static Configuration conf = new Configuration();
+  private static Configuration conf;
   
-  static {
+  @Before
+  public void setupConf() {
+    conf = new Configuration();
     conf.setClass("rpc.engine." + StoppedProtocol.class.getName(),
         StoppedRpcEngine.class, RpcEngine.class);
+    UserGroupInformation.setConfiguration(conf);
   }
 
   int datasize = 1024*100;
@@ -676,11 +682,17 @@ public class TestRPC {
   
   @Test
   public void testErrorMsgForInsecureClient() throws Exception {
-    final Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
+    Configuration serverConf = new Configuration(conf);
+    SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS,
+                                         serverConf);
+    UserGroupInformation.setConfiguration(serverConf);
+    
+    final Server server = new RPC.Builder(serverConf).setProtocol(TestProtocol.class)
         .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
         .setNumHandlers(5).setVerbose(true).build();
-    server.enableSecurity();
     server.start();
+
+    UserGroupInformation.setConfiguration(conf);
     boolean succeeded = false;
     final InetSocketAddress addr = NetUtils.getConnectAddress(server);
     TestProtocol proxy = null;
@@ -702,17 +714,18 @@ public class TestRPC {
 
     conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2);
 
-    final Server multiServer = new RPC.Builder(conf)
+    UserGroupInformation.setConfiguration(serverConf);
+    final Server multiServer = new RPC.Builder(serverConf)
         .setProtocol(TestProtocol.class).setInstance(new TestImpl())
         .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
         .build();
-    multiServer.enableSecurity();
     multiServer.start();
     succeeded = false;
     final InetSocketAddress mulitServerAddr =
                       NetUtils.getConnectAddress(multiServer);
     proxy = null;
     try {
+      UserGroupInformation.setConfiguration(conf);
       proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
           TestProtocol.versionID, mulitServerAddr, conf);
       proxy.echo("");

+ 123 - 94
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java

@@ -18,8 +18,9 @@
 
 package org.apache.hadoop.ipc;
 
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.*;
 import static org.junit.Assert.*;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -28,6 +29,7 @@ import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
 import java.util.Set;
+import java.util.regex.Pattern;
 
 import javax.security.sasl.Sasl;
 
@@ -41,7 +43,6 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.SaslInputStream;
 import org.apache.hadoop.security.SaslRpcClient;
@@ -58,7 +59,7 @@ import org.apache.hadoop.security.token.TokenInfo;
 import org.apache.hadoop.security.token.TokenSelector;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.log4j.Level;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
 
 /** Unit tests for using Sasl over RPC. */
@@ -75,10 +76,11 @@ public class TestSaslRPC {
   static final String SERVER_PRINCIPAL_2 = "p2/foo@BAR";
   
   private static Configuration conf;
-  @BeforeClass
-  public static void setup() {
+
+  @Before
+  public void setup() {
     conf = new Configuration();
-    conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+    SecurityUtil.setAuthenticationMethod(KERBEROS, conf);
     UserGroupInformation.setConfiguration(conf);
   }
 
@@ -186,6 +188,7 @@ public class TestSaslRPC {
   @TokenInfo(TestTokenSelector.class)
   public interface TestSaslProtocol extends TestRPC.TestProtocol {
     public AuthenticationMethod getAuthMethod() throws IOException;
+    public String getAuthUser() throws IOException;
   }
   
   public static class TestSaslImpl extends TestRPC.TestImpl implements
@@ -194,6 +197,10 @@ public class TestSaslRPC {
     public AuthenticationMethod getAuthMethod() throws IOException {
       return UserGroupInformation.getCurrentUser().getAuthenticationMethod();
     }
+    @Override
+    public String getAuthUser() throws IOException {
+      return UserGroupInformation.getCurrentUser().getUserName();
+    }
   }
 
   public static class CustomSecurityInfo extends SecurityInfo {
@@ -260,10 +267,10 @@ public class TestSaslRPC {
 
   @Test
   public void testSecureToInsecureRpc() throws Exception {
+    SecurityUtil.setAuthenticationMethod(AuthenticationMethod.SIMPLE, conf);
     Server server = new RPC.Builder(conf).setProtocol(TestSaslProtocol.class)
         .setInstance(new TestSaslImpl()).setBindAddress(ADDRESS).setPort(0)
         .setNumHandlers(5).setVerbose(true).build();
-    server.disableSecurity();
     TestTokenSecretManager sm = new TestTokenSecretManager();
     doDigestRpc(server, sm);
   }
@@ -345,7 +352,7 @@ public class TestSaslRPC {
           new InetSocketAddress(0), TestSaslProtocol.class, null, 0, newConf);
       assertEquals(SERVER_PRINCIPAL_1, remoteId.getServerPrincipal());
       // this following test needs security to be off
-      newConf.set(HADOOP_SECURITY_AUTHENTICATION, "simple");
+      SecurityUtil.setAuthenticationMethod(SIMPLE, newConf);
       UserGroupInformation.setConfiguration(newConf);
       remoteId = ConnectionId.getConnectionId(new InetSocketAddress(0),
           TestSaslProtocol.class, null, 0, newConf);
@@ -448,127 +455,135 @@ public class TestSaslRPC {
     System.out.println("Test is successful.");
   }
 
-  // insecure -> insecure
+  private static Pattern BadToken =
+      Pattern.compile(".*DIGEST-MD5: digest response format violation.*");
+  private static Pattern KrbFailed =
+      Pattern.compile(".*Failed on local exception:.* " +
+                      "Failed to specify server's Kerberos principal name.*");
+  private static Pattern Denied = 
+      Pattern.compile(".*Authorization .* is enabled .*");
+  
+  /*
+   *  simple server
+   */
   @Test
-  public void testInsecureClientInsecureServer() throws Exception {
-    assertEquals(AuthenticationMethod.SIMPLE,
-                 getAuthMethod(false, false, false));
+  public void testSimpleServer() throws Exception {
+    assertAuthEquals(SIMPLE,    getAuthMethod(SIMPLE,   SIMPLE));
+    // SASL methods are reverted to SIMPLE, but test setup fails
+    assertAuthEquals(KrbFailed, getAuthMethod(KERBEROS, SIMPLE));
   }
 
   @Test
-  public void testInsecureClientInsecureServerWithToken() throws Exception {
-    assertEquals(AuthenticationMethod.TOKEN,
-                 getAuthMethod(false, false, true));
+  public void testSimpleServerWithTokens() throws Exception {
+    // Tokens are ignored because client is reverted to simple
+    assertAuthEquals(SIMPLE, getAuthMethod(SIMPLE,   SIMPLE, true));
+    assertAuthEquals(SIMPLE, getAuthMethod(KERBEROS, SIMPLE, true));
   }
-
-  // insecure -> secure
+    
   @Test
-  public void testInsecureClientSecureServer() throws Exception {
-    RemoteException e = null;
-    try {
-      getAuthMethod(false, true, false);
-    } catch (RemoteException re) {
-      e = re;
-    }
-    assertNotNull(e);
-    assertEquals(AccessControlException.class.getName(), e.getClassName());
+  public void testSimpleServerWithInvalidTokens() throws Exception {
+    // Tokens are ignored because client is reverted to simple
+    assertAuthEquals(SIMPLE, getAuthMethod(SIMPLE,   SIMPLE, false));
+    assertAuthEquals(SIMPLE, getAuthMethod(KERBEROS, SIMPLE, false));
   }
-
+  
+  /*
+   * kerberos server
+   */
   @Test
-  public void testInsecureClientSecureServerWithToken() throws Exception {
-    assertEquals(AuthenticationMethod.TOKEN,
-                 getAuthMethod(false, true, true));
+  public void testKerberosServer() throws Exception {
+    assertAuthEquals(Denied,    getAuthMethod(SIMPLE,   KERBEROS));
+    assertAuthEquals(KrbFailed, getAuthMethod(KERBEROS, KERBEROS));    
   }
 
-  // secure -> secure
   @Test
-  public void testSecureClientSecureServer() throws Exception {
-    /* Should be this when multiple secure auths are supported and we can
-     * dummy one out:
-     *     assertEquals(AuthenticationMethod.SECURE_AUTH_METHOD,
-     *                  getAuthMethod(true, true, false));
-     */
-    try {
-      getAuthMethod(true, true, false);
-    } catch (IOException ioe) {
-      // can't actually test kerberos w/o kerberos...
-      String expectedError = "Failed to specify server's Kerberos principal";
-      String actualError = ioe.getMessage();
-      assertTrue("["+actualError+"] doesn't start with ["+expectedError+"]",
-          actualError.contains(expectedError));
-    }
+  public void testKerberosServerWithTokens() throws Exception {
+    // can use tokens regardless of auth
+    assertAuthEquals(TOKEN, getAuthMethod(SIMPLE,   KERBEROS, true));
+    assertAuthEquals(TOKEN, getAuthMethod(KERBEROS, KERBEROS, true));
   }
 
   @Test
-  public void testSecureClientSecureServerWithToken() throws Exception {
-    assertEquals(AuthenticationMethod.TOKEN,
-                 getAuthMethod(true, true, true));
+  public void testKerberosServerWithInvalidTokens() throws Exception {
+    assertAuthEquals(BadToken, getAuthMethod(SIMPLE,   KERBEROS, false));
+    assertAuthEquals(BadToken, getAuthMethod(KERBEROS, KERBEROS, false));
   }
 
-  // secure -> insecure
-  @Test
-  public void testSecureClientInsecureServerWithToken() throws Exception {
-    assertEquals(AuthenticationMethod.TOKEN,
-                 getAuthMethod(true, false, true));
-  }
 
-  @Test
-  public void testSecureClientInsecureServer() throws Exception {
-    /* Should be this when multiple secure auths are supported and we can
-     * dummy one out:
-     *     assertEquals(AuthenticationMethod.SIMPLE
-     *                  getAuthMethod(true, false, false));
-     */
+  // test helpers
+
+  private String getAuthMethod(
+      final AuthenticationMethod clientAuth,
+      final AuthenticationMethod serverAuth) throws Exception {
     try {
-      getAuthMethod(true, false, false);
-    } catch (IOException ioe) {
-      // can't actually test kerberos w/o kerberos...
-      String expectedError = "Failed to specify server's Kerberos principal";
-      String actualError = ioe.getMessage();
-      assertTrue("["+actualError+"] doesn't start with ["+expectedError+"]",
-          actualError.contains(expectedError));
+      return internalGetAuthMethod(clientAuth, serverAuth, false, false);
+    } catch (Exception e) {
+      return e.toString();
     }
   }
 
-
-  private AuthenticationMethod getAuthMethod(final boolean isSecureClient,
-                                             final boolean isSecureServer,
-                                             final boolean useToken
-                                             
-      ) throws Exception {
+  private String getAuthMethod(
+      final AuthenticationMethod clientAuth,
+      final AuthenticationMethod serverAuth,
+      final boolean useValidToken) throws Exception {
+    try {
+      return internalGetAuthMethod(clientAuth, serverAuth, true, useValidToken);
+    } catch (Exception e) {
+      return e.toString();
+    }
+  }
+  
+  private String internalGetAuthMethod(
+      final AuthenticationMethod clientAuth,
+      final AuthenticationMethod serverAuth,
+      final boolean useToken,
+      final boolean useValidToken) throws Exception {
+    
+    Configuration serverConf = new Configuration(conf);
+    SecurityUtil.setAuthenticationMethod(serverAuth, serverConf);
+    UserGroupInformation.setConfiguration(serverConf);
+    
     TestTokenSecretManager sm = new TestTokenSecretManager();
-    Server server = new RPC.Builder(conf).setProtocol(TestSaslProtocol.class)
+    Server server = new RPC.Builder(serverConf).setProtocol(TestSaslProtocol.class)
         .setInstance(new TestSaslImpl()).setBindAddress(ADDRESS).setPort(0)
-        .setNumHandlers(5).setVerbose(true).setSecretManager(sm).build();      
-    if (isSecureServer) {
-      server.enableSecurity();
-    } else {
-      server.disableSecurity();
-    }
+        .setNumHandlers(5).setVerbose(true)
+        .setSecretManager((serverAuth != SIMPLE) ? sm : null)
+        .build();      
     server.start();
 
-    final UserGroupInformation current = UserGroupInformation.getCurrentUser();
+    final UserGroupInformation clientUgi =
+        UserGroupInformation.createRemoteUser(
+            UserGroupInformation.getCurrentUser().getUserName()+"-CLIENT");
     final InetSocketAddress addr = NetUtils.getConnectAddress(server);
     if (useToken) {
       TestTokenIdentifier tokenId = new TestTokenIdentifier(
-          new Text(current.getUserName()));
-      Token<TestTokenIdentifier> token =
-          new Token<TestTokenIdentifier>(tokenId, sm);
+          new Text(clientUgi.getUserName()));
+      Token<TestTokenIdentifier> token = useValidToken
+          ? new Token<TestTokenIdentifier>(tokenId, sm)
+          : new Token<TestTokenIdentifier>(
+              tokenId.getBytes(), "bad-password!".getBytes(),
+              tokenId.getKind(), null);
+      
       SecurityUtil.setTokenService(token, addr);
-      current.addToken(token);
+      clientUgi.addToken(token);
     }
 
-    conf.set(HADOOP_SECURITY_AUTHENTICATION, isSecureClient ? "kerberos" : "simple");
-    UserGroupInformation.setConfiguration(conf);
+    final Configuration clientConf = new Configuration(conf);
+    SecurityUtil.setAuthenticationMethod(clientAuth, clientConf);
+    UserGroupInformation.setConfiguration(clientConf);
+    
     try {
-      return current.doAs(new PrivilegedExceptionAction<AuthenticationMethod>() {
+      return clientUgi.doAs(new PrivilegedExceptionAction<String>() {
         @Override
-        public AuthenticationMethod run() throws IOException {
+        public String run() throws IOException {
           TestSaslProtocol proxy = null;
           try {
             proxy = (TestSaslProtocol) RPC.getProxy(TestSaslProtocol.class,
-                TestSaslProtocol.versionID, addr, conf);
-            return proxy.getAuthMethod();
+                TestSaslProtocol.versionID, addr, clientConf);
+            
+            // make sure the other side thinks we are who we said we are!!!
+            assertEquals(clientUgi.getUserName(), proxy.getAuthUser());
+            return proxy.getAuthMethod().toString();
           } finally {
             if (proxy != null) {
               RPC.stopProxy(proxy);
@@ -580,7 +595,22 @@ public class TestSaslRPC {
       server.stop();
     }
   }
+
+  private static void assertAuthEquals(AuthenticationMethod expect,
+      String actual) {
+    assertEquals(expect.toString(), actual);
+  }
   
+  private static void assertAuthEquals(Pattern expect,
+      String actual) {
+    // this allows us to see the regexp and the value it didn't match
+    if (!expect.matcher(actual).matches()) {
+      assertEquals(expect, actual); // it failed
+    } else {
+      assertTrue(true); // it matched
+    }
+  }
+
   public static void main(String[] args) throws Exception {
     System.out.println("Testing Kerberos authentication over RPC");
     if (args.length != 2) {
@@ -593,5 +623,4 @@ public class TestSaslRPC {
     String keytab = args[1];
     testKerberosRpc(principal, keytab);
   }
-
 }

+ 15 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/SecurityUtilTestHelper.java

@@ -27,4 +27,19 @@ public class SecurityUtilTestHelper {
   public static void setTokenServiceUseIp(boolean flag) {
     SecurityUtil.setTokenServiceUseIp(flag);
   }
+
+  /**
+   * Return true if externalKdc=true and the location of the krb5.conf
+   * file has been specified, and false otherwise.
+   */
+  public static boolean isExternalKdcRunning() {
+    String externalKdc = System.getProperty("externalKdc");
+    String krb5Conf = System.getProperty("java.security.krb5.conf");
+    if(externalKdc == null || !externalKdc.equals("true") ||
+       krb5Conf == null) {
+      return false;
+    }
+    return true;
+  }
+
 }

+ 3 - 5
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java

@@ -28,13 +28,13 @@ import java.util.Enumeration;
 import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenInfo;
@@ -416,8 +416,7 @@ public class TestDoAsEffectiveUser {
   public void testProxyWithToken() throws Exception {
     final Configuration conf = new Configuration(masterConf);
     TestTokenSecretManager sm = new TestTokenSecretManager();
-    conf
-        .set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+    SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf);
     UserGroupInformation.setConfiguration(conf);
     final Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class)
         .setInstance(new TestImpl()).setBindAddress(ADDRESS).setPort(0)
@@ -471,8 +470,7 @@ public class TestDoAsEffectiveUser {
   public void testTokenBySuperUser() throws Exception {
     TestTokenSecretManager sm = new TestTokenSecretManager();
     final Configuration newConf = new Configuration(masterConf);
-    newConf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION,
-        "kerberos");
+    SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, newConf);
     UserGroupInformation.setConfiguration(newConf);
     final Server server = new RPC.Builder(newConf)
         .setProtocol(TestProtocol.class).setInstance(new TestImpl())

+ 52 - 4
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestSecurityUtil.java

@@ -16,6 +16,8 @@
  */
 package org.apache.hadoop.security;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.*;
 import static org.junit.Assert.*;
 
 import java.io.IOException;
@@ -29,10 +31,19 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
 public class TestSecurityUtil {
+  @BeforeClass
+  public static void unsetKerberosRealm() {
+    // prevent failures if kinit-ed or on os x with no realm
+    System.setProperty("java.security.krb5.kdc", "");
+    System.setProperty("java.security.krb5.realm", "NONE");    
+  }
+
   @Test
   public void isOriginalTGTReturnsCorrectValues() {
     assertTrue(SecurityUtil.isTGSPrincipal
@@ -111,9 +122,7 @@ public class TestSecurityUtil {
   @Test
   public void testStartsWithIncorrectSettings() throws IOException {
     Configuration conf = new Configuration();
-    conf.set(
-        org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION,
-        "kerberos");
+    SecurityUtil.setAuthenticationMethod(KERBEROS, conf);
     String keyTabKey="key";
     conf.set(keyTabKey, "");
     UserGroupInformation.setConfiguration(conf);
@@ -256,7 +265,7 @@ public class TestSecurityUtil {
     SecurityUtil.setTokenServiceUseIp(useIp);
     String serviceHost = useIp ? ip : host.toLowerCase();
     
-    Token token = new Token();
+    Token<?> token = new Token<TokenIdentifier>();
     Text service = new Text(serviceHost+":"+port);
     
     assertEquals(service, SecurityUtil.buildTokenService(addr));
@@ -345,4 +354,43 @@ public class TestSecurityUtil {
     NetUtils.addStaticResolution(staticHost, "255.255.255.255");
     verifyServiceAddr(staticHost, "255.255.255.255");
   }
+  
+  @Test
+  public void testGetAuthenticationMethod() {
+    Configuration conf = new Configuration();
+    // default is simple
+    conf.unset(HADOOP_SECURITY_AUTHENTICATION);
+    assertEquals(SIMPLE, SecurityUtil.getAuthenticationMethod(conf));
+    // simple
+    conf.set(HADOOP_SECURITY_AUTHENTICATION, "simple");
+    assertEquals(SIMPLE, SecurityUtil.getAuthenticationMethod(conf));
+    // kerberos
+    conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+    assertEquals(KERBEROS, SecurityUtil.getAuthenticationMethod(conf));
+    // bad value
+    conf.set(HADOOP_SECURITY_AUTHENTICATION, "kaboom");
+    String error = null;
+    try {
+      SecurityUtil.getAuthenticationMethod(conf);
+    } catch (Exception e) {
+      error = e.toString();
+    }
+    assertEquals("java.lang.IllegalArgumentException: " +
+                 "Invalid attribute value for " +
+                 HADOOP_SECURITY_AUTHENTICATION + " of kaboom", error);
+  }
+  
+  @Test
+  public void testSetAuthenticationMethod() {
+    Configuration conf = new Configuration();
+    // default
+    SecurityUtil.setAuthenticationMethod(null, conf);
+    assertEquals("simple", conf.get(HADOOP_SECURITY_AUTHENTICATION));
+    // simple
+    SecurityUtil.setAuthenticationMethod(SIMPLE, conf);
+    assertEquals("simple", conf.get(HADOOP_SECURITY_AUTHENTICATION));
+    // kerberos
+    SecurityUtil.setAuthenticationMethod(KERBEROS, conf);
+    assertEquals("kerberos", conf.get(HADOOP_SECURITY_AUTHENTICATION));
+  }
 }

+ 74 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUGIWithExternalKdc.java

@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.security;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import static org.apache.hadoop.security.SecurityUtilTestHelper.isExternalKdcRunning;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests kerberos keytab login using a user-specified external KDC
+ *
+ * To run, users must specify the following system properties:
+ *   externalKdc=true
+ *   java.security.krb5.conf
+ *   user.principal
+ *   user.keytab
+ */
+public class TestUGIWithExternalKdc {
+
+  @Before
+  public void testExternalKdcRunning() {
+    Assume.assumeTrue(isExternalKdcRunning());
+  }
+
+  @Test
+  public void testLogin() throws IOException {
+    String userPrincipal = System.getProperty("user.principal");
+    String userKeyTab = System.getProperty("user.keytab");
+    Assert.assertNotNull("User principal was not specified", userPrincipal);
+    Assert.assertNotNull("User keytab was not specified", userKeyTab);
+
+    Configuration conf = new Configuration();
+    conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION,
+        "kerberos");
+    UserGroupInformation.setConfiguration(conf);
+
+    UserGroupInformation ugi = UserGroupInformation
+        .loginUserFromKeytabAndReturnUGI(userPrincipal, userKeyTab);
+
+    Assert.assertEquals(AuthenticationMethod.KERBEROS,
+        ugi.getAuthenticationMethod());
+    
+    try {
+      UserGroupInformation
+      .loginUserFromKeytabAndReturnUGI("bogus@EXAMPLE.COM", userKeyTab);
+      Assert.fail("Login should have failed");
+    } catch (Exception ex) {
+      ex.printStackTrace();
+    }
+  }
+
+}

+ 1 - 3
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUGIWithSecurityOn.java

@@ -21,7 +21,6 @@ import java.io.IOException;
 import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.junit.Assume;
 import org.junit.Before;
@@ -49,8 +48,7 @@ public class TestUGIWithSecurityOn {
     String user1keyTabFilepath = System.getProperty("kdc.resource.dir") 
         + "/keytabs/user1.keytab";
     Configuration conf = new Configuration();
-    conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, 
-        "kerberos");
+    SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf);
     UserGroupInformation.setConfiguration(conf);
     
     UserGroupInformation ugiNn = UserGroupInformation

+ 12 - 1
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestUserGroupInformation.java

@@ -305,7 +305,6 @@ public class TestUserGroupInformation {
     assertSame(secret, ugi.getCredentials().getSecretKey(secretKey));
   }
 
-  @SuppressWarnings("unchecked") // from Mockito mocks
   @Test
   public <T extends TokenIdentifier> void testGetCredsNotSame()
       throws Exception {
@@ -429,6 +428,18 @@ public class TestUserGroupInformation {
     assertEquals(2, otherSet.size());
   }
 
+  @Test
+  public void testTestAuthMethod() throws Exception {
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    // verify the reverse mappings works
+    for (AuthenticationMethod am : AuthenticationMethod.values()) {
+      if (am.getAuthMethod() != null) {
+        ugi.setAuthenticationMethod(am.getAuthMethod());
+        assertEquals(am, ugi.getAuthenticationMethod());
+      }
+    }
+  }
+  
   @Test
   public void testUGIAuthMethod() throws Exception {
     final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();

+ 2 - 2
hadoop-common-project/hadoop-common/src/test/resources/testConf.xml

@@ -591,11 +591,11 @@
       <comparators>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^-test -\[ezd\] &lt;path&gt;:\s+If file exists, has zero length, is a directory( )*</expected-output>
+          <expected-output>^-test -\[defsz\] &lt;path&gt;:\sAnswer various questions about &lt;path&gt;, with result via exit status.</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^( |\t)*then return 0, else return 1.( )*</expected-output>
+          <expected-output>^( |\t)*else, return 1.( )*</expected-output>
         </comparator>
       </comparators>
     </test>

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -154,6 +154,9 @@ Trunk (Unreleased)
     HDFS-4151. Change the methods in FSDirectory to pass INodesInPath instead
     of INode[] as a parameter. (szetszwo)
 
+    HDFS-4152. Add a new class BlocksMapUpdateInfo for the parameter in
+    INode.collectSubtreeBlocksAndClear(..). (Jing Zhao via szetszwo)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -451,6 +454,9 @@ Release 2.0.3-alpha - Unreleased
     HDFS-4143. Change blocks to private in INodeFile and renames isLink() to
     isSymlink() in INode. (szetszwo)
 
+    HDFS-4046. Rename ChecksumTypeProto enum NULL since it is illegal in
+    C/C++. (Binglin Chang via suresh)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -548,6 +554,8 @@ Release 2.0.3-alpha - Unreleased
     HDFS-4132. When libwebhdfs is not enabled, nativeMiniDfsClient frees
     uninitialized memory (Colin Patrick McCabe via todd)
 
+    HDFS-1331. dfs -test should work like /bin/test (Andy Isaacson via daryn)
+
 Release 2.0.2-alpha - 2012-09-07 
 
   INCOMPATIBLE CHANGES
@@ -1935,6 +1943,8 @@ Release 0.23.5 - UNRELEASED
 
   OPTIMIZATIONS
 
+    HDFS-4075. Reduce recommissioning overhead (Kihwal Lee via daryn)
+
   BUG FIXES
 
     HDFS-3829. TestHftpURLTimeouts fails intermittently with JDK7  (Trevor

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java

@@ -157,11 +157,11 @@ public abstract class HdfsProtoUtil {
   }
 
   public static DataChecksum.Type fromProto(HdfsProtos.ChecksumTypeProto type) {
-    return DataChecksum.Type.valueOf(type.name());
+    return DataChecksum.Type.valueOf(type.getNumber());
   }
 
   public static HdfsProtos.ChecksumTypeProto toProto(DataChecksum.Type type) {
-    return HdfsProtos.ChecksumTypeProto.valueOf(type.name());
+    return HdfsProtos.ChecksumTypeProto.valueOf(type.id);
   }
 
   public static InputStream vintPrefixed(final InputStream input)

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java

@@ -52,7 +52,7 @@ public abstract class DataTransferProtoUtil {
   }
 
   public static ChecksumProto toProto(DataChecksum checksum) {
-    ChecksumTypeProto type = ChecksumTypeProto.valueOf(checksum.getChecksumType().name());
+    ChecksumTypeProto type = HdfsProtoUtil.toProto(checksum.getChecksumType());
     if (type == null) {
       throw new IllegalArgumentException(
           "Can't convert checksum to protobuf: " + checksum);
@@ -68,7 +68,7 @@ public abstract class DataTransferProtoUtil {
     if (proto == null) return null;
 
     int bytesPerChecksum = proto.getBytesPerChecksum();
-    DataChecksum.Type type = DataChecksum.Type.valueOf(proto.getType().name());
+    DataChecksum.Type type = HdfsProtoUtil.fromProto(proto.getType());
     
     return DataChecksum.newDataChecksum(type, bytesPerChecksum);
   }

+ 3 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java

@@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos;
@@ -67,7 +68,6 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ContentSummaryProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CorruptFileBlocksProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
@@ -129,7 +129,6 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.security.token.Token;
 
 import com.google.protobuf.ByteString;
@@ -961,7 +960,7 @@ public class PBHelper {
         fs.getFileBufferSize(),
         fs.getEncryptDataTransfer(),
         fs.getTrashInterval(),
-        DataChecksum.Type.valueOf(fs.getChecksumType().name()));
+        HdfsProtoUtil.fromProto(fs.getChecksumType()));
   }
   
   public static FsServerDefaultsProto convert(FsServerDefaults fs) {
@@ -974,7 +973,7 @@ public class PBHelper {
       .setFileBufferSize(fs.getFileBufferSize())
       .setEncryptDataTransfer(fs.getEncryptDataTransfer())
       .setTrashInterval(fs.getTrashInterval())
-      .setChecksumType(ChecksumTypeProto.valueOf(fs.getChecksumType().name()))
+      .setChecksumType(HdfsProtoUtil.toProto(fs.getChecksumType()))
       .build();
   }
   

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -2696,6 +2696,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
   void processOverReplicatedBlocksOnReCommission(
       final DatanodeDescriptor srcNode) {
     final Iterator<? extends Block> it = srcNode.getBlockIterator();
+    int numOverReplicated = 0;
     while(it.hasNext()) {
       final Block block = it.next();
       BlockCollection bc = blocksMap.getBlockCollection(block);
@@ -2705,8 +2706,11 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
       if (numCurrentReplica > expectedReplication) {
         // over-replicated block 
         processOverReplicatedBlock(block, expectedReplication, null, null);
+        numOverReplicated++;
       }
     }
+    LOG.info("Invalidated " + numOverReplicated + " over-replicated blocks on " +
+        srcNode + " during recommissioning");
   }
 
   /**

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -608,7 +608,11 @@ public class DatanodeManager {
     if (node.isDecommissionInProgress() || node.isDecommissioned()) {
       LOG.info("Stop Decommissioning " + node);
       heartbeatManager.stopDecommission(node);
-      blockManager.processOverReplicatedBlocksOnReCommission(node);
+      // Over-replicated blocks will be detected and processed when 
+      // the dead node comes back and send in its full block report.
+      if (node.isAlive) {
+        blockManager.processOverReplicatedBlocksOnReCommission(node);
+      }
     }
   }
 

+ 23 - 14
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java

@@ -38,6 +38,8 @@ import org.mortbay.jetty.security.SslSocketConnector;
 
 import javax.net.ssl.SSLServerSocketFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Utility class to start a datanode in a secure cluster, first obtaining 
  * privileged resources before main startup and handing them to the datanode.
@@ -73,6 +75,25 @@ public class SecureDataNodeStarter implements Daemon {
     // Stash command-line arguments for regular datanode
     args = context.getArguments();
     
+    sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
+    resources = getSecureResources(sslFactory, conf);
+  }
+
+  @Override
+  public void start() throws Exception {
+    System.err.println("Starting regular datanode initialization");
+    DataNode.secureMain(args, resources);
+  }
+  
+  @Override public void destroy() {
+    sslFactory.destroy();
+  }
+
+  @Override public void stop() throws Exception { /* Nothing to do */ }
+
+  @VisibleForTesting
+  public static SecureResources getSecureResources(final SSLFactory sslFactory,
+                                  Configuration conf) throws Exception {
     // Obtain secure port for data streaming to datanode
     InetSocketAddress streamingAddr  = DataNode.getStreamingAddr(conf);
     int socketWriteTimeout = conf.getInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
@@ -85,13 +106,12 @@ public class SecureDataNodeStarter implements Daemon {
     // Check that we got the port we need
     if (ss.getLocalPort() != streamingAddr.getPort()) {
       throw new RuntimeException("Unable to bind on specified streaming port in secure " +
-      		"context. Needed " + streamingAddr.getPort() + ", got " + ss.getLocalPort());
+          "context. Needed " + streamingAddr.getPort() + ", got " + ss.getLocalPort());
     }
 
     // Obtain secure listener for web server
     Connector listener;
     if (HttpConfig.isSecure()) {
-      sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
       try {
         sslFactory.init();
       } catch (GeneralSecurityException ex) {
@@ -126,18 +146,7 @@ public class SecureDataNodeStarter implements Daemon {
     }
     System.err.println("Opened streaming server at " + streamingAddr);
     System.err.println("Opened info server at " + infoSocAddr);
-    resources = new SecureResources(ss, listener);
+    return new SecureResources(ss, listener);
   }
 
-  @Override
-  public void start() throws Exception {
-    System.err.println("Starting regular datanode initialization");
-    DataNode.secureMain(args, resources);
-  }
-  
-  @Override public void destroy() {
-    sslFactory.destroy();
-  }
-
-  @Override public void stop() throws Exception { /* Nothing to do */ }
 }

+ 6 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.INodesInPath;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.util.ByteArray;
@@ -758,7 +759,7 @@ public class FSDirectory implements Closeable {
         if (removedDst != null) {
           INode rmdst = removedDst;
           removedDst = null;
-          List<Block> collectedBlocks = new ArrayList<Block>();
+          BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
           filesDeleted = rmdst.collectSubtreeBlocksAndClear(collectedBlocks);
           getFSNamesystem().removePathAndBlocks(src, collectedBlocks);
         }
@@ -997,7 +998,7 @@ public class FSDirectory implements Closeable {
    * @param collectedBlocks Blocks under the deleted directory
    * @return true on successful deletion; else false
    */
-  boolean delete(String src, List<Block>collectedBlocks) 
+  boolean delete(String src, BlocksMapUpdateInfo collectedBlocks) 
     throws IOException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + src);
@@ -1084,7 +1085,7 @@ public class FSDirectory implements Closeable {
   void unprotectedDelete(String src, long mtime) 
     throws UnresolvedLinkException {
     assert hasWriteLock();
-    List<Block> collectedBlocks = new ArrayList<Block>();
+    BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
     int filesRemoved = 0;
 
     final INodesInPath inodesInPath = rootDir.getExistingPathINodes(
@@ -1108,8 +1109,8 @@ public class FSDirectory implements Closeable {
    * @param mtime the time the inode is removed
    * @return the number of inodes deleted; 0 if no inodes are deleted.
    */ 
-  int unprotectedDelete(INodesInPath inodesInPath, List<Block> collectedBlocks, 
-      long mtime) {
+  int unprotectedDelete(INodesInPath inodesInPath,
+      BlocksMapUpdateInfo collectedBlocks, long mtime) {
     assert hasWriteLock();
 
     final INode[] inodes = inodesInPath.getINodes();

+ 19 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -159,6 +159,7 @@ import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirType;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory.INodesInPath;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
@@ -2676,7 +2677,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       boolean enforcePermission)
       throws AccessControlException, SafeModeException, UnresolvedLinkException,
              IOException {
-    ArrayList<Block> collectedBlocks = new ArrayList<Block>();
+    BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
 
     writeLock();
     try {
@@ -2707,21 +2708,26 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     return true;
   }
 
-  /** 
+  /**
    * From the given list, incrementally remove the blocks from blockManager
    * Writelock is dropped and reacquired every BLOCK_DELETION_INCREMENT to
    * ensure that other waiters on the lock can get in. See HDFS-2938
+   * 
+   * @param blocks
+   *          An instance of {@link BlocksMapUpdateInfo} which contains a list
+   *          of blocks that need to be removed from blocksMap
    */
-  private void removeBlocks(List<Block> blocks) {
+  private void removeBlocks(BlocksMapUpdateInfo blocks) {
     int start = 0;
     int end = 0;
-    while (start < blocks.size()) {
+    List<Block> toDeleteList = blocks.getToDeleteList();
+    while (start < toDeleteList.size()) {
       end = BLOCK_DELETION_INCREMENT + start;
-      end = end > blocks.size() ? blocks.size() : end;
+      end = end > toDeleteList.size() ? toDeleteList.size() : end;
       writeLock();
       try {
         for (int i = start; i < end; i++) {
-          blockManager.removeBlock(blocks.get(i));
+          blockManager.removeBlock(toDeleteList.get(i));
         }
       } finally {
         writeUnlock();
@@ -2730,7 +2736,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
   }
   
-  void removePathAndBlocks(String src, List<Block> blocks) {
+  /**
+   * Remove leases and blocks related to a given path
+   * @param src The given path
+   * @param blocks Containing the list of blocks to be deleted from blocksMap
+   */
+  void removePathAndBlocks(String src, BlocksMapUpdateInfo blocks) {
     assert hasWriteLock();
     leaseManager.removeLeaseWithPrefixPath(src);
     if (blocks == null) {
@@ -2743,7 +2754,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     boolean trackBlockCounts = isSafeModeTrackingBlocks();
     int numRemovedComplete = 0, numRemovedSafe = 0;
 
-    for (Block b : blocks) {
+    for (Block b : blocks.getToDeleteList()) {
       if (trackBlockCounts) {
         BlockInfo bi = blockManager.getStoredBlock(b);
         if (bi.isComplete()) {

+ 52 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java

@@ -188,11 +188,15 @@ public abstract class INode implements Comparable<byte[]> {
   }
 
   /**
-   * Collect all the blocks in all children of this INode.
-   * Count and return the number of files in the sub tree.
-   * Also clears references since this INode is deleted.
+   * Collect all the blocks in all children of this INode. Count and return the
+   * number of files in the sub tree. Also clears references since this INode is
+   * deleted.
+   * 
+   * @param info
+   *          Containing all the blocks collected from the children of this
+   *          INode. These blocks later should be removed from the blocksMap.
    */
-  abstract int collectSubtreeBlocksAndClear(List<Block> v);
+  abstract int collectSubtreeBlocksAndClear(BlocksMapUpdateInfo info);
 
   /** Compute {@link ContentSummary}. */
   public final ContentSummary computeContentSummary() {
@@ -493,4 +497,48 @@ public abstract class INode implements Comparable<byte[]> {
     out.print(s.substring(s.lastIndexOf(getClass().getSimpleName())));
     out.println(")");
   }
+  
+  /**
+   * Information used for updating the blocksMap when deleting files.
+   */
+  public static class BlocksMapUpdateInfo {
+    /**
+     * The list of blocks that need to be removed from blocksMap
+     */
+    private List<Block> toDeleteList;
+    
+    public BlocksMapUpdateInfo(List<Block> toDeleteList) {
+      this.toDeleteList = toDeleteList == null ? new ArrayList<Block>()
+          : toDeleteList;
+    }
+    
+    public BlocksMapUpdateInfo() {
+      toDeleteList = new ArrayList<Block>();
+    }
+    
+    /**
+     * @return The list of blocks that need to be removed from blocksMap
+     */
+    public List<Block> getToDeleteList() {
+      return toDeleteList;
+    }
+    
+    /**
+     * Add a to-be-deleted block into the
+     * {@link BlocksMapUpdateInfo#toDeleteList}
+     * @param toDelete the to-be-deleted block
+     */
+    public void addDeleteBlock(Block toDelete) {
+      if (toDelete != null) {
+        toDeleteList.add(toDelete);
+      }
+    }
+    
+    /**
+     * Clear {@link BlocksMapUpdateInfo#toDeleteList}
+     */
+    public void clear() {
+      toDeleteList.clear();
+    }
+  }
 }

+ 2 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java

@@ -28,7 +28,6 @@ import java.util.List;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
@@ -473,13 +472,13 @@ public class INodeDirectory extends INode {
   }
 
   @Override
-  int collectSubtreeBlocksAndClear(List<Block> v) {
+  int collectSubtreeBlocksAndClear(BlocksMapUpdateInfo info) {
     int total = 1;
     if (children == null) {
       return total;
     }
     for (INode child : children) {
-      total += child.collectSubtreeBlocksAndClear(v);
+      total += child.collectSubtreeBlocksAndClear(info);
     }
     parent = null;
     children = null;

+ 3 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java

@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -162,11 +161,11 @@ public class INodeFile extends INode implements BlockCollection {
   }
 
   @Override
-  protected int collectSubtreeBlocksAndClear(List<Block> v) {
+  protected int collectSubtreeBlocksAndClear(BlocksMapUpdateInfo info) {
     parent = null;
-    if(blocks != null && v != null) {
+    if(blocks != null && info != null) {
       for (BlockInfo blk : blocks) {
-        v.add(blk);
+        info.addDeleteBlock(blk);
         blk.setBlockCollection(null);
       }
     }

+ 1 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java

@@ -17,12 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.util.List;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.protocol.Block;
 
 /**
  * An INode representing a symbolic link.
@@ -72,7 +69,7 @@ public class INodeSymlink extends INode {
   }
   
   @Override
-  int collectSubtreeBlocksAndClear(List<Block> v) {
+  int collectSubtreeBlocksAndClear(BlocksMapUpdateInfo info) {
     return 1;
   }
 

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto

@@ -181,5 +181,5 @@ message OpBlockChecksumResponseProto {
   required uint32 bytesPerCrc = 1;
   required uint64 crcPerBlock = 2;
   required bytes md5 = 3;
-  optional ChecksumTypeProto crcType = 4 [default = CRC32];
+  optional ChecksumTypeProto crcType = 4 [default = CHECKSUM_CRC32];
 }

+ 6 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto

@@ -181,11 +181,13 @@ message HdfsFileStatusProto {
 
 /**
  * Checksum algorithms/types used in HDFS
+ * Make sure this enum's integer values match enum values' id properties defined
+ * in org.apache.hadoop.util.DataChecksum.Type
  */
 enum ChecksumTypeProto {
-  NULL = 0;
-  CRC32 = 1;
-  CRC32C = 2;
+  CHECKSUM_NULL = 0;
+  CHECKSUM_CRC32 = 1;
+  CHECKSUM_CRC32C = 2;
 }
 
 /**
@@ -199,7 +201,7 @@ message FsServerDefaultsProto {
   required uint32 fileBufferSize = 5;
   optional bool encryptDataTransfer = 6 [default = false];
   optional uint64 trashInterval = 7 [default = 0];
-  optional ChecksumTypeProto checksumType = 8 [default = CRC32];
+  optional ChecksumTypeProto checksumType = 8 [default = CHECKSUM_CRC32];
 }
 
 

+ 37 - 7
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -81,6 +81,8 @@ import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter;
+import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -95,6 +97,7 @@ import org.apache.hadoop.net.StaticMapping;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
@@ -145,6 +148,7 @@ public class MiniDFSCluster {
     private boolean setupHostsFile = false;
     private MiniDFSNNTopology nnTopology = null;
     private boolean checkExitOnShutdown = true;
+    private boolean checkDataNodeAddrConfig = false;
     private boolean checkDataNodeHostConfig = false;
     
     public Builder(Configuration conf) {
@@ -263,6 +267,14 @@ public class MiniDFSCluster {
       return this;
     }
 
+    /**
+     * Default: false
+     */
+    public Builder checkDataNodeAddrConfig(boolean val) {
+      this.checkDataNodeAddrConfig = val;
+      return this;
+    }
+
     /**
      * Default: false
      */
@@ -336,6 +348,7 @@ public class MiniDFSCluster {
                        builder.setupHostsFile,
                        builder.nnTopology,
                        builder.checkExitOnShutdown,
+                       builder.checkDataNodeAddrConfig,
                        builder.checkDataNodeHostConfig);
   }
   
@@ -343,11 +356,14 @@ public class MiniDFSCluster {
     DataNode datanode;
     Configuration conf;
     String[] dnArgs;
+    SecureResources secureResources;
 
-    DataNodeProperties(DataNode node, Configuration conf, String[] args) {
+    DataNodeProperties(DataNode node, Configuration conf, String[] args,
+                       SecureResources secureResources) {
       this.datanode = node;
       this.conf = conf;
       this.dnArgs = args;
+      this.secureResources = secureResources;
     }
   }
 
@@ -573,7 +589,7 @@ public class MiniDFSCluster {
         manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs,
         operation, racks, hosts,
         simulatedCapacities, null, true, false,
-        MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true, false);
+        MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true, false, false);
   }
 
   private void initMiniDFSCluster(
@@ -584,6 +600,7 @@ public class MiniDFSCluster {
       String[] hosts, long[] simulatedCapacities, String clusterId,
       boolean waitSafeMode, boolean setupHostsFile,
       MiniDFSNNTopology nnTopology, boolean checkExitOnShutdown,
+      boolean checkDataNodeAddrConfig,
       boolean checkDataNodeHostConfig)
   throws IOException {
     ExitUtil.disableSystemExit();
@@ -647,7 +664,7 @@ public class MiniDFSCluster {
 
     // Start the DataNodes
     startDataNodes(conf, numDataNodes, manageDataDfsDirs, operation, racks,
-        hosts, simulatedCapacities, setupHostsFile, false, checkDataNodeHostConfig);
+        hosts, simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, checkDataNodeHostConfig);
     waitClusterUp();
     //make sure ProxyUsers uses the latest conf
     ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
@@ -1161,7 +1178,18 @@ public class MiniDFSCluster {
       if (hosts != null) {
         NetUtils.addStaticResolution(hosts[i - curDatanodesNum], "localhost");
       }
-      DataNode dn = DataNode.instantiateDataNode(dnArgs, dnConf);
+
+      SecureResources secureResources = null;
+      if (UserGroupInformation.isSecurityEnabled()) {
+        SSLFactory sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, dnConf);
+        try {
+          secureResources = SecureDataNodeStarter.getSecureResources(sslFactory, dnConf);
+        } catch (Exception ex) {
+          ex.printStackTrace();
+        }
+      }
+      DataNode dn = DataNode.instantiateDataNode(dnArgs, dnConf,
+                                                 secureResources);
       if(dn == null)
         throw new IOException("Cannot start DataNode in "
             + dnConf.get(DFS_DATANODE_DATA_DIR_KEY));
@@ -1176,7 +1204,7 @@ public class MiniDFSCluster {
                                   racks[i-curDatanodesNum]);
       }
       dn.runDatanodeDaemon();
-      dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs));
+      dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs, secureResources));
     }
     curDatanodesNum += numDataNodes;
     this.numDataNodes += numDataNodes;
@@ -1607,14 +1635,16 @@ public class MiniDFSCluster {
       boolean keepPort) throws IOException {
     Configuration conf = dnprop.conf;
     String[] args = dnprop.dnArgs;
+    SecureResources secureResources = dnprop.secureResources;
     Configuration newconf = new HdfsConfiguration(conf); // save cloned config
     if (keepPort) {
       InetSocketAddress addr = dnprop.datanode.getXferAddress();
       conf.set(DFS_DATANODE_ADDRESS_KEY, 
           addr.getAddress().getHostAddress() + ":" + addr.getPort());
     }
-    dataNodes.add(new DataNodeProperties(DataNode.createDataNode(args, conf),
-        newconf, args));
+    dataNodes.add(new DataNodeProperties(
+        DataNode.createDataNode(args, conf, secureResources),
+        newconf, args, secureResources));
     numDataNodes++;
     return true;
   }

+ 100 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java

@@ -1243,7 +1243,106 @@ public class TestDFSShell {
         }
         assertEquals(0, val);
       }
-        
+
+      // Verify -test -f negative case (missing file)
+      {
+        String[] args = new String[3];
+        args[0] = "-test";
+        args[1] = "-f";
+        args[2] = "/test/mkdirs/noFileHere";
+        int val = -1;
+        try {
+          val = shell.run(args);
+        } catch (Exception e) {
+          System.err.println("Exception raised from DFSShell.run " +
+                             e.getLocalizedMessage());
+        }
+        assertEquals(1, val);
+      }
+
+      // Verify -test -f negative case (directory rather than file)
+      {
+        String[] args = new String[3];
+        args[0] = "-test";
+        args[1] = "-f";
+        args[2] = "/test/mkdirs";
+        int val = -1;
+        try {
+          val = shell.run(args);
+        } catch (Exception e) {
+          System.err.println("Exception raised from DFSShell.run " +
+                             e.getLocalizedMessage());
+        }
+        assertEquals(1, val);
+      }
+
+      // Verify -test -f positive case
+      {
+        writeFile(fileSys, myFile);
+        assertTrue(fileSys.exists(myFile));
+
+        String[] args = new String[3];
+        args[0] = "-test";
+        args[1] = "-f";
+        args[2] = myFile.toString();
+        int val = -1;
+        try {
+          val = shell.run(args);
+        } catch (Exception e) {
+          System.err.println("Exception raised from DFSShell.run " +
+                             e.getLocalizedMessage());
+        }
+        assertEquals(0, val);
+      }
+
+      // Verify -test -s negative case (missing file)
+      {
+        String[] args = new String[3];
+        args[0] = "-test";
+        args[1] = "-s";
+        args[2] = "/test/mkdirs/noFileHere";
+        int val = -1;
+        try {
+          val = shell.run(args);
+        } catch (Exception e) {
+          System.err.println("Exception raised from DFSShell.run " +
+                             e.getLocalizedMessage());
+        }
+        assertEquals(1, val);
+      }
+
+      // Verify -test -s negative case (zero length file)
+      {
+        String[] args = new String[3];
+        args[0] = "-test";
+        args[1] = "-s";
+        args[2] = "/test/mkdirs/isFileHere";
+        int val = -1;
+        try {
+          val = shell.run(args);
+        } catch (Exception e) {
+          System.err.println("Exception raised from DFSShell.run " +
+                             e.getLocalizedMessage());
+        }
+        assertEquals(1, val);
+      }
+
+      // Verify -test -s positive case (nonzero length file)
+      {
+        String[] args = new String[3];
+        args[0] = "-test";
+        args[1] = "-s";
+        args[2] = myFile.toString();
+        int val = -1;
+        try {
+          val = shell.run(args);
+        } catch (Exception e) {
+          System.err.println("Exception raised from DFSShell.run " +
+                             e.getLocalizedMessage());
+        }
+        assertEquals(0, val);
+      }
+
     } finally {
       try {
         fileSys.close();

+ 42 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestHdfsProtoUtil.java

@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.util.DataChecksum;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestHdfsProtoUtil {
+  @Test
+  public void testChecksumTypeProto() {
+    assertEquals(DataChecksum.Type.NULL,
+        HdfsProtoUtil.fromProto(HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL));
+    assertEquals(DataChecksum.Type.CRC32,
+        HdfsProtoUtil.fromProto(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32));
+    assertEquals(DataChecksum.Type.CRC32C,
+        HdfsProtoUtil.fromProto(HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C));
+    assertEquals(HdfsProtoUtil.toProto(DataChecksum.Type.NULL),
+        HdfsProtos.ChecksumTypeProto.CHECKSUM_NULL);
+    assertEquals(HdfsProtoUtil.toProto(DataChecksum.Type.CRC32),
+        HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32);
+    assertEquals(HdfsProtoUtil.toProto(DataChecksum.Type.CRC32C),
+        HdfsProtos.ChecksumTypeProto.CHECKSUM_CRC32C);
+  }
+}

+ 117 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStartSecureDataNode.java

@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import static org.apache.hadoop.security.SecurityUtilTestHelper.isExternalKdcRunning;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This test starts a 1 NameNode 1 DataNode MiniDFSCluster with
+ * kerberos authentication enabled using user-specified KDC,
+ * principals, and keytabs.
+ *
+ * A secure DataNode has to be started by root, so this test needs to
+ * be run by root.
+ *
+ * To run, users must specify the following system properties:
+ *   externalKdc=true
+ *   java.security.krb5.conf
+ *   dfs.namenode.kerberos.principal
+ *   dfs.namenode.kerberos.internal.spnego.principal
+ *   dfs.namenode.keytab.file
+ *   dfs.datanode.kerberos.principal
+ *   dfs.datanode.keytab.file
+ */
+public class TestStartSecureDataNode {
+  final static private int NUM_OF_DATANODES = 1;
+
+  @Before
+  public void testExternalKdcRunning() {
+    // Tests are skipped if external KDC is not running.
+    Assume.assumeTrue(isExternalKdcRunning());
+  }
+
+  @Test
+  public void testSecureNameNode() throws IOException, InterruptedException {
+    MiniDFSCluster cluster = null;
+    try {
+      String nnPrincipal =
+        System.getProperty("dfs.namenode.kerberos.principal");
+      String nnSpnegoPrincipal =
+        System.getProperty("dfs.namenode.kerberos.internal.spnego.principal");
+      String nnKeyTab = System.getProperty("dfs.namenode.keytab.file");
+      assertNotNull("NameNode principal was not specified", nnPrincipal);
+      assertNotNull("NameNode SPNEGO principal was not specified",
+                    nnSpnegoPrincipal);
+      assertNotNull("NameNode keytab was not specified", nnKeyTab);
+
+      String dnPrincipal = System.getProperty("dfs.datanode.kerberos.principal");
+      String dnKeyTab = System.getProperty("dfs.datanode.keytab.file");
+      assertNotNull("DataNode principal was not specified", dnPrincipal);
+      assertNotNull("DataNode keytab was not specified", dnKeyTab);
+
+      Configuration conf = new HdfsConfiguration();
+      conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION,
+        "kerberos");
+      conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, nnPrincipal);
+      conf.set(DFSConfigKeys.DFS_NAMENODE_INTERNAL_SPNEGO_USER_NAME_KEY,
+        nnSpnegoPrincipal);
+      conf.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, nnKeyTab);
+      conf.set(DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY, dnPrincipal);
+      conf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, dnKeyTab);
+      // Secure DataNode requires using ports lower than 1024.
+      conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "127.0.0.1:1004");
+      conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "127.0.0.1:1006");
+      conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, "700");
+
+      cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(NUM_OF_DATANODES)
+        .checkDataNodeAddrConfig(true)
+        .build();
+      cluster.waitActive();
+      assertTrue(cluster.isDataNodeUp());
+
+    } catch (Exception ex) {
+      ex.printStackTrace();
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+}

+ 129 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSecureNameNodeWithExternalKdc.java

@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import static org.apache.hadoop.security.SecurityUtilTestHelper.isExternalKdcRunning;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This test brings up a MiniDFSCluster with 1 NameNode and 0
+ * DataNodes with kerberos authentication enabled using user-specified
+ * KDC, principals, and keytabs.
+ *
+ * To run, users must specify the following system properties:
+ *   externalKdc=true
+ *   java.security.krb5.conf
+ *   dfs.namenode.kerberos.principal
+ *   dfs.namenode.kerberos.internal.spnego.principal
+ *   dfs.namenode.keytab.file
+ *   user.principal (do not specify superuser!)
+ *   user.keytab
+ */
+public class TestSecureNameNodeWithExternalKdc {
+  final static private int NUM_OF_DATANODES = 0;
+
+  @Before
+  public void testExternalKdcRunning() {
+    // Tests are skipped if external KDC is not running.
+    Assume.assumeTrue(isExternalKdcRunning());
+  }
+
+  @Test
+  public void testSecureNameNode() throws IOException, InterruptedException {
+    MiniDFSCluster cluster = null;
+    try {
+      String nnPrincipal =
+        System.getProperty("dfs.namenode.kerberos.principal");
+      String nnSpnegoPrincipal =
+        System.getProperty("dfs.namenode.kerberos.internal.spnego.principal");
+      String nnKeyTab = System.getProperty("dfs.namenode.keytab.file");
+      assertNotNull("NameNode principal was not specified", nnPrincipal);
+      assertNotNull("NameNode SPNEGO principal was not specified",
+        nnSpnegoPrincipal);
+      assertNotNull("NameNode keytab was not specified", nnKeyTab);
+
+      Configuration conf = new HdfsConfiguration();
+      conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION,
+          "kerberos");
+      conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, nnPrincipal);
+      conf.set(DFSConfigKeys.DFS_NAMENODE_INTERNAL_SPNEGO_USER_NAME_KEY,
+          nnSpnegoPrincipal);
+      conf.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, nnKeyTab);
+
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_OF_DATANODES)
+          .build();
+      final MiniDFSCluster clusterRef = cluster;
+      cluster.waitActive();
+      FileSystem fsForCurrentUser = cluster.getFileSystem();
+      fsForCurrentUser.mkdirs(new Path("/tmp"));
+      fsForCurrentUser.setPermission(new Path("/tmp"), new FsPermission(
+          (short) 511));
+
+      // The user specified should not be a superuser
+      String userPrincipal = System.getProperty("user.principal");
+      String userKeyTab = System.getProperty("user.keytab");
+      assertNotNull("User principal was not specified", userPrincipal);
+      assertNotNull("User keytab was not specified", userKeyTab);
+
+      UserGroupInformation ugi = UserGroupInformation
+          .loginUserFromKeytabAndReturnUGI(userPrincipal, userKeyTab);
+      FileSystem fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+        @Override
+        public FileSystem run() throws Exception {
+          return clusterRef.getFileSystem();
+        }
+      });
+      try {
+        Path p = new Path("/users");
+        fs.mkdirs(p);
+        fail("User must not be allowed to write in /");
+      } catch (IOException expected) {
+      }
+
+      Path p = new Path("/tmp/alpha");
+      fs.mkdirs(p);
+      assertNotNull(fs.listStatus(p));
+      assertEquals(AuthenticationMethod.KERBEROS,
+          ugi.getAuthenticationMethod());
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+}

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -631,6 +631,9 @@ Release 0.23.5 - UNRELEASED
 
     MAPREDUCE-4763 repair test TestUmbilicalProtocolWithJobToken (Ivan A.
     Veselovsky via bobby)
+
+    MAPREDUCE-4771. KeyFieldBasedPartitioner not partitioning properly when
+    configured (jlowe via bobby)
  
 Release 0.23.4 - UNRELEASED
 

+ 1 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedPartitioner.java

@@ -63,6 +63,7 @@ public class KeyFieldBasedPartitioner<K2, V2> extends Partitioner<K2, V2>
 
   public void setConf(Configuration conf) {
     this.conf = conf;
+    keyFieldHelper = new KeyFieldHelper();
     String keyFieldSeparator = 
       conf.get(MRJobConfig.MAP_OUTPUT_KEY_FIELD_SEPERATOR, "\t");
     keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);

+ 19 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedPartitioner.java

@@ -17,17 +17,18 @@
  */
 package org.apache.hadoop.mapred.lib;
 
+import static org.junit.Assert.assertEquals;
+
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
-
-import junit.framework.TestCase;
+import org.junit.Test;
 
-public class TestKeyFieldBasedPartitioner extends TestCase {
+public class TestKeyFieldBasedPartitioner {
 
   /**
    * Test is key-field-based partitioned works with empty key.
    */
+  @Test
   public void testEmptyKey() throws Exception {
     KeyFieldBasedPartitioner<Text, Text> kfbp = 
       new KeyFieldBasedPartitioner<Text, Text>();
@@ -37,4 +38,18 @@ public class TestKeyFieldBasedPartitioner extends TestCase {
     assertEquals("Empty key should map to 0th partition", 
                  0, kfbp.getPartition(new Text(), new Text(), 10));
   }
+
+  @Test
+  public void testMultiConfigure() {
+    KeyFieldBasedPartitioner<Text, Text> kfbp =
+      new KeyFieldBasedPartitioner<Text, Text>();
+    JobConf conf = new JobConf();
+    conf.set(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS, "-k1,1");
+    kfbp.setConf(conf);
+    Text key = new Text("foo\tbar");
+    Text val = new Text("val");
+    int partNum = kfbp.getPartition(key, val, 4096);
+    kfbp.configure(conf);
+    assertEquals(partNum, kfbp.getPartition(key,val, 4096));
+  }
 }

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -199,6 +199,9 @@ Release 0.23.5 - UNRELEASED
     YARN-189. Fixed a deadlock between RM's ApplicationMasterService and the
     dispatcher. (Thomas Graves via vinodkv)
 
+    YARN-202. Log Aggregation generates a storm of fsync() for namenode 
+    (Kihwal Lee via bobby)
+
 Release 0.23.4 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 0 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java

@@ -263,7 +263,6 @@ public class AggregatedLogFormat {
       out = this.writer.prepareAppendValue(-1);
       logValue.write(out);
       out.close();
-      this.fsDataOStream.hflush();
     }
 
     public void closeWriter() {

+ 88 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -90,6 +91,85 @@ public class FSDownload implements Callable<Path> {
     }
   }
 
+  /**
+   * Returns a boolean to denote whether a cache file is visible to all(public)
+   * or not
+   * @param conf
+   * @param uri
+   * @return true if the path in the uri is visible to all, false otherwise
+   * @throws IOException
+   */
+  private static boolean isPublic(FileSystem fs, Path current) throws IOException {
+    current = fs.makeQualified(current);
+    //the leaf level file should be readable by others
+    if (!checkPublicPermsForAll(fs, current, FsAction.READ_EXECUTE, FsAction.READ)) {
+      return false;
+    }
+    return ancestorsHaveExecutePermissions(fs, current.getParent());
+  }
+
+  private static boolean checkPublicPermsForAll(FileSystem fs, Path current, 
+      FsAction dir, FsAction file) 
+    throws IOException {
+    return checkPublicPermsForAll(fs, fs.getFileStatus(current), dir, file);
+  }
+    
+  private static boolean checkPublicPermsForAll(FileSystem fs, 
+        FileStatus status, FsAction dir, FsAction file) 
+    throws IOException {
+    FsPermission perms = status.getPermission();
+    FsAction otherAction = perms.getOtherAction();
+    if (status.isDirectory()) {
+      if (!otherAction.implies(dir)) {
+        return false;
+      }
+      
+      for (FileStatus child : fs.listStatus(status.getPath())) {
+        if(!checkPublicPermsForAll(fs, child, dir, file)) {
+          return false;
+        }
+      }
+      return true;
+    }
+    return (otherAction.implies(file));
+  }
+
+  /**
+   * Returns true if all ancestors of the specified path have the 'execute'
+   * permission set for all users (i.e. that other users can traverse
+   * the directory heirarchy to the given path)
+   */
+  private static boolean ancestorsHaveExecutePermissions(FileSystem fs, Path path)
+    throws IOException {
+    Path current = path;
+    while (current != null) {
+      //the subdirs in the path should have execute permissions for others
+      if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE)) {
+        return false;
+      }
+      current = current.getParent();
+    }
+    return true;
+  }
+
+  /**
+   * Checks for a given path whether the Other permissions on it 
+   * imply the permission in the passed FsAction
+   * @param fs
+   * @param path
+   * @param action
+   * @return true if the path in the uri is visible to all, false otherwise
+   * @throws IOException
+   */
+  private static boolean checkPermissionOfOther(FileSystem fs, Path path,
+      FsAction action) throws IOException {
+    FileStatus status = fs.getFileStatus(path);
+    FsPermission perms = status.getPermission();
+    FsAction otherAction = perms.getOtherAction();
+    return otherAction.implies(action);
+  }
+
+  
   private Path copy(Path sCopy, Path dstdir) throws IOException {
     FileSystem sourceFs = sCopy.getFileSystem(conf);
     Path dCopy = new Path(dstdir, sCopy.getName() + ".tmp");
@@ -99,7 +179,14 @@ public class FSDownload implements Callable<Path> {
           " changed on src filesystem (expected " + resource.getTimestamp() +
           ", was " + sStat.getModificationTime());
     }
-
+    if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) {
+      if (!isPublic(sourceFs, sCopy)) {
+        throw new IOException("Resource " + sCopy +
+            " is not publicly accessable and as such cannot be part of the" +
+            " public cache.");
+      }
+    }
+    
     sourceFs.copyToLocalFile(sCopy, dCopy);
     return dCopy;
   }

+ 54 - 25
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java

@@ -113,6 +113,54 @@ public class TestFSDownload {
     return ret;
   }
   
+  @Test
+  public void testDownloadBadPublic() throws IOException, URISyntaxException,
+      InterruptedException {
+    Configuration conf = new Configuration();
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
+    FileContext files = FileContext.getLocalFSFileContext(conf);
+    final Path basedir = files.makeQualified(new Path("target",
+      TestFSDownload.class.getSimpleName()));
+    files.mkdir(basedir, null, true);
+    conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
+    
+    Map<LocalResource, LocalResourceVisibility> rsrcVis =
+        new HashMap<LocalResource, LocalResourceVisibility>();
+
+    Random rand = new Random();
+    long sharedSeed = rand.nextLong();
+    rand.setSeed(sharedSeed);
+    System.out.println("SEED: " + sharedSeed);
+
+    Map<LocalResource,Future<Path>> pending =
+      new HashMap<LocalResource,Future<Path>>();
+    ExecutorService exec = Executors.newSingleThreadExecutor();
+    LocalDirAllocator dirs =
+      new LocalDirAllocator(TestFSDownload.class.getName());
+    int size = 512;
+    LocalResourceVisibility vis = LocalResourceVisibility.PUBLIC;
+    Path path = new Path(basedir, "test-file");
+    LocalResource rsrc = createFile(files, path, size, rand, vis);
+    rsrcVis.put(rsrc, vis);
+    Path destPath = dirs.getLocalPathForWrite(
+        basedir.toString(), size, conf);
+    FSDownload fsd =
+      new FSDownload(files, UserGroupInformation.getCurrentUser(), conf,
+          destPath, rsrc, new Random(sharedSeed));
+    pending.put(rsrc, exec.submit(fsd));
+
+    try {
+      for (Map.Entry<LocalResource,Future<Path>> p : pending.entrySet()) {
+        p.getValue().get();
+        Assert.fail("We localized a file that is not public.");
+      }
+    } catch (ExecutionException e) {
+      Assert.assertTrue(e.getCause() instanceof IOException);
+    } finally {
+      exec.shutdown();
+    }
+  }
+  
   @Test
   public void testDownload() throws IOException, URISyntaxException,
       InterruptedException {
@@ -140,14 +188,9 @@ public class TestFSDownload {
     int[] sizes = new int[10];
     for (int i = 0; i < 10; ++i) {
       sizes[i] = rand.nextInt(512) + 512;
-      LocalResourceVisibility vis = LocalResourceVisibility.PUBLIC;
-      switch (i%3) {
-      case 1:
-        vis = LocalResourceVisibility.PRIVATE;
-        break;
-      case 2:
+      LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE;
+      if (i%2 == 1) {
         vis = LocalResourceVisibility.APPLICATION;
-        break;       
       }
       Path p = new Path(basedir, "" + i);
       LocalResource rsrc = createFile(files, p, sizes[i], rand, vis);
@@ -176,17 +219,8 @@ public class TestFSDownload {
         System.out.println("File permission " + perm + 
             " for rsrc vis " + p.getKey().getVisibility().name());
         assert(rsrcVis.containsKey(p.getKey()));
-        switch (rsrcVis.get(p.getKey())) {
-        case PUBLIC:
-          Assert.assertTrue("Public file should be 555",
-              perm.toShort() == FSDownload.PUBLIC_FILE_PERMS.toShort());
-          break;
-        case PRIVATE:
-        case APPLICATION:
-          Assert.assertTrue("Private file should be 500",
-              perm.toShort() == FSDownload.PRIVATE_FILE_PERMS.toShort());          
-          break;
-        }
+        Assert.assertTrue("Private file should be 500",
+            perm.toShort() == FSDownload.PRIVATE_FILE_PERMS.toShort());
       }
     } catch (ExecutionException e) {
       throw new IOException("Failed exec", e);
@@ -250,14 +284,9 @@ public class TestFSDownload {
     LocalDirAllocator dirs =
       new LocalDirAllocator(TestFSDownload.class.getName());
     for (int i = 0; i < 5; ++i) {
-      LocalResourceVisibility vis = LocalResourceVisibility.PUBLIC;
-      switch (rand.nextInt()%3) {
-      case 1:
-        vis = LocalResourceVisibility.PRIVATE;
-        break;
-      case 2:
+      LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE;
+      if (i%2 == 1) {
         vis = LocalResourceVisibility.APPLICATION;
-        break;       
       }
 
       Path p = new Path(basedir, "dir" + i + ".jar");