浏览代码

Merge r1326020 through r1327257 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3092@1327258 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 13 年之前
父节点
当前提交
7964dcf40c
共有 66 个文件被更改,包括 1154 次插入311 次删除
  1. 19 3
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 1 1
      hadoop-common-project/hadoop-common/src/main/bin/start-all.sh
  3. 76 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java
  4. 9 4
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
  5. 22 8
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
  6. 20 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
  7. 42 4
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
  8. 12 7
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
  9. 1 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/VersionUtil.java
  10. 67 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfiguration.java
  11. 2 1
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
  12. 118 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestServer.java
  13. 30 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/SecurityUtilTestHelper.java
  14. 0 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
  15. 1 1
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestVersionUtil.java
  16. 13 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  17. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-balancer.sh
  18. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-balancer.sh
  19. 5 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
  20. 10 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  21. 1 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
  22. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
  23. 0 15
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
  24. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  25. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
  26. 4 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
  27. 22 11
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSHAAdmin.java
  28. 22 8
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
  29. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
  30. 16 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java
  31. 12 0
      hadoop-mapreduce-project/CHANGES.txt
  32. 6 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/AMRMProtocolPBClientImpl.java
  33. 6 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  34. 26 4
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ApplicationTokenIdentifier.java
  35. 0 78
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ApplicationTokenSecretManager.java
  36. 8 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  37. 5 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerSecurityInfo.java
  38. 23 18
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
  39. 3 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
  40. 10 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
  41. 37 17
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
  42. 2 5
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
  43. 4 8
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java
  44. 3 3
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
  45. 11 7
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
  46. 3 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
  47. 155 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/ApplicationTokenSecretManager.java
  48. 5 6
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
  49. 17 25
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java
  50. 8 9
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
  51. 3 6
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java
  52. 1 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
  53. 1 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java
  54. 1 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
  55. 1 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java
  56. 1 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
  57. 7 7
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
  58. 14 5
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
  59. 9 3
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
  60. 2 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
  61. 234 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestApplicationTokens.java
  62. 1 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java
  63. 12 13
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java
  64. 1 1
      hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobInProgress.java
  65. 1 1
      hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java
  66. 1 1
      hadoop-project/pom.xml

+ 19 - 3
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -61,6 +61,8 @@ Trunk (unreleased changes)
     HADOOP-8147. test-patch should run tests with -fn to avoid masking test
     failures (Robert Evans via tgraves)
 
+    HADOOP-8117. Upgrade test build to Surefire 2.12 (todd)
+
   BUG FIXES
 
     HADOOP-8177. MBeans shouldn't try to register when it fails to create MBeanName.
@@ -208,9 +210,6 @@ Release 2.0.0 - UNRELEASED
     HADOOP-7358. Improve log levels when exceptions caught in RPC handler
     (Todd Lipcon via shv)
 
-    HADOOP-8108. Move method getHostPortString() from NameNode to NetUtils.
-    (Brandon Li via jitendra)
-
     HADOOP-7557 Make IPC header be extensible (sanjay radia)
 
     HADOOP-7806. Support binding to sub-interfaces (eli)
@@ -261,6 +260,9 @@ Release 2.0.0 - UNRELEASED
     HADOOP-8086. KerberosName silently sets defaultRealm to "" if the 
     Kerberos config is not found, it should log a WARN (tucu)
 
+    HADOOP-8280. Move VersionUtil/TestVersionUtil and GenericTestUtils from
+    HDFS into Common. (Ahmed Radwan via atm)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -353,6 +355,9 @@ Release 2.0.0 - UNRELEASED
     properly if no local node and first node is local rack node.
     (Junping Du)
 
+    HADOOP-8282. start-all.sh refers incorrectly start-dfs.sh
+    existence for starting start-yarn.sh. (Devaraj K via eli)
+
   BREAKDOWN OF HADOOP-7454 SUBTASKS
 
     HADOOP-7455. HA: Introduce HA Service Protocol Interface. (suresh)
@@ -413,6 +418,9 @@ Release 0.23.3 - UNRELEASED
 
   IMPROVEMENTS
 
+    HADOOP-8108. Move method getHostPortString() from NameNode to NetUtils.
+    (Brandon Li via jitendra)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -435,6 +443,14 @@ Release 0.23.3 - UNRELEASED
     HADOOP-7510. Tokens should use original hostname provided instead of ip
     (Daryn Sharp via bobby)
 
+    HADOOP-8283. Allow tests to control token service value (Daryn Sharp via
+    bobby)
+
+    HADOOP-8286. Simplify getting a socket address from conf (Daryn Sharp via
+    bobby)
+
+    HADOOP-8227. Allow RPC to limit ephemeral port range. (bobby)
+
 Release 0.23.2 - UNRELEASED 
 
   INCOMPATIBLE CHANGES

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/bin/start-all.sh

@@ -33,6 +33,6 @@ if [ -f "${HADOOP_HDFS_HOME}"/sbin/start-dfs.sh ]; then
 fi
 
 # start yarn daemons if yarn is present
-if [ -f "${YARN_HOME}"/sbin/start-dfs.sh ]; then
+if [ -f "${YARN_HOME}"/sbin/start-yarn.sh ]; then
   "${YARN_HOME}"/sbin/start-yarn.sh --config $HADOOP_CONF_DIR
 fi

+ 76 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java

@@ -30,6 +30,7 @@ import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.Reader;
 import java.io.Writer;
+import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -68,6 +69,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.codehaus.jackson.JsonFactory;
@@ -962,11 +964,57 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
    * bound may be omitted meaning all values up to or over. So the string 
    * above means 2, 3, 5, and 7, 8, 9, ...
    */
-  public static class IntegerRanges {
+  public static class IntegerRanges implements Iterable<Integer>{
     private static class Range {
       int start;
       int end;
     }
+    
+    private static class RangeNumberIterator implements Iterator<Integer> {
+      Iterator<Range> internal;
+      int at;
+      int end;
+
+      public RangeNumberIterator(List<Range> ranges) {
+        if (ranges != null) {
+          internal = ranges.iterator();
+        }
+        at = -1;
+        end = -2;
+      }
+      
+      @Override
+      public boolean hasNext() {
+        if (at <= end) {
+          return true;
+        } else if (internal != null){
+          return internal.hasNext();
+        }
+        return false;
+      }
+
+      @Override
+      public Integer next() {
+        if (at <= end) {
+          at++;
+          return at - 1;
+        } else if (internal != null){
+          Range found = internal.next();
+          if (found != null) {
+            at = found.start;
+            end = found.end;
+            at++;
+            return at - 1;
+          }
+        }
+        return null;
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    };
 
     List<Range> ranges = new ArrayList<Range>();
     
@@ -1025,6 +1073,13 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
       return false;
     }
     
+    /**
+     * @return true if there are no values in this range, else false.
+     */
+    public boolean isEmpty() {
+      return ranges == null || ranges.isEmpty();
+    }
+    
     @Override
     public String toString() {
       StringBuilder result = new StringBuilder();
@@ -1041,6 +1096,12 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
       }
       return result.toString();
     }
+
+    @Override
+    public Iterator<Integer> iterator() {
+      return new RangeNumberIterator(ranges);
+    }
+    
   }
 
   /**
@@ -1162,6 +1223,20 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
     set(name, StringUtils.arrayToString(values));
   }
 
+  /**
+   * Get the socket address for <code>name</code> property as a
+   * <code>InetSocketAddress</code>.
+   * @param name property name.
+   * @param defaultAddress the default value
+   * @param defaultPort the default port
+   * @return InetSocketAddress
+   */
+  public InetSocketAddress getSocketAddr(
+      String name, String defaultAddress, int defaultPort) {
+    final String address = get(name, defaultAddress);
+    return NetUtils.createSocketAddr(address, defaultPort, name);
+  }
+  
   /**
    * Load a class by name.
    * 

+ 9 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java

@@ -319,10 +319,12 @@ public class ProtobufRpcEngine implements RpcEngine {
   public RPC.Server getServer(Class<?> protocol, Object protocolImpl,
       String bindAddress, int port, int numHandlers, int numReaders,
       int queueSizePerHandler, boolean verbose, Configuration conf,
-      SecretManager<? extends TokenIdentifier> secretManager)
+      SecretManager<? extends TokenIdentifier> secretManager,
+      String portRangeConfig)
       throws IOException {
     return new Server(protocol, protocolImpl, conf, bindAddress, port,
-        numHandlers, numReaders, queueSizePerHandler, verbose, secretManager);
+        numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
+        portRangeConfig);
   }
   
   public static class Server extends RPC.Server {
@@ -336,15 +338,18 @@ public class ProtobufRpcEngine implements RpcEngine {
      * @param port the port to listen for connections on
      * @param numHandlers the number of method handler threads to run
      * @param verbose whether each call should be logged
+     * @param portRangeConfig A config parameter that can be used to restrict
+     * the range of ports used when port is 0 (an ephemeral port)
      */
     public Server(Class<?> protocolClass, Object protocolImpl,
         Configuration conf, String bindAddress, int port, int numHandlers,
         int numReaders, int queueSizePerHandler, boolean verbose,
-        SecretManager<? extends TokenIdentifier> secretManager)
+        SecretManager<? extends TokenIdentifier> secretManager, 
+        String portRangeConfig)
         throws IOException {
       super(bindAddress, port, null, numHandlers,
           numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl
-              .getClass().getName()), secretManager);
+              .getClass().getName()), secretManager, portRangeConfig);
       this.verbose = verbose;  
       registerProtocolAndImpl(RpcKind.RPC_PROTOCOL_BUFFER, protocolClass,
           protocolImpl);

+ 22 - 8
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java

@@ -654,7 +654,8 @@ public class RPC {
                                  final boolean verbose, Configuration conf) 
     throws IOException {
     return getServer(instance.getClass(),         // use impl class for protocol
-                     instance, bindAddress, port, numHandlers, false, conf, null);
+                     instance, bindAddress, port, numHandlers, false, conf, null,
+                     null);
   }
 
   /** Construct a server for a protocol implementation instance. */
@@ -662,7 +663,8 @@ public class RPC {
                                  Object instance, String bindAddress,
                                  int port, Configuration conf) 
     throws IOException {
-    return getServer(protocol, instance, bindAddress, port, 1, false, conf, null);
+    return getServer(protocol, instance, bindAddress, port, 1, false, conf, null,
+        null);
   }
 
   /** Construct a server for a protocol implementation instance.
@@ -676,7 +678,7 @@ public class RPC {
     throws IOException {
     
     return getServer(protocol, instance, bindAddress, port, numHandlers, verbose,
-                 conf, null);
+                 conf, null, null);
   }
   
   /** Construct a server for a protocol implementation instance. */
@@ -686,10 +688,20 @@ public class RPC {
                                  boolean verbose, Configuration conf,
                                  SecretManager<? extends TokenIdentifier> secretManager) 
     throws IOException {
-    
+    return getServer(protocol, instance, bindAddress, port, numHandlers, verbose,
+        conf, secretManager, null);
+  }
+  
+  public static Server getServer(Class<?> protocol,
+      Object instance, String bindAddress, int port,
+      int numHandlers,
+      boolean verbose, Configuration conf,
+      SecretManager<? extends TokenIdentifier> secretManager,
+      String portRangeConfig) 
+  throws IOException {
     return getProtocolEngine(protocol, conf)
       .getServer(protocol, instance, bindAddress, port, numHandlers, -1, -1,
-                 verbose, conf, secretManager);
+                 verbose, conf, secretManager, portRangeConfig);
   }
 
   /** Construct a server for a protocol implementation instance. */
@@ -704,7 +716,8 @@ public class RPC {
     
     return getProtocolEngine(protocol, conf)
       .getServer(protocol, instance, bindAddress, port, numHandlers,
-                 numReaders, queueSizePerHandler, verbose, conf, secretManager);
+                 numReaders, queueSizePerHandler, verbose, conf, secretManager,
+                 null);
   }
 
   /** An RPC Server. */
@@ -855,9 +868,10 @@ public class RPC {
                      Class<? extends Writable> paramClass, int handlerCount,
                      int numReaders, int queueSizePerHandler,
                      Configuration conf, String serverName, 
-                     SecretManager<? extends TokenIdentifier> secretManager) throws IOException {
+                     SecretManager<? extends TokenIdentifier> secretManager,
+                     String portRangeConfig) throws IOException {
       super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler,
-            conf, serverName, secretManager);
+            conf, serverName, secretManager, portRangeConfig);
       initProtocolMetaInfo(conf);
     }
     

+ 20 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java

@@ -47,12 +47,30 @@ public interface RpcEngine {
                 UserGroupInformation ticket, Configuration conf)
     throws IOException, InterruptedException;
 
-  /** Construct a server for a protocol implementation instance. */
+  /** 
+   * Construct a server for a protocol implementation instance.
+   * 
+   * @param protocol the class of protocol to use
+   * @param instance the instance of protocol whose methods will be called
+   * @param conf the configuration to use
+   * @param bindAddress the address to bind on to listen for connection
+   * @param port the port to listen for connections on
+   * @param numHandlers the number of method handler threads to run
+   * @param numReaders the number of reader threads to run
+   * @param queueSizePerHandler the size of the queue per hander thread
+   * @param verbose whether each call should be logged
+   * @param secretManager The secret manager to use to validate incoming requests.
+   * @param portRangeConfig A config parameter that can be used to restrict
+   *        the range of ports used when port is 0 (an ephemeral port)
+   * @return The Server instance
+   * @throws IOException on any error
+   */
   RPC.Server getServer(Class<?> protocol, Object instance, String bindAddress,
                        int port, int numHandlers, int numReaders,
                        int queueSizePerHandler, boolean verbose,
                        Configuration conf, 
-                       SecretManager<? extends TokenIdentifier> secretManager
+                       SecretManager<? extends TokenIdentifier> secretManager,
+                       String portRangeConfig
                        ) throws IOException;
 
   /**

+ 42 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -63,6 +63,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.BytesWritable;
@@ -291,6 +292,7 @@ public abstract class Server {
   protected RpcDetailedMetrics rpcDetailedMetrics;
   
   private Configuration conf;
+  private String portRangeConfig = null;
   private SecretManager<TokenIdentifier> secretManager;
   private ServiceAuthorizationManager serviceAuthorizationManager = new ServiceAuthorizationManager();
 
@@ -323,8 +325,33 @@ public abstract class Server {
    */
   public static void bind(ServerSocket socket, InetSocketAddress address, 
                           int backlog) throws IOException {
+    bind(socket, address, backlog, null, null);
+  }
+
+  public static void bind(ServerSocket socket, InetSocketAddress address, 
+      int backlog, Configuration conf, String rangeConf) throws IOException {
     try {
-      socket.bind(address, backlog);
+      IntegerRanges range = null;
+      if (rangeConf != null) {
+        range = conf.getRange(rangeConf, "");
+      }
+      if (range == null || range.isEmpty() || (address.getPort() != 0)) {
+        socket.bind(address, backlog);
+      } else {
+        for (Integer port : range) {
+          if (socket.isBound()) break;
+          try {
+            InetSocketAddress temp = new InetSocketAddress(address.getAddress(),
+                port);
+            socket.bind(temp, backlog);
+          } catch(BindException e) {
+            //Ignored
+          }
+        }
+        if (!socket.isBound()) {
+          throw new BindException("Could not find a free port in "+range);
+        }
+      }
     } catch (SocketException e) {
       throw NetUtils.wrapException(null,
           0,
@@ -424,7 +451,7 @@ public abstract class Server {
       acceptChannel.configureBlocking(false);
 
       // Bind the server socket to the local host and port
-      bind(acceptChannel.socket(), address, backlogLength);
+      bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
       port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
       // create a selector;
       selector= Selector.open();
@@ -1725,7 +1752,16 @@ public abstract class Server {
     throws IOException 
   {
     this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer
-        .toString(port), null);
+        .toString(port), null, null);
+  }
+  
+  protected Server(String bindAddress, int port,
+      Class<? extends Writable> rpcRequestClass, int handlerCount,
+      int numReaders, int queueSizePerHandler, Configuration conf,
+      String serverName, SecretManager<? extends TokenIdentifier> secretManager)
+    throws IOException {
+    this(bindAddress, port, rpcRequestClass, handlerCount, numReaders, 
+        queueSizePerHandler, conf, serverName, secretManager, null);
   }
   
   /** 
@@ -1745,10 +1781,12 @@ public abstract class Server {
   protected Server(String bindAddress, int port,
       Class<? extends Writable> rpcRequestClass, int handlerCount,
       int numReaders, int queueSizePerHandler, Configuration conf,
-      String serverName, SecretManager<? extends TokenIdentifier> secretManager)
+      String serverName, SecretManager<? extends TokenIdentifier> secretManager,
+      String portRangeConfig)
     throws IOException {
     this.bindAddress = bindAddress;
     this.conf = conf;
+    this.portRangeConfig = portRangeConfig;
     this.port = port;
     this.rpcRequestClass = rpcRequestClass; 
     this.handlerCount = handlerCount;

+ 12 - 7
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java

@@ -299,16 +299,19 @@ public class WritableRpcEngine implements RpcEngine {
     }
   }
 
-  /** Construct a server for a protocol implementation instance listening on a
+  /* Construct a server for a protocol implementation instance listening on a
    * port and address. */
+  @Override
   public RPC.Server getServer(Class<?> protocolClass,
                       Object protocolImpl, String bindAddress, int port,
                       int numHandlers, int numReaders, int queueSizePerHandler,
                       boolean verbose, Configuration conf,
-                      SecretManager<? extends TokenIdentifier> secretManager) 
+                      SecretManager<? extends TokenIdentifier> secretManager,
+                      String portRangeConfig) 
     throws IOException {
     return new Server(protocolClass, protocolImpl, conf, bindAddress, port,
-        numHandlers, numReaders, queueSizePerHandler, verbose, secretManager);
+        numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
+        portRangeConfig);
   }
 
 
@@ -341,7 +344,7 @@ public class WritableRpcEngine implements RpcEngine {
         Configuration conf, String bindAddress, int port) 
       throws IOException {
       this(protocolClass, protocolImpl, conf,  bindAddress, port, 1, -1, -1,
-          false, null);
+          false, null, null);
     }
     
     /** 
@@ -363,7 +366,7 @@ public class WritableRpcEngine implements RpcEngine {
             throws IOException {
        this(null, protocolImpl,  conf,  bindAddress,   port,
                    numHandlers,  numReaders,  queueSizePerHandler,  verbose, 
-                   secretManager);
+                   secretManager, null);
    
     }
     
@@ -381,11 +384,13 @@ public class WritableRpcEngine implements RpcEngine {
     public Server(Class<?> protocolClass, Object protocolImpl,
         Configuration conf, String bindAddress,  int port,
         int numHandlers, int numReaders, int queueSizePerHandler, 
-        boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) 
+        boolean verbose, SecretManager<? extends TokenIdentifier> secretManager,
+        String portRangeConfig) 
         throws IOException {
       super(bindAddress, port, null, numHandlers, numReaders,
           queueSizePerHandler, conf,
-          classNameBase(protocolImpl.getClass().getName()), secretManager);
+          classNameBase(protocolImpl.getClass().getName()), secretManager,
+          portRangeConfig);
 
       this.verbose = verbose;
       

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/VersionUtil.java → hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/VersionUtil.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.util;
+package org.apache.hadoop.util;
 
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;

+ 67 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestConfiguration.java

@@ -23,18 +23,24 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.StringWriter;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.regex.Pattern;
 
 import junit.framework.TestCase;
 import static org.junit.Assert.assertArrayEquals;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.net.NetUtils;
 import org.codehaus.jackson.map.ObjectMapper; 
 
 public class TestConfiguration extends TestCase {
@@ -360,6 +366,35 @@ public class TestConfiguration extends TestCase {
     assertEquals(true, range.isIncluded(34));
     assertEquals(true, range.isIncluded(100000000));
   }
+  
+  public void testGetRangeIterator() throws Exception {
+    Configuration config = new Configuration(false);
+    IntegerRanges ranges = config.getRange("Test", "");
+    assertFalse("Empty range has values", ranges.iterator().hasNext());
+    ranges = config.getRange("Test", "5");
+    Set<Integer> expected = new HashSet<Integer>(Arrays.asList(5));
+    Set<Integer> found = new HashSet<Integer>();
+    for(Integer i: ranges) {
+      found.add(i);
+    }
+    assertEquals(expected, found);
+
+    ranges = config.getRange("Test", "5-10,13-14");
+    expected = new HashSet<Integer>(Arrays.asList(5,6,7,8,9,10,13,14));
+    found = new HashSet<Integer>();
+    for(Integer i: ranges) {
+      found.add(i);
+    }
+    assertEquals(expected, found);
+    
+    ranges = config.getRange("Test", "8-12, 5- 7");
+    expected = new HashSet<Integer>(Arrays.asList(5,6,7,8,9,10,11,12));
+    found = new HashSet<Integer>();
+    for(Integer i: ranges) {
+      found.add(i);
+    }
+    assertEquals(expected, found);
+  }
 
   public void testHexValues() throws IOException{
     out=new BufferedWriter(new FileWriter(CONFIG));
@@ -604,6 +639,38 @@ public class TestConfiguration extends TestCase {
                  conf.getPattern("test.pattern3", defaultPattern).pattern());
   }
 
+  public void testSocketAddress() throws IOException {
+    Configuration conf = new Configuration();
+    final String defaultAddr = "host:1";
+    final int defaultPort = 2;
+    InetSocketAddress addr = null;
+    
+    addr = conf.getSocketAddr("myAddress", defaultAddr, defaultPort);
+    assertEquals(defaultAddr, NetUtils.getHostPortString(addr));
+    
+    conf.set("myAddress", "host2");
+    addr = conf.getSocketAddr("myAddress", defaultAddr, defaultPort);
+    assertEquals("host2:"+defaultPort, NetUtils.getHostPortString(addr));
+    
+    conf.set("myAddress", "host2:3");
+    addr = conf.getSocketAddr("myAddress", defaultAddr, defaultPort);
+    assertEquals("host2:3", NetUtils.getHostPortString(addr));
+    
+    boolean threwException = false;
+    conf.set("myAddress", "bad:-port");
+    try {
+      addr = conf.getSocketAddr("myAddress", defaultAddr, defaultPort);
+    } catch (IllegalArgumentException iae) {
+      threwException = true;
+      assertEquals("Does not contain a valid host:port authority: " +
+                   "bad:-port (configuration property 'myAddress')",
+                   iae.getMessage());
+      
+    } finally {
+      assertTrue(threwException);
+    }
+  }
+
   public void testReload() throws IOException {
     out=new BufferedWriter(new FileWriter(CONFIG));
     startConfig();

+ 2 - 1
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

@@ -260,7 +260,8 @@ public class TestRPC {
     public org.apache.hadoop.ipc.RPC.Server getServer(Class<?> protocol,
         Object instance, String bindAddress, int port, int numHandlers,
         int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf,
-        SecretManager<? extends TokenIdentifier> secretManager) throws IOException {
+        SecretManager<? extends TokenIdentifier> secretManager, 
+        String portRangeConfig) throws IOException {
       return null;
     }
 

+ 118 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestServer.java

@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import static org.junit.Assert.*;
+
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+/**
+ * This is intended to be a set of unit tests for the 
+ * org.apache.hadoop.ipc.Server class.
+ */
+public class TestServer {
+
+  @Test
+  public void testBind() throws Exception {
+    Configuration conf = new Configuration();
+    ServerSocket socket = new ServerSocket();
+    InetSocketAddress address = new InetSocketAddress("0.0.0.0",0);
+    socket.bind(address);
+    try {
+      int min = socket.getLocalPort();
+      int max = min + 100;
+      conf.set("TestRange", min+"-"+max);
+      
+
+      ServerSocket socket2 = new ServerSocket();
+      InetSocketAddress address2 = new InetSocketAddress("0.0.0.0", 0);
+      Server.bind(socket2, address2, 10, conf, "TestRange");
+      try {
+        assertTrue(socket2.isBound());
+        assertTrue(socket2.getLocalPort() > min);
+        assertTrue(socket2.getLocalPort() <= max);
+      } finally {
+        socket2.close();
+      }
+    } finally {
+      socket.close();
+    }
+  }
+  
+  @Test
+  public void testBindSimple() throws Exception {
+    ServerSocket socket = new ServerSocket();
+    InetSocketAddress address = new InetSocketAddress("0.0.0.0",0);
+    Server.bind(socket, address, 10);
+    try {
+      assertTrue(socket.isBound());
+    } finally {
+      socket.close();
+    }
+  }
+
+  @Test
+  public void testEmptyConfig() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set("TestRange", "");
+
+
+    ServerSocket socket = new ServerSocket();
+    InetSocketAddress address = new InetSocketAddress("0.0.0.0", 0);
+    try {
+      Server.bind(socket, address, 10, conf, "TestRange");
+      assertTrue(socket.isBound());
+    } finally {
+      socket.close();
+    }
+  }
+  
+  
+  @Test
+  public void testBindError() throws Exception {
+    Configuration conf = new Configuration();
+    ServerSocket socket = new ServerSocket();
+    InetSocketAddress address = new InetSocketAddress("0.0.0.0",0);
+    socket.bind(address);
+    try {
+      int min = socket.getLocalPort();
+      conf.set("TestRange", min+"-"+min);
+      
+
+      ServerSocket socket2 = new ServerSocket();
+      InetSocketAddress address2 = new InetSocketAddress("0.0.0.0", 0);
+      boolean caught = false;
+      try {
+        Server.bind(socket2, address2, 10, conf, "TestRange");
+      } catch (BindException e) {
+        caught = true;
+      } finally {
+        socket2.close();
+      }
+      assertTrue("Failed to catch the expected bind exception",caught);
+    } finally {
+      socket.close();
+    }
+  }
+}

+ 30 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/SecurityUtilTestHelper.java

@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.security;
+
+/** helper utils for tests */
+public class SecurityUtilTestHelper {
+  
+  /**
+   * Allow tests to change the resolver used for tokens
+   * @param flag boolean for whether token services use ips or hosts
+   */
+  public static void setTokenServiceUseIp(boolean flag) {
+    SecurityUtil.setTokenServiceUseIp(flag);
+  }
+}

+ 0 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/test/GenericTestUtils.java → hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java


+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestVersionUtil.java → hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestVersionUtil.java

@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.util;
+package org.apache.hadoop.util;
 
 import static org.junit.Assert.*;
 

+ 13 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -377,6 +377,11 @@ Release 2.0.0 - UNRELEASED
     HDFS-3259. NameNode#initializeSharedEdits should populate shared edits dir
     with edit log segments. (atm)
 
+    HDFS-2708. Stats for the # of blocks per DN. (atm)
+
+    HDFS-3279. Move the FSEditLog constructor with @VisibleForTesting to
+    TestEditLog.  (Arpit Gupta via szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-3024. Improve performance of stringification in addStoredBlock (todd)
@@ -514,6 +519,14 @@ Release 2.0.0 - UNRELEASED
 
     HDFS-2765. TestNameEditsConfigs is incorrectly swallowing IOE. (atm)
 
+    HDFS-3268. FileContext API mishandles token service and incompatible with
+    HA (Daryn Sharp via todd)
+
+    HDFS-3284. bootstrapStandby fails in secure cluster (todd)
+
+    HDFS-3165. HDFS Balancer scripts are refering to wrong path of
+    hadoop-daemon.sh (Amith D K via eli)
+
   BREAKDOWN OF HDFS-1623 SUBTASKS
 
     HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd)

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-balancer.sh

@@ -24,4 +24,4 @@ HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
 
 # Start balancer daemon.
 
-"$HADOOP_PREFIX"/bin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script "$bin"/hdfs start balancer $@
+"$HADOOP_PREFIX"/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script "$bin"/hdfs start balancer $@

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-balancer.sh

@@ -25,4 +25,4 @@ HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
 # Stop balancer daemon.
 # Run this on the machine where the balancer is running
 
-"$HADOOP_PREFIX"/bin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script "$bin"/hdfs stop balancer
+"$HADOOP_PREFIX"/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script "$bin"/hdfs stop balancer

+ 5 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java

@@ -20,7 +20,6 @@ package org.apache.hadoop.fs;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
@@ -391,11 +390,15 @@ public class Hdfs extends AbstractFileSystem {
     return new Path(dfs.getLinkTarget(getUriPath(p)));
   }
   
+  @Override
+  public String getCanonicalServiceName() {
+    return dfs.getCanonicalServiceName();
+  }
+
   @Override //AbstractFileSystem
   public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
     Token<DelegationTokenIdentifier> result = dfs
         .getDelegationToken(renewer == null ? null : new Text(renewer));
-    result.setService(new Text(this.getCanonicalServiceName()));
     List<Token<?>> tokenList = new ArrayList<Token<?>>();
     tokenList.add(result);
     return tokenList;

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -637,6 +637,16 @@ public class DFSClient implements java.io.Closeable {
     return serverDefaults;
   }
   
+  /**
+   * Get a canonical token service name for this client's tokens.  Null should
+   * be returned if the client is not using tokens.
+   * @return the token service for the client
+   */
+  @InterfaceAudience.LimitedPrivate( { "HDFS" }) 
+  public String getCanonicalServiceName() {
+    return (dtService != null) ? dtService.toString() : null;
+  }
+  
   /**
    * @see ClientProtocol#getDelegationToken(Text)
    */

+ 1 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -848,12 +848,7 @@ public class DistributedFileSystem extends FileSystem {
    */
   @Override
   public String getCanonicalServiceName() {
-    URI uri = getUri();
-    if (HAUtil.isLogicalUri(getConf(), uri)) {
-      return HAUtil.buildTokenServiceForLogicalUri(uri).toString();
-    } else {
-      return super.getCanonicalServiceName();
-    }
+    return dfs.getCanonicalServiceName();
   }
 
   /**

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

@@ -48,11 +48,11 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
-import org.apache.hadoop.hdfs.util.VersionUtil;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.util.VersionUtil;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;

+ 0 - 15
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java

@@ -26,7 +26,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -178,20 +177,6 @@ public class FSEditLog  {
     }
   };
 
-  /**
-   * Construct FSEditLog with default configuration, taking editDirs from NNStorage
-   * 
-   * @param storage Storage object used by namenode
-   */
-  @VisibleForTesting
-  FSEditLog(NNStorage storage) throws IOException {
-    Configuration conf = new Configuration();
-    // Make sure the edits dirs are set in the provided configuration object.
-    conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
-        StringUtils.join(storage.getEditsDirectories(), ","));
-    init(conf, storage, FSNamesystem.getNamespaceEditsDirs(conf));
-  }
-
   /**
    * Constructor for FSEditLog. Underlying journals are constructed, but 
    * no streams are opened until open() is called.

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -5051,6 +5051,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       innerinfo.put("adminState", node.getAdminState().toString());
       innerinfo.put("nonDfsUsedSpace", node.getNonDfsUsed());
       innerinfo.put("capacity", node.getCapacity());
+      innerinfo.put("numBlocks", node.numBlocks());
       info.put(node.getHostName(), innerinfo);
     }
     return JSON.toString(info);

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -108,7 +108,6 @@ import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
-import org.apache.hadoop.hdfs.util.VersionUtil;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -124,6 +123,7 @@ import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.util.VersionUtil;
 
 import com.google.protobuf.BlockingService;
 

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java

@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.tools.DFSHAAdmin;
 import org.apache.hadoop.hdfs.tools.NNHAServiceTarget;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
@@ -144,8 +145,8 @@ public class BootstrapStandby implements Tool, Configurable {
   
   private HAServiceProtocol createHAProtocolProxy()
       throws IOException {
-    return new NNHAServiceTarget(new HdfsConfiguration(conf),
-        nsId, otherNNId).getProxy(conf, 15000);
+    return new NNHAServiceTarget(new HdfsConfiguration(conf), nsId, otherNNId)
+        .getProxy(conf, 15000);
   }
 
   private int doRun() throws IOException {
@@ -334,7 +335,7 @@ public class BootstrapStandby implements Tool, Configurable {
 
   @Override
   public void setConf(Configuration conf) {
-    this.conf = conf;
+    this.conf = DFSHAAdmin.addSecurityConfiguration(conf);
   }
 
   @Override

+ 22 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSHAAdmin.java

@@ -46,21 +46,32 @@ public class DFSHAAdmin extends HAAdmin {
   @Override
   public void setConf(Configuration conf) {
     if (conf != null) {
-      // Make a copy so we don't mutate it. Also use an HdfsConfiguration to
-      // force loading of hdfs-site.xml.
-      conf = new HdfsConfiguration(conf);
-      String nameNodePrincipal = conf.get(
-          DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, "");
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Using NN principal: " + nameNodePrincipal);
-      }
-
-      conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY,
-          nameNodePrincipal);
+      conf = addSecurityConfiguration(conf);
     }
     super.setConf(conf);
   }
 
+  /**
+   * Add the requisite security principal settings to the given Configuration,
+   * returning a copy.
+   * @param conf the original config
+   * @return a copy with the security settings added
+   */
+  public static Configuration addSecurityConfiguration(Configuration conf) {
+    // Make a copy so we don't mutate it. Also use an HdfsConfiguration to
+    // force loading of hdfs-site.xml.
+    conf = new HdfsConfiguration(conf);
+    String nameNodePrincipal = conf.get(
+        DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, "");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Using NN principal: " + nameNodePrincipal);
+    }
+
+    conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY,
+        nameNodePrincipal);
+    return conf;
+  }
+
   /**
    * Try to map the given namenode ID to its service address.
    */

+ 22 - 8
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java

@@ -141,6 +141,20 @@ public class TestEditLog extends TestCase {
       }
     }
   }
+  
+  /**
+   * Construct FSEditLog with default configuration, taking editDirs from NNStorage
+   * 
+   * @param storage Storage object used by namenode
+   */
+  private static FSEditLog getFSEditLog(NNStorage storage) throws IOException {
+    Configuration conf = new Configuration();
+    // Make sure the edits dirs are set in the provided configuration object.
+    conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
+        StringUtils.join(",", storage.getEditsDirectories()));
+    FSEditLog log = new FSEditLog(conf, storage, FSNamesystem.getNamespaceEditsDirs(conf));
+    return log;
+  }
 
   /**
    * Test case for an empty edit log from a prior version of Hadoop.
@@ -863,7 +877,7 @@ public class TestEditLog extends TestCase {
     storage = mockStorageWithEdits(
         "[1,100]|[101,200]|[201,]",
         "[1,100]|[101,200]|[201,]");
-    log = new FSEditLog(storage);
+    log = getFSEditLog(storage);
     log.initJournalsForWrite();
     assertEquals("[[1,100], [101,200]]",
         log.getEditLogManifest(1).toString());
@@ -875,7 +889,7 @@ public class TestEditLog extends TestCase {
     storage = mockStorageWithEdits(
         "[1,100]|[101,200]",
         "[1,100]|[201,300]|[301,400]"); // nothing starting at 101
-    log = new FSEditLog(storage);
+    log = getFSEditLog(storage);
     log.initJournalsForWrite();
     assertEquals("[[1,100], [101,200], [201,300], [301,400]]",
         log.getEditLogManifest(1).toString());
@@ -885,7 +899,7 @@ public class TestEditLog extends TestCase {
     storage = mockStorageWithEdits(
         "[1,100]|[301,400]", // gap from 101 to 300
         "[301,400]|[401,500]");
-    log = new FSEditLog(storage);
+    log = getFSEditLog(storage);
     log.initJournalsForWrite();
     assertEquals("[[301,400], [401,500]]",
         log.getEditLogManifest(1).toString());
@@ -895,7 +909,7 @@ public class TestEditLog extends TestCase {
     storage = mockStorageWithEdits(
         "[1,100]|[101,150]", // short log at 101
         "[1,50]|[101,200]"); // short log at 1
-    log = new FSEditLog(storage);
+    log = getFSEditLog(storage);
     log.initJournalsForWrite();
     assertEquals("[[1,100], [101,200]]",
         log.getEditLogManifest(1).toString());
@@ -908,7 +922,7 @@ public class TestEditLog extends TestCase {
     storage = mockStorageWithEdits(
         "[1,100]|[101,]", 
         "[1,100]|[101,200]"); 
-    log = new FSEditLog(storage);
+    log = getFSEditLog(storage);
     log.initJournalsForWrite();
     assertEquals("[[1,100], [101,200]]",
         log.getEditLogManifest(1).toString());
@@ -998,7 +1012,7 @@ public class TestEditLog extends TestCase {
                                       Collections.<URI>emptyList(),
                                       editUris);
     storage.format(new NamespaceInfo());
-    FSEditLog editlog = new FSEditLog(storage);    
+    FSEditLog editlog = getFSEditLog(storage);    
     // open the edit log and add two transactions
     // logGenerationStamp is used, simply because it doesn't 
     // require complex arguments.
@@ -1080,7 +1094,7 @@ public class TestEditLog extends TestCase {
                                    new AbortSpec(9, 0),
                                    new AbortSpec(10, 1));
     long totaltxnread = 0;
-    FSEditLog editlog = new FSEditLog(storage);
+    FSEditLog editlog = getFSEditLog(storage);
     editlog.initJournalsForWrite();
     long startTxId = 1;
     Iterable<EditLogInputStream> editStreams = editlog.selectInputStreams(startTxId, 
@@ -1130,7 +1144,7 @@ public class TestEditLog extends TestCase {
     assertEquals(1, files.length);
     assertTrue(files[0].delete());
     
-    FSEditLog editlog = new FSEditLog(storage);
+    FSEditLog editlog = getFSEditLog(storage);
     editlog.initJournalsForWrite();
     long startTxId = 1;
     try {

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java

@@ -101,6 +101,8 @@ public class TestNameNodeMXBean {
         assertTrue(((Long)liveNode.get("nonDfsUsedSpace")) > 0);
         assertTrue(liveNode.containsKey("capacity"));
         assertTrue(((Long)liveNode.get("capacity")) > 0);
+        assertTrue(liveNode.containsKey("numBlocks"));
+        assertTrue(((Long)liveNode.get("numBlocks")) == 0);
       }
       Assert.assertEquals(fsn.getLiveNodes(), alivenodeinfo);
       // get attribute deadnodeinfo

+ 16 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java

@@ -30,6 +30,7 @@ import java.util.Collection;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.AbstractFileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -223,6 +224,21 @@ public class TestDelegationTokensWithHA {
     token.cancel(dfs.getConf());
   }
   
+  @Test
+  public void testHdfsGetCanonicalServiceName() throws Exception {
+    Configuration conf = dfs.getConf();
+    URI haUri = HATestUtil.getLogicalUri(cluster);
+    AbstractFileSystem afs =  AbstractFileSystem.createFileSystem(haUri, conf);    
+    String haService = HAUtil.buildTokenServiceForLogicalUri(haUri).toString();
+    assertEquals(haService, afs.getCanonicalServiceName());
+    Token<?> token = afs.getDelegationTokens(
+        UserGroupInformation.getCurrentUser().getShortUserName()).get(0);
+    assertEquals(haService, token.getService().toString());
+    // make sure the logical uri is handled correctly
+    token.renew(conf);
+    token.cancel(conf);
+  }
+  
   enum TokenTestAction {
     RENEW, CANCEL;
   }

+ 12 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -241,6 +241,9 @@ Release 2.0.0 - UNRELEASED
     MAPREDUCE-4147. YARN should not have a compile-time dependency on HDFS.
     (tomwhite)
 
+    MAPREDUCE-4008. ResourceManager throws MetricsException on start up 
+    saying QueueMetrics MBean already exists (Devaraj K via tgraves)
+
 Release 0.23.3 - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -255,6 +258,10 @@ Release 0.23.3 - UNRELEASED
     MAPREDUCE-4059. The history server should have a separate pluggable 
     storage/query interface. (Robert Evans via tgraves)
 
+    MAPREDUCE-3942. Randomize master key generation for
+    ApplicationTokenSecretManager and roll it every so often. (Vinod Kumar
+    Vavilapalli via sseth)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -338,6 +345,11 @@ Release 0.23.3 - UNRELEASED
     MAPREDUCE-4144. Fix a NPE in the ResourceManager when handling node
     updates. (Jason Lowe via sseth)
 
+    MAPREDUCE-4156. ant build fails compiling JobInProgress (tgraves)
+
+    MAPREDUCE-4160. some mrv1 ant tests fail with timeout - due to 4156 
+    (tgraves)
+
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 6 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/AMRMProtocolPBClientImpl.java

@@ -56,7 +56,12 @@ public class AMRMProtocolPBClientImpl implements AMRMProtocol {
         AMRMProtocolPB.class, clientVersion, addr, conf);
   }
   
-  
+  public void close() {
+    if (this.proxy != null) {
+      RPC.stopProxy(this.proxy);
+    }
+  }
+
   @Override
   public AllocateResponse allocate(AllocateRequest request)
       throws YarnRemoteException {

+ 6 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -246,6 +246,12 @@ public class YarnConfiguration extends Configuration {
   public static final String DEFAULT_RM_METRICS_RUNTIME_BUCKETS = 
     "60,300,1440";
 
+  public static final String RM_APP_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS = RM_PREFIX
+      + "application-tokens.master-key-rolling-interval-secs";
+
+  public static final long DEFAULT_RM_APP_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS =
+      24 * 60 * 60;
+
   ////////////////////////////////
   // Node Manager Configs
   ////////////////////////////////

+ 26 - 4
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ApplicationTokenIdentifier.java

@@ -23,34 +23,55 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 
+/**
+ * ApplicationTokenIdentifier is the TokenIdentifier to be used by
+ * ApplicationMasters to authenticate to the ResourceManager.
+ */
 public class ApplicationTokenIdentifier extends TokenIdentifier {
 
   public static final Text KIND_NAME = new Text("YARN_APPLICATION_TOKEN");
 
-  private String applicationAttemptId;
+  private ApplicationAttemptId applicationAttemptId;
 
   public ApplicationTokenIdentifier() {
   }
 
   public ApplicationTokenIdentifier(ApplicationAttemptId appAttemptId) {
     this();
-    this.applicationAttemptId = appAttemptId.toString();
+    this.applicationAttemptId = appAttemptId;
+  }
+
+  @Private
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return this.applicationAttemptId;
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
-    Text.writeString(out, this.applicationAttemptId);
+    ApplicationId appId = this.applicationAttemptId.getApplicationId();
+    out.writeLong(appId.getClusterTimestamp());
+    out.writeInt(appId.getId());
+    out.writeInt(this.applicationAttemptId.getAttemptId());
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    this.applicationAttemptId = Text.readString(in);
+    long clusterTimeStamp = in.readLong();
+    int appId = in.readInt();
+    int attemptId = in.readInt();
+    ApplicationId applicationId =
+        BuilderUtils.newApplicationId(clusterTimeStamp, appId);
+    this.applicationAttemptId =
+        BuilderUtils.newApplicationAttemptId(applicationId, attemptId);
   }
 
   @Override
@@ -68,6 +89,7 @@ public class ApplicationTokenIdentifier extends TokenIdentifier {
         .toString());
   }
 
+  // TODO: Needed?
   @InterfaceAudience.Private
   public static class Renewer extends Token.TrivialRenewer {
     @Override

+ 0 - 78
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ApplicationTokenSecretManager.java

@@ -1,78 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.hadoop.yarn.security;
-
-import javax.crypto.SecretKey;
-
-import org.apache.hadoop.security.token.SecretManager;
-
-public class ApplicationTokenSecretManager extends
-    SecretManager<ApplicationTokenIdentifier> {
-
-  // TODO: mark as final
-  private SecretKey masterKey; // For now only one masterKey, for ever.
-
-  // TODO: add expiry for masterKey
-  // TODO: add logic to handle with multiple masterKeys, only one being used for
-  // creating new tokens at any time.
-  // TODO: Make he masterKey more secure, non-transferrable etc.
-
-  /**
-   * Default constructor
-   */
-  public ApplicationTokenSecretManager() {
-    this.masterKey = generateSecret();
-  }
-
-  // TODO: this should go away.
-  public void setMasterKey(SecretKey mk) {
-    this.masterKey = mk;
-  }
-
-  // TODO: this should go away.
-  public SecretKey getMasterKey() {
-    return masterKey;
-  }
-
-  /**
-   * Convert the byte[] to a secret key
-   * @param key the byte[] to create the secret key from
-   * @return the secret key
-   */
-  public static SecretKey createSecretKey(byte[] key) {
-    return SecretManager.createSecretKey(key);
-  }
-
-  @Override
-  public byte[] createPassword(ApplicationTokenIdentifier identifier) {
-    return createPassword(identifier.getBytes(), masterKey);
-  }
-
-  @Override
-  public byte[] retrievePassword(ApplicationTokenIdentifier identifier)
-      throws SecretManager.InvalidToken {
-    return createPassword(identifier.getBytes(), masterKey);
-  }
-
-  @Override
-  public ApplicationTokenIdentifier createIdentifier() {
-    return new ApplicationTokenIdentifier();
-  }
-
-}

+ 8 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -215,6 +215,14 @@
     <value>30000</value>
   </property>
 
+  <property>
+    <description>Interval for the roll over for the master key used to generate
+        application tokens
+    </description>
+    <name>yarn.resourcemanager.application-tokens.master-key-rolling-interval-secs</name>
+    <value>86400</value>
+  </property>
+
   <!-- Node Manager Configs -->
   <property>
     <description>address of node manager IPC.</description>

+ 5 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerSecurityInfo.java

@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sec
 
 import java.lang.annotation.Annotation;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.SecurityInfo;
@@ -30,6 +32,8 @@ import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocolPB;
 
 public class LocalizerSecurityInfo extends SecurityInfo {
 
+  private static final Log LOG = LogFactory.getLog(LocalizerSecurityInfo.class);
+
   @Override
   public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
     return null;
@@ -51,7 +55,7 @@ public class LocalizerSecurityInfo extends SecurityInfo {
       @Override
       public Class<? extends TokenSelector<? extends TokenIdentifier>>
           value() {
-        System.err.print("=========== Using localizerTokenSecurityInfo");
+        LOG.debug("Using localizerTokenSecurityInfo");
         return LocalizerTokenSelector.class;
       }
     };

+ 23 - 18
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java

@@ -55,7 +55,6 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeState;
@@ -72,14 +71,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicy
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 
+@SuppressWarnings("unchecked")
 @Private
 public class ApplicationMasterService extends AbstractService implements
     AMRMProtocol {
   private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class);
   private final AMLivelinessMonitor amLivelinessMonitor;
   private YarnScheduler rScheduler;
-  private ApplicationTokenSecretManager appTokenManager;
-  private InetSocketAddress masterServiceAddress;
+  private InetSocketAddress bindAddress;
   private Server server;
   private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   private final ConcurrentMap<ApplicationAttemptId, AMResponse> responseMap =
@@ -87,35 +86,31 @@ public class ApplicationMasterService extends AbstractService implements
   private final AMResponse reboot = recordFactory.newRecordInstance(AMResponse.class);
   private final RMContext rmContext;
 
-  public ApplicationMasterService(RMContext rmContext,
-      ApplicationTokenSecretManager appTokenManager, YarnScheduler scheduler) {
+  public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
     super(ApplicationMasterService.class.getName());
     this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
-    this.appTokenManager = appTokenManager;
     this.rScheduler = scheduler;
     this.reboot.setReboot(true);
 //    this.reboot.containers = new ArrayList<Container>();
     this.rmContext = rmContext;
   }
 
-  @Override
-  public void init(Configuration conf) {
-    String bindAddress =
-      conf.get(YarnConfiguration.RM_SCHEDULER_ADDRESS,
-          YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS);
-    masterServiceAddress =  NetUtils.createSocketAddr(bindAddress,
-      YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT,
-      YarnConfiguration.RM_SCHEDULER_ADDRESS);
-    super.init(conf);
-  }
-
   @Override
   public void start() {
     Configuration conf = getConfig();
     YarnRPC rpc = YarnRPC.create(conf);
+
+    String bindAddressStr =
+        conf.get(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+          YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS);
+    InetSocketAddress masterServiceAddress =
+        NetUtils.createSocketAddr(bindAddressStr,
+          YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT,
+          YarnConfiguration.RM_SCHEDULER_ADDRESS);
+
     this.server =
       rpc.getServer(AMRMProtocol.class, this, masterServiceAddress,
-          conf, this.appTokenManager,
+          conf, this.rmContext.getApplicationTokenSecretManager(),
           conf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT, 
               YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
     
@@ -127,9 +122,19 @@ public class ApplicationMasterService extends AbstractService implements
     }
     
     this.server.start();
+
+    this.bindAddress =
+        NetUtils.createSocketAddr(masterServiceAddress.getHostName(),
+          this.server.getPort());
+
     super.start();
   }
 
+  @Private
+  public InetSocketAddress getBindAddress() {
+    return this.bindAddress;
+  }
+
   private void authorizeRequest(ApplicationAttemptId appAttemptID)
       throws YarnRemoteException {
 

+ 3 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 
 /**
@@ -53,4 +54,6 @@ public interface RMContext {
   ContainerAllocationExpirer getContainerAllocationExpirer();
   
   DelegationTokenRenewer getDelegationTokenRenewer();
+
+  ApplicationTokenSecretManager getApplicationTokenSecretManager();
 }

+ 10 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 
 public class RMContextImpl implements RMContext {
@@ -50,16 +51,19 @@ public class RMContextImpl implements RMContext {
   private AMLivelinessMonitor amLivelinessMonitor;
   private ContainerAllocationExpirer containerAllocationExpirer;
   private final DelegationTokenRenewer tokenRenewer;
+  private final ApplicationTokenSecretManager appTokenSecretManager;
 
   public RMContextImpl(Store store, Dispatcher rmDispatcher,
       ContainerAllocationExpirer containerAllocationExpirer,
       AMLivelinessMonitor amLivelinessMonitor,
-      DelegationTokenRenewer tokenRenewer) {
+      DelegationTokenRenewer tokenRenewer,
+      ApplicationTokenSecretManager appTokenSecretManager) {
     this.store = store;
     this.rmDispatcher = rmDispatcher;
     this.containerAllocationExpirer = containerAllocationExpirer;
     this.amLivelinessMonitor = amLivelinessMonitor;
     this.tokenRenewer = tokenRenewer;
+    this.appTokenSecretManager = appTokenSecretManager;
   }
   
   @Override
@@ -106,4 +110,9 @@ public class RMContextImpl implements RMContext {
   public DelegationTokenRenewer getDelegationTokenRenewer() {
     return tokenRenewer;
   }
+
+  @Override
+  public ApplicationTokenSecretManager getApplicationTokenSecretManager() {
+    return this.appTokenSecretManager;
+  }
 }

+ 37 - 17
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -27,7 +27,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
 import org.apache.hadoop.security.SecurityUtil;
@@ -41,7 +40,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
 import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
 import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
@@ -65,6 +63,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -82,8 +81,10 @@ import org.apache.hadoop.yarn.webapp.WebApps.Builder;
 
 /**
  * The ResourceManager is the main class that is a set of components.
+ * "I am the ResourceManager. All your resources are belong to us..."
  *
  */
+@SuppressWarnings("unchecked")
 public class ResourceManager extends CompositeService implements Recoverable {
   private static final Log LOG = LogFactory.getLog(ResourceManager.class);
   public static final long clusterTimeStamp = System.currentTimeMillis();
@@ -94,8 +95,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
   protected ContainerTokenSecretManager containerTokenSecretManager =
       new ContainerTokenSecretManager();
 
-  protected ApplicationTokenSecretManager appTokenSecretManager =
-      new ApplicationTokenSecretManager();
+  protected ApplicationTokenSecretManager appTokenSecretManager;
 
   private Dispatcher rmDispatcher;
 
@@ -137,6 +137,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
     this.rmDispatcher = createDispatcher();
     addIfService(this.rmDispatcher);
 
+    this.appTokenSecretManager = createApplicationTokenSecretManager(conf);
+
     this.containerAllocationExpirer = new ContainerAllocationExpirer(
         this.rmDispatcher);
     addService(this.containerAllocationExpirer);
@@ -147,8 +149,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
     DelegationTokenRenewer tokenRenewer = createDelegationTokenRenewer();
     addService(tokenRenewer);
     
-    this.rmContext = new RMContextImpl(this.store, this.rmDispatcher,
-        this.containerAllocationExpirer, amLivelinessMonitor, tokenRenewer);
+    this.rmContext =
+        new RMContextImpl(this.store, this.rmDispatcher,
+          this.containerAllocationExpirer, amLivelinessMonitor, tokenRenewer,
+          this.appTokenSecretManager);
 
     // Register event handler for NodesListManager
     this.nodesListManager = new NodesListManager(this.rmContext);
@@ -175,10 +179,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
     this.rmDispatcher.register(RMNodeEventType.class,
         new NodeEventDispatcher(this.rmContext));    
 
-    //TODO change this to be random
-    this.appTokenSecretManager.setMasterKey(ApplicationTokenSecretManager
-        .createSecretKey("Dummy".getBytes()));
-
     this.nmLivelinessMonitor = createNMLivelinessMonitor();
     addService(this.nmLivelinessMonitor);
 
@@ -233,6 +233,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
     }
   }
 
+  protected ApplicationTokenSecretManager createApplicationTokenSecretManager(
+      Configuration conf) {
+    return new ApplicationTokenSecretManager(conf);
+  }
+
   protected ResourceScheduler createScheduler() {
     return ReflectionUtils.newInstance(this.conf.getClass(
         YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
@@ -240,9 +245,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
   }
 
   protected ApplicationMasterLauncher createAMLauncher() {
-    return new ApplicationMasterLauncher(
-        this.appTokenSecretManager, this.clientToAMSecretManager,
-        this.rmContext);
+    return new ApplicationMasterLauncher(this.clientToAMSecretManager,
+      this.rmContext);
   }
 
   private NMLivelinessMonitor createNMLivelinessMonitor() {
@@ -273,6 +277,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
       new LinkedBlockingQueue<SchedulerEvent>();
     private final Thread eventProcessor;
     private volatile boolean stopped = false;
+    private boolean shouldExitOnError = false;
 
     public SchedulerEventDispatcher(ResourceScheduler scheduler) {
       super(SchedulerEventDispatcher.class.getName());
@@ -281,6 +286,14 @@ public class ResourceManager extends CompositeService implements Recoverable {
       this.eventProcessor.setName("ResourceManager Event Processor");
     }
 
+    @Override
+    public synchronized void init(Configuration conf) {
+      this.shouldExitOnError =
+          conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
+            Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
+      super.init(conf);
+    }
+
     @Override
     public synchronized void start() {
       this.eventProcessor.start();
@@ -306,8 +319,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
           } catch (Throwable t) {
             LOG.fatal("Error in handling event type " + event.getType()
                 + " to the scheduler", t);
-            if (getConfig().getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
-              Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR)) {
+            if (shouldExitOnError) {
               LOG.info("Exiting, bbye..");
               System.exit(-1);
             }
@@ -453,6 +465,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
       throw new YarnException("Failed to login", ie);
     }
 
+    this.appTokenSecretManager.start();
+
     startWepApp();
     DefaultMetricsSystem.initialize("ResourceManager");
     JvmMetrics.initSingleton("ResourceManager", null);
@@ -487,6 +501,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
     }
     rmDTSecretManager.stopThreads();
 
+    this.appTokenSecretManager.stop();
+
     /*synchronized(shutdown) {
       shutdown.set(true);
       shutdown.notifyAll();
@@ -524,8 +540,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
   }
 
   protected ApplicationMasterService createApplicationMasterService() {
-    return new ApplicationMasterService(this.rmContext,
-        this.appTokenSecretManager, scheduler);
+    return new ApplicationMasterService(this.rmContext, scheduler);
   }
   
 
@@ -571,6 +586,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
     return this.applicationACLsManager;
   }
 
+  @Private
+  public ApplicationTokenSecretManager getApplicationTokenSecretManager(){
+    return this.appTokenSecretManager;
+  }
+
   @Override
   public void recover(RMState state) throws Exception {
     resourceTracker.recover(state);

+ 2 - 5
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java

@@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
-import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
 import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
@@ -76,7 +75,6 @@ public class AMLauncher implements Runnable {
   private final Configuration conf;
   private final RecordFactory recordFactory = 
       RecordFactoryProvider.getRecordFactory(null);
-  private final ApplicationTokenSecretManager applicationTokenSecretManager;
   private final ClientToAMSecretManager clientToAMSecretManager;
   private final AMLauncherEventType eventType;
   private final RMContext rmContext;
@@ -86,11 +84,9 @@ public class AMLauncher implements Runnable {
   
   public AMLauncher(RMContext rmContext, RMAppAttempt application,
       AMLauncherEventType eventType,
-      ApplicationTokenSecretManager applicationTokenSecretManager,
       ClientToAMSecretManager clientToAMSecretManager, Configuration conf) {
     this.application = application;
     this.conf = conf;
-    this.applicationTokenSecretManager = applicationTokenSecretManager;
     this.clientToAMSecretManager = clientToAMSecretManager;
     this.eventType = eventType;
     this.rmContext = rmContext;
@@ -129,6 +125,7 @@ public class AMLauncher implements Runnable {
     containerMgrProxy.stopContainer(stopRequest);
   }
 
+  // Protected. For tests.
   protected ContainerManager getContainerMgrProxy(
       final ContainerId containerId) {
 
@@ -220,7 +217,7 @@ public class AMLauncher implements Runnable {
           application.getAppAttemptId());
       Token<ApplicationTokenIdentifier> token =
           new Token<ApplicationTokenIdentifier>(id,
-              this.applicationTokenSecretManager);
+              this.rmContext.getApplicationTokenSecretManager());
       String schedulerAddressStr =
           this.conf.get(YarnConfiguration.RM_SCHEDULER_ADDRESS,
               YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS);

+ 4 - 8
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java

@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
 import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -42,20 +41,16 @@ public class ApplicationMasterLauncher extends AbstractService implements
   private final BlockingQueue<Runnable> masterEvents
     = new LinkedBlockingQueue<Runnable>();
   
-  protected ApplicationTokenSecretManager applicationTokenSecretManager;
   private ClientToAMSecretManager clientToAMSecretManager;
   protected final RMContext context;
   
   public ApplicationMasterLauncher(
-      ApplicationTokenSecretManager applicationTokenSecretManager, 
-      ClientToAMSecretManager clientToAMSecretManager,
-      RMContext context) {
+      ClientToAMSecretManager clientToAMSecretManager, RMContext context) {
     super(ApplicationMasterLauncher.class.getName());
     this.context = context;
     this.launcherPool = new ThreadPoolExecutor(10, 10, 1, 
         TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
     this.launcherHandlingThread = new LauncherThread();
-    this.applicationTokenSecretManager = applicationTokenSecretManager;
     this.clientToAMSecretManager = clientToAMSecretManager;
   }
   
@@ -66,8 +61,9 @@ public class ApplicationMasterLauncher extends AbstractService implements
   
   protected Runnable createRunnableLauncher(RMAppAttempt application, 
       AMLauncherEventType event) {
-    Runnable launcher = new AMLauncher(context, application, event,
-        applicationTokenSecretManager, clientToAMSecretManager, getConfig());
+    Runnable launcher =
+        new AMLauncher(context, application, event, clientToAMSecretManager,
+          getConfig());
     return launcher;
   }
   

+ 3 - 3
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -33,13 +33,13 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;

+ 11 - 7
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java

@@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 
+@SuppressWarnings({"unchecked", "rawtypes"})
 public class RMAppAttemptImpl implements RMAppAttempt {
 
   private static final Log LOG = LogFactory.getLog(RMAppAttemptImpl.class);
@@ -95,7 +96,6 @@ public class RMAppAttemptImpl implements RMAppAttempt {
                              RMAppAttemptEvent> stateMachine;
 
   private final RMContext rmContext;
-  @SuppressWarnings("rawtypes")
   private final EventHandler eventHandler;
   private final YarnScheduler scheduler;
   private final ApplicationMasterService masterService;
@@ -539,7 +539,6 @@ public class RMAppAttemptImpl implements RMAppAttempt {
   }
 
   private static final class AttemptStartedTransition extends BaseTransition {
-    @SuppressWarnings("unchecked")
 	@Override
     public void transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
@@ -638,12 +637,13 @@ public class RMAppAttemptImpl implements RMAppAttempt {
     public void transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
 
+      ApplicationAttemptId appAttemptId = appAttempt.getAppAttemptId();
+
       // Tell the AMS. Unregister from the ApplicationMasterService
-      appAttempt.masterService
-          .unregisterAttempt(appAttempt.applicationAttemptId);
+      appAttempt.masterService.unregisterAttempt(appAttemptId);
 
       // Tell the application and the scheduler
-      ApplicationId applicationId = appAttempt.getAppAttemptId().getApplicationId();
+      ApplicationId applicationId = appAttemptId.getApplicationId();
       RMAppEvent appEvent = null;
       switch (finalAttemptState) {
         case FINISHED:
@@ -676,8 +676,12 @@ public class RMAppAttemptImpl implements RMAppAttempt {
       }
 
       appAttempt.eventHandler.handle(appEvent);
-      appAttempt.eventHandler.handle(new AppRemovedSchedulerEvent(appAttempt
-          .getAppAttemptId(), finalAttemptState));
+      appAttempt.eventHandler.handle(new AppRemovedSchedulerEvent(appAttemptId,
+        finalAttemptState));
+
+      // Remove the AppAttempt from the ApplicationTokenSecretManager
+      appAttempt.rmContext.getApplicationTokenSecretManager()
+        .applicationMasterFinished(appAttemptId);
     }
   }
 

+ 3 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

@@ -185,8 +185,6 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
   @Override
   public synchronized void setConf(Configuration conf) {
     this.conf = conf;
-    metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false, conf);
-    activeUsersManager = new ActiveUsersManager(metrics);
   }
   
   @Override
@@ -223,6 +221,9 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
         Resources.createResource(conf.getInt(MINIMUM_ALLOCATION, MINIMUM_MEMORY));
       this.maximumAllocation = 
         Resources.createResource(conf.getInt(MAXIMUM_ALLOCATION, MAXIMUM_MEMORY));
+      this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false,
+          conf);
+      this.activeUsersManager = new ActiveUsersManager(metrics);
       this.initialized = true;
     }
   }

+ 155 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/ApplicationTokenSecretManager.java

@@ -0,0 +1,155 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.security;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import javax.crypto.SecretKey;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
+
+/**
+ * Application-tokens are per ApplicationAttempt. If users redistribute their
+ * tokens, it is their headache, god save them. I mean you are not supposed to
+ * distribute keys to your vault, right? Anyways, ResourceManager saves each
+ * token locally in memory till application finishes and to a store for restart,
+ * so no need to remember master-keys even after rolling them.
+ */
+public class ApplicationTokenSecretManager extends
+    SecretManager<ApplicationTokenIdentifier> {
+
+  private static final Log LOG = LogFactory
+    .getLog(ApplicationTokenSecretManager.class);
+
+  private SecretKey masterKey;
+  private final Timer timer;
+  private final long rollingInterval;
+
+  private final Map<ApplicationAttemptId, byte[]> passwords =
+      new HashMap<ApplicationAttemptId, byte[]>();
+
+  /**
+   * Create an {@link ApplicationTokenSecretManager}
+   */
+  public ApplicationTokenSecretManager(Configuration conf) {
+    rollMasterKey();
+    this.timer = new Timer();
+    this.rollingInterval =
+        conf
+          .getLong(
+            YarnConfiguration.RM_APP_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
+            YarnConfiguration.DEFAULT_RM_APP_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000;
+  }
+
+  public void start() {
+    this.timer.scheduleAtFixedRate(new MasterKeyRoller(), 0, rollingInterval);
+  }
+
+  public void stop() {
+    this.timer.cancel();
+  }
+
+  public synchronized void applicationMasterFinished(
+      ApplicationAttemptId appAttemptId) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Application finished, removing password for " + appAttemptId);
+    }
+    this.passwords.remove(appAttemptId);
+  }
+
+  private class MasterKeyRoller extends TimerTask {
+    @Override
+    public void run() {
+      rollMasterKey();
+    }
+  }
+
+  @Private
+  public synchronized void setMasterKey(SecretKey masterKey) {
+    this.masterKey = masterKey;
+  }
+
+  @Private
+  public synchronized SecretKey getMasterKey() {
+    return this.masterKey;
+  }
+
+  @Private
+  synchronized void rollMasterKey() {
+    LOG.info("Rolling master-key for application-tokens");
+    this.masterKey = generateSecret();
+  }
+
+  /**
+   * Create a password for a given {@link ApplicationTokenIdentifier}. Used to
+   * send to the AppicationAttempt which can give it back during authentication.
+   */
+  @Override
+  public synchronized byte[] createPassword(
+      ApplicationTokenIdentifier identifier) {
+    ApplicationAttemptId applicationAttemptId =
+        identifier.getApplicationAttemptId();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Creating password for " + applicationAttemptId);
+    }
+    byte[] password = createPassword(identifier.getBytes(), masterKey);
+    this.passwords.put(applicationAttemptId, password);
+    return password;
+  }
+
+  /**
+   * Retrieve the password for the given {@link ApplicationTokenIdentifier}.
+   * Used by RPC layer to validate a remote {@link ApplicationTokenIdentifier}.
+   */
+  @Override
+  public synchronized byte[] retrievePassword(
+      ApplicationTokenIdentifier identifier) throws InvalidToken {
+    ApplicationAttemptId applicationAttemptId =
+        identifier.getApplicationAttemptId();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Trying to retrieve password for " + applicationAttemptId);
+    }
+    byte[] password = this.passwords.get(applicationAttemptId);
+    if (password == null) {
+      throw new InvalidToken("Password not found for ApplicationAttempt "
+          + applicationAttemptId);
+    }
+    return password;
+  }
+
+  /**
+   * Creates an empty TokenId to be used for de-serializing an
+   * {@link ApplicationTokenIdentifier} by the RPC layer.
+   */
+  @Override
+  public ApplicationTokenIdentifier createIdentifier() {
+    return new ApplicationTokenIdentifier();
+  }
+
+}

+ 5 - 6
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java

@@ -24,9 +24,9 @@ import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
@@ -55,6 +54,7 @@ import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+@SuppressWarnings("unchecked")
 public class MockRM extends ResourceManager {
 
   public MockRM() {
@@ -224,8 +224,7 @@ public class MockRM extends ResourceManager {
 
   @Override
   protected ApplicationMasterService createApplicationMasterService() {
-    return new ApplicationMasterService(getRMContext(),
-        this.appTokenSecretManager, scheduler) {
+    return new ApplicationMasterService(getRMContext(), scheduler) {
       @Override
       public void start() {
         // override to not start rpc handler
@@ -240,8 +239,8 @@ public class MockRM extends ResourceManager {
 
   @Override
   protected ApplicationMasterLauncher createAMLauncher() {
-    return new ApplicationMasterLauncher(this.appTokenSecretManager,
-        this.clientToAMSecretManager, getRMContext()) {
+    return new ApplicationMasterLauncher(this.clientToAMSecretManager,
+      getRMContext()) {
       @Override
       public void start() {
         // override to not start rpc handler

+ 17 - 25
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAMAuthorization.java

@@ -60,9 +60,9 @@ public class TestAMAuthorization {
 
   private static final Log LOG = LogFactory.getLog(TestAMAuthorization.class);
 
-  private static final class MyContainerManager implements ContainerManager {
+  public static final class MyContainerManager implements ContainerManager {
 
-    Map<String, String> containerEnv;
+    public Map<String, String> amContainerEnv;
 
     public MyContainerManager() {
     }
@@ -71,7 +71,7 @@ public class TestAMAuthorization {
     public StartContainerResponse
         startContainer(StartContainerRequest request)
             throws YarnRemoteException {
-      containerEnv = request.getContainerLaunchContext().getEnvironment();
+      amContainerEnv = request.getContainerLaunchContext().getEnvironment();
       return null;
     }
 
@@ -90,19 +90,15 @@ public class TestAMAuthorization {
     }
   }
 
-  private static class MockRMWithAMS extends MockRMWithCustomAMLauncher {
+  public static class MockRMWithAMS extends MockRMWithCustomAMLauncher {
 
-    private static final Configuration conf = new Configuration();
-    static {
+    public MockRMWithAMS(Configuration conf, ContainerManager containerManager) {
+      super(conf, containerManager);
       conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
           "kerberos");
       UserGroupInformation.setConfiguration(conf);
     }
 
-    public MockRMWithAMS(ContainerManager containerManager) {
-      super(conf, containerManager);
-    }
-
     @Override
     protected void doSecureLogin() throws IOException {
       // Skip the login.
@@ -111,15 +107,14 @@ public class TestAMAuthorization {
     @Override
     protected ApplicationMasterService createApplicationMasterService() {
 
-      return new ApplicationMasterService(getRMContext(),
-          this.appTokenSecretManager, this.scheduler);
+      return new ApplicationMasterService(getRMContext(), this.scheduler);
     }
   }
 
   @Test
   public void testAuthorizedAccess() throws Exception {
     MyContainerManager containerManager = new MyContainerManager();
-    MockRM rm = new MockRMWithAMS(containerManager);
+    final MockRM rm = new MockRMWithAMS(new Configuration(), containerManager);
     rm.start();
 
     MockNM nm1 = rm.registerNode("localhost:1234", 5120);
@@ -132,11 +127,11 @@ public class TestAMAuthorization {
     nm1.nodeHeartbeat(true);
 
     int waitCount = 0;
-    while (containerManager.containerEnv == null && waitCount++ < 20) {
+    while (containerManager.amContainerEnv == null && waitCount++ < 20) {
       LOG.info("Waiting for AM Launch to happen..");
       Thread.sleep(1000);
     }
-    Assert.assertNotNull(containerManager.containerEnv);
+    Assert.assertNotNull(containerManager.amContainerEnv);
 
     RMAppAttempt attempt = app.getCurrentAppAttempt();
     ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
@@ -145,13 +140,10 @@ public class TestAMAuthorization {
     // Create a client to the RM.
     final Configuration conf = rm.getConfig();
     final YarnRPC rpc = YarnRPC.create(conf);
-    final String serviceAddr = conf.get(
-        YarnConfiguration.RM_SCHEDULER_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS);
 
     UserGroupInformation currentUser = UserGroupInformation
         .createRemoteUser(applicationAttemptId.toString());
-    String tokenURLEncodedStr = containerManager.containerEnv
+    String tokenURLEncodedStr = containerManager.amContainerEnv
         .get(ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
     LOG.info("AppMasterToken is " + tokenURLEncodedStr);
     Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
@@ -162,8 +154,8 @@ public class TestAMAuthorization {
         .doAs(new PrivilegedAction<AMRMProtocol>() {
           @Override
           public AMRMProtocol run() {
-            return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, NetUtils
-                .createSocketAddr(serviceAddr), conf);
+            return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rm
+              .getApplicationMasterService().getBindAddress(), conf);
           }
         });
 
@@ -181,7 +173,7 @@ public class TestAMAuthorization {
   @Test
   public void testUnauthorizedAccess() throws Exception {
     MyContainerManager containerManager = new MyContainerManager();
-    MockRM rm = new MockRMWithAMS(containerManager);
+    MockRM rm = new MockRMWithAMS(new Configuration(), containerManager);
     rm.start();
 
     MockNM nm1 = rm.registerNode("localhost:1234", 5120);
@@ -191,11 +183,11 @@ public class TestAMAuthorization {
     nm1.nodeHeartbeat(true);
 
     int waitCount = 0;
-    while (containerManager.containerEnv == null && waitCount++ < 20) {
+    while (containerManager.amContainerEnv == null && waitCount++ < 20) {
       LOG.info("Waiting for AM Launch to happen..");
       Thread.sleep(1000);
     }
-    Assert.assertNotNull(containerManager.containerEnv);
+    Assert.assertNotNull(containerManager.amContainerEnv);
 
     RMAppAttempt attempt = app.getCurrentAppAttempt();
     ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
@@ -210,7 +202,7 @@ public class TestAMAuthorization {
 
     UserGroupInformation currentUser = UserGroupInformation
         .createRemoteUser(applicationAttemptId.toString());
-    String tokenURLEncodedStr = containerManager.containerEnv
+    String tokenURLEncodedStr = containerManager.amContainerEnv
         .get(ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
     LOG.info("AppMasterToken is " + tokenURLEncodedStr);
     Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();

+ 8 - 9
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java

@@ -27,8 +27,8 @@ import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.MockApps;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
 import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
@@ -93,7 +92,7 @@ public class TestAppManager{
     AMLivelinessMonitor amLivelinessMonitor = new AMLivelinessMonitor(
         rmDispatcher);
     return new RMContextImpl(new MemStore(), rmDispatcher,
-        containerAllocationExpirer, amLivelinessMonitor, null) {
+        containerAllocationExpirer, amLivelinessMonitor, null, null) {
       @Override
       public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
         return map;
@@ -336,9 +335,9 @@ public class TestAppManager{
 
     RMContext rmContext = mockRMContext(0, now - 10);
     ResourceScheduler scheduler = new CapacityScheduler();
-    ApplicationMasterService masterService =  new ApplicationMasterService(rmContext,
-        new ApplicationTokenSecretManager(), scheduler);
     Configuration conf = new Configuration();
+    ApplicationMasterService masterService =
+        new ApplicationMasterService(rmContext, scheduler);
     TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
         new ClientToAMSecretManager(), scheduler, masterService,
         new ApplicationACLsManager(conf), conf);
@@ -384,9 +383,9 @@ public class TestAppManager{
 
     RMContext rmContext = mockRMContext(1, now - 10);
     ResourceScheduler scheduler = new CapacityScheduler();
-    ApplicationMasterService masterService =  new ApplicationMasterService(rmContext,
-        new ApplicationTokenSecretManager(), scheduler);
     Configuration conf = new Configuration();
+    ApplicationMasterService masterService =
+        new ApplicationMasterService(rmContext, scheduler);
     TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
         new ClientToAMSecretManager(), scheduler, masterService,
         new ApplicationACLsManager(conf), conf);
@@ -432,9 +431,9 @@ public class TestAppManager{
     // specify 1 here and use same appId below so it gets duplicate entry
     RMContext rmContext = mockRMContext(1, now - 10);
     ResourceScheduler scheduler = new CapacityScheduler();
-    ApplicationMasterService masterService =  new ApplicationMasterService(rmContext,
-        new ApplicationTokenSecretManager(), scheduler);
     Configuration conf = new Configuration();
+    ApplicationMasterService masterService =
+        new ApplicationMasterService(rmContext, scheduler);
     TestRMAppManager appMonitor = new TestRMAppManager(rmContext,
         new ClientToAMSecretManager(), scheduler, masterService,
         new ApplicationACLsManager(conf), conf);

+ 3 - 6
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java

@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-import java.io.IOException;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -33,7 +32,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher;
@@ -123,14 +121,13 @@ public class TestApplicationMasterLauncher {
 
     @Override
     protected ApplicationMasterLauncher createAMLauncher() {
-      return new ApplicationMasterLauncher(super.appTokenSecretManager,
-          super.clientToAMSecretManager, getRMContext()) {
+      return new ApplicationMasterLauncher(super.clientToAMSecretManager,
+        getRMContext()) {
         @Override
         protected Runnable createRunnableLauncher(RMAppAttempt application,
             AMLauncherEventType event) {
           return new AMLauncher(context, application, event,
-              applicationTokenSecretManager, clientToAMSecretManager,
-              getConfig()) {
+            clientToAMSecretManager, getConfig()) {
             @Override
             protected ContainerManager getContainerMgrProxy(
                 ContainerId containerId) {

+ 1 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java

@@ -78,7 +78,7 @@ public class TestRMNodeTransitions {
     
     rmContext =
         new RMContextImpl(new MemStore(), rmDispatcher, null, null,
-            mock(DelegationTokenRenewer.class));
+            mock(DelegationTokenRenewer.class), null);
     scheduler = mock(YarnScheduler.class);
     doAnswer(
         new Answer<Void>() {

+ 1 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMLaunchFailure.java

@@ -44,7 +44,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
@@ -55,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;

+ 1 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java

@@ -48,7 +48,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
@@ -62,6 +61,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.junit.After;
 import org.junit.Before;

+ 1 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMExpiry.java

@@ -73,7 +73,7 @@ public class TestNMExpiry {
     // Dispatcher that processes events inline
     Dispatcher dispatcher = new InlineDispatcher();
     RMContext context = new RMContextImpl(new MemStore(), dispatcher, null,
-        null, null);
+        null, null, null);
     dispatcher.register(SchedulerEventType.class,
         new InlineDispatcher.EmptyEventHandler());
     dispatcher.register(RMNodeEventType.class,

+ 1 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java

@@ -67,7 +67,7 @@ public class TestRMNMRPCResponseId {
       }
     });
     RMContext context = 
-        new RMContextImpl(new MemStore(), dispatcher, null, null, null);
+        new RMContextImpl(new MemStore(), dispatcher, null, null, null, null);
     dispatcher.register(RMNodeEventType.class,
         new ResourceManager.NodeEventDispatcher(context));
     NodesListManager nodesListManager = new NodesListManager(context);

+ 7 - 7
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java

@@ -34,7 +34,6 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
@@ -48,7 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-
+import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -118,8 +117,10 @@ public class TestRMAppTransitions {
     ContainerAllocationExpirer containerAllocationExpirer = 
         mock(ContainerAllocationExpirer.class);
     AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
-    this.rmContext = new RMContextImpl(new MemStore(), rmDispatcher,
-        containerAllocationExpirer, amLivelinessMonitor, null);
+    this.rmContext =
+        new RMContextImpl(new MemStore(), rmDispatcher,
+          containerAllocationExpirer, amLivelinessMonitor, null,
+          new ApplicationTokenSecretManager(conf));
 
     rmDispatcher.register(RMAppAttemptEventType.class,
         new TestApplicationAttemptEventDispatcher(this.rmContext));
@@ -142,9 +143,8 @@ public class TestRMAppTransitions {
     String clientTokenStr = "bogusstring";
     ApplicationStore appStore = mock(ApplicationStore.class);
     YarnScheduler scheduler = mock(YarnScheduler.class);
-    ApplicationMasterService masterService = 
-        new ApplicationMasterService(rmContext,
-            new ApplicationTokenSecretManager(), scheduler);
+    ApplicationMasterService masterService =
+        new ApplicationMasterService(rmContext, scheduler);
 
     RMApp application = new RMAppImpl(applicationId, rmContext,
         conf, name, user,

+ 14 - 5
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java

@@ -17,9 +17,15 @@
 */
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
 
-import static org.junit.Assert.*;
-import static org.mockito.Matchers.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.util.Collections;
 import java.util.List;
@@ -61,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -136,8 +143,10 @@ public class TestRMAppAttemptTransitions {
     ContainerAllocationExpirer containerAllocationExpirer =
         mock(ContainerAllocationExpirer.class);
     AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
-    rmContext = new RMContextImpl(new MemStore(), rmDispatcher,
-      containerAllocationExpirer, amLivelinessMonitor, null);
+    rmContext =
+        new RMContextImpl(new MemStore(), rmDispatcher,
+          containerAllocationExpirer, amLivelinessMonitor, null,
+          new ApplicationTokenSecretManager(new Configuration()));
     
     scheduler = mock(YarnScheduler.class);
     masterService = mock(ApplicationMasterService.class);

+ 9 - 3
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java

@@ -18,10 +18,14 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -42,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAlloca
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
 
 public class TestUtils {
   private static final Log LOG = LogFactory.getLog(TestUtils.class);
@@ -74,8 +79,9 @@ public class TestUtils {
     ContainerAllocationExpirer cae = 
         new ContainerAllocationExpirer(nullDispatcher);
     
-    RMContext rmContext = 
-        new RMContextImpl(null, nullDispatcher, cae, null, null);
+    RMContext rmContext =
+        new RMContextImpl(null, nullDispatcher, cae, null, null,
+          new ApplicationTokenSecretManager(new Configuration()));
     
     return rmContext;
   }

+ 2 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java

@@ -85,7 +85,8 @@ public class TestFifoScheduler {
   @Test
   public void testAppAttemptMetrics() throws Exception {
     AsyncDispatcher dispatcher = new InlineDispatcher();
-    RMContext rmContext = new RMContextImpl(null, dispatcher, null, null, null);
+    RMContext rmContext =
+        new RMContextImpl(null, dispatcher, null, null, null, null);
 
     FifoScheduler schedular = new FifoScheduler();
     schedular.reinitialize(new Configuration(), null, rmContext);

+ 234 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestApplicationTokens.java

@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.security;
+
+import java.security.PrivilegedAction;
+
+import javax.crypto.SecretKey;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
+import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestApplicationTokens {
+
+  private static final Log LOG = LogFactory.getLog(TestApplicationTokens.class);
+
+  /**
+   * Validate that application tokens are unusable after the
+   * application-finishes.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testTokenExpiry() throws Exception {
+
+    MyContainerManager containerManager = new MyContainerManager();
+    final MockRM rm = new MockRMWithAMS(new Configuration(), containerManager);
+    rm.start();
+
+    try {
+      MockNM nm1 = rm.registerNode("localhost:1234", 5120);
+
+      RMApp app = rm.submitApp(1024);
+
+      nm1.nodeHeartbeat(true);
+
+      int waitCount = 0;
+      while (containerManager.amContainerEnv == null && waitCount++ < 20) {
+        LOG.info("Waiting for AM Launch to happen..");
+        Thread.sleep(1000);
+      }
+      Assert.assertNotNull(containerManager.amContainerEnv);
+
+      RMAppAttempt attempt = app.getCurrentAppAttempt();
+      ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
+
+      // Create a client to the RM.
+      final Configuration conf = rm.getConfig();
+      final YarnRPC rpc = YarnRPC.create(conf);
+
+      UserGroupInformation currentUser =
+          UserGroupInformation
+            .createRemoteUser(applicationAttemptId.toString());
+      String tokenURLEncodedStr =
+          containerManager.amContainerEnv
+            .get(ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
+      LOG.info("AppMasterToken is " + tokenURLEncodedStr);
+      Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
+      token.decodeFromUrlString(tokenURLEncodedStr);
+      currentUser.addToken(token);
+
+      AMRMProtocol rmClient = createRMClient(rm, conf, rpc, currentUser);
+
+      RegisterApplicationMasterRequest request =
+          Records.newRecord(RegisterApplicationMasterRequest.class);
+      request.setApplicationAttemptId(applicationAttemptId);
+      rmClient.registerApplicationMaster(request);
+
+      FinishApplicationMasterRequest finishAMRequest =
+          Records.newRecord(FinishApplicationMasterRequest.class);
+      finishAMRequest.setAppAttemptId(applicationAttemptId);
+      finishAMRequest
+        .setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+      finishAMRequest.setDiagnostics("diagnostics");
+      finishAMRequest.setTrackingUrl("url");
+      rmClient.finishApplicationMaster(finishAMRequest);
+
+      // Now simulate trying to allocate. RPC call itself should throw auth
+      // exception.
+      rpc.stopProxy(rmClient, conf); // To avoid using cached client
+      rmClient = createRMClient(rm, conf, rpc, currentUser);
+      request.setApplicationAttemptId(BuilderUtils.newApplicationAttemptId(
+        BuilderUtils.newApplicationId(12345, 78), 987));
+      AllocateRequest allocateRequest =
+          Records.newRecord(AllocateRequest.class);
+      allocateRequest.setApplicationAttemptId(applicationAttemptId);
+      try {
+        rmClient.allocate(allocateRequest);
+        Assert.fail("You got to be kidding me! "
+            + "Using App tokens after app-finish should fail!");
+      } catch (Throwable t) {
+        LOG.info("Exception found is ", t);
+        // The exception will still have the earlier appAttemptId as it picks it
+        // up from the token.
+        Assert.assertTrue(t.getCause().getMessage().contains(
+            "Password not found for ApplicationAttempt " +
+            applicationAttemptId.toString()));
+      }
+
+    } finally {
+      rm.stop();
+    }
+  }
+
+  /**
+   * Validate master-key-roll-over and that tokens are usable even after
+   * master-key-roll-over.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testMasterKeyRollOver() throws Exception {
+
+    Configuration config = new Configuration();
+    MyContainerManager containerManager = new MyContainerManager();
+    final MockRM rm = new MockRMWithAMS(config, containerManager);
+    rm.start();
+
+    try {
+      MockNM nm1 = rm.registerNode("localhost:1234", 5120);
+
+      RMApp app = rm.submitApp(1024);
+
+      nm1.nodeHeartbeat(true);
+
+      int waitCount = 0;
+      while (containerManager.amContainerEnv == null && waitCount++ < 20) {
+        LOG.info("Waiting for AM Launch to happen..");
+        Thread.sleep(1000);
+      }
+      Assert.assertNotNull(containerManager.amContainerEnv);
+
+      RMAppAttempt attempt = app.getCurrentAppAttempt();
+      ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId();
+
+      // Create a client to the RM.
+      final Configuration conf = rm.getConfig();
+      final YarnRPC rpc = YarnRPC.create(conf);
+
+      UserGroupInformation currentUser =
+          UserGroupInformation
+            .createRemoteUser(applicationAttemptId.toString());
+      String tokenURLEncodedStr =
+          containerManager.amContainerEnv
+            .get(ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
+      LOG.info("AppMasterToken is " + tokenURLEncodedStr);
+      Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
+      token.decodeFromUrlString(tokenURLEncodedStr);
+      currentUser.addToken(token);
+
+      AMRMProtocol rmClient = createRMClient(rm, conf, rpc, currentUser);
+
+      RegisterApplicationMasterRequest request =
+          Records.newRecord(RegisterApplicationMasterRequest.class);
+      request.setApplicationAttemptId(applicationAttemptId);
+      rmClient.registerApplicationMaster(request);
+
+      // One allocate call.
+      AllocateRequest allocateRequest =
+          Records.newRecord(AllocateRequest.class);
+      allocateRequest.setApplicationAttemptId(applicationAttemptId);
+      Assert.assertFalse(rmClient.allocate(allocateRequest).getAMResponse()
+        .getReboot());
+
+      // Simulate a master-key-roll-over
+      ApplicationTokenSecretManager appTokenSecretManager =
+          rm.getRMContext().getApplicationTokenSecretManager();
+      SecretKey oldKey = appTokenSecretManager.getMasterKey();
+      appTokenSecretManager.rollMasterKey();
+      SecretKey newKey = appTokenSecretManager.getMasterKey();
+      Assert.assertFalse("Master key should have changed!",
+        oldKey.equals(newKey));
+
+      // Another allocate call. Should continue to work.
+      rpc.stopProxy(rmClient, conf); // To avoid using cached client
+      rmClient = createRMClient(rm, conf, rpc, currentUser);
+      allocateRequest = Records.newRecord(AllocateRequest.class);
+      allocateRequest.setApplicationAttemptId(applicationAttemptId);
+      Assert.assertFalse(rmClient.allocate(allocateRequest).getAMResponse()
+        .getReboot());
+    } finally {
+      rm.stop();
+    }
+  }
+
+  private AMRMProtocol createRMClient(final MockRM rm,
+      final Configuration conf, final YarnRPC rpc,
+      UserGroupInformation currentUser) {
+    return currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
+      @Override
+      public AMRMProtocol run() {
+        return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rm
+          .getApplicationMasterService().getBindAddress(), conf);
+      }
+    });
+  }
+}

+ 1 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java

@@ -152,7 +152,7 @@ public class TestRMWebApp {
     for (RMNode node : deactivatedNodes) {
       deactivatedNodesMap.put(node.getHostName(), node);
     }
-   return new RMContextImpl(new MemStore(), null, null, null, null) {
+   return new RMContextImpl(new MemStore(), null, null, null, null, null) {
       @Override
       public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
         return applicationsMaps;

+ 12 - 13
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java

@@ -78,12 +78,12 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
-import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
@@ -387,20 +387,19 @@ public class TestContainerManagerSecurity {
                                        appAttempt.getAppAttemptId().toString());
 
     // Ask for a container from the RM
-    String schedulerAddressString = conf.get(
-        YarnConfiguration.RM_SCHEDULER_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS);
-    final InetSocketAddress schedulerAddr = NetUtils
-        .createSocketAddr(schedulerAddressString);
+    final InetSocketAddress schedulerAddr =
+        resourceManager.getApplicationMasterService().getBindAddress();
     ApplicationTokenIdentifier appTokenIdentifier = new ApplicationTokenIdentifier(
         appAttempt.getAppAttemptId());
-    ApplicationTokenSecretManager appTokenSecretManager = new ApplicationTokenSecretManager();
-    appTokenSecretManager.setMasterKey(ApplicationTokenSecretManager
-        .createSecretKey("Dummy".getBytes())); // TODO: FIX. Be in Sync with
-                                               // ResourceManager.java
-    Token<ApplicationTokenIdentifier> appToken = new Token<ApplicationTokenIdentifier>(
-        appTokenIdentifier, appTokenSecretManager);
-    appToken.setService(new Text(schedulerAddressString));
+    ApplicationTokenSecretManager appTokenSecretManager =
+        new ApplicationTokenSecretManager(conf);
+    appTokenSecretManager.setMasterKey(resourceManager
+      .getApplicationTokenSecretManager().getMasterKey());
+    Token<ApplicationTokenIdentifier> appToken =
+        new Token<ApplicationTokenIdentifier>(appTokenIdentifier,
+          appTokenSecretManager);
+    appToken.setService(new Text(schedulerAddr.getHostName() + ":"
+        + schedulerAddr.getPort()));
     currentUser.addToken(appToken);
 
     AMRMProtocol scheduler = currentUser

+ 1 - 1
hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -2731,7 +2731,7 @@ public class JobInProgress {
     }
 
     TaskFinishedEvent tfe = new TaskFinishedEvent(tip.getTIPId(),
-        tip.getExecFinishTime(), taskType, 
+        statusAttemptID, tip.getExecFinishTime(), taskType, 
         TaskStatus.State.SUCCEEDED.toString(),
         new org.apache.hadoop.mapreduce.Counters(status.getCounters()));
     

+ 1 - 1
hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java

@@ -97,7 +97,7 @@ public class TestJobHistoryParsing  extends TestCase {
     // Try to write one more event now, should not fail
     TaskID tid = TaskID.forName("task_200809171136_0001_m_000002");
     TaskFinishedEvent tfe =
-      new TaskFinishedEvent(tid, 0, TaskType.MAP, "", null);
+      new TaskFinishedEvent(tid, null, 0, TaskType.MAP, "", null);
     boolean caughtException = false;
 
     try {

+ 1 - 1
hadoop-project/pom.xml

@@ -668,7 +668,7 @@
         <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-surefire-plugin</artifactId>
-          <version>2.10</version>
+          <version>2.12</version>
         </plugin>
         <plugin>
           <groupId>org.apache.maven.plugins</groupId>