瀏覽代碼

Merge trunk into HA branch.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1227775 13f79535-47bb-0310-9956-ffa450edef68
Aaron Myers 13 年之前
父節點
當前提交
3187a3ae05
共有 41 個文件被更改,包括 2086 次插入367 次删除
  1. 14 3
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 7 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
  3. 1 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
  4. 36 22
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
  5. 9 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
  6. 14 10
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
  7. 17 6
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
  8. 2 8
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/spi/Util.java
  9. 2 8
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/Servers.java
  10. 271 31
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java
  11. 80 23
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java
  12. 365 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemCanonicalization.java
  13. 2 2
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/MiniRPCBenchmark.java
  14. 11 16
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java
  15. 74 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/NetUtilsTestResolver.java
  16. 263 8
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java
  17. 2 6
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java
  18. 215 3
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestSecurityUtil.java
  19. 4 1
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  20. 48 18
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java
  21. 19 0
      hadoop-mapreduce-project/CHANGES.txt
  22. 3 22
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
  23. 50 50
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
  24. 9 8
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java
  25. 117 55
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
  26. 20 6
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java
  27. 1 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
  28. 69 15
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
  29. 28 11
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
  30. 23 7
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
  31. 1 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
  32. 2 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
  33. 91 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java
  34. 1 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
  35. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
  36. 11 13
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java
  37. 32 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java
  38. 161 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java
  39. 8 6
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/lib/TestZKClient.java
  40. 1 1
      hadoop-mapreduce-project/hadoop-yarn/pom.xml
  41. 1 1
      hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobInProgress.java

+ 14 - 3
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -79,11 +79,19 @@ Trunk (unreleased changes)
 
     HADOOP-7899. Generate proto java files as part of the build. (tucu)
 
-    HADOOP-7574. Improve FSShell -stat, add user/group elements (XieXianshan via harsh)
+    HADOOP-7574. Improve FSShell -stat, add user/group elements.
+    (XieXianshan via harsh)
 
-    HADOOP-7348. Change 'addnl' in getmerge util to be a flag '-nl' instead (XieXianshan via harsh)
+    HADOOP-7348. Change 'addnl' in getmerge util to be a flag '-nl' instead.
+    (XieXianshan via harsh)
 
-    HADOOP-7919. Remove the unused hadoop.logfile.* properties from the core-default.xml file. (harsh)
+    HADOOP-7919. Remove the unused hadoop.logfile.* properties from the 
+    core-default.xml file. (harsh)
+
+    HADOOP-7808. Port HADOOP-7510 - Add configurable option to use original 
+    hostname in token instead of IP to allow server IP change. 
+    (Daryn Sharp via suresh)
+  
 
   BUGS
 
@@ -241,6 +249,9 @@ Release 0.23.1 - Unreleased
    HADOOP-7948. Shell scripts created by hadoop-dist/pom.xml to build tar do not 
    properly propagate failure. (cim_michajlomatijkiw via tucu)
 
+   HADOOP-7949. Updated maxIdleTime default in the code to match
+   core-default.xml (eli)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

+ 7 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java

@@ -51,7 +51,7 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
   /** How often does RPC client send pings to RPC server */
   public static final String  IPC_PING_INTERVAL_KEY = "ipc.ping.interval";
   /** Default value for IPC_PING_INTERVAL_KEY */
-  public static final int     IPC_PING_INTERVAL_DEFAULT = 60000;
+  public static final int     IPC_PING_INTERVAL_DEFAULT = 60000; // 1 min
   /** Enables pings from RPC client to the server */
   public static final String  IPC_CLIENT_PING_KEY = "ipc.client.ping";
   /** Default value of IPC_CLIENT_PING_KEY */
@@ -114,5 +114,11 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
   public static final String 
   HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_USER_MAPPINGS =
       "security.refresh.user.mappings.protocol.acl";
+  
+  public static final String HADOOP_SECURITY_TOKEN_SERVICE_USE_IP =
+      "hadoop.security.token.service.use_ip";
+  public static final boolean HADOOP_SECURITY_TOKEN_SERVICE_USE_IP_DEFAULT =
+      true;
+
 }
 

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java

@@ -165,7 +165,7 @@ public class CommonConfigurationKeysPublic {
   public static final String  IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY =
     "ipc.client.connection.maxidletime";
   /** Default value for IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY */
-  public static final int     IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT = 10000;
+  public static final int     IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT = 10000; // 10s
   /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
   public static final String  IPC_CLIENT_CONNECT_MAX_RETRIES_KEY =
     "ipc.client.connect.max.retries";

+ 36 - 22
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

@@ -47,6 +47,7 @@ import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -186,6 +187,15 @@ public abstract class FileSystem extends Configured implements Closeable {
   /** Returns a URI whose scheme and authority identify this FileSystem.*/
   public abstract URI getUri();
   
+  /**
+   * Resolve the uri's hostname and add the default port if not in the uri
+   * @return URI
+   * @see NetUtils#getCanonicalUri(URI, int)
+   */
+  protected URI getCanonicalUri() {
+    return NetUtils.getCanonicalUri(getUri(), getDefaultPort());
+  }
+  
   /**
    * Get the default port for this file system.
    * @return the default port or 0 if there isn't one
@@ -195,8 +205,13 @@ public abstract class FileSystem extends Configured implements Closeable {
   }
 
   /**
-   * Get a canonical name for this file system.
-   * @return a URI string that uniquely identifies this file system
+   * Get a canonical service name for this file system.  The token cache is
+   * the only user of this value, and uses it to lookup this filesystem's
+   * service tokens.  The token cache will not attempt to acquire tokens if the
+   * service is null.
+   * @return a service string that uniquely identifies this file system, null
+   *         if the filesystem does not implement tokens
+   * @see SecurityUtil#buildDTServiceName(URI, int) 
    */
   public String getCanonicalServiceName() {
     return SecurityUtil.buildDTServiceName(getUri(), getDefaultPort());
@@ -487,32 +502,31 @@ public abstract class FileSystem extends Configured implements Closeable {
    */
   protected void checkPath(Path path) {
     URI uri = path.toUri();
-    if (uri.getScheme() == null)                // fs is relative 
-      return;
-    String thisScheme = this.getUri().getScheme();
     String thatScheme = uri.getScheme();
-    String thisAuthority = this.getUri().getAuthority();
-    String thatAuthority = uri.getAuthority();
+    if (thatScheme == null)                // fs is relative
+      return;
+    URI thisUri = getCanonicalUri();
+    String thisScheme = thisUri.getScheme();
     //authority and scheme are not case sensitive
     if (thisScheme.equalsIgnoreCase(thatScheme)) {// schemes match
-      if (thisAuthority == thatAuthority ||       // & authorities match
-          (thisAuthority != null && 
-           thisAuthority.equalsIgnoreCase(thatAuthority)))
-        return;
-
+      String thisAuthority = thisUri.getAuthority();
+      String thatAuthority = uri.getAuthority();
       if (thatAuthority == null &&                // path's authority is null
           thisAuthority != null) {                // fs has an authority
-        URI defaultUri = getDefaultUri(getConf()); // & is the conf default 
-        if (thisScheme.equalsIgnoreCase(defaultUri.getScheme()) &&
-            thisAuthority.equalsIgnoreCase(defaultUri.getAuthority()))
-          return;
-        try {                                     // or the default fs's uri
-          defaultUri = get(getConf()).getUri();
-        } catch (IOException e) {
-          throw new RuntimeException(e);
+        URI defaultUri = getDefaultUri(getConf());
+        if (thisScheme.equalsIgnoreCase(defaultUri.getScheme())) {
+          uri = defaultUri; // schemes match, so use this uri instead
+        } else {
+          uri = null; // can't determine auth of the path
         }
-        if (thisScheme.equalsIgnoreCase(defaultUri.getScheme()) &&
-            thisAuthority.equalsIgnoreCase(defaultUri.getAuthority()))
+      }
+      if (uri != null) {
+        // canonicalize uri before comparing with this fs
+        uri = NetUtils.getCanonicalUri(uri, getDefaultPort());
+        thatAuthority = uri.getAuthority();
+        if (thisAuthority == thatAuthority ||       // authorities match
+            (thisAuthority != null &&
+             thisAuthority.equalsIgnoreCase(thatAuthority)))
           return;
       }
     }

+ 9 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java

@@ -77,6 +77,15 @@ public class FilterFileSystem extends FileSystem {
     return fs.getUri();
   }
 
+  /**
+   * Returns a qualified URI whose scheme and authority identify this
+   * FileSystem.
+   */
+  @Override
+  protected URI getCanonicalUri() {
+    return fs.getCanonicalUri();
+  }
+  
   /** Make sure that a path specifies a FileSystem. */
   public Path makeQualified(Path path) {
     return fs.makeQualified(path);

+ 14 - 10
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

@@ -48,6 +48,7 @@ import org.apache.commons.logging.*;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.ipc.RpcPayloadHeader.*;
 import org.apache.hadoop.io.IOUtils;
@@ -88,8 +89,6 @@ public class Client {
   private SocketFactory socketFactory;           // how to create sockets
   private int refCount = 1;
   
-  final static String PING_INTERVAL_NAME = "ipc.ping.interval";
-  final static int DEFAULT_PING_INTERVAL = 60000; // 1 min
   final static int PING_CALL_ID = -1;
   
   /**
@@ -99,7 +98,7 @@ public class Client {
    * @param pingInterval the ping interval
    */
   final public static void setPingInterval(Configuration conf, int pingInterval) {
-    conf.setInt(PING_INTERVAL_NAME, pingInterval);
+    conf.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, pingInterval);
   }
 
   /**
@@ -110,7 +109,8 @@ public class Client {
    * @return the ping interval
    */
   final static int getPingInterval(Configuration conf) {
-    return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL);
+    return conf.getInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,
+        CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT);
   }
 
   /**
@@ -123,7 +123,7 @@ public class Client {
    * @return the timeout period in milliseconds. -1 if no timeout value is set
    */
   final public static int getTimeout(Configuration conf) {
-    if (!conf.getBoolean("ipc.client.ping", true)) {
+    if (!conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true)) {
       return getPingInterval(conf);
     }
     return -1;
@@ -425,7 +425,7 @@ public class Client {
      */
     private synchronized boolean updateAddress() throws IOException {
       // Do a fresh lookup with the old host name.
-      InetSocketAddress currentAddr =  new InetSocketAddress(
+      InetSocketAddress currentAddr = NetUtils.createSocketAddrForHost(
                                server.getHostName(), server.getPort());
 
       if (!server.equals(currentAddr)) {
@@ -1347,15 +1347,19 @@ public class Client {
         Class<?> protocol, UserGroupInformation ticket, int rpcTimeout,
         Configuration conf) throws IOException {
       String remotePrincipal = getRemotePrincipal(conf, addr, protocol);
-      boolean doPing = conf.getBoolean("ipc.client.ping", true);
+      boolean doPing =
+        conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
       return new ConnectionId(addr, protocol, ticket,
           rpcTimeout, remotePrincipal,
-          conf.getInt("ipc.client.connection.maxidletime", 10000), // 10s
-          conf.getInt("ipc.client.connect.max.retries", 10),
           conf.getInt(
             CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
             CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT),
-          conf.getBoolean("ipc.client.tcpnodelay", false),
+          conf.getInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
+              CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT),
+          conf.getInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
+              CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_DEFAULT),
+          conf.getBoolean(CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY,
+              CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_DEFAULT),
           doPing, 
           (doPing ? Client.getPingInterval(conf) : 0));
     }

+ 17 - 6
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -62,6 +62,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
@@ -378,7 +379,9 @@ public abstract class Server {
                                          //-tion (for idle connections) ran
     private long cleanupInterval = 10000; //the minimum interval between 
                                           //two cleanup runs
-    private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
+    private int backlogLength = conf.getInt(
+        CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
+        CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
     
     public Listener() throws IOException {
       address = new InetSocketAddress(bindAddress, port);
@@ -1712,12 +1715,18 @@ public abstract class Server {
     } else {
       this.readThreads = conf.getInt(
           CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
-          CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);      
+          CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
     }
     this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize); 
-    this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
-    this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
-    this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
+    this.maxIdleTime = 2 * conf.getInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT);
+    this.maxConnectionsToNuke = conf.getInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_KEY,
+        CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_DEFAULT);
+    this.thresholdIdleConnections = conf.getInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_KEY,
+        CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_DEFAULT);
     this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
     this.authorize = 
       conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, 
@@ -1729,7 +1738,9 @@ public abstract class Server {
     this.port = listener.getAddress().getPort();    
     this.rpcMetrics = RpcMetrics.create(this);
     this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port);
-    this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);
+    this.tcpNoDelay = conf.getBoolean(
+        CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,
+        CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_DEFAULT);
 
     // Create the responder here
     responder = new Responder();

+ 2 - 8
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/spi/Util.java

@@ -28,6 +28,7 @@ import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.net.NetUtils;
 
 /**
  * Static utility methods
@@ -56,14 +57,7 @@ public class Util {
     else {
       String[] specStrings = specs.split("[ ,]+");
       for (String specString : specStrings) {
-        int colon = specString.indexOf(':');
-        if (colon < 0 || colon == specString.length() - 1) {
-          result.add(new InetSocketAddress(specString, defaultPort));
-        } else {
-          String hostname = specString.substring(0, colon);
-          int port = Integer.parseInt(specString.substring(colon+1));
-          result.add(new InetSocketAddress(hostname, port));
-        }
+        result.add(NetUtils.createSocketAddr(specString, defaultPort));
       }
     }
     return result;

+ 2 - 8
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/util/Servers.java

@@ -28,6 +28,7 @@ import com.google.common.collect.Lists;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.net.NetUtils;
 
 /**
  * Helpers to handle server addresses
@@ -57,14 +58,7 @@ public class Servers {
     else {
       String[] specStrings = specs.split("[ ,]+");
       for (String specString : specStrings) {
-        int colon = specString.indexOf(':');
-        if (colon < 0 || colon == specString.length() - 1) {
-          result.add(new InetSocketAddress(specString, defaultPort));
-        } else {
-          String hostname = specString.substring(0, colon);
-          int port = Integer.parseInt(specString.substring(colon+1));
-          result.add(new InetSocketAddress(hostname, port));
-        }
+        result.add(NetUtils.createSocketAddr(specString, defaultPort));
       }
     }
     return result;

+ 271 - 31
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java

@@ -37,6 +37,7 @@ import java.nio.channels.SocketChannel;
 import java.util.Map.Entry;
 import java.util.regex.Pattern;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.net.SocketFactory;
 
@@ -45,11 +46,17 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+
+//this will need to be replaced someday when there is a suitable replacement
+import sun.net.dns.ResolverConfiguration;
+import sun.net.util.IPAddressUtil;
+
 @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
 @InterfaceStability.Unstable
 public class NetUtils {
@@ -65,6 +72,26 @@ public class NetUtils {
   /** Base URL of the Hadoop Wiki: {@value} */
   public static final String HADOOP_WIKI = "http://wiki.apache.org/hadoop/";
 
+  private static HostResolver hostResolver;
+  
+  static {
+    // SecurityUtils requires a more secure host resolver if tokens are
+    // using hostnames
+    setUseQualifiedHostResolver(!SecurityUtil.getTokenServiceUseIp());
+  }
+
+  /**
+   * This method is intended for use only by SecurityUtils!
+   * @param flag where the qualified or standard host resolver is used
+   *             to create socket addresses
+   */
+  @InterfaceAudience.Private
+  public static void setUseQualifiedHostResolver(boolean flag) {
+      hostResolver = flag
+          ? new QualifiedHostResolver()
+          : new StandardHostResolver();
+  }
+  
   /**
    * Get the socket factory for the given class according to its
    * configuration parameter
@@ -178,43 +205,256 @@ public class NetUtils {
       throw new IllegalArgumentException("Target address cannot be null." +
           helpText);
     }
-    int colonIndex = target.indexOf(':');
-    if (colonIndex < 0 && defaultPort == -1) {
-      throw new RuntimeException("Not a host:port pair: " + target +
-          helpText);
+    boolean hasScheme = target.contains("://");    
+    URI uri = null;
+    try {
+      uri = hasScheme ? URI.create(target) : URI.create("dummyscheme://"+target);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException(
+          "Does not contain a valid host:port authority: " + target + helpText
+      );
+    }
+
+    String host = uri.getHost();
+    int port = uri.getPort();
+    if (port == -1) {
+      port = defaultPort;
+    }
+    String path = uri.getPath();
+    
+    if ((host == null) || (port < 0) ||
+        (!hasScheme && path != null && !path.isEmpty()))
+    {
+      throw new IllegalArgumentException(
+          "Does not contain a valid host:port authority: " + target + helpText
+      );
+    }
+    return createSocketAddrForHost(host, port);
+  }
+
+  /**
+   * Create a socket address with the given host and port.  The hostname
+   * might be replaced with another host that was set via
+   * {@link #addStaticResolution(String, String)}.  The value of
+   * hadoop.security.token.service.use_ip will determine whether the
+   * standard java host resolver is used, or if the fully qualified resolver
+   * is used.
+   * @param host the hostname or IP use to instantiate the object
+   * @param port the port number
+   * @return InetSocketAddress
+   */
+  public static InetSocketAddress createSocketAddrForHost(String host, int port) {
+    String staticHost = getStaticResolution(host);
+    String resolveHost = (staticHost != null) ? staticHost : host;
+    
+    InetSocketAddress addr;
+    try {
+      InetAddress iaddr = hostResolver.getByName(resolveHost);
+      // if there is a static entry for the host, make the returned
+      // address look like the original given host
+      if (staticHost != null) {
+        iaddr = InetAddress.getByAddress(host, iaddr.getAddress());
+      }
+      addr = new InetSocketAddress(iaddr, port);
+    } catch (UnknownHostException e) {
+      addr = InetSocketAddress.createUnresolved(host, port);
+    }
+    return addr;
+  }
+
+  interface HostResolver {
+    InetAddress getByName(String host) throws UnknownHostException;    
+  }
+  
+  /**
+   * Uses standard java host resolution
+   */
+  static class StandardHostResolver implements HostResolver {
+    public InetAddress getByName(String host) throws UnknownHostException {
+      return InetAddress.getByName(host);
+    }
+  }
+  
+  /**
+   * This an alternate resolver with important properties that the standard
+   * java resolver lacks:
+   * 1) The hostname is fully qualified.  This avoids security issues if not
+   *    all hosts in the cluster do not share the same search domains.  It
+   *    also prevents other hosts from performing unnecessary dns searches.
+   *    In contrast, InetAddress simply returns the host as given.
+   * 2) The InetAddress is instantiated with an exact host and IP to prevent
+   *    further unnecessary lookups.  InetAddress may perform an unnecessary
+   *    reverse lookup for an IP.
+   * 3) A call to getHostName() will always return the qualified hostname, or
+   *    more importantly, the IP if instantiated with an IP.  This avoids
+   *    unnecessary dns timeouts if the host is not resolvable.
+   * 4) Point 3 also ensures that if the host is re-resolved, ex. during a
+   *    connection re-attempt, that a reverse lookup to host and forward
+   *    lookup to IP is not performed since the reverse/forward mappings may
+   *    not always return the same IP.  If the client initiated a connection
+   *    with an IP, then that IP is all that should ever be contacted.
+   *    
+   * NOTE: this resolver is only used if:
+   *       hadoop.security.token.service.use_ip=false 
+   */
+  protected static class QualifiedHostResolver implements HostResolver {
+    @SuppressWarnings("unchecked")
+    private List<String> searchDomains =
+        ResolverConfiguration.open().searchlist();
+    
+    /**
+     * Create an InetAddress with a fully qualified hostname of the given
+     * hostname.  InetAddress does not qualify an incomplete hostname that
+     * is resolved via the domain search list.
+     * {@link InetAddress#getCanonicalHostName()} will fully qualify the
+     * hostname, but it always return the A record whereas the given hostname
+     * may be a CNAME.
+     * 
+     * @param host a hostname or ip address
+     * @return InetAddress with the fully qualified hostname or ip
+     * @throws UnknownHostException if host does not exist
+     */
+    public InetAddress getByName(String host) throws UnknownHostException {
+      InetAddress addr = null;
+
+      if (IPAddressUtil.isIPv4LiteralAddress(host)) {
+        // use ipv4 address as-is
+        byte[] ip = IPAddressUtil.textToNumericFormatV4(host);
+        addr = InetAddress.getByAddress(host, ip);
+      } else if (IPAddressUtil.isIPv6LiteralAddress(host)) {
+        // use ipv6 address as-is
+        byte[] ip = IPAddressUtil.textToNumericFormatV6(host);
+        addr = InetAddress.getByAddress(host, ip);
+      } else if (host.endsWith(".")) {
+        // a rooted host ends with a dot, ex. "host."
+        // rooted hosts never use the search path, so only try an exact lookup
+        addr = getByExactName(host);
+      } else if (host.contains(".")) {
+        // the host contains a dot (domain), ex. "host.domain"
+        // try an exact host lookup, then fallback to search list
+        addr = getByExactName(host);
+        if (addr == null) {
+          addr = getByNameWithSearch(host);
+        }
+      } else {
+        // it's a simple host with no dots, ex. "host"
+        // try the search list, then fallback to exact host
+        InetAddress loopback = InetAddress.getByName(null);
+        if (host.equalsIgnoreCase(loopback.getHostName())) {
+          addr = InetAddress.getByAddress(host, loopback.getAddress());
+        } else {
+          addr = getByNameWithSearch(host);
+          if (addr == null) {
+            addr = getByExactName(host);
+          }
+        }
+      }
+      // unresolvable!
+      if (addr == null) {
+        throw new UnknownHostException(host);
+      }
+      return addr;
     }
-    String hostname;
-    int port = -1;
-    if (!target.contains("/")) {
-      if (colonIndex == -1) {
-        hostname = target;
+
+    InetAddress getByExactName(String host) {
+      InetAddress addr = null;
+      // InetAddress will use the search list unless the host is rooted
+      // with a trailing dot.  The trailing dot will disable any use of the
+      // search path in a lower level resolver.  See RFC 1535.
+      String fqHost = host;
+      if (!fqHost.endsWith(".")) fqHost += ".";
+      try {
+        addr = getInetAddressByName(fqHost);
+        // can't leave the hostname as rooted or other parts of the system
+        // malfunction, ex. kerberos principals are lacking proper host
+        // equivalence for rooted/non-rooted hostnames
+        addr = InetAddress.getByAddress(host, addr.getAddress());
+      } catch (UnknownHostException e) {
+        // ignore, caller will throw if necessary
+      }
+      return addr;
+    }
+
+    InetAddress getByNameWithSearch(String host) {
+      InetAddress addr = null;
+      if (host.endsWith(".")) { // already qualified?
+        addr = getByExactName(host); 
       } else {
-        // must be the old style <host>:<port>
-        hostname = target.substring(0, colonIndex);
-        String portStr = target.substring(colonIndex + 1);
-        try {
-          port = Integer.parseInt(portStr);
-        } catch (NumberFormatException nfe) {
-          throw new IllegalArgumentException(
-              "Can't parse port '" + portStr + "'"
-              + helpText);
+        for (String domain : searchDomains) {
+          String dot = !domain.startsWith(".") ? "." : "";
+          addr = getByExactName(host + dot + domain);
+          if (addr != null) break;
         }
       }
-    } else {
-      // a new uri
-      URI addr = new Path(target).toUri();
-      hostname = addr.getHost();
-      port = addr.getPort();
+      return addr;
     }
 
-    if (port == -1) {
-      port = defaultPort;
+    // implemented as a separate method to facilitate unit testing
+    InetAddress getInetAddressByName(String host) throws UnknownHostException {
+      return InetAddress.getByName(host);
     }
+
+    void setSearchDomains(String ... domains) {
+      searchDomains = Arrays.asList(domains);
+    }
+  }
   
-    if (getStaticResolution(hostname) != null) {
-      hostname = getStaticResolution(hostname);
+  /**
+   * This is for testing only!
+   */
+  @VisibleForTesting
+  static void setHostResolver(HostResolver newResolver) {
+    hostResolver = newResolver;
+  }
+  
+  /**
+   * Resolve the uri's hostname and add the default port if not in the uri
+   * @param uri to resolve
+   * @param defaultPort if none is given
+   * @return URI
+   */
+  public static URI getCanonicalUri(URI uri, int defaultPort) {
+    // skip if there is no authority, ie. "file" scheme or relative uri
+    String host = uri.getHost();
+    if (host == null) {
+      return uri;
+    }
+    String fqHost = canonicalizeHost(host);
+    int port = uri.getPort();
+    // short out if already canonical with a port
+    if (host.equals(fqHost) && port != -1) {
+      return uri;
+    }
+    // reconstruct the uri with the canonical host and port
+    try {
+      uri = new URI(uri.getScheme(), uri.getUserInfo(),
+          fqHost, (port == -1) ? defaultPort : port,
+          uri.getPath(), uri.getQuery(), uri.getFragment());
+    } catch (URISyntaxException e) {
+      throw new IllegalArgumentException(e);
+    }
+    return uri;
+  }  
+
+  // cache the canonicalized hostnames;  the cache currently isn't expired,
+  // but the canonicals will only change if the host's resolver configuration
+  // changes
+  private static final ConcurrentHashMap<String, String> canonicalizedHostCache =
+      new ConcurrentHashMap<String, String>();
+
+  private static String canonicalizeHost(String host) {
+    // check if the host has already been canonicalized
+    String fqHost = canonicalizedHostCache.get(host);
+    if (fqHost == null) {
+      try {
+        fqHost = hostResolver.getByName(host).getHostName();
+        // slight race condition, but won't hurt 
+        canonicalizedHostCache.put(host, fqHost);
+      } catch (UnknownHostException e) {
+        fqHost = host;
+      }
     }
-    return new InetSocketAddress(hostname, port);
+    return fqHost;
   }
 
   /**
@@ -279,8 +519,8 @@ public class NetUtils {
    */
   public static InetSocketAddress getConnectAddress(Server server) {
     InetSocketAddress addr = server.getListenerAddress();
-    if (addr.getAddress().getHostAddress().equals("0.0.0.0")) {
-      addr = new InetSocketAddress("127.0.0.1", addr.getPort());
+    if (addr.getAddress().isAnyLocalAddress()) {
+      addr = createSocketAddrForHost("127.0.0.1", addr.getPort());
     }
     return addr;
   }

+ 80 - 23
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java

@@ -35,6 +35,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
@@ -50,6 +51,35 @@ public class SecurityUtil {
   public static final Log LOG = LogFactory.getLog(SecurityUtil.class);
   public static final String HOSTNAME_PATTERN = "_HOST";
 
+  // controls whether buildTokenService will use an ip or host/ip as given
+  // by the user
+  private static boolean useIpForTokenService;
+  
+  static {
+    boolean useIp = new Configuration().getBoolean(
+      CommonConfigurationKeys.HADOOP_SECURITY_TOKEN_SERVICE_USE_IP,
+      CommonConfigurationKeys.HADOOP_SECURITY_TOKEN_SERVICE_USE_IP_DEFAULT);
+    setTokenServiceUseIp(useIp);
+  }
+  
+  /**
+   * For use only by tests and initialization
+   */
+  @InterfaceAudience.Private
+  static void setTokenServiceUseIp(boolean flag) {
+    useIpForTokenService = flag;
+    NetUtils.setUseQualifiedHostResolver(!flag);
+  }
+  
+  /**
+   * Intended only for temporary use by NetUtils.  Do not use.
+   * @return whether tokens use an IP address
+   */
+  @InterfaceAudience.Private
+  public static boolean getTokenServiceUseIp() {
+    return useIpForTokenService;
+  }
+  
   /**
    * Find the original TGT within the current subject's credentials. Cross-realm
    * TGT's of the form "krbtgt/TWO.COM@ONE.COM" may be present.
@@ -263,29 +293,20 @@ public class SecurityUtil {
   }
 
   /**
-   * create service name for Delegation token ip:port
-   * @param uri
-   * @param defPort
-   * @return "ip:port"
+   * create the service name for a Delegation token
+   * @param uri of the service
+   * @param defPort is used if the uri lacks a port
+   * @return the token service, or null if no authority
+   * @see #buildTokenService(InetSocketAddress)
    */
   public static String buildDTServiceName(URI uri, int defPort) {
-    int port = uri.getPort();
-    if(port == -1) 
-      port = defPort;
-
-    // build the service name string "/ip:port"
-    // for whatever reason using NetUtils.createSocketAddr(target).toString()
-    // returns "localhost/ip:port"
-    StringBuffer sb = new StringBuffer();
-    String host = uri.getHost();
-    if (host != null) {
-      host = NetUtils.normalizeHostName(host);
-    } else {
-      host = "";
+    String authority = uri.getAuthority();
+    if (authority == null) {
+      return null;
     }
-    sb.append(host).append(":").append(port);
-    return sb.toString();
-  }
+    InetSocketAddress addr = NetUtils.createSocketAddr(authority, defPort);
+    return buildTokenService(addr).toString();
+   }
   
   /**
    * Get the host name from the principal name of format <service>/host@realm.
@@ -367,22 +388,58 @@ public class SecurityUtil {
     return null;
   }
 
+  /**
+   * Decode the given token's service field into an InetAddress
+   * @param token from which to obtain the service
+   * @return InetAddress for the service
+   */
+  public static InetSocketAddress getTokenServiceAddr(Token<?> token) {
+    return NetUtils.createSocketAddr(token.getService().toString());
+  }
+
   /**
    * Set the given token's service to the format expected by the RPC client 
    * @param token a delegation token
    * @param addr the socket for the rpc connection
    */
   public static void setTokenService(Token<?> token, InetSocketAddress addr) {
-    token.setService(buildTokenService(addr));
+    Text service = buildTokenService(addr);
+    if (token != null) {
+      token.setService(service);
+      LOG.info("Acquired token "+token);  // Token#toString() prints service
+    } else {
+      LOG.warn("Failed to get token for service "+service);
+    }
   }
   
   /**
    * Construct the service key for a token
    * @param addr InetSocketAddress of remote connection with a token
-   * @return "ip:port"
+   * @return "ip:port" or "host:port" depending on the value of
+   *          hadoop.security.token.service.use_ip
    */
   public static Text buildTokenService(InetSocketAddress addr) {
-    String host = addr.getAddress().getHostAddress();
+    String host = null;
+    if (useIpForTokenService) {
+      if (addr.isUnresolved()) { // host has no ip address
+        throw new IllegalArgumentException(
+            new UnknownHostException(addr.getHostName())
+        );
+      }
+      host = addr.getAddress().getHostAddress();
+    } else {
+      host = addr.getHostName().toLowerCase();
+    }
     return new Text(host + ":" + addr.getPort());
   }
+
+  /**
+   * Construct the service key for a token
+   * @param uri of remote connection with a token
+   * @return "ip:port" or "host:port" depending on the value of
+   *          hadoop.security.token.service.use_ip
+   */
+  public static Text buildTokenService(URI uri) {
+    return buildTokenService(NetUtils.createSocketAddr(uri.getAuthority()));
+  }
 }

+ 365 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemCanonicalization.java

@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.net.URI;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.net.NetUtilsTestResolver;
+import org.apache.hadoop.util.Progressable;
+import org.junit.Test;
+
+public class TestFileSystemCanonicalization extends TestCase {
+  static String[] authorities = {
+    "myfs://host",
+    "myfs://host.a",
+    "myfs://host.a.b",
+  };
+
+  static String[] ips = {
+    "myfs://127.0.0.1"
+  };
+
+
+  @Test
+  public void testSetupResolver() throws Exception {
+    NetUtilsTestResolver.install();
+  }
+
+  // no ports
+
+  @Test
+  public void testShortAuthority() throws Exception {
+    FileSystem fs = getVerifiedFS("myfs://host", "myfs://host.a.b:123");
+    verifyPaths(fs, authorities, -1, true);
+    verifyPaths(fs, authorities, 123, true);
+    verifyPaths(fs, authorities, 456, false);
+    verifyPaths(fs, ips, -1, false);
+    verifyPaths(fs, ips, 123, false);
+    verifyPaths(fs, ips, 456, false);
+  }
+
+  @Test
+  public void testPartialAuthority() throws Exception {
+    FileSystem fs = getVerifiedFS("myfs://host.a", "myfs://host.a.b:123");
+    verifyPaths(fs, authorities, -1, true);
+    verifyPaths(fs, authorities, 123, true);
+    verifyPaths(fs, authorities, 456, false);
+    verifyPaths(fs, ips, -1, false);
+    verifyPaths(fs, ips, 123, false);
+    verifyPaths(fs, ips, 456, false);
+  }
+
+  @Test
+  public void testFullAuthority() throws Exception {
+    FileSystem fs = getVerifiedFS("myfs://host.a.b", "myfs://host.a.b:123");
+    verifyPaths(fs, authorities, -1, true);
+    verifyPaths(fs, authorities, 123, true);
+    verifyPaths(fs, authorities, 456, false);
+    verifyPaths(fs, ips, -1, false);
+    verifyPaths(fs, ips, 123, false);
+    verifyPaths(fs, ips, 456, false);
+  }
+
+  // with default ports
+  
+  @Test
+  public void testShortAuthorityWithDefaultPort() throws Exception {
+    FileSystem fs = getVerifiedFS("myfs://host:123", "myfs://host.a.b:123");
+    verifyPaths(fs, authorities, -1, true);
+    verifyPaths(fs, authorities, 123, true);
+    verifyPaths(fs, authorities, 456, false);
+    verifyPaths(fs, ips, -1, false);
+    verifyPaths(fs, ips, 123, false);
+    verifyPaths(fs, ips, 456, false);
+  }
+
+  @Test
+  public void testPartialAuthorityWithDefaultPort() throws Exception {
+    FileSystem fs = getVerifiedFS("myfs://host.a:123", "myfs://host.a.b:123");
+    verifyPaths(fs, authorities, -1, true);
+    verifyPaths(fs, authorities, 123, true);
+    verifyPaths(fs, authorities, 456, false);
+    verifyPaths(fs, ips, -1, false);
+    verifyPaths(fs, ips, 123, false);
+    verifyPaths(fs, ips, 456, false);
+  }
+
+  @Test
+  public void testFullAuthorityWithDefaultPort() throws Exception {
+    FileSystem fs = getVerifiedFS("myfs://host.a.b:123", "myfs://host.a.b:123");
+    verifyPaths(fs, authorities, -1, true);
+    verifyPaths(fs, authorities, 123, true);
+    verifyPaths(fs, authorities, 456, false);
+    verifyPaths(fs, ips, -1, false);
+    verifyPaths(fs, ips, 123, false);
+    verifyPaths(fs, ips, 456, false);
+  }
+
+  // with non-standard ports
+  
+  @Test
+  public void testShortAuthorityWithOtherPort() throws Exception {
+    FileSystem fs = getVerifiedFS("myfs://host:456", "myfs://host.a.b:456");
+    verifyPaths(fs, authorities, -1, false);
+    verifyPaths(fs, authorities, 123, false);
+    verifyPaths(fs, authorities, 456, true);
+    verifyPaths(fs, ips, -1, false);
+    verifyPaths(fs, ips, 123, false);
+    verifyPaths(fs, ips, 456, false);
+  }
+
+  @Test
+  public void testPartialAuthorityWithOtherPort() throws Exception {
+    FileSystem fs = getVerifiedFS("myfs://host.a:456", "myfs://host.a.b:456");
+    verifyPaths(fs, authorities, -1, false);
+    verifyPaths(fs, authorities, 123, false);
+    verifyPaths(fs, authorities, 456, true);
+    verifyPaths(fs, ips, -1, false);
+    verifyPaths(fs, ips, 123, false);
+    verifyPaths(fs, ips, 456, false);
+  }
+
+  @Test
+  public void testFullAuthorityWithOtherPort() throws Exception {
+    FileSystem fs = getVerifiedFS("myfs://host.a.b:456", "myfs://host.a.b:456");
+    verifyPaths(fs, authorities, -1, false);
+    verifyPaths(fs, authorities, 123, false);
+    verifyPaths(fs, authorities, 456, true);
+    verifyPaths(fs, ips, -1, false);
+    verifyPaths(fs, ips, 123, false);
+    verifyPaths(fs, ips, 456, false);
+  }
+
+  // ips
+  
+  @Test
+  public void testIpAuthority() throws Exception {
+    FileSystem fs = getVerifiedFS("myfs://127.0.0.1", "myfs://127.0.0.1:123");
+    verifyPaths(fs, authorities, -1, false);
+    verifyPaths(fs, authorities, 123, false);
+    verifyPaths(fs, authorities, 456, false);
+    verifyPaths(fs, ips, -1, true);
+    verifyPaths(fs, ips, 123, true);
+    verifyPaths(fs, ips, 456, false);
+  }
+
+  @Test
+  public void testIpAuthorityWithDefaultPort() throws Exception {
+    FileSystem fs = getVerifiedFS("myfs://127.0.0.1:123", "myfs://127.0.0.1:123");
+    verifyPaths(fs, authorities, -1, false);
+    verifyPaths(fs, authorities, 123, false);
+    verifyPaths(fs, authorities, 456, false);
+    verifyPaths(fs, ips, -1, true);
+    verifyPaths(fs, ips, 123, true);
+    verifyPaths(fs, ips, 456, false);
+  }
+
+  @Test
+  public void testIpAuthorityWithOtherPort() throws Exception {
+    FileSystem fs = getVerifiedFS("myfs://127.0.0.1:456", "myfs://127.0.0.1:456");
+    verifyPaths(fs, authorities, -1, false);
+    verifyPaths(fs, authorities, 123, false);
+    verifyPaths(fs, authorities, 456, false);
+    verifyPaths(fs, ips, -1, false);
+    verifyPaths(fs, ips, 123, false);
+    verifyPaths(fs, ips, 456, true);
+  }
+
+  // bad stuff
+
+  @Test
+  public void testMismatchedSchemes() throws Exception {
+    FileSystem fs = getVerifiedFS("myfs2://simple", "myfs2://simple:123");
+    verifyPaths(fs, authorities, -1, false);
+    verifyPaths(fs, authorities, 123, false);
+    verifyPaths(fs, authorities, 456, false);
+    verifyPaths(fs, ips, -1, false);
+    verifyPaths(fs, ips, 123, false);
+    verifyPaths(fs, ips, 456, false);
+  }
+
+  @Test
+  public void testMismatchedHosts() throws Exception {
+    FileSystem fs = getVerifiedFS("myfs://simple", "myfs://simple:123");
+    verifyPaths(fs, authorities, -1, false);
+    verifyPaths(fs, authorities, 123, false);
+    verifyPaths(fs, authorities, 456, false);
+    verifyPaths(fs, ips, -1, false);
+    verifyPaths(fs, ips, 123, false);
+    verifyPaths(fs, ips, 456, false);
+  }
+
+  @Test
+  public void testNullAuthority() throws Exception {
+    FileSystem fs = getVerifiedFS("myfs:///", "myfs:///");
+    verifyPaths(fs, new String[]{ "myfs://" }, -1, true);
+    verifyPaths(fs, authorities, -1, false);
+    verifyPaths(fs, authorities, 123, false);
+    verifyPaths(fs, authorities, 456, false);
+    verifyPaths(fs, ips, -1, false);
+    verifyPaths(fs, ips, 123, false);
+    verifyPaths(fs, ips, 456, false);
+  }
+
+  @Test
+  public void testAuthorityFromDefaultFS() throws Exception {
+    Configuration config = new Configuration();
+    String defaultFsKey = CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
+    
+    FileSystem fs = getVerifiedFS("myfs://host", "myfs://host.a.b:123", config);
+    verifyPaths(fs, new String[]{ "myfs://" }, -1, false);
+
+    config.set(defaultFsKey, "myfs://host");
+    verifyPaths(fs, new String[]{ "myfs://" }, -1, true);
+
+    config.set(defaultFsKey, "myfs2://host");
+    verifyPaths(fs, new String[]{ "myfs://" }, -1, false);
+
+    config.set(defaultFsKey, "myfs://host:123");
+    verifyPaths(fs, new String[]{ "myfs://" }, -1, true);
+
+    config.set(defaultFsKey, "myfs://host:456");
+    verifyPaths(fs, new String[]{ "myfs://" }, -1, false);
+  }
+
+  FileSystem getVerifiedFS(String authority, String canonical) throws Exception {
+    return getVerifiedFS(authority, canonical, new Configuration());
+  }
+
+  // create a fs from the authority, then check its uri against the given uri
+  // and the canonical.  then try to fetch paths using the canonical
+  FileSystem getVerifiedFS(String authority, String canonical, Configuration conf)
+  throws Exception {
+    URI uri = URI.create(authority);
+    URI canonicalUri = URI.create(canonical);
+
+    FileSystem fs = new DummyFileSystem(uri, conf);
+    assertEquals(uri, fs.getUri());
+    assertEquals(canonicalUri, fs.getCanonicalUri());
+    verifyCheckPath(fs, "/file", true);
+    return fs;
+  }  
+  
+  void verifyPaths(FileSystem fs, String[] uris, int port, boolean shouldPass) {
+    for (String uri : uris) {
+      if (port != -1) uri += ":"+port;
+      verifyCheckPath(fs, uri+"/file", shouldPass);
+    }
+  }
+
+  void verifyCheckPath(FileSystem fs, String path, boolean shouldPass) {
+    Path rawPath = new Path(path);
+    Path fqPath = null;
+    Exception e = null;
+    try {
+      fqPath = fs.makeQualified(rawPath);
+    } catch (IllegalArgumentException iae) {
+      e = iae;
+    }
+    if (shouldPass) {
+      assertEquals(null, e);
+      String pathAuthority = rawPath.toUri().getAuthority();
+      if (pathAuthority == null) {
+        pathAuthority = fs.getUri().getAuthority();
+      }
+      assertEquals(pathAuthority, fqPath.toUri().getAuthority());
+    } else {
+      assertNotNull("did not fail", e);
+      assertEquals("Wrong FS: "+rawPath+", expected: "+fs.getUri(),
+          e.getMessage());
+    }
+  }
+    
+  static class DummyFileSystem extends FileSystem {
+    URI uri;
+    static int defaultPort = 123;
+
+    DummyFileSystem(URI uri, Configuration conf) throws IOException {
+      this.uri = uri;
+      setConf(conf);
+    }
+    
+    @Override
+    public URI getUri() {
+      return uri;
+    }
+
+    @Override
+    protected int getDefaultPort() {
+      return defaultPort;
+    }
+    
+    @Override
+    public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+      throw new IOException("not supposed to be here");
+    }
+
+    @Override
+    public FSDataOutputStream create(Path f, FsPermission permission,
+        boolean overwrite, int bufferSize, short replication, long blockSize,
+        Progressable progress) throws IOException {
+      throw new IOException("not supposed to be here");
+    }
+
+    @Override
+    public FSDataOutputStream append(Path f, int bufferSize,
+        Progressable progress) throws IOException {
+      throw new IOException("not supposed to be here");
+    }
+
+    @Override
+    public boolean rename(Path src, Path dst) throws IOException {
+      throw new IOException("not supposed to be here");
+    }
+
+    @Override
+    public boolean delete(Path f, boolean recursive) throws IOException {
+      throw new IOException("not supposed to be here");
+    }
+
+    @Override
+    public FileStatus[] listStatus(Path f) throws IOException {
+      throw new IOException("not supposed to be here");
+    }
+
+    @Override
+    public void setWorkingDirectory(Path new_dir) {
+    }
+
+    @Override
+    public Path getWorkingDirectory() {
+      return new Path("/");
+    }
+
+    @Override
+    public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+      throw new IOException("not supposed to be here");
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path f) throws IOException {
+      throw new IOException("not supposed to be here");
+    }
+  }
+}

+ 2 - 2
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/MiniRPCBenchmark.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.token.Token;
@@ -213,8 +214,7 @@ public class MiniRPCBenchmark {
             token = p.getDelegationToken(new Text(RENEWER));
             currentUgi = UserGroupInformation.createUserForTesting(MINI_USER, 
                 GROUP_NAMES);
-            token.setService(new Text(addr.getAddress().getHostAddress() 
-                + ":" + addr.getPort()));
+            SecurityUtil.setTokenService(token, addr);
             currentUgi.addToken(token);
             return p;
           }

+ 11 - 16
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestSaslRPC.java

@@ -40,6 +40,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.net.NetUtils;
@@ -286,10 +287,7 @@ public class TestSaslRPC {
         .getUserName()));
     Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(tokenId,
         sm);
-    Text host = new Text(addr.getAddress().getHostAddress() + ":"
-        + addr.getPort());
-    token.setService(host);
-    LOG.info("Service IP address for token is " + host);
+    SecurityUtil.setTokenService(token, addr);
     current.addToken(token);
 
     TestSaslProtocol proxy = null;
@@ -311,14 +309,17 @@ public class TestSaslRPC {
   public void testPingInterval() throws Exception {
     Configuration newConf = new Configuration(conf);
     newConf.set(SERVER_PRINCIPAL_KEY, SERVER_PRINCIPAL_1);
-    conf.setInt(Client.PING_INTERVAL_NAME, Client.DEFAULT_PING_INTERVAL);
+    conf.setInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,
+        CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT);
+
     // set doPing to true
-    newConf.setBoolean("ipc.client.ping", true);
+    newConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, true);
     ConnectionId remoteId = ConnectionId.getConnectionId(
         new InetSocketAddress(0), TestSaslProtocol.class, null, 0, newConf);
-    assertEquals(Client.DEFAULT_PING_INTERVAL, remoteId.getPingInterval());
+    assertEquals(CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT,
+        remoteId.getPingInterval());
     // set doPing to false
-    newConf.setBoolean("ipc.client.ping", false);
+    newConf.setBoolean(CommonConfigurationKeys.IPC_CLIENT_PING_KEY, false);
     remoteId = ConnectionId.getConnectionId(
         new InetSocketAddress(0), TestSaslProtocol.class, null, 0, newConf);
     assertEquals(0, remoteId.getPingInterval());
@@ -358,10 +359,7 @@ public class TestSaslRPC {
         .getUserName()));
     Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(tokenId,
         sm);
-    Text host = new Text(addr.getAddress().getHostAddress() + ":"
-        + addr.getPort());
-    token.setService(host);
-    LOG.info("Service IP address for token is " + host);
+    SecurityUtil.setTokenService(token, addr);
     current.addToken(token);
 
     Configuration newConf = new Configuration(conf);
@@ -448,10 +446,7 @@ public class TestSaslRPC {
         .getUserName()));
     Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(tokenId,
         sm);
-    Text host = new Text(addr.getAddress().getHostAddress() + ":"
-        + addr.getPort());
-    token.setService(host);
-    LOG.info("Service IP address for token is " + host);
+    SecurityUtil.setTokenService(token, addr);
     current.addToken(token);
 
     current.doAs(new PrivilegedExceptionAction<Object>() {

+ 74 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/NetUtilsTestResolver.java

@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.net;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.net.NetUtils.QualifiedHostResolver;
+
+/**
+ * provides a dummy dns search resolver with a configurable search path
+ * and host mapping
+ */
+public class NetUtilsTestResolver extends QualifiedHostResolver {
+  Map<String, InetAddress> resolvedHosts = new HashMap<String, InetAddress>();
+  List<String> hostSearches = new LinkedList<String>();
+
+  public static NetUtilsTestResolver install() {
+    NetUtilsTestResolver resolver = new NetUtilsTestResolver();
+    resolver.setSearchDomains("a.b", "b", "c");
+    resolver.addResolvedHost("host.a.b.", "1.1.1.1");
+    resolver.addResolvedHost("b-host.b.", "2.2.2.2");
+    resolver.addResolvedHost("simple.", "3.3.3.3");    
+    NetUtils.setHostResolver(resolver);
+    return resolver;
+  }
+
+  public void addResolvedHost(String host, String ip) {
+    InetAddress addr;
+    try {
+      addr = InetAddress.getByName(ip);
+      addr = InetAddress.getByAddress(host, addr.getAddress());
+    } catch (UnknownHostException e) {
+      throw new IllegalArgumentException("not an ip:"+ip);
+    }
+    resolvedHosts.put(host, addr);
+  }
+
+  InetAddress getInetAddressByName(String host) throws UnknownHostException {
+    hostSearches.add(host);
+    if (!resolvedHosts.containsKey(host)) {
+      throw new UnknownHostException(host);
+    }
+    return resolvedHosts.get(host);
+  }
+
+  String[] getHostSearches() {
+    return hostSearches.toArray(new String[0]);
+  }
+
+  void reset() {
+    hostSearches.clear();
+  }
+}

+ 263 - 8
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java

@@ -17,25 +17,29 @@
  */
 package org.apache.hadoop.net;
 
-import junit.framework.AssertionFailedError;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.junit.Test;
-
 import static org.junit.Assert.*;
 
 import java.io.IOException;
 import java.net.BindException;
+import java.net.ConnectException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.NetworkInterface;
 import java.net.Socket;
-import java.net.ConnectException;
 import java.net.SocketException;
-import java.net.InetSocketAddress;
+import java.net.URI;
 import java.net.UnknownHostException;
 import java.util.Enumeration;
 
+import junit.framework.AssertionFailedError;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
 
 public class TestNetUtils {
 
@@ -248,4 +252,255 @@ public class TestNetUtils {
     }
     return wrapped;
   }
-}
+
+  static NetUtilsTestResolver resolver;
+  static Configuration config;
+  
+  @BeforeClass
+  public static void setupResolver() {
+    resolver = NetUtilsTestResolver.install();
+  }
+  
+  @Before
+  public void resetResolver() {
+    resolver.reset();
+    config = new Configuration();
+  }
+
+  // getByExactName
+  
+  private void verifyGetByExactNameSearch(String host, String ... searches) {
+    assertNull(resolver.getByExactName(host));
+    assertBetterArrayEquals(searches, resolver.getHostSearches());
+  }
+
+  @Test
+  public void testResolverGetByExactNameUnqualified() {
+    verifyGetByExactNameSearch("unknown", "unknown.");
+  }
+
+  @Test
+  public void testResolverGetByExactNameUnqualifiedWithDomain() {
+    verifyGetByExactNameSearch("unknown.domain", "unknown.domain.");
+  }
+
+  @Test
+  public void testResolverGetByExactNameQualified() {
+    verifyGetByExactNameSearch("unknown.", "unknown.");
+  }
+  
+  @Test
+  public void testResolverGetByExactNameQualifiedWithDomain() {
+    verifyGetByExactNameSearch("unknown.domain.", "unknown.domain.");
+  }
+
+  // getByNameWithSearch
+  
+  private void verifyGetByNameWithSearch(String host, String ... searches) {
+    assertNull(resolver.getByNameWithSearch(host));
+    assertBetterArrayEquals(searches, resolver.getHostSearches());
+  }
+
+  @Test
+  public void testResolverGetByNameWithSearchUnqualified() {
+    String host = "unknown";
+    verifyGetByNameWithSearch(host, host+".a.b.", host+".b.", host+".c.");
+  }
+
+  @Test
+  public void testResolverGetByNameWithSearchUnqualifiedWithDomain() {
+    String host = "unknown.domain";
+    verifyGetByNameWithSearch(host, host+".a.b.", host+".b.", host+".c.");
+  }
+
+  @Test
+  public void testResolverGetByNameWithSearchQualified() {
+    String host = "unknown.";
+    verifyGetByNameWithSearch(host, host);
+  }
+
+  @Test
+  public void testResolverGetByNameWithSearchQualifiedWithDomain() {
+    String host = "unknown.domain.";
+    verifyGetByNameWithSearch(host, host);
+  }
+
+  // getByName
+
+  private void verifyGetByName(String host, String ... searches) {
+    InetAddress addr = null;
+    try {
+      addr = resolver.getByName(host);
+    } catch (UnknownHostException e) {} // ignore
+    assertNull(addr);
+    assertBetterArrayEquals(searches, resolver.getHostSearches());
+  }
+  
+  @Test
+  public void testResolverGetByNameQualified() {
+    String host = "unknown.";
+    verifyGetByName(host, host);
+  }
+
+  @Test
+  public void testResolverGetByNameQualifiedWithDomain() {
+    verifyGetByName("unknown.domain.", "unknown.domain.");
+  }
+
+  @Test
+  public void testResolverGetByNameUnqualified() {
+    String host = "unknown";
+    verifyGetByName(host, host+".a.b.", host+".b.", host+".c.", host+".");
+  }
+
+  @Test
+  public void testResolverGetByNameUnqualifiedWithDomain() {
+    String host = "unknown.domain";
+    verifyGetByName(host, host+".", host+".a.b.", host+".b.", host+".c.");
+  }
+  
+  // resolving of hosts
+
+  private InetAddress verifyResolve(String host, String ... searches) {
+    InetAddress addr = null;
+    try {
+      addr = resolver.getByName(host);
+    } catch (UnknownHostException e) {} // ignore
+    assertNotNull(addr);
+    assertBetterArrayEquals(searches, resolver.getHostSearches());
+    return addr;
+  }
+
+  private void
+  verifyInetAddress(InetAddress addr, String host, String ip) {
+    assertNotNull(addr);
+    assertEquals(host, addr.getHostName());
+    assertEquals(ip, addr.getHostAddress());
+  }
+  
+  @Test
+  public void testResolverUnqualified() {
+    String host = "host";
+    InetAddress addr = verifyResolve(host, host+".a.b.");
+    verifyInetAddress(addr, "host.a.b", "1.1.1.1");
+  }
+
+  @Test
+  public void testResolverUnqualifiedWithDomain() {
+    String host = "host.a";
+    InetAddress addr = verifyResolve(host, host+".", host+".a.b.", host+".b.");
+    verifyInetAddress(addr, "host.a.b", "1.1.1.1");
+  }
+
+  @Test
+  public void testResolverUnqualifedFull() {
+    String host = "host.a.b";
+    InetAddress addr = verifyResolve(host, host+".");
+    verifyInetAddress(addr, host, "1.1.1.1");
+  }
+  
+  @Test
+  public void testResolverQualifed() {
+    String host = "host.a.b.";
+    InetAddress addr = verifyResolve(host, host);
+    verifyInetAddress(addr, host, "1.1.1.1");
+  }
+  
+  // localhost
+  
+  @Test
+  public void testResolverLoopback() {
+    String host = "Localhost";
+    InetAddress addr = verifyResolve(host); // no lookup should occur
+    verifyInetAddress(addr, "Localhost", "127.0.0.1");
+  }
+
+  @Test
+  public void testResolverIP() {
+    String host = "1.1.1.1";
+    InetAddress addr = verifyResolve(host); // no lookup should occur for ips
+    verifyInetAddress(addr, host, host);
+  }
+
+  //
+    
+  @Test
+  public void testCanonicalUriWithPort() {
+    URI uri;
+
+    uri = NetUtils.getCanonicalUri(URI.create("scheme://host:123"), 456);
+    assertEquals("scheme://host.a.b:123", uri.toString());
+
+    uri = NetUtils.getCanonicalUri(URI.create("scheme://host:123/"), 456);
+    assertEquals("scheme://host.a.b:123/", uri.toString());
+
+    uri = NetUtils.getCanonicalUri(URI.create("scheme://host:123/path"), 456);
+    assertEquals("scheme://host.a.b:123/path", uri.toString());
+
+    uri = NetUtils.getCanonicalUri(URI.create("scheme://host:123/path?q#frag"), 456);
+    assertEquals("scheme://host.a.b:123/path?q#frag", uri.toString());
+  }
+
+  @Test
+  public void testCanonicalUriWithDefaultPort() {
+    URI uri;
+    
+    uri = NetUtils.getCanonicalUri(URI.create("scheme://host"), 123);
+    assertEquals("scheme://host.a.b:123", uri.toString());
+
+    uri = NetUtils.getCanonicalUri(URI.create("scheme://host/"), 123);
+    assertEquals("scheme://host.a.b:123/", uri.toString());
+
+    uri = NetUtils.getCanonicalUri(URI.create("scheme://host/path"), 123);
+    assertEquals("scheme://host.a.b:123/path", uri.toString());
+
+    uri = NetUtils.getCanonicalUri(URI.create("scheme://host/path?q#frag"), 123);
+    assertEquals("scheme://host.a.b:123/path?q#frag", uri.toString());
+  }
+
+  @Test
+  public void testCanonicalUriWithPath() {
+    URI uri;
+
+    uri = NetUtils.getCanonicalUri(URI.create("path"), 2);
+    assertEquals("path", uri.toString());
+
+    uri = NetUtils.getCanonicalUri(URI.create("/path"), 2);
+    assertEquals("/path", uri.toString());
+  }
+
+  @Test
+  public void testCanonicalUriWithNoAuthority() {
+    URI uri;
+
+    uri = NetUtils.getCanonicalUri(URI.create("scheme:/"), 2);
+    assertEquals("scheme:/", uri.toString());
+
+    uri = NetUtils.getCanonicalUri(URI.create("scheme:/path"), 2);
+    assertEquals("scheme:/path", uri.toString());
+
+    uri = NetUtils.getCanonicalUri(URI.create("scheme:///"), 2);
+    assertEquals("scheme:///", uri.toString());
+
+    uri = NetUtils.getCanonicalUri(URI.create("scheme:///path"), 2);
+    assertEquals("scheme:///path", uri.toString());
+  }
+
+  @Test
+  public void testCanonicalUriWithNoHost() {
+    URI uri = NetUtils.getCanonicalUri(URI.create("scheme://:123/path"), 2);
+    assertEquals("scheme://:123/path", uri.toString());
+  }
+
+  @Test
+  public void testCanonicalUriWithNoPortNoDefaultPort() {
+    URI uri = NetUtils.getCanonicalUri(URI.create("scheme://host/path"), -1);
+    assertEquals("scheme://host.a.b/path", uri.toString());
+  }
+  
+  private <T> void assertBetterArrayEquals(T[] expect, T[]got) {
+    String expectStr = StringUtils.join(expect, ", ");
+    String gotStr = StringUtils.join(got, ", ");
+    assertEquals(expectStr, gotStr);
+  }
+}

+ 2 - 6
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestDoAsEffectiveUser.java

@@ -418,9 +418,7 @@ public class TestDoAsEffectiveUser {
         .getUserName()), new Text("SomeSuperUser"));
     Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(tokenId,
         sm);
-    Text host = new Text(addr.getAddress().getHostAddress() + ":"
-        + addr.getPort());
-    token.setService(host);
+    SecurityUtil.setTokenService(token, addr);
     UserGroupInformation proxyUserUgi = UserGroupInformation
         .createProxyUserForTesting(PROXY_USER_NAME, current, GROUP_NAMES);
     proxyUserUgi.addToken(token);
@@ -476,9 +474,7 @@ public class TestDoAsEffectiveUser {
         .getUserName()), new Text("SomeSuperUser"));
     Token<TestTokenIdentifier> token = new Token<TestTokenIdentifier>(tokenId,
         sm);
-    Text host = new Text(addr.getAddress().getHostAddress() + ":"
-        + addr.getPort());
-    token.setService(host);
+    SecurityUtil.setTokenService(token, addr);
     current.addToken(token);
     String retVal = current.doAs(new PrivilegedExceptionAction<String>() {
       @Override

+ 215 - 3
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestSecurityUtil.java

@@ -16,16 +16,19 @@
  */
 package org.apache.hadoop.security;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
 
 import javax.security.auth.kerberos.KerberosPrincipal;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -121,4 +124,213 @@ public class TestSecurityUtil {
     assertEquals(null,
         SecurityUtil.getHostFromPrincipal("service@realm"));
   }
+
+  @Test
+  public void testBuildDTServiceName() {
+    assertEquals("127.0.0.1:123",
+        SecurityUtil.buildDTServiceName(URI.create("test://LocalHost"), 123)
+    );
+    assertEquals("127.0.0.1:123",
+        SecurityUtil.buildDTServiceName(URI.create("test://LocalHost:123"), 456)
+    );
+    assertEquals("127.0.0.1:123",
+        SecurityUtil.buildDTServiceName(URI.create("test://127.0.0.1"), 123)
+    );
+    assertEquals("127.0.0.1:123",
+        SecurityUtil.buildDTServiceName(URI.create("test://127.0.0.1:123"), 456)
+    );
+  }
+  
+  @Test
+  public void testBuildTokenServiceSockAddr() {
+    assertEquals("127.0.0.1:123",
+        SecurityUtil.buildTokenService(new InetSocketAddress("LocalHost", 123)).toString()
+    );
+    assertEquals("127.0.0.1:123",
+        SecurityUtil.buildTokenService(new InetSocketAddress("127.0.0.1", 123)).toString()
+    );
+    // what goes in, comes out
+    assertEquals("127.0.0.1:123",
+        SecurityUtil.buildTokenService(NetUtils.createSocketAddr("127.0.0.1", 123)).toString()
+    );
+  }
+
+  @Test
+  public void testGoodHostsAndPorts() {
+    InetSocketAddress compare = NetUtils.createSocketAddrForHost("localhost", 123);
+    runGoodCases(compare, "localhost", 123);
+    runGoodCases(compare, "localhost:", 123);
+    runGoodCases(compare, "localhost:123", 456);
+  }
+  
+  void runGoodCases(InetSocketAddress addr, String host, int port) {
+    assertEquals(addr, NetUtils.createSocketAddr(host, port));
+    assertEquals(addr, NetUtils.createSocketAddr("hdfs://"+host, port));
+    assertEquals(addr, NetUtils.createSocketAddr("hdfs://"+host+"/path", port));
+  }
+  
+  @Test
+  public void testBadHostsAndPorts() {
+    runBadCases("", true);
+    runBadCases(":", false);
+    runBadCases("hdfs/", false);
+    runBadCases("hdfs:/", false);
+    runBadCases("hdfs://", true);
+  }
+  
+  void runBadCases(String prefix, boolean validIfPosPort) {
+    runBadPortPermutes(prefix, false);
+    runBadPortPermutes(prefix+"*", false);
+    runBadPortPermutes(prefix+"localhost", validIfPosPort);
+    runBadPortPermutes(prefix+"localhost:-1", false);
+    runBadPortPermutes(prefix+"localhost:-123", false);
+    runBadPortPermutes(prefix+"localhost:xyz", false);
+    runBadPortPermutes(prefix+"localhost/xyz", validIfPosPort);
+    runBadPortPermutes(prefix+"localhost/:123", validIfPosPort);
+    runBadPortPermutes(prefix+":123", false);
+    runBadPortPermutes(prefix+":xyz", false);
+  }
+
+  void runBadPortPermutes(String arg, boolean validIfPosPort) {
+    int ports[] = { -123, -1, 123 };
+    boolean bad = false;
+    try {
+      NetUtils.createSocketAddr(arg);
+    } catch (IllegalArgumentException e) {
+      bad = true;
+    } finally {
+      assertTrue("should be bad: '"+arg+"'", bad);
+    }
+    for (int port : ports) {
+      if (validIfPosPort && port > 0) continue;
+      
+      bad = false;
+      try {
+        NetUtils.createSocketAddr(arg, port);
+      } catch (IllegalArgumentException e) {
+        bad = true;
+      } finally {
+        assertTrue("should be bad: '"+arg+"' (default port:"+port+")", bad);
+      }
+    }
+  }
+
+  // check that the socket addr has:
+  // 1) the InetSocketAddress has the correct hostname, ie. exact host/ip given
+  // 2) the address is resolved, ie. has an ip
+  // 3,4) the socket's InetAddress has the same hostname, and the correct ip
+  // 5) the port is correct
+  private void
+  verifyValues(InetSocketAddress addr, String host, String ip, int port) {
+    assertTrue(!addr.isUnresolved());
+    // don't know what the standard resolver will return for hostname.
+    // should be host for host; host or ip for ip is ambiguous
+    if (!SecurityUtil.getTokenServiceUseIp()) {
+      assertEquals(host, addr.getHostName());
+      assertEquals(host, addr.getAddress().getHostName());
+    }
+    assertEquals(ip, addr.getAddress().getHostAddress());
+    assertEquals(port, addr.getPort());    
+  }
+
+  // check:
+  // 1) buildTokenService honors use_ip setting
+  // 2) setTokenService & getService works
+  // 3) getTokenServiceAddr decodes to the identical socket addr
+  private void
+  verifyTokenService(InetSocketAddress addr, String host, String ip, int port, boolean useIp) {
+    //LOG.info("address:"+addr+" host:"+host+" ip:"+ip+" port:"+port);
+
+    SecurityUtil.setTokenServiceUseIp(useIp);
+    String serviceHost = useIp ? ip : host.toLowerCase();
+    
+    Token token = new Token();
+    Text service = new Text(serviceHost+":"+port);
+    
+    assertEquals(service, SecurityUtil.buildTokenService(addr));
+    SecurityUtil.setTokenService(token, addr);
+    assertEquals(service, token.getService());
+    
+    InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
+    assertNotNull(serviceAddr);
+    verifyValues(serviceAddr, serviceHost, ip, port);
+  }
+
+  // check:
+  // 1) socket addr is created with fields set as expected
+  // 2) token service with ips
+  // 3) token service with the given host or ip
+  private void
+  verifyAddress(InetSocketAddress addr, String host, String ip, int port) {
+    verifyValues(addr, host, ip, port);
+    //LOG.info("test that token service uses ip");
+    verifyTokenService(addr, host, ip, port, true);    
+    //LOG.info("test that token service uses host");
+    verifyTokenService(addr, host, ip, port, false);
+  }
+
+  // check:
+  // 1-4) combinations of host and port
+  // this will construct a socket addr, verify all the fields, build the
+  // service to verify the use_ip setting is honored, set the token service
+  // based on addr and verify the token service is set correctly, decode
+  // the token service and ensure all the fields of the decoded addr match
+  private void verifyServiceAddr(String host, String ip) {
+    InetSocketAddress addr;
+    int port = 123;
+
+    // test host, port tuple
+    //LOG.info("test tuple ("+host+","+port+")");
+    addr = NetUtils.createSocketAddrForHost(host, port);
+    verifyAddress(addr, host, ip, port);
+
+    // test authority with no default port
+    //LOG.info("test authority '"+host+":"+port+"'");
+    addr = NetUtils.createSocketAddr(host+":"+port);
+    verifyAddress(addr, host, ip, port);
+
+    // test authority with a default port, make sure default isn't used
+    //LOG.info("test authority '"+host+":"+port+"' with ignored default port");
+    addr = NetUtils.createSocketAddr(host+":"+port, port+1);
+    verifyAddress(addr, host, ip, port);
+
+    // test host-only authority, using port as default port
+    //LOG.info("test host:"+host+" port:"+port);
+    addr = NetUtils.createSocketAddr(host, port);
+    verifyAddress(addr, host, ip, port);
+  }
+
+  @Test
+  public void testSocketAddrWithName() {
+    String staticHost = "my";
+    NetUtils.addStaticResolution(staticHost, "localhost");
+    verifyServiceAddr("LocalHost", "127.0.0.1");
+  }
+
+  @Test
+  public void testSocketAddrWithIP() {
+    verifyServiceAddr("127.0.0.1", "127.0.0.1");
+  }
+
+  @Test
+  public void testSocketAddrWithNameToStaticName() {
+    String staticHost = "host1";
+    NetUtils.addStaticResolution(staticHost, "localhost");
+    verifyServiceAddr(staticHost, "127.0.0.1");
+  }
+
+  @Test
+  public void testSocketAddrWithNameToStaticIP() {
+    String staticHost = "host3";
+    NetUtils.addStaticResolution(staticHost, "255.255.255.255");
+    verifyServiceAddr(staticHost, "255.255.255.255");
+  }
+
+  // this is a bizarre case, but it's if a test tries to remap an ip address
+  @Test
+  public void testSocketAddrWithIPToStaticIP() {
+    String staticHost = "1.2.3.4";
+    NetUtils.addStaticResolution(staticHost, "255.255.255.255");
+    verifyServiceAddr(staticHost, "255.255.255.255");
+  }
 }

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -258,10 +258,13 @@ Release 0.23.1 - UNRELEASED
     HDFS-2335. DataNodeCluster and NNStorage always pull fresh entropy.
     (Uma Maheswara Rao G via eli)
 
-    HDFS-2574. Remove references to some deprecated properties in conf templates and defaults files. (Joe Crobak via harsh)
+    HDFS-2574. Remove references to some deprecated properties in conf 
+    templates and defaults files. (Joe Crobak via harsh)
 
     HDFS-2722. HttpFs should not be using an int for block size. (harsh)
 
+    HDFS-2710. Add HDFS tests related to HADOOP-7933. (sid via suresh)
+
   OPTIMIZATIONS
 
     HDFS-2130. Switch default checksum to CRC32C. (todd)

+ 48 - 18
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java

@@ -20,44 +20,57 @@ package org.apache.hadoop.fs.viewfs;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
-import java.util.List;
 
 import javax.security.auth.login.LoginException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-
 import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Test;
 
 
 public class TestViewFileSystemHdfs extends ViewFileSystemBaseTest {
 
   private static MiniDFSCluster cluster;
   private static Path defaultWorkingDirectory;
+  private static Path defaultWorkingDirectory2;
   private static Configuration CONF = new Configuration();
   private static FileSystem fHdfs;
+  private static FileSystem fHdfs2;
+  private FileSystem fsTarget2;
+  Path targetTestRoot2;
   
   @BeforeClass
   public static void clusterSetupAtBegining() throws IOException,
       LoginException, URISyntaxException {
     SupportsBlocks = true;
-    cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(2).build();
+    cluster =
+        new MiniDFSCluster.Builder(CONF).nnTopology(
+                MiniDFSNNTopology.simpleFederatedTopology(2))
+            .numDataNodes(2)
+            .build();
     cluster.waitClusterUp();
-    NameNodeAdapter.getDtSecretManager(cluster.getNamesystem()).startThreads();
-    fHdfs = cluster.getFileSystem();
+    NameNodeAdapter.getDtSecretManager(cluster.getNamesystem(0)).startThreads();
+    NameNodeAdapter.getDtSecretManager(cluster.getNamesystem(1)).startThreads();
+    
+    fHdfs = cluster.getFileSystem(0);
+    fHdfs2 = cluster.getFileSystem(1);
+    
     defaultWorkingDirectory = fHdfs.makeQualified( new Path("/user/" + 
         UserGroupInformation.getCurrentUser().getShortUserName()));
+    defaultWorkingDirectory2 = fHdfs2.makeQualified( new Path("/user/" + 
+        UserGroupInformation.getCurrentUser().getShortUserName()));
+    
     fHdfs.mkdirs(defaultWorkingDirectory);
+    fHdfs2.mkdirs(defaultWorkingDirectory2);
   }
 
       
@@ -70,25 +83,42 @@ public class TestViewFileSystemHdfs extends ViewFileSystemBaseTest {
   public void setUp() throws Exception {
     // create the test root on local_fs
     fsTarget = fHdfs;
+    fsTarget2 = fHdfs2;
+    targetTestRoot2 = FileSystemTestHelper.getAbsoluteTestRootPath(fsTarget2);
     super.setUp();
-    
   }
 
   @After
   public void tearDown() throws Exception {
     super.tearDown();
   }
+
+  @Override
+  void setupMountPoints() {
+    super.setupMountPoints();
+    ConfigUtil.addLink(conf, "/mountOnNn2", new Path(targetTestRoot2,
+        "mountOnNn2").toUri());
+  }
+
+  // Overriden test helper methods - changed values based on hdfs and the
+  // additional mount.
+  @Override
+  int getExpectedDirPaths() {
+    return 7;
+  }
   
-  /*
-   * This overides the default implementation since hdfs does have delegation
-   * tokens.
-   */
   @Override
-  @Test
-  public void testGetDelegationTokens() throws IOException {
-    List<Token<?>> delTokens = 
-        fsView.getDelegationTokens("sanjay");
-    Assert.assertEquals(7, delTokens.size()); 
+  int getExpectedMountPoints() {
+    return 8;
+  }
+
+  @Override
+  int getExpectedDelegationTokenCount() {
+    return 8;
   }
 
+  @Override
+  int getExpectedDelegationTokenCountWithCredentials() {
+    return 2;
+  }
 }

+ 19 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -180,6 +180,8 @@ Release 0.23.1 - Unreleased
 
     MAPREDUCE-3610. Remove use of the 'dfs.block.size' config for default block size fetching. Use FS#getDefaultBlocksize instead. (Sho Shimauchi via harsh)
 
+    MAPREDUCE-3478. Cannot build against ZooKeeper 3.4.0. (Tom White via mahadev)
+
   OPTIMIZATIONS
 
     MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar
@@ -191,6 +193,9 @@ Release 0.23.1 - Unreleased
 
     MAPREDUCE-3568. Optimized Job's progress calculations in MR AM. (vinodkv)
 
+    MAPREDUCE-3569. TaskAttemptListener holds a global lock for all
+    task-updates. (Vinod Kumar Vavilapalli via sseth)
+
   BUG FIXES
 
     MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob
@@ -395,6 +400,20 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-1744. DistributedCache creates its own FileSytem instance when 
     adding a file/archive to the path. (Dick King via tucu)
 
+    MAPREDUCE-3529. TokenCache does not cache viewfs credentials correctly
+    (sseth)
+
+    MAPREDUCE-3595. Add missing TestCounters#testCounterValue test from branch
+    1 to 0.23 (Tom White via sseth)
+
+    MAPREDUCE-3566. Fixed MR AM to construct CLC only once across all tasks.
+    (vinodkv via acmurthy) 
+
+    MAPREDUCE-3572. Moved AM event dispatcher to a separate thread for
+    performance reasons. (vinodkv via acmurthy) 
+
+    MAPREDUCE-3615. Fix some ant test failures. (Thomas Graves via sseth)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

+ 3 - 22
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.util.Apps;
 
+@SuppressWarnings("deprecation")
 public class MapReduceChildJVM {
 
   private static String getTaskLogFile(LogName filter) {
@@ -46,7 +47,7 @@ public class MapReduceChildJVM {
           jobConf.get(JobConf.MAPRED_TASK_ENV));
     }
     return jobConf.get(JobConf.MAPRED_REDUCE_TASK_ENV,
-        jobConf.get(jobConf.MAPRED_TASK_ENV));
+        jobConf.get(JobConf.MAPRED_TASK_ENV));
   }
 
   private static String getChildLogLevel(JobConf conf, boolean isMap) {
@@ -68,29 +69,9 @@ public class MapReduceChildJVM {
 
     JobConf conf = task.conf;
 
-    // Shell
-    environment.put(
-        Environment.SHELL.name(), 
-        conf.get(
-            MRJobConfig.MAPRED_ADMIN_USER_SHELL, 
-            MRJobConfig.DEFAULT_SHELL)
-            );
-    
-    // Add pwd to LD_LIBRARY_PATH, add this before adding anything else
-    Apps.addToEnvironment(
-        environment, 
-        Environment.LD_LIBRARY_PATH.name(), 
-        Environment.PWD.$());
-
-    // Add the env variables passed by the user & admin
+    // Add the env variables passed by the user
     String mapredChildEnv = getChildEnv(conf, task.isMapTask());
     Apps.setEnvFromInputString(environment, mapredChildEnv);
-    Apps.setEnvFromInputString(
-        environment, 
-        conf.get(
-            MRJobConfig.MAPRED_ADMIN_USER_ENV, 
-            MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV)
-        );
 
     // Set logging level in the environment.
     // This is so that, if the child forks another "bin/hadoop" (common in

+ 50 - 50
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java

@@ -19,14 +19,12 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -64,21 +62,22 @@ import org.apache.hadoop.yarn.service.CompositeService;
  * This class HAS to be in this package to access package private 
  * methods/classes.
  */
+@SuppressWarnings({"unchecked" , "deprecation"})
 public class TaskAttemptListenerImpl extends CompositeService 
     implements TaskUmbilicalProtocol, TaskAttemptListener {
 
+  private static final JvmTask TASK_FOR_INVALID_JVM = new JvmTask(null, true);
+
   private static final Log LOG = LogFactory.getLog(TaskAttemptListenerImpl.class);
 
   private AppContext context;
   private Server server;
   protected TaskHeartbeatHandler taskHeartbeatHandler;
   private InetSocketAddress address;
-  private Map<WrappedJvmID, org.apache.hadoop.mapred.Task> jvmIDToActiveAttemptMap = 
-    Collections.synchronizedMap(new HashMap<WrappedJvmID, 
-        org.apache.hadoop.mapred.Task>());
+  private ConcurrentMap<WrappedJvmID, org.apache.hadoop.mapred.Task>
+    jvmIDToActiveAttemptMap
+      = new ConcurrentHashMap<WrappedJvmID, org.apache.hadoop.mapred.Task>();
   private JobTokenSecretManager jobTokenSecretManager = null;
-  private Set<WrappedJvmID> pendingJvms =
-    Collections.synchronizedSet(new HashSet<WrappedJvmID>());
   
   public TaskAttemptListenerImpl(AppContext context,
       JobTokenSecretManager jobTokenSecretManager) {
@@ -123,10 +122,9 @@ public class TaskAttemptListenerImpl extends CompositeService
 
       server.start();
       InetSocketAddress listenerAddress = server.getListenerAddress();
-      this.address =
-          NetUtils.createSocketAddr(listenerAddress.getAddress()
-              .getLocalHost().getCanonicalHostName()
-              + ":" + listenerAddress.getPort());
+      listenerAddress.getAddress();
+      this.address = NetUtils.createSocketAddr(InetAddress.getLocalHost()
+        .getCanonicalHostName() + ":" + listenerAddress.getPort());
     } catch (IOException e) {
       throw new YarnException(e);
     }
@@ -408,57 +406,59 @@ public class TaskAttemptListenerImpl extends CompositeService
 
     WrappedJvmID wJvmID = new WrappedJvmID(jvmId.getJobId(), jvmId.isMap,
         jvmId.getId());
-    synchronized(this) {
-      if(pendingJvms.contains(wJvmID)) {
-        org.apache.hadoop.mapred.Task task = jvmIDToActiveAttemptMap.get(wJvmID);
-        if (task != null) { //there may be lag in the attempt getting added here
-         LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
-          jvmTask = new JvmTask(task, false);
-
-          //remove the task as it is no more needed and free up the memory
-          //Also we have already told the JVM to process a task, so it is no
-          //longer pending, and further request should ask it to exit.
-          pendingJvms.remove(wJvmID);
-          jvmIDToActiveAttemptMap.remove(wJvmID);
-        }
-      } else {
-        LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed.");
-        jvmTask = new JvmTask(null, true);
-      }
+
+    // Try to look up the task. We remove it directly as we don't give
+    // multiple tasks to a JVM
+    org.apache.hadoop.mapred.Task task = jvmIDToActiveAttemptMap
+        .remove(wJvmID);
+    if (task != null) {
+      LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
+      jvmTask = new JvmTask(task, false);
+
+      // remove the task as it is no more needed and free up the memory
+      // Also we have already told the JVM to process a task, so it is no
+      // longer pending, and further request should ask it to exit.
+    } else {
+      LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed.");
+      jvmTask = TASK_FOR_INVALID_JVM;
     }
     return jvmTask;
   }
   
   @Override
-  public synchronized void registerPendingTask(WrappedJvmID jvmID) {
-    //Save this JVM away as one that has not been handled yet
-    pendingJvms.add(jvmID);
+  public void registerPendingTask(
+      org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {
+    // Create the mapping so that it is easy to look up
+    // when the jvm comes back to ask for Task.
+
+    // A JVM not present in this map is an illegal task/JVM.
+    jvmIDToActiveAttemptMap.put(jvmID, task);
   }
 
   @Override
   public void registerLaunchedTask(
-      org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
-      org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {
-    synchronized(this) {
-      //create the mapping so that it is easy to look up
-      //when it comes back to ask for Task.
-      jvmIDToActiveAttemptMap.put(jvmID, task);
-      //This should not need to happen here, but just to be on the safe side
-      if(!pendingJvms.add(jvmID)) {
-        LOG.warn(jvmID+" launched without first being registered");
-      }
-    }
-    //register this attempt
+      org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID) {
+
+    // The task is launched. Register this for expiry-tracking.
+
+    // Timing can cause this to happen after the real JVM launches and gets a
+    // task which is still fine as we will only be tracking for expiry a little
+    // late than usual.
     taskHeartbeatHandler.register(attemptID);
   }
 
   @Override
-  public void unregister(org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
+  public void unregister(
+      org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
       WrappedJvmID jvmID) {
-    //remove the mapping if not already removed
+
+    // Unregistration also comes from the same TaskAttempt which does the
+    // registration. Events are ordered at TaskAttempt, so unregistration will
+    // always come after registration.
+
+    // remove the mapping if not already removed
     jvmIDToActiveAttemptMap.remove(jvmID);
-    //remove the pending if not already removed
-    pendingJvms.remove(jvmID);
+
     //unregister this attempt
     taskHeartbeatHandler.unregister(attemptID);
   }

+ 9 - 8
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java

@@ -32,20 +32,21 @@ public interface TaskAttemptListener {
   InetSocketAddress getAddress();
 
   /**
-   * register a JVM with the listener.  This should be called as soon as a 
+   * Register a JVM with the listener.  This should be called as soon as a 
    * JVM ID is assigned to a task attempt, before it has been launched.
+   * @param task the task itself for this JVM.
    * @param jvmID The ID of the JVM .
    */
-  void registerPendingTask(WrappedJvmID jvmID);
+  void registerPendingTask(Task task, WrappedJvmID jvmID);
   
   /**
-   * Register the task and task attempt with the JVM.  This should be called
-   * when the JVM has been launched.
-   * @param attemptID the id of the attempt for this JVM.
-   * @param task the task itself for this JVM.
-   * @param jvmID the id of the JVM handling the task.
+   * Register task attempt. This should be called when the JVM has been
+   * launched.
+   * 
+   * @param attemptID
+   *          the id of the attempt for this JVM.
    */
-  void registerLaunchedTask(TaskAttemptId attemptID, Task task, WrappedJvmID jvmID);
+  void registerLaunchedTask(TaskAttemptId attemptID);
 
   /**
    * Unregister the JVM and the attempt associated with it.  This should be 

+ 117 - 55
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -27,6 +27,7 @@ import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -109,6 +110,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -154,6 +156,8 @@ public abstract class TaskAttemptImpl implements
   private Token<JobTokenIdentifier> jobToken;
   private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
   private static String initialClasspath = null;
+  private static Object commonContainerSpecLock = new Object();
+  private static ContainerLaunchContext commonContainerSpec = null;
   private static final Object classpathLock = new Object();
   private long launchTime;
   private long finishTime;
@@ -497,29 +501,27 @@ public abstract class TaskAttemptImpl implements
 
   /**
    * Create a {@link LocalResource} record with all the given parameters.
-   * TODO: This should pave way for Builder pattern.
    */
-  private static LocalResource createLocalResource(FileSystem fc,
-      RecordFactory recordFactory, Path file, LocalResourceType type,
-      LocalResourceVisibility visibility) throws IOException {
+  private static LocalResource createLocalResource(FileSystem fc, Path file,
+      LocalResourceType type, LocalResourceVisibility visibility)
+      throws IOException {
     FileStatus fstat = fc.getFileStatus(file);
-    LocalResource resource =
-        recordFactory.newRecordInstance(LocalResource.class);
-    resource.setResource(ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat
-        .getPath())));
-    resource.setType(type);
-    resource.setVisibility(visibility);
-    resource.setSize(fstat.getLen());
-    resource.setTimestamp(fstat.getModificationTime());
-    return resource;
+    URL resourceURL = ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat
+        .getPath()));
+    long resourceSize = fstat.getLen();
+    long resourceModificationTime = fstat.getModificationTime();
+
+    return BuilderUtils.newLocalResource(resourceURL, type, visibility,
+        resourceSize, resourceModificationTime);
   }
 
   /**
    * Lock this on initialClasspath so that there is only one fork in the AM for
-   * getting the initial class-path. TODO: This should go away once we construct
-   * a parent CLC and use it for all the containers.
+   * getting the initial class-path. TODO: We already construct
+   * a parent CLC and use it for all the containers, so this should go away
+   * once the mr-generated-classpath stuff is gone.
    */
-  private String getInitialClasspath() throws IOException {
+  private static String getInitialClasspath() throws IOException {
     synchronized (classpathLock) {
       if (initialClasspathFlag.get()) {
         return initialClasspath;
@@ -534,11 +536,14 @@ public abstract class TaskAttemptImpl implements
 
 
   /**
-   * Create the {@link ContainerLaunchContext} for this attempt.
+   * Create the common {@link ContainerLaunchContext} for all attempts.
    * @param applicationACLs 
    */
-  private ContainerLaunchContext createContainerLaunchContext(
-      Map<ApplicationAccessType, String> applicationACLs) {
+  private static ContainerLaunchContext createCommonContainerLaunchContext(
+      Map<ApplicationAccessType, String> applicationACLs, Configuration conf,
+      Token<JobTokenIdentifier> jobToken,
+      final org.apache.hadoop.mapred.JobID oldJobId,
+      Collection<Token<? extends TokenIdentifier>> fsTokens) {
 
     // Application resources
     Map<String, LocalResource> localResources = 
@@ -556,13 +561,13 @@ public abstract class TaskAttemptImpl implements
       FileSystem remoteFS = FileSystem.get(conf);
 
       // //////////// Set up JobJar to be localized properly on the remote NM.
-      if (conf.get(MRJobConfig.JAR) != null) {
-        Path remoteJobJar = (new Path(remoteTask.getConf().get(
-              MRJobConfig.JAR))).makeQualified(remoteFS.getUri(), 
-                                               remoteFS.getWorkingDirectory());
+      String jobJar = conf.get(MRJobConfig.JAR);
+      if (jobJar != null) {
+        Path remoteJobJar = (new Path(jobJar)).makeQualified(remoteFS
+            .getUri(), remoteFS.getWorkingDirectory());
         localResources.put(
             MRJobConfig.JOB_JAR,
-            createLocalResource(remoteFS, recordFactory, remoteJobJar,
+            createLocalResource(remoteFS, remoteJobJar,
                 LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
         LOG.info("The job-jar file on the remote FS is "
             + remoteJobJar.toUri().toASCIIString());
@@ -584,7 +589,7 @@ public abstract class TaskAttemptImpl implements
           new Path(remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
       localResources.put(
           MRJobConfig.JOB_CONF_FILE,
-          createLocalResource(remoteFS, recordFactory, remoteJobConfPath,
+          createLocalResource(remoteFS, remoteJobConfPath,
               LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
       LOG.info("The job-conf file on the remote FS is "
           + remoteJobConfPath.toUri().toASCIIString());
@@ -630,19 +635,81 @@ public abstract class TaskAttemptImpl implements
       throw new YarnException(e);
     }
 
-    // Setup environment
-    MapReduceChildJVM.setVMEnv(environment, remoteTask);
+    // Shell
+    environment.put(
+        Environment.SHELL.name(), 
+        conf.get(
+            MRJobConfig.MAPRED_ADMIN_USER_SHELL, 
+            MRJobConfig.DEFAULT_SHELL)
+            );
+
+    // Add pwd to LD_LIBRARY_PATH, add this before adding anything else
+    Apps.addToEnvironment(
+        environment, 
+        Environment.LD_LIBRARY_PATH.name(), 
+        Environment.PWD.$());
+
+    // Add the env variables passed by the admin
+    Apps.setEnvFromInputString(
+        environment, 
+        conf.get(
+            MRJobConfig.MAPRED_ADMIN_USER_ENV, 
+            MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV)
+        );
+
+    // Construct the actual Container
+    // The null fields are per-container and will be constructed for each
+    // container separately.
+    ContainerLaunchContext container = BuilderUtils
+        .newContainerLaunchContext(null, conf
+            .get(MRJobConfig.USER_NAME), null, localResources,
+            environment, null, serviceData, tokens, applicationACLs);
+
+    return container;
+  }
+
+  static ContainerLaunchContext createContainerLaunchContext(
+      Map<ApplicationAccessType, String> applicationACLs,
+      ContainerId containerID, Configuration conf,
+      Token<JobTokenIdentifier> jobToken, Task remoteTask,
+      final org.apache.hadoop.mapred.JobID oldJobId,
+      Resource assignedCapability, WrappedJvmID jvmID,
+      TaskAttemptListener taskAttemptListener,
+      Collection<Token<? extends TokenIdentifier>> fsTokens) {
+
+    synchronized (commonContainerSpecLock) {
+      if (commonContainerSpec == null) {
+        commonContainerSpec = createCommonContainerLaunchContext(
+            applicationACLs, conf, jobToken, oldJobId, fsTokens);
+      }
+    }
+
+    // Fill in the fields needed per-container that are missing in the common
+    // spec.
+
+    // Setup environment by cloning from common env.
+    Map<String, String> env = commonContainerSpec.getEnvironment();
+    Map<String, String> myEnv = new HashMap<String, String>(env.size());
+    myEnv.putAll(env);
+    MapReduceChildJVM.setVMEnv(myEnv, remoteTask);
 
     // Set up the launch command
     List<String> commands = MapReduceChildJVM.getVMCommand(
-        taskAttemptListener.getAddress(), remoteTask,
-        jvmID);
-    
+        taskAttemptListener.getAddress(), remoteTask, jvmID);
+
+    // Duplicate the ByteBuffers for access by multiple containers.
+    Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
+    for (Entry<String, ByteBuffer> entry : commonContainerSpec
+                .getServiceData().entrySet()) {
+      myServiceData.put(entry.getKey(), entry.getValue().duplicate());
+    }
+
     // Construct the actual Container
-    ContainerLaunchContext container = BuilderUtils
-        .newContainerLaunchContext(containerID, conf
-            .get(MRJobConfig.USER_NAME), assignedCapability, localResources,
-            environment, commands, serviceData, tokens, applicationACLs);
+    ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
+        containerID, commonContainerSpec.getUser(), assignedCapability,
+        commonContainerSpec.getLocalResources(), myEnv, commands,
+        myServiceData, commonContainerSpec.getContainerTokens().duplicate(),
+        applicationACLs);
 
     return container;
   }
@@ -1022,7 +1089,7 @@ public abstract class TaskAttemptImpl implements
 
   private static class ContainerAssignedTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
-    @SuppressWarnings({ "unchecked", "deprecation" })
+    @SuppressWarnings({ "unchecked" })
     @Override
     public void transition(final TaskAttemptImpl taskAttempt, 
         TaskAttemptEvent event) {
@@ -1042,24 +1109,21 @@ public abstract class TaskAttemptImpl implements
       taskAttempt.jvmID = new WrappedJvmID(
           taskAttempt.remoteTask.getTaskID().getJobID(), 
           taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId());
-      taskAttempt.taskAttemptListener.registerPendingTask(taskAttempt.jvmID);
+      taskAttempt.taskAttemptListener.registerPendingTask(
+          taskAttempt.remoteTask, taskAttempt.jvmID);
       
       //launch the container
       //create the container object to be launched for a given Task attempt
-      taskAttempt.eventHandler.handle(
-          new ContainerRemoteLaunchEvent(taskAttempt.attemptId, 
-              taskAttempt.containerID, 
-              taskAttempt.containerMgrAddress, taskAttempt.containerToken) {
-        @Override
-        public ContainerLaunchContext getContainer() {
-          return taskAttempt.createContainerLaunchContext(cEvent
-              .getApplicationACLs());
-        }
-        @Override
-        public Task getRemoteTask() {  // classic mapred Task, not YARN version
-          return taskAttempt.remoteTask;
-        }
-      });
+      ContainerLaunchContext launchContext = createContainerLaunchContext(
+          cEvent.getApplicationACLs(), taskAttempt.containerID,
+          taskAttempt.conf, taskAttempt.jobToken, taskAttempt.remoteTask,
+          taskAttempt.oldJobId, taskAttempt.assignedCapability,
+          taskAttempt.jvmID, taskAttempt.taskAttemptListener,
+          taskAttempt.fsTokens);
+      taskAttempt.eventHandler.handle(new ContainerRemoteLaunchEvent(
+          taskAttempt.attemptId, taskAttempt.containerID,
+          taskAttempt.containerMgrAddress, taskAttempt.containerToken,
+          launchContext, taskAttempt.remoteTask));
 
       // send event to speculator that our container needs are satisfied
       taskAttempt.eventHandler.handle
@@ -1135,10 +1199,9 @@ public abstract class TaskAttemptImpl implements
       taskAttempt.launchTime = taskAttempt.clock.getTime();
       taskAttempt.shufflePort = event.getShufflePort();
 
-      // register it to TaskAttemptListener so that it start listening
-      // for it
-      taskAttempt.taskAttemptListener.registerLaunchedTask(
-          taskAttempt.attemptId, taskAttempt.remoteTask, taskAttempt.jvmID);
+      // register it to TaskAttemptListener so that it can start monitoring it.
+      taskAttempt.taskAttemptListener
+        .registerLaunchedTask(taskAttempt.attemptId);
       //TODO Resolve to host / IP in case of a local address.
       InetSocketAddress nodeHttpInetAddr =
           NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO:
@@ -1197,7 +1260,6 @@ public abstract class TaskAttemptImpl implements
     @Override
     public void transition(TaskAttemptImpl taskAttempt, 
         TaskAttemptEvent event) {
-      @SuppressWarnings("deprecation")
       TaskAttemptContext taskContext =
         new TaskAttemptContextImpl(taskAttempt.conf,
             TypeConverter.fromYarn(taskAttempt.attemptId));

+ 20 - 6
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java

@@ -24,17 +24,31 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerToken;
 
-public abstract class ContainerRemoteLaunchEvent extends ContainerLauncherEvent {
+public class ContainerRemoteLaunchEvent extends ContainerLauncherEvent {
+
+  private final ContainerLaunchContext container;
+  private final Task task;
 
   public ContainerRemoteLaunchEvent(TaskAttemptId taskAttemptID,
       ContainerId containerID, String containerMgrAddress,
-      ContainerToken containerToken) {
-    super(taskAttemptID, containerID, containerMgrAddress,
-        containerToken,
+      ContainerToken containerToken,
+      ContainerLaunchContext containerLaunchContext, Task remoteTask) {
+    super(taskAttemptID, containerID, containerMgrAddress, containerToken,
         ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH);
+    this.container = containerLaunchContext;
+    this.task = remoteTask;
   }
-  public abstract ContainerLaunchContext getContainer();
 
-  public abstract Task getRemoteTask();
+  public ContainerLaunchContext getContainer() {
+    return this.container;
+  }
 
+  public Task getRemoteTask() {
+    return this.task;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return super.equals(obj);
+  }
 }

+ 1 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java

@@ -104,10 +104,9 @@ public abstract class RMCommunicator extends AbstractService  {
   @Override
   public void start() {
     scheduler= createSchedulerProxy();
-    //LOG.info("Scheduler is " + scheduler);
     register();
     startAllocatorThread();
-    JobID id = TypeConverter.fromYarn(context.getApplicationID());
+    JobID id = TypeConverter.fromYarn(this.applicationId);
     JobId jobId = TypeConverter.toYarn(id);
     job = context.getJob(jobId);
     super.start();

+ 69 - 15
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java

@@ -30,18 +30,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobCounter;
-import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
-import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
@@ -69,7 +68,7 @@ import org.apache.hadoop.yarn.util.RackResolver;
 public class RMContainerAllocator extends RMContainerRequestor
     implements ContainerAllocator {
 
-  private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
+  static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
   
   public static final 
   float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
@@ -77,7 +76,10 @@ public class RMContainerAllocator extends RMContainerRequestor
   private static final Priority PRIORITY_FAST_FAIL_MAP;
   private static final Priority PRIORITY_REDUCE;
   private static final Priority PRIORITY_MAP;
-  
+
+  private Thread eventHandlingThread;
+  private volatile boolean stopEventHandling;
+
   static {
     PRIORITY_FAST_FAIL_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
     PRIORITY_FAST_FAIL_MAP.setPriority(5);
@@ -130,7 +132,10 @@ public class RMContainerAllocator extends RMContainerRequestor
   private float reduceSlowStart = 0;
   private long retryInterval;
   private long retrystartTime;
-  
+
+  BlockingQueue<ContainerAllocatorEvent> eventQueue
+    = new LinkedBlockingQueue<ContainerAllocatorEvent>();
+
   public RMContainerAllocator(ClientService clientService, AppContext context) {
     super(clientService, context);
   }
@@ -155,6 +160,40 @@ public class RMContainerAllocator extends RMContainerRequestor
     retrystartTime = System.currentTimeMillis();
   }
 
+  @Override
+  public void start() {
+    this.eventHandlingThread = new Thread() {
+      @SuppressWarnings("unchecked")
+      @Override
+      public void run() {
+
+        ContainerAllocatorEvent event;
+
+        while (!stopEventHandling && !Thread.currentThread().isInterrupted()) {
+          try {
+            event = RMContainerAllocator.this.eventQueue.take();
+          } catch (InterruptedException e) {
+            LOG.error("Returning, interrupted : " + e);
+            return;
+          }
+
+          try {
+            handleEvent(event);
+          } catch (Throwable t) {
+            LOG.error("Error in handling event type " + event.getType()
+                + " to the ContainreAllocator", t);
+            // Kill the AM
+            eventHandler.handle(new JobEvent(getJob().getID(),
+              JobEventType.INTERNAL_ERROR));
+            return;
+          }
+        }
+      }
+    };
+    this.eventHandlingThread.start();
+    super.start();
+  }
+
   @Override
   protected synchronized void heartbeat() throws Exception {
     LOG.info("Before Scheduling: " + getStat());
@@ -181,6 +220,8 @@ public class RMContainerAllocator extends RMContainerRequestor
 
   @Override
   public void stop() {
+    this.stopEventHandling = true;
+    eventHandlingThread.interrupt();
     super.stop();
     LOG.info("Final Stats: " + getStat());
   }
@@ -192,10 +233,27 @@ public class RMContainerAllocator extends RMContainerRequestor
   public void setIsReduceStarted(boolean reduceStarted) {
     this.reduceStarted = reduceStarted; 
   }
-  
-  @SuppressWarnings("unchecked")
+
   @Override
-  public synchronized void handle(ContainerAllocatorEvent event) {
+  public void handle(ContainerAllocatorEvent event) {
+    int qSize = eventQueue.size();
+    if (qSize != 0 && qSize % 1000 == 0) {
+      LOG.info("Size of event-queue in RMContainerAllocator is " + qSize);
+    }
+    int remCapacity = eventQueue.remainingCapacity();
+    if (remCapacity < 1000) {
+      LOG.warn("Very low remaining capacity in the event-queue "
+          + "of RMContainerAllocator: " + remCapacity);
+    }
+    try {
+      eventQueue.put(event);
+    } catch (InterruptedException e) {
+      throw new YarnException(e);
+    }
+  }
+
+  @SuppressWarnings({ "unchecked" })
+  protected synchronized void handleEvent(ContainerAllocatorEvent event) {
     LOG.info("Processing the event " + event.toString());
     recalculateReduceSchedule = true;
     if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
@@ -206,9 +264,7 @@ public class RMContainerAllocator extends RMContainerRequestor
           int minSlotMemSize = getMinContainerCapability().getMemory();
           mapResourceReqt = (int) Math.ceil((float) mapResourceReqt/minSlotMemSize)
               * minSlotMemSize;
-          JobID id = TypeConverter.fromYarn(applicationId);
-          JobId jobId = TypeConverter.toYarn(id);
-          eventHandler.handle(new JobHistoryEvent(jobId, 
+          eventHandler.handle(new JobHistoryEvent(getJob().getID(), 
               new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP,
               mapResourceReqt)));
           LOG.info("mapResourceReqt:"+mapResourceReqt);
@@ -232,9 +288,7 @@ public class RMContainerAllocator extends RMContainerRequestor
           //round off on slotsize
           reduceResourceReqt = (int) Math.ceil((float) 
               reduceResourceReqt/minSlotMemSize) * minSlotMemSize;
-          JobID id = TypeConverter.fromYarn(applicationId);
-          JobId jobId = TypeConverter.toYarn(id);
-          eventHandler.handle(new JobHistoryEvent(jobId, 
+          eventHandler.handle(new JobHistoryEvent(getJob().getID(), 
               new NormalizedResourceEvent(
                   org.apache.hadoop.mapreduce.TaskType.REDUCE,
               reduceResourceReqt)));

+ 28 - 11
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java

@@ -17,8 +17,11 @@
 */
 package org.apache.hadoop.mapred;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
 
@@ -68,33 +71,47 @@ public class TestTaskAttemptListenerImpl {
     JVMId id = new JVMId("foo",1, true, 1);
     WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId());
 
+    // Verify ask before registration.
     //The JVM ID has not been registered yet so we should kill it.
     JvmContext context = new JvmContext();
     context.jvmId = id; 
     JvmTask result = listener.getTask(context);
     assertNotNull(result);
     assertTrue(result.shouldDie);
-    
-    //Now register the JVM, and see
-    listener.registerPendingTask(wid);
-    result = listener.getTask(context);
-    assertNull(result);
-    
+
+    // Verify ask after registration but before launch
     TaskAttemptId attemptID = mock(TaskAttemptId.class);
     Task task = mock(Task.class);
     //Now put a task with the ID
-    listener.registerLaunchedTask(attemptID, task, wid);
+    listener.registerPendingTask(task, wid);
+    result = listener.getTask(context);
+    assertNotNull(result);
+    assertFalse(result.shouldDie);
+    // Unregister for more testing.
+    listener.unregister(attemptID, wid);
+
+    // Verify ask after registration and launch
+    //Now put a task with the ID
+    listener.registerPendingTask(task, wid);
+    listener.registerLaunchedTask(attemptID);
     verify(hbHandler).register(attemptID);
     result = listener.getTask(context);
     assertNotNull(result);
     assertFalse(result.shouldDie);
-    
+    // Don't unregister yet for more testing.
+
     //Verify that if we call it again a second time we are told to die.
     result = listener.getTask(context);
     assertNotNull(result);
     assertTrue(result.shouldDie);
-    
+
     listener.unregister(attemptID, wid);
+
+    // Verify after unregistration.
+    result = listener.getTask(context);
+    assertNotNull(result);
+    assertTrue(result.shouldDie);
+
     listener.stop();
   }
 }

+ 23 - 7
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapreduce.v2.app;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.EnumSet;
@@ -65,7 +66,9 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.Clock;
@@ -88,6 +91,7 @@ import org.apache.hadoop.yarn.util.BuilderUtils;
  * Mock MRAppMaster. Doesn't start RPC servers.
  * No threads are started except of the event Dispatcher thread.
  */
+@SuppressWarnings("unchecked")
 public class MRApp extends MRAppMaster {
   private static final Log LOG = LogFactory.getLog(MRApp.class);
 
@@ -173,7 +177,8 @@ public class MRApp extends MRAppMaster {
   }
 
   public Job submit(Configuration conf) throws Exception {
-    String user = conf.get(MRJobConfig.USER_NAME, "mapred");
+    String user = conf.get(MRJobConfig.USER_NAME, UserGroupInformation
+      .getCurrentUser().getShortUserName());
     conf.set(MRJobConfig.USER_NAME, user);
     conf.set(MRJobConfig.MR_AM_STAGING_DIR, testAbsPath.toString());
     conf.setBoolean(MRJobConfig.MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR, true);
@@ -187,6 +192,14 @@ public class MRApp extends MRAppMaster {
     start();
     DefaultMetricsSystem.shutdown();
     Job job = getContext().getAllJobs().values().iterator().next();
+
+    // Write job.xml
+    String jobFile = MRApps.getJobFile(conf, user,
+      TypeConverter.fromYarn(job.getID()));
+    LOG.info("Writing job conf to " + jobFile);
+    new File(jobFile).getParentFile().mkdirs();
+    conf.writeXml(new FileOutputStream(jobFile));
+
     return job;
   }
 
@@ -308,16 +321,16 @@ public class MRApp extends MRAppMaster {
     return new TaskAttemptListener(){
       @Override
       public InetSocketAddress getAddress() {
-        return null;
+        return NetUtils.createSocketAddr("localhost:54321");
       }
       @Override
-      public void registerLaunchedTask(TaskAttemptId attemptID, 
-          org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {}
+      public void registerLaunchedTask(TaskAttemptId attemptID) {}
       @Override
       public void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID) {
       }
       @Override
-      public void registerPendingTask(WrappedJvmID jvmID) {
+      public void registerPendingTask(org.apache.hadoop.mapred.Task task,
+          WrappedJvmID jvmID) {
       }
     };
   }
@@ -337,12 +350,14 @@ public class MRApp extends MRAppMaster {
     return new MockContainerLauncher();
   }
 
-  class MockContainerLauncher implements ContainerLauncher {
+  protected class MockContainerLauncher implements ContainerLauncher {
 
     //We are running locally so set the shuffle port to -1 
     int shufflePort = -1;
 
-    @SuppressWarnings("unchecked")
+    public MockContainerLauncher() {
+    }
+
     @Override
     public void handle(ContainerLauncherEvent event) {
       switch (event.getType()) {
@@ -474,6 +489,7 @@ public class MRApp extends MRAppMaster {
     }
     @Override
     protected void setup(JobImpl job) throws IOException {
+      super.setup(job);
       job.conf.setInt(MRJobConfig.NUM_REDUCES, reduces);
       job.remoteJobConfFile = new Path("test");
     }

+ 1 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java

@@ -40,6 +40,7 @@ import org.junit.Test;
 /**
  * Tests the state machine of MR App.
  */
+@SuppressWarnings("unchecked")
 public class TestMRApp {
 
   @Test

+ 2 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java

@@ -1186,12 +1186,12 @@ public class TestRMContainerAllocator {
 
     public void sendRequests(List<ContainerRequestEvent> reqs) {
       for (ContainerRequestEvent req : reqs) {
-        super.handle(req);
+        super.handleEvent(req);
       }
     }
 
     public void sendFailure(ContainerFailedEvent f) {
-      super.handle(f);
+      super.handleEvent(f);
     }
     
     // API to be used by tests

+ 91 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java

@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.job.impl;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.MRApp;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.junit.Test;
+
+public class TestMapReduceChildJVM {
+
+  private static final Log LOG = LogFactory.getLog(TestMapReduceChildJVM.class);
+
+  @Test
+  public void testCommandLine() throws Exception {
+
+    MyMRApp app = new MyMRApp(1, 0, true, this.getClass().getName(), true);
+    Job job = app.submit(new Configuration());
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+
+    Assert.assertEquals(
+      "[exec $JAVA_HOME/bin/java" +
+      " -Djava.net.preferIPv4Stack=true" +
+      " -Dhadoop.metrics.log.level=WARN" +
+      "  -Xmx200m -Djava.io.tmpdir=$PWD/tmp" +
+      " -Dlog4j.configuration=container-log4j.properties" +
+      " -Dyarn.app.mapreduce.container.log.dir=<LOG_DIR>" +
+      " -Dyarn.app.mapreduce.container.log.filesize=0" +
+      " -Dhadoop.root.logger=INFO,CLA" +
+      " org.apache.hadoop.mapred.YarnChild 127.0.0.1" +
+      " 54321" +
+      " attempt_0_0000_m_000000_0" +
+      " 0" +
+      " 1><LOG_DIR>/stdout" +
+      " 2><LOG_DIR>/stderr ]", app.myCommandLine);
+  }
+
+  private static final class MyMRApp extends MRApp {
+
+    private String myCommandLine;
+
+    public MyMRApp(int maps, int reduces, boolean autoComplete,
+        String testName, boolean cleanOnStart) {
+      super(maps, reduces, autoComplete, testName, cleanOnStart);
+    }
+
+    @Override
+    protected ContainerLauncher createContainerLauncher(AppContext context) {
+      return new MockContainerLauncher() {
+        @Override
+        public void handle(ContainerLauncherEvent event) {
+          if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) {
+            ContainerRemoteLaunchEvent launchEvent = (ContainerRemoteLaunchEvent) event;
+            ContainerLaunchContext launchContext = launchEvent.getContainer();
+            String cmdString = launchContext.getCommands().toString();
+            LOG.info("launchContext " + cmdString);
+            myCommandLine = cmdString;
+          }
+          super.handle(event);
+        }
+      };
+    }
+  }
+}

+ 1 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.junit.Test;
 
+@SuppressWarnings("unchecked")
 public class TestTaskAttempt{
 
   @Test

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java

@@ -103,7 +103,7 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
        (TaskAttemptID id, TaskType taskType,
         String status, long finishTime, 
         String hostname, String error) {
-    this(id, taskType, status, finishTime, hostname, -1, null, error, null);
+    this(id, taskType, status, finishTime, hostname, -1, "", error, null);
   }
 
   TaskAttemptUnsuccessfulCompletionEvent() {}

+ 11 - 13
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java

@@ -19,9 +19,7 @@
 package org.apache.hadoop.mapreduce.security;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,9 +33,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Master;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
-import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -90,7 +86,7 @@ public class TokenCache {
       obtainTokensForNamenodesInternal(fs, credentials, conf);
     }
   }
-  
+
   /**
    * get delegation token for a specific FS
    * @param fs
@@ -99,6 +95,7 @@ public class TokenCache {
    * @param conf
    * @throws IOException
    */
+  @SuppressWarnings("deprecation")
   static void obtainTokensForNamenodesInternal(FileSystem fs, 
       Credentials credentials, Configuration conf) throws IOException {
     String delegTokenRenewer = Master.getMasterPrincipal(conf);
@@ -131,7 +128,8 @@ public class TokenCache {
           return;
         }
       }
-      List<Token<?>> tokens = fs.getDelegationTokens(delegTokenRenewer);
+      List<Token<?>> tokens =
+          fs.getDelegationTokens(delegTokenRenewer, credentials);
       if (tokens != null) {
         for (Token<?> token : tokens) {
           credentials.addToken(token.getService(), token);
@@ -141,13 +139,13 @@ public class TokenCache {
       }
       //Call getDelegationToken as well for now - for FS implementations
       // which may not have implmented getDelegationTokens (hftp)
-      Token<?> token = fs.getDelegationToken(delegTokenRenewer);
-      if (token != null) {
-        Text fsNameText = new Text(fsName);
-        token.setService(fsNameText);
-        credentials.addToken(fsNameText, token);
-        LOG.info("Got dt for " + fs.getUri() + ";uri="+ fsName + 
-            ";t.service="+token.getService());
+      if (tokens == null || tokens.size() == 0) {
+        Token<?> token = fs.getDelegationToken(delegTokenRenewer);
+        if (token != null) {
+          credentials.addToken(token.getService(), token);
+          LOG.info("Got dt for " + fs.getUri() + ";uri=" + fsName
+              + ";t.service=" + token.getService());
+        }
       }
     }
   }

+ 32 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java

@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
 import java.text.ParseException;
+import java.util.Random;
 
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapreduce.FileSystemCounter;
@@ -98,6 +99,37 @@ public class TestCounters {
     }
   }
   
+  /**
+   * Verify counter value works
+   */
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testCounterValue() {
+    Counters counters = new Counters();
+    final int NUMBER_TESTS = 100;
+    final int NUMBER_INC = 10;
+    final Random rand = new Random();
+    for (int i = 0; i < NUMBER_TESTS; i++) {
+      long initValue = rand.nextInt();
+      long expectedValue = initValue;
+      Counter counter = counters.findCounter("foo", "bar");
+      counter.setValue(initValue);
+      assertEquals("Counter value is not initialized correctly",
+                   expectedValue, counter.getValue());
+      for (int j = 0; j < NUMBER_INC; j++) {
+        int incValue = rand.nextInt();
+        counter.increment(incValue);
+        expectedValue += incValue;
+        assertEquals("Counter value is not incremented correctly",
+                     expectedValue, counter.getValue());
+      }
+      expectedValue = rand.nextInt();
+      counter.setValue(expectedValue);
+      assertEquals("Counter value is not set correctly",
+                   expectedValue, counter.getValue());
+    }
+  }
+  
   @SuppressWarnings("deprecation")
   @Test
   public void testLegacyNames() {

+ 161 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/security/TestTokenCache.java

@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.net.URI;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Master;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestTokenCache {
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testGetDelegationTokensNotImplemented() throws Exception {
+    Credentials credentials = new Credentials();
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RM_PRINCIPAL, "mapred/host@REALM");
+    String renewer = Master.getMasterPrincipal(conf);
+
+    FileSystem fs = setupSingleFsWithoutGetDelegationTokens();
+    TokenCache.obtainTokensForNamenodesInternal(fs, credentials, conf);
+    assertEquals(1, credentials.getAllTokens().size());
+
+    verify(fs).getDelegationTokens(renewer, credentials);
+    verify(fs).getDelegationToken(renewer);
+  }
+
+  @Test
+  public void testManagedFileSystem() throws Exception {
+    Credentials credentials = new Credentials();
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RM_PRINCIPAL, "mapred/host@REALM");
+    String renewer = Master.getMasterPrincipal(conf);
+
+    FileSystem singleFs = setupSingleFs();
+    FileSystem multiFs = setupMultiFs(singleFs, renewer, credentials);
+
+    TokenCache.obtainTokensForNamenodesInternal(singleFs, credentials, conf);
+    assertEquals(1, credentials.getAllTokens().size());
+
+    TokenCache.obtainTokensForNamenodesInternal(singleFs, credentials, conf);
+    assertEquals(1, credentials.getAllTokens().size());
+
+    TokenCache.obtainTokensForNamenodesInternal(multiFs, credentials, conf);
+    assertEquals(2, credentials.getAllTokens().size());
+
+    TokenCache.obtainTokensForNamenodesInternal(multiFs, credentials, conf);
+    assertEquals(2, credentials.getAllTokens().size());
+
+    verify(singleFs, times(1)).getDelegationTokens(renewer, credentials);
+    verify(multiFs, times(2)).getDelegationTokens(renewer, credentials);
+    // A call to getDelegationToken would have generated an exception.
+  }
+
+  @SuppressWarnings("deprecation")
+  private FileSystem setupSingleFsWithoutGetDelegationTokens() throws Exception {
+    FileSystem mockFs = mock(FileSystem.class);
+    when(mockFs.getCanonicalServiceName()).thenReturn("singlefs4");
+    when(mockFs.getUri()).thenReturn(new URI("singlefs4:///"));
+
+    final Token<?> mockToken = (Token<?>) mock(Token.class);
+    when(mockToken.getService()).thenReturn(new Text("singlefs4"));
+
+    when(mockFs.getDelegationToken(any(String.class))).thenAnswer(
+        new Answer<Token<?>>() {
+          @Override
+          public Token<?> answer(InvocationOnMock invocation) throws Throwable {
+            return mockToken;
+          }
+        });
+
+    when(mockFs.getDelegationTokens(any(String.class), any(Credentials.class)))
+        .thenReturn(new LinkedList<Token<?>>());
+
+    return mockFs;
+  }
+
+  private FileSystem setupSingleFs() throws Exception {
+    FileSystem mockFs = mock(FileSystem.class);
+    when(mockFs.getCanonicalServiceName()).thenReturn("singlefs1");
+    when(mockFs.getUri()).thenReturn(new URI("singlefs1:///"));
+
+    List<Token<?>> tokens = new LinkedList<Token<?>>();
+    Token<?> mockToken = mock(Token.class);
+    when(mockToken.getService()).thenReturn(new Text("singlefs1"));
+    tokens.add(mockToken);
+
+    when(mockFs.getDelegationTokens(any(String.class))).thenThrow(
+        new RuntimeException(
+            "getDelegationTokens(renewer) should not be called"));
+    when(mockFs.getDelegationTokens(any(String.class), any(Credentials.class)))
+        .thenReturn(tokens);
+
+    return mockFs;
+  }
+
+  private FileSystem setupMultiFs(final FileSystem singleFs,
+      final String renewer, final Credentials credentials) throws Exception {
+    FileSystem mockFs = mock(FileSystem.class);
+    when(mockFs.getCanonicalServiceName()).thenReturn("multifs");
+    when(mockFs.getUri()).thenReturn(new URI("multifs:///"));
+
+    when(mockFs.getDelegationTokens(any(String.class))).thenThrow(
+        new RuntimeException(
+            "getDelegationTokens(renewer) should not be called"));
+    when(mockFs.getDelegationTokens(renewer, credentials)).thenAnswer(
+        new Answer<List<Token<?>>>() {
+
+          @Override
+          public List<Token<?>> answer(InvocationOnMock invocation)
+              throws Throwable {
+            List<Token<?>> newTokens = new LinkedList<Token<?>>();
+            if (credentials.getToken(new Text("singlefs1")) == null) {
+              newTokens.addAll(singleFs.getDelegationTokens(renewer,
+                  credentials));
+            } else {
+              newTokens.add(credentials.getToken(new Text("singlefs1")));
+            }
+            Token<?> mockToken2 = mock(Token.class);
+            when(mockToken2.getService()).thenReturn(new Text("singlefs2"));
+            newTokens.add(mockToken2);
+            return newTokens;
+          }
+        });
+
+    return mockFs;
+  }
+}

+ 8 - 6
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/lib/TestZKClient.java

@@ -29,7 +29,7 @@ import java.net.Socket;
 import junit.framework.Assert;
 
 import org.apache.hadoop.yarn.lib.ZKClient;
-import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnLog;
@@ -45,7 +45,8 @@ public class TestZKClient  {
 
   protected String hostPort = "127.0.0.1:2000";
   protected int maxCnxns = 0;
-  protected NIOServerCnxn.Factory factory = null;
+  protected NIOServerCnxnFactory factory = null;
+  protected ZooKeeperServer zks;
   protected File tmpDir = null;
 
   public static String send4LetterWord(String host, int port, String cmd)
@@ -144,10 +145,11 @@ public class TestZKClient  {
       BASETEST.mkdirs();
     }
     File dataDir = createTmpDir(BASETEST);
-    ZooKeeperServer zks = new ZooKeeperServer(dataDir, dataDir, 3000);
+    zks = new ZooKeeperServer(dataDir, dataDir, 3000);
     final int PORT = Integer.parseInt(hostPort.split(":")[1]);
     if (factory == null) {
-      factory = new NIOServerCnxn.Factory(new InetSocketAddress(PORT),maxCnxns);
+      factory = new NIOServerCnxnFactory();
+      factory.configure(new InetSocketAddress(PORT), maxCnxns);
     }
     factory.startup(zks);
     Assert.assertTrue("waiting for server up",
@@ -158,8 +160,8 @@ public class TestZKClient  {
   
   @After
   public void tearDown() throws IOException, InterruptedException {
-    if (factory != null) {
-      ZKDatabase zkDb = factory.getZooKeeperServer().getZKDatabase();
+    if (zks != null) {
+      ZKDatabase zkDb = zks.getZKDatabase();
       factory.shutdown();
       try {
         zkDb.close();

+ 1 - 1
hadoop-mapreduce-project/hadoop-yarn/pom.xml

@@ -310,7 +310,7 @@
       <dependency>
         <groupId>org.apache.zookeeper</groupId>
         <artifactId>zookeeper</artifactId>
-        <version>3.3.1</version>
+        <version>3.4.2</version>
         <exclusions>
           <exclusion>
             <!-- otherwise seems to drag in junit 3.8.1 via jline -->

+ 1 - 1
hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -3210,7 +3210,7 @@ public class JobInProgress {
             (taskid, 
              taskType, taskStatus.getRunState().toString(),
              finishTime, 
-             taskTrackerHostName, -1, null, diagInfo,
+             taskTrackerHostName, -1, "", diagInfo,
              splits.burst());
     jobHistory.logEvent(tue, taskid.getJobID());