Browse Source

YARN-562. Modified NM to reject any containers allocated by a previous ResourceManager. Contributed by Jian He.
MAPREDUCE-5167. Update MR App after YARN-562 to use the new builder API for the container. Contributed by Jian He.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1476346 13f79535-47bb-0310-9956-ffa450edef68

Vinod Kumar Vavilapalli 12 years ago
parent
commit
ffe3368d94
39 changed files with 763 additions and 106 deletions
  1. 3 0
      hadoop-mapreduce-project/CHANGES.txt
  2. 4 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
  3. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
  4. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
  5. 3 0
      hadoop-yarn-project/CHANGES.txt
  6. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java
  7. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java
  8. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
  9. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java
  10. 4 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
  11. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
  12. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
  13. 24 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerConstants.java
  14. 3 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java
  15. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java
  16. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
  17. 3 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
  18. 17 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
  19. 3 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
  20. 16 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
  21. 27 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
  22. 33 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/InvalidContainerException.java
  23. 34 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/NMNotYetReadyException.java
  24. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
  25. 11 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
  26. 313 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
  27. 9 75
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
  28. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
  29. 7 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
  30. 86 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
  31. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
  32. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
  33. 3 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
  34. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
  35. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
  36. 16 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
  37. 84 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationmasterservice/TestApplicationMasterService.java
  38. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
  39. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -197,6 +197,9 @@ Release 2.0.5-beta - UNRELEASED
     MAPREDUCE-5178. Update MR App to set progress in ApplicationReport after
     MAPREDUCE-5178. Update MR App to set progress in ApplicationReport after
     YARN-577. (Hitesh Shah via vinodkv)
     YARN-577. (Hitesh Shah via vinodkv)
 
 
+    MAPREDUCE-5167. Update MR App after YARN-562 to use the new builder API
+    for the container. (Jian He via vinodkv)
+
 Release 2.0.4-alpha - UNRELEASED
 Release 2.0.4-alpha - UNRELEASED
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 4 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -1094,12 +1094,12 @@ public abstract class TaskAttemptImpl implements
         + taInfo.getPort());
         + taInfo.getPort());
     String nodeHttpAddress = StringInterner.weakIntern(taInfo.getHostname() + ":"
     String nodeHttpAddress = StringInterner.weakIntern(taInfo.getHostname() + ":"
         + taInfo.getHttpPort());
         + taInfo.getHttpPort());
-    // Resource/Priority/Tokens are only needed while launching the
-    // container on an NM, these are already completed tasks, so setting them to
-    // null
+    // Resource/Priority/Tokens and RMIdentifier are only needed while
+    // launching the container on an NM, these are already completed tasks, so
+    // setting them to null and RMIdentifier as 0
     container =
     container =
         BuilderUtils.newContainer(containerId, containerNodeId,
         BuilderUtils.newContainer(containerId, containerNodeId,
-          nodeHttpAddress, null, null, null);
+          nodeHttpAddress, null, null, null, 0);
     computeRackAndLocality();
     computeRackAndLocality();
     launchTime = taInfo.getStartTime();
     launchTime = taInfo.getStartTime();
     finishTime = (taInfo.getFinishTime() != -1) ?
     finishTime = (taInfo.getFinishTime() != -1) ?

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java

@@ -519,7 +519,7 @@ public class MRApp extends MRAppMaster {
         cId.setId(containerCount++);
         cId.setId(containerCount++);
         NodeId nodeId = BuilderUtils.newNodeId(NM_HOST, NM_PORT);
         NodeId nodeId = BuilderUtils.newNodeId(NM_HOST, NM_PORT);
         Container container = BuilderUtils.newContainer(cId, nodeId,
         Container container = BuilderUtils.newContainer(cId, nodeId,
-            NM_HOST + ":" + NM_HTTP_PORT, null, null, null);
+            NM_HOST + ":" + NM_HTTP_PORT, null, null, null, 0);
         JobID id = TypeConverter.fromYarn(applicationId);
         JobID id = TypeConverter.fromYarn(applicationId);
         JobId jobId = TypeConverter.toYarn(id);
         JobId jobId = TypeConverter.toYarn(id);
         getContext().getEventHandler().handle(new JobHistoryEvent(jobId, 
         getContext().getEventHandler().handle(new JobHistoryEvent(jobId, 

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java

@@ -243,7 +243,7 @@ public class MRAppBenchmark {
                       .newContainer(containerId, BuilderUtils.newNodeId("host"
                       .newContainer(containerId, BuilderUtils.newNodeId("host"
                           + containerId.getId(), 2345),
                           + containerId.getId(), 2345),
                         "host" + containerId.getId() + ":5678", req
                         "host" + containerId.getId() + ":5678", req
-                          .getCapability(), req.getPriority(), null));
+                          .getCapability(), req.getPriority(), null, 0));
                   }
                   }
                 }
                 }
 
 

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -116,6 +116,9 @@ Release 2.0.5-beta - UNRELEASED
     YARN-595. Refactor fair scheduler to use common Resources. (Sandy Ryza
     YARN-595. Refactor fair scheduler to use common Resources. (Sandy Ryza
     via tomwhite)
     via tomwhite)
 
 
+    YARN-562. Modified NM to reject any containers allocated by a previous
+    ResourceManager. (Jian He via vinodkv)
+
     YARN-591. Moved RM recovery related records out of public API as they do not
     YARN-591. Moved RM recovery related records out of public API as they do not
     belong there. (vinodkv)
     belong there. (vinodkv)
 
 

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java

@@ -135,4 +135,16 @@ public interface Container extends Comparable<Container> {
   @Private
   @Private
   @Unstable
   @Unstable
   void setContainerToken(ContainerToken containerToken);
   void setContainerToken(ContainerToken containerToken);
+
+  /**
+   * Get the RMIdentifier of RM in which containers are allocated
+   * @return RMIdentifier
+   */
+  @Private
+  @Unstable
+  long getRMIdentifer();
+
+  @Private
+  @Unstable
+  void setRMIdentifier(long rmIdentifier);
 }
 }

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java

@@ -230,6 +230,18 @@ public class ContainerPBImpl extends ProtoBase<ContainerProto> implements Contai
     this.containerToken = containerToken;
     this.containerToken = containerToken;
   }
   }
 
 
+  @Override
+  public long getRMIdentifer() {
+    ContainerProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getRmIdentifier();
+  }
+
+  @Override
+  public void setRMIdentifier(long rmIdentifier) {
+    maybeInitBuilder();
+    builder.setRmIdentifier((rmIdentifier));
+  }
+
   private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
   private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
     return new ContainerIdPBImpl(p);
     return new ContainerIdPBImpl(p);
   }
   }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto

@@ -68,6 +68,7 @@ message ContainerProto {
   optional ResourceProto resource = 4;
   optional ResourceProto resource = 4;
   optional PriorityProto priority = 5;
   optional PriorityProto priority = 5;
   optional hadoop.common.TokenProto container_token = 6;
   optional hadoop.common.TokenProto container_token = 6;
+  optional int64 rm_identifier = 7;
 }
 }
 
 
 enum YarnApplicationStateProto {
 enum YarnApplicationStateProto {

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java

@@ -56,7 +56,7 @@ public class TestAMRMClientAsync {
             BuilderUtils.newContainerId(0, 0, 0, 0),
             BuilderUtils.newContainerId(0, 0, 0, 0),
             ContainerState.COMPLETE, "", 0));
             ContainerState.COMPLETE, "", 0));
     List<Container> allocated1 = Arrays.asList(
     List<Container> allocated1 = Arrays.asList(
-        BuilderUtils.newContainer(null, null, null, null, null, null));
+        BuilderUtils.newContainer(null, null, null, null, null, null, 0));
     final AllocateResponse response1 = createAllocateResponse(
     final AllocateResponse response1 = createAllocateResponse(
         new ArrayList<ContainerStatus>(), allocated1);
         new ArrayList<ContainerStatus>(), allocated1);
     final AllocateResponse response2 = createAllocateResponse(completed1,
     final AllocateResponse response2 = createAllocateResponse(completed1,

+ 4 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java

@@ -237,9 +237,9 @@ public class BuilderUtils {
     return containerStatus;
     return containerStatus;
   }
   }
 
 
-  public static Container newContainer(ContainerId containerId,
-      NodeId nodeId, String nodeHttpAddress,
-      Resource resource, Priority priority, ContainerToken containerToken) {
+  public static Container newContainer(ContainerId containerId, NodeId nodeId,
+      String nodeHttpAddress, Resource resource, Priority priority,
+      ContainerToken containerToken, long rmIdentifier) {
     Container container = recordFactory.newRecordInstance(Container.class);
     Container container = recordFactory.newRecordInstance(Container.class);
     container.setId(containerId);
     container.setId(containerId);
     container.setNodeId(nodeId);
     container.setNodeId(nodeId);
@@ -247,6 +247,7 @@ public class BuilderUtils {
     container.setResource(resource);
     container.setResource(resource);
     container.setPriority(priority);
     container.setPriority(priority);
     container.setContainerToken(containerToken);
     container.setContainerToken(containerToken);
+    container.setRMIdentifier(rmIdentifier);
     return container;
     return container;
   }
   }
 
 

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java

@@ -105,7 +105,7 @@ public class TestContainerLaunchRPC {
       containerId.setId(100);
       containerId.setId(100);
       Container container =
       Container container =
           BuilderUtils.newContainer(containerId, null, null, recordFactory
           BuilderUtils.newContainer(containerId, null, null, recordFactory
-              .newRecordInstance(Resource.class), null, null);
+              .newRecordInstance(Resource.class), null, null, 0);
 
 
       StartContainerRequest scRequest = recordFactory
       StartContainerRequest scRequest = recordFactory
           .newRecordInstance(StartContainerRequest.class);
           .newRecordInstance(StartContainerRequest.class);

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java

@@ -128,7 +128,7 @@ public class TestRPC {
     containerId.setId(100);
     containerId.setId(100);
     Container mockContainer =
     Container mockContainer =
         BuilderUtils.newContainer(containerId, null, null, recordFactory
         BuilderUtils.newContainer(containerId, null, null, recordFactory
-            .newRecordInstance(Resource.class), null, null);
+            .newRecordInstance(Resource.class), null, null, 0);
 //    containerLaunchContext.env = new HashMap<CharSequence, CharSequence>();
 //    containerLaunchContext.env = new HashMap<CharSequence, CharSequence>();
 //    containerLaunchContext.command = new ArrayList<CharSequence>();
 //    containerLaunchContext.command = new ArrayList<CharSequence>();
     
     

+ 24 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerConstants.java

@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api;
+
+public interface ResourceManagerConstants {
+
+  public static final long RM_INVALID_IDENTIFIER = 0;
+}

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java

@@ -30,4 +30,7 @@ public interface RegisterNodeManagerResponse {
 
 
   void setNodeAction(NodeAction nodeAction);
   void setNodeAction(NodeAction nodeAction);
 
 
+  long getRMIdentifier();
+
+  void setRMIdentifier(long rmIdentifier);
 }
 }

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java

@@ -121,6 +121,18 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan
     rebuild = true;
     rebuild = true;
   }
   }
 
 
+  @Override
+  public long getRMIdentifier() {
+    RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.getRmIdentifier());
+  }
+
+  @Override
+  public void setRMIdentifier(long rmIdentifier) {
+    maybeInitBuilder();
+    builder.setRmIdentifier(rmIdentifier);
+  }
+
   private NodeAction convertFromProtoFormat(NodeActionProto p) {
   private NodeAction convertFromProtoFormat(NodeActionProto p) {
     return  NodeAction.valueOf(p.name());
     return  NodeAction.valueOf(p.name());
   }
   }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto

@@ -33,6 +33,7 @@ message RegisterNodeManagerRequestProto {
 message RegisterNodeManagerResponseProto {
 message RegisterNodeManagerResponseProto {
   optional MasterKeyProto master_key = 1;
   optional MasterKeyProto master_key = 1;
   optional NodeActionProto nodeAction = 2;
   optional NodeActionProto nodeAction = 2;
+  optional int64 rm_identifier = 3;
 }
 }
 
 
 message NodeHeartbeatRequestProto {
 message NodeHeartbeatRequestProto {

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager;
 
 
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentMap;
 
 
+import org.apache.hadoop.yarn.api.ContainerManager;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
@@ -48,4 +49,6 @@ public interface Context {
   NMContainerTokenSecretManager getContainerTokenSecretManager();
   NMContainerTokenSecretManager getContainerTokenSecretManager();
 
 
   NodeHealthStatus getNodeHealthStatus();
   NodeHealthStatus getNodeHealthStatus();
+
+  ContainerManager getContainerManager();
 }
 }

+ 17 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.ContainerManager;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
@@ -164,6 +165,7 @@ public class NodeManager extends CompositeService
     addService(nodeHealthChecker);
     addService(nodeHealthChecker);
     dirsHandler = nodeHealthChecker.getDiskHandler();
     dirsHandler = nodeHealthChecker.getDiskHandler();
 
 
+
     nodeStatusUpdater =
     nodeStatusUpdater =
         createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
         createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
 
 
@@ -174,6 +176,7 @@ public class NodeManager extends CompositeService
         createContainerManager(context, exec, del, nodeStatusUpdater,
         createContainerManager(context, exec, del, nodeStatusUpdater,
         this.aclsManager, dirsHandler);
         this.aclsManager, dirsHandler);
     addService(containerManager);
     addService(containerManager);
+    ((NMContext) context).setContainerManager(containerManager);
 
 
     Service webServer = createWebServer(context, containerManager
     Service webServer = createWebServer(context, containerManager
         .getContainersMonitor(), this.aclsManager, dirsHandler);
         .getContainersMonitor(), this.aclsManager, dirsHandler);
@@ -221,11 +224,13 @@ public class NodeManager extends CompositeService
     DefaultMetricsSystem.shutdown();
     DefaultMetricsSystem.shutdown();
   }
   }
 
 
-  protected void cleanupContainersOnResync() {
+  protected void resyncWithRM() {
     //we do not want to block dispatcher thread here
     //we do not want to block dispatcher thread here
     new Thread() {
     new Thread() {
       @Override
       @Override
       public void run() {
       public void run() {
+        LOG.info("Notifying ContainerManager to block new container-requests");
+        containerManager.setBlockNewContainerRequests(true);
         cleanupContainers(NodeManagerEventType.RESYNC);
         cleanupContainers(NodeManagerEventType.RESYNC);
         ((NodeStatusUpdaterImpl) nodeStatusUpdater ).rebootNodeStatusUpdater();
         ((NodeStatusUpdaterImpl) nodeStatusUpdater ).rebootNodeStatusUpdater();
       }
       }
@@ -296,7 +301,7 @@ public class NodeManager extends CompositeService
         new ConcurrentSkipListMap<ContainerId, Container>();
         new ConcurrentSkipListMap<ContainerId, Container>();
 
 
     private final NMContainerTokenSecretManager containerTokenSecretManager;
     private final NMContainerTokenSecretManager containerTokenSecretManager;
-
+    private ContainerManager containerManager;
     private final NodeHealthStatus nodeHealthStatus = RecordFactoryProvider
     private final NodeHealthStatus nodeHealthStatus = RecordFactoryProvider
         .getRecordFactory(null).newRecordInstance(NodeHealthStatus.class);
         .getRecordFactory(null).newRecordInstance(NodeHealthStatus.class);
 
 
@@ -333,6 +338,15 @@ public class NodeManager extends CompositeService
     public NodeHealthStatus getNodeHealthStatus() {
     public NodeHealthStatus getNodeHealthStatus() {
       return this.nodeHealthStatus;
       return this.nodeHealthStatus;
     }
     }
+
+    @Override
+    public ContainerManager getContainerManager() {
+      return this.containerManager;
+    }
+
+    public void setContainerManager(ContainerManager containerManager) {
+      this.containerManager = containerManager;
+    }
   }
   }
 
 
 
 
@@ -376,7 +390,7 @@ public class NodeManager extends CompositeService
       stop();
       stop();
       break;
       break;
     case RESYNC:
     case RESYNC:
-      cleanupContainersOnResync();
+      resyncWithRM();
       break;
       break;
     default:
     default:
       LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring.");
       LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring.");

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java

@@ -24,5 +24,8 @@ import org.apache.hadoop.yarn.service.Service;
 public interface NodeStatusUpdater extends Service {
 public interface NodeStatusUpdater extends Service {
 
 
   void sendOutofBandHeartBeat();
   void sendOutofBandHeartBeat();
+
   NodeStatus getNodeStatusAndUpdateContainersInContext();
   NodeStatus getNodeStatusAndUpdateContainersInContext();
+
+  long getRMIdentifier();
 }
 }

+ 16 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.AbstractService;
@@ -95,6 +97,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
 
 
   private Runnable statusUpdaterRunnable;
   private Runnable statusUpdaterRunnable;
   private Thread  statusUpdater;
   private Thread  statusUpdater;
+  private long rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER;
 
 
   public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
   public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
       NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
       NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
@@ -267,6 +270,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
         this.resourceTracker = getRMClient();
         this.resourceTracker = getRMClient();
         regNMResponse =
         regNMResponse =
             this.resourceTracker.registerNodeManager(request);
             this.resourceTracker.registerNodeManager(request);
+        this.rmIdentifier = regNMResponse.getRMIdentifier();
         break;
         break;
       } catch(Throwable e) {
       } catch(Throwable e) {
         LOG.warn("Trying to connect to ResourceManager, " +
         LOG.warn("Trying to connect to ResourceManager, " +
@@ -308,7 +312,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
 
 
     LOG.info("Registered with ResourceManager as " + this.nodeId
     LOG.info("Registered with ResourceManager as " + this.nodeId
         + " with total resource of " + this.totalResource);
         + " with total resource of " + this.totalResource);
-
+    LOG.info("Notifying ContainerManager to unblock new container-requests");
+    ((ContainerManagerImpl) this.context.getContainerManager())
+      .setBlockNewContainerRequests(false);
   }
   }
 
 
   private List<ApplicationId> createKeepAliveApplicationList() {
   private List<ApplicationId> createKeepAliveApplicationList() {
@@ -334,6 +340,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     return appList;
     return appList;
   }
   }
 
 
+  @Override
   public NodeStatus getNodeStatusAndUpdateContainersInContext() {
   public NodeStatus getNodeStatusAndUpdateContainersInContext() {
 
 
     NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
     NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
@@ -407,6 +414,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     }
     }
   }
   }
 
 
+  @Override
+  public long getRMIdentifier() {
+    return this.rmIdentifier;
+  }
+
   protected void startStatusUpdater() {
   protected void startStatusUpdater() {
 
 
     statusUpdaterRunnable = new Runnable() {
     statusUpdaterRunnable = new Runnable() {
@@ -478,6 +490,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
             if (response.getNodeAction() == NodeAction.RESYNC) {
             if (response.getNodeAction() == NodeAction.RESYNC) {
               LOG.info("Node is out of sync with ResourceManager,"
               LOG.info("Node is out of sync with ResourceManager,"
                   + " hence rebooting.");
                   + " hence rebooting.");
+              // Invalidate the RMIdentifier while resync
+              NodeStatusUpdaterImpl.this.rmIdentifier =
+                  ResourceManagerConstants.RM_INVALID_IDENTIFIER;
               dispatcher.getEventHandler().handle(
               dispatcher.getEventHandler().handle(
                   new NodeManagerEvent(NodeManagerEventType.RESYNC));
                   new NodeManagerEvent(NodeManagerEventType.RESYNC));
               break;
               break;

+ 27 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java

@@ -23,10 +23,9 @@ import static org.apache.hadoop.yarn.service.Service.STATE.STARTED;
 import java.io.IOException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Set;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
@@ -125,6 +124,7 @@ public class ContainerManagerImpl extends CompositeService implements
   private final ApplicationACLsManager aclsManager;
   private final ApplicationACLsManager aclsManager;
 
 
   private final DeletionService deletionService;
   private final DeletionService deletionService;
+  private AtomicBoolean blockNewContainerRequests = new AtomicBoolean(false);
 
 
   public ContainerManagerImpl(Context context, ContainerExecutor exec,
   public ContainerManagerImpl(Context context, ContainerExecutor exec,
       DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
       DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
@@ -239,7 +239,10 @@ public class ContainerManagerImpl extends CompositeService implements
         false)) {
         false)) {
       refreshServiceAcls(conf, new NMPolicyProvider());
       refreshServiceAcls(conf, new NMPolicyProvider());
     }
     }
-    
+
+    LOG.info("Blocking new container-requests as container manager rpc" +
+    		" server is still starting.");
+    this.setBlockNewContainerRequests(true);
     server.start();
     server.start();
     InetSocketAddress connectAddress = NetUtils.getConnectAddress(server);
     InetSocketAddress connectAddress = NetUtils.getConnectAddress(server);
     this.context.getNodeId().setHost(connectAddress.getHostName());
     this.context.getNodeId().setHost(connectAddress.getHostName());
@@ -393,6 +396,13 @@ public class ContainerManagerImpl extends CompositeService implements
   @Override
   @Override
   public StartContainerResponse startContainer(StartContainerRequest request)
   public StartContainerResponse startContainer(StartContainerRequest request)
       throws YarnRemoteException {
       throws YarnRemoteException {
+
+    if (blockNewContainerRequests.get()) {
+      throw RPCUtil.getRemoteException(new NMNotYetReadyException(
+          "Rejecting new containers as NodeManager has not" +
+          " yet connected with ResourceManager"));
+    }
+
     ContainerLaunchContext launchContext = request.getContainerLaunchContext();
     ContainerLaunchContext launchContext = request.getContainerLaunchContext();
     org.apache.hadoop.yarn.api.records.Container lauchContainer =
     org.apache.hadoop.yarn.api.records.Container lauchContainer =
         request.getContainer();
         request.getContainer();
@@ -402,6 +412,16 @@ public class ContainerManagerImpl extends CompositeService implements
     UserGroupInformation remoteUgi = getRemoteUgi(containerIDStr);
     UserGroupInformation remoteUgi = getRemoteUgi(containerIDStr);
     authorizeRequest(containerIDStr, launchContext, lauchContainer, remoteUgi);
     authorizeRequest(containerIDStr, launchContext, lauchContainer, remoteUgi);
 
 
+    // Is the container coming from unknown RM
+    if (lauchContainer.getRMIdentifer() != nodeStatusUpdater
+      .getRMIdentifier()) {
+      String msg = "\nContainer "+ containerIDStr
+          + " rejected as it is allocated by a previous RM";
+      LOG.error(msg);
+      throw RPCUtil
+        .getRemoteException(new InvalidContainerException(msg));
+    }
+
     LOG.info("Start request for " + containerIDStr + " by user "
     LOG.info("Start request for " + containerIDStr + " by user "
         + launchContext.getUser());
         + launchContext.getUser());
 
 
@@ -615,6 +635,10 @@ public class ContainerManagerImpl extends CompositeService implements
     }
     }
   }
   }
 
 
+  public void setBlockNewContainerRequests(boolean blockNewContainerRequests) {
+    this.blockNewContainerRequests.set(blockNewContainerRequests);
+  }
+
   @Override
   @Override
   public void stateChanged(Service service) {
   public void stateChanged(Service service) {
     // TODO Auto-generated method stub
     // TODO Auto-generated method stub

+ 33 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/InvalidContainerException.java

@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+import org.apache.hadoop.yarn.YarnException;
+
+/**
+ * This Exception happens when NM is rejecting container requests from RM
+ */
+public class InvalidContainerException extends YarnException {
+
+  private static final long serialVersionUID = 1L;
+
+  public InvalidContainerException(String msg) {
+    super(msg);
+  }
+}

+ 34 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/NMNotYetReadyException.java

@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+import org.apache.hadoop.yarn.YarnException;
+
+/**
+ * This exception happens when NM starts from scratch but has not yet connected
+ * with RM.
+ */
+public class NMNotYetReadyException extends YarnException {
+
+  private static final long serialVersionUID = 1L;
+
+  public NMNotYetReadyException(String msg) {
+    super(msg);
+  }
+}

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java

@@ -168,4 +168,9 @@ public class DummyContainerManager extends ContainerManagerImpl {
       }
       }
     };
     };
   }
   }
+
+  @Override
+  public void setBlockNewContainerRequests(boolean blockNewContainerRequests) {
+    // do nothing
+  }
 }
 }

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java

@@ -142,6 +142,17 @@ public class TestContainerManagerWithLCE extends TestContainerManager {
     super.testLocalFilesCleanup();
     super.testLocalFilesCleanup();
   }
   }
 
 
+  @Override
+  public void testContainerLaunchFromPreviousRM() throws InterruptedException,
+      IOException {
+    // Don't run the test if the binary is not available.
+    if (!shouldRunTest()) {
+      LOG.info("LCE binary path is not passed. Not running the test");
+      return;
+    }
+    LOG.info("Running testContainerLaunchFromPreviousRM");
+    super.testContainerLaunchFromPreviousRM();
+  }
   private boolean shouldRunTest() {
   private boolean shouldRunTest() {
     return System
     return System
         .getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null;
         .getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null;

+ 313 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java

@@ -0,0 +1,313 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.NMNotYetReadyException;
+import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNodeManagerResync {
+  static final File basedir =
+      new File("target", TestNodeManagerResync.class.getName());
+  static final File tmpDir = new File(basedir, "tmpDir");
+  static final File logsDir = new File(basedir, "logs");
+  static final File remoteLogsDir = new File(basedir, "remotelogs");
+  static final File nmLocalDir = new File(basedir, "nm0");
+  static final File processStartFile = new File(tmpDir, "start_file.txt")
+    .getAbsoluteFile();
+
+  static final RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+  static final String user = "nobody";
+  private FileContext localFS;
+  private CyclicBarrier syncBarrier;
+  private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
+
+  @Before
+  public void setup() throws UnsupportedFileSystemException {
+    localFS = FileContext.getLocalFSFileContext();
+    tmpDir.mkdirs();
+    logsDir.mkdirs();
+    remoteLogsDir.mkdirs();
+    nmLocalDir.mkdirs();
+    syncBarrier = new CyclicBarrier(2);
+  }
+
+  @After
+  public void tearDown() throws IOException, InterruptedException {
+    localFS.delete(new Path(basedir.getPath()), true);
+    assertionFailedInThread.set(false);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testKillContainersOnResync() throws IOException,
+      InterruptedException {
+    NodeManager nm = new TestNodeManager1();
+    YarnConfiguration conf = createNMConfig();
+    nm.init(conf);
+    nm.start();
+    TestNodeManagerShutdown.startContainer(nm, localFS, tmpDir,
+      processStartFile);
+
+    Assert.assertEquals(1, ((TestNodeManager1) nm).getNMRegistrationCount());
+    nm.getNMDispatcher().getEventHandler().
+        handle( new NodeManagerEvent(NodeManagerEventType.RESYNC));
+    try {
+      syncBarrier.await();
+    } catch (BrokenBarrierException e) {
+    }
+    Assert.assertEquals(2, ((TestNodeManager1) nm).getNMRegistrationCount());
+
+    Assert.assertFalse(assertionFailedInThread.get());
+
+    nm.stop();
+  }
+
+  // This test tests new container requests are blocked when NM starts from
+  // scratch until it register with RM AND while NM is resyncing with RM
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testBlockNewContainerRequestsOnStartAndResync()
+      throws IOException, InterruptedException {
+    NodeManager nm = new TestNodeManager2();
+    YarnConfiguration conf = createNMConfig();
+    nm.init(conf);
+    nm.start();
+
+    // Start the container in running state
+    TestNodeManagerShutdown.startContainer(nm, localFS, tmpDir,
+      processStartFile);
+
+    nm.getNMDispatcher().getEventHandler()
+      .handle(new NodeManagerEvent(NodeManagerEventType.RESYNC));
+    try {
+      syncBarrier.await();
+    } catch (BrokenBarrierException e) {
+    }
+    Assert.assertFalse(assertionFailedInThread.get());
+    nm.stop();
+  }
+
+  private YarnConfiguration createNMConfig() {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.NM_PMEM_MB, 5*1024); // 5GB
+    conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:12345");
+    conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346");
+    conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath());
+    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+      remoteLogsDir.getAbsolutePath());
+    conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath());
+    return conf;
+  }
+
+  class TestNodeManager1 extends NodeManager {
+
+    private int registrationCount = 0;
+
+    @Override
+    protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+        Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+      return new TestNodeStatusUpdaterImpl1(context, dispatcher,
+          healthChecker, metrics);
+    }
+
+    public int getNMRegistrationCount() {
+      return registrationCount;
+    }
+
+    class TestNodeStatusUpdaterImpl1 extends MockNodeStatusUpdater {
+
+      public TestNodeStatusUpdaterImpl1(Context context, Dispatcher dispatcher,
+          NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+        super(context, dispatcher, healthChecker, metrics);
+      }
+
+      @Override
+      protected void registerWithRM() throws YarnRemoteException {
+        super.registerWithRM();
+        registrationCount++;
+      }
+
+      @Override
+      protected void rebootNodeStatusUpdater() {
+        ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager
+        .containermanager.container.Container> containers =
+            getNMContext().getContainers();
+        try {
+          // ensure that containers are empty before restart nodeStatusUpdater
+          Assert.assertTrue(containers.isEmpty());
+          super.rebootNodeStatusUpdater();
+          syncBarrier.await();
+        } catch (InterruptedException e) {
+        } catch (BrokenBarrierException e) {
+        } catch (AssertionError ae) {
+          assertionFailedInThread.set(true);
+        }
+      }
+    }
+  }
+
+  class TestNodeManager2 extends NodeManager {
+
+    Thread launchContainersThread = null;
+    @Override
+    protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+        Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+      return new TestNodeStatusUpdaterImpl2(context, dispatcher,
+        healthChecker, metrics);
+    }
+
+    @Override
+    protected ContainerManagerImpl createContainerManager(Context context,
+        ContainerExecutor exec, DeletionService del,
+        NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager,
+        LocalDirsHandlerService dirsHandler) {
+      return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
+        metrics, aclsManager, dirsHandler){
+        @Override
+        public void setBlockNewContainerRequests(
+            boolean blockNewContainerRequests) {
+          if (blockNewContainerRequests) {
+            // start test thread right after blockNewContainerRequests is set
+            // true
+            super.setBlockNewContainerRequests(blockNewContainerRequests);
+            launchContainersThread = new RejectedContainersLauncherThread();
+            launchContainersThread.start();
+          } else {
+            // join the test thread right before blockNewContainerRequests is
+            // reset
+            try {
+              // stop the test thread
+              ((RejectedContainersLauncherThread) launchContainersThread)
+                .setStopThreadFlag(true);
+              launchContainersThread.join();
+              ((RejectedContainersLauncherThread) launchContainersThread)
+              .setStopThreadFlag(false);
+              super.setBlockNewContainerRequests(blockNewContainerRequests);
+            } catch (InterruptedException e) {
+            }
+          }
+        }
+      };
+    }
+
+    class TestNodeStatusUpdaterImpl2 extends MockNodeStatusUpdater {
+
+      public TestNodeStatusUpdaterImpl2(Context context, Dispatcher dispatcher,
+          NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+        super(context, dispatcher, healthChecker, metrics);
+      }
+
+      @Override
+      protected void rebootNodeStatusUpdater() {
+        ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager
+        .containermanager.container.Container> containers =
+            getNMContext().getContainers();
+
+        try {
+          // ensure that containers are empty before restart nodeStatusUpdater
+          Assert.assertTrue(containers.isEmpty());
+          super.rebootNodeStatusUpdater();
+          // After this point new containers are free to be launched, except
+          // containers from previous RM
+          // Wait here so as to sync with the main test thread.
+          syncBarrier.await();
+        } catch (InterruptedException e) {
+        } catch (BrokenBarrierException e) {
+        } catch (AssertionError ae) {
+          assertionFailedInThread.set(true);
+        }
+      }
+    }
+
+    class RejectedContainersLauncherThread extends Thread {
+
+      boolean isStopped = false;
+      public void setStopThreadFlag(boolean isStopped) {
+        this.isStopped = isStopped;
+      }
+
+      @Override
+      public void run() {
+        int numContainers = 0;
+        int numContainersRejected = 0;
+        ContainerLaunchContext containerLaunchContext =
+            recordFactory.newRecordInstance(ContainerLaunchContext.class);
+        try {
+          while (!isStopped && numContainers < 10) {
+            ContainerId cId = TestNodeManagerShutdown.createContainerId();
+            Container container =
+                BuilderUtils.newContainer(cId, null, null, null, null, null, 0);
+            StartContainerRequest startRequest =
+                recordFactory.newRecordInstance(StartContainerRequest.class);
+            startRequest.setContainerLaunchContext(containerLaunchContext);
+            startRequest.setContainer(container);
+            System.out.println("no. of containers to be launched: "
+                + numContainers);
+            numContainers++;
+            try {
+              getContainerManager().startContainer(startRequest);
+            } catch (YarnRemoteException e) {
+              numContainersRejected++;
+              Assert.assertTrue(e.getMessage().contains(
+                "Rejecting new containers as NodeManager has not" +
+                " yet connected with ResourceManager"));
+              // TO DO: This should be replaced to explicitly check exception
+              // class name after YARN-142
+              Assert.assertTrue(e.getRemoteTrace().contains(
+                NMNotYetReadyException.class.getName()));
+            }
+          }
+          // no. of containers to be launched should equal to no. of
+          // containers rejected
+          Assert.assertEquals(numContainers, numContainersRejected);
+        } catch (AssertionError ae) {
+          assertionFailedInThread.set(true);
+        }
+      }
+    }
+  }
+}

+ 9 - 75
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java

@@ -31,9 +31,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CyclicBarrier;
 
 
 import junit.framework.Assert;
 import junit.framework.Assert;
 
 
@@ -57,11 +54,9 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
-import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.After;
 import org.junit.After;
@@ -82,7 +77,6 @@ public class TestNodeManagerShutdown {
       .getRecordFactory(null);
       .getRecordFactory(null);
   static final String user = "nobody";
   static final String user = "nobody";
   private FileContext localFS;
   private FileContext localFS;
-  private CyclicBarrier syncBarrier = new CyclicBarrier(2);
 
 
   @Before
   @Before
   public void setup() throws UnsupportedFileSystemException {
   public void setup() throws UnsupportedFileSystemException {
@@ -103,7 +97,7 @@ public class TestNodeManagerShutdown {
     NodeManager nm = getNodeManager();
     NodeManager nm = getNodeManager();
     nm.init(createNMConfig());
     nm.init(createNMConfig());
     nm.start();
     nm.start();
-    startContainers(nm);
+    startContainer(nm, localFS, tmpDir, processStartFile);
     
     
     final int MAX_TRIES=20;
     final int MAX_TRIES=20;
     int numTries = 0;
     int numTries = 0;
@@ -136,29 +130,12 @@ public class TestNodeManagerShutdown {
     Assert.assertTrue("Did not find sigterm message", foundSigTermMessage);
     Assert.assertTrue("Did not find sigterm message", foundSigTermMessage);
     reader.close();
     reader.close();
   }
   }
-  
-  @SuppressWarnings("unchecked")
-  @Test
-  public void testKillContainersOnResync() throws IOException, InterruptedException {
-    NodeManager nm = new TestNodeManager();
-    YarnConfiguration conf = createNMConfig();
-    nm.init(conf);
-    nm.start();
-    startContainers(nm);
-
-    assert ((TestNodeManager) nm).getNMRegistrationCount() == 1;
-    nm.getNMDispatcher().getEventHandler().
-        handle( new NodeManagerEvent(NodeManagerEventType.RESYNC));
-    try {
-      syncBarrier.await();
-    } catch (BrokenBarrierException e) {
-    }
-    assert ((TestNodeManager) nm).getNMRegistrationCount() == 2;
-  }
 
 
-  private void startContainers(NodeManager nm) throws IOException {
+  public static void startContainer(NodeManager nm, FileContext localFS,
+      File scriptFileDir, File processStartFile) throws IOException {
     ContainerManagerImpl containerManager = nm.getContainerManager();
     ContainerManagerImpl containerManager = nm.getContainerManager();
-    File scriptFile = createUnhaltingScriptFile();
+    File scriptFile =
+        createUnhaltingScriptFile(scriptFileDir, processStartFile);
     
     
     ContainerLaunchContext containerLaunchContext =
     ContainerLaunchContext containerLaunchContext =
         recordFactory.newRecordInstance(ContainerLaunchContext.class);
         recordFactory.newRecordInstance(ContainerLaunchContext.class);
@@ -208,7 +185,7 @@ public class TestNodeManagerShutdown {
     Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState());
     Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState());
   }
   }
   
   
-  private ContainerId createContainerId() {
+  public static ContainerId createContainerId() {
     ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
     ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
     appId.setClusterTimestamp(0);
     appId.setClusterTimestamp(0);
     appId.setId(0);
     appId.setId(0);
@@ -237,8 +214,9 @@ public class TestNodeManagerShutdown {
    * Creates a script to run a container that will run forever unless
    * Creates a script to run a container that will run forever unless
    * stopped by external means.
    * stopped by external means.
    */
    */
-  private File createUnhaltingScriptFile() throws IOException {
-    File scriptFile = new File(tmpDir, "scriptFile.sh");
+  private static File createUnhaltingScriptFile(File scriptFileDir,
+      File processStartFile) throws IOException {
+    File scriptFile = new File(scriptFileDir, "scriptFile.sh");
     BufferedWriter fileWriter = new BufferedWriter(new FileWriter(scriptFile));
     BufferedWriter fileWriter = new BufferedWriter(new FileWriter(scriptFile));
     fileWriter.write("#!/bin/bash\n\n");
     fileWriter.write("#!/bin/bash\n\n");
     fileWriter.write("echo \"Running testscript for delayed kill\"\n");
     fileWriter.write("echo \"Running testscript for delayed kill\"\n");
@@ -264,48 +242,4 @@ public class TestNodeManagerShutdown {
       }
       }
     };
     };
   }
   }
-
-  class TestNodeManager extends NodeManager {
-
-    private int registrationCount = 0;
-
-    @Override
-    protected NodeStatusUpdater createNodeStatusUpdater(Context context,
-        Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
-      return new TestNodeStatusUpdaterImpl(context, dispatcher,
-          healthChecker, metrics);
-    }
-
-    public int getNMRegistrationCount() {
-      return registrationCount;
-    }
-
-    class TestNodeStatusUpdaterImpl extends MockNodeStatusUpdater {
-
-      public TestNodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
-          NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
-        super(context, dispatcher, healthChecker, metrics);
-      }
-
-      @Override
-      protected void registerWithRM() throws YarnRemoteException {
-        super.registerWithRM();
-        registrationCount++;
-      }
-
-      @Override
-      protected void rebootNodeStatusUpdater() {
-        ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container> containers =
-            getNMContext().getContainers();
-        // ensure that containers are empty before restart nodeStatusUpdater
-        Assert.assertTrue(containers.isEmpty());
-        super.rebootNodeStatusUpdater();
-        try {
-          syncBarrier.await();
-        } catch (InterruptedException e) {
-        } catch (BrokenBarrierException e) {
-        }
-      }
-    }
-  }
 }
 }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java

@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.ContainerManager;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;

+ 7 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java

@@ -156,7 +156,13 @@ public abstract class BaseContainerManagerTest {
     dirsHandler = nodeHealthChecker.getDiskHandler();
     dirsHandler = nodeHealthChecker.getDiskHandler();
     containerManager =
     containerManager =
         new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
         new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
-          metrics, new ApplicationACLsManager(conf), dirsHandler);
+          metrics, new ApplicationACLsManager(conf), dirsHandler) {
+          @Override
+          public void setBlockNewContainerRequests(
+              boolean blockNewContainerRequests) {
+            // do nothing
+          }
+        };
     containerManager.init(conf);
     containerManager.init(conf);
   }
   }
 
 

+ 86 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java

@@ -18,6 +18,9 @@
 
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager;
 package org.apache.hadoop.yarn.server.nodemanager.containermanager;
 
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.io.BufferedReader;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.File;
 import java.io.FileReader;
 import java.io.FileReader;
@@ -48,12 +51,15 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
@@ -61,7 +67,6 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.Test;
 import org.junit.Test;
-import static org.mockito.Mockito.*;
 
 
 public class TestContainerManager extends BaseContainerManagerTest {
 public class TestContainerManager extends BaseContainerManagerTest {
 
 
@@ -400,7 +405,13 @@ public class TestContainerManager extends BaseContainerManagerTest {
 
 
     containerManager =
     containerManager =
         new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
         new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
-          metrics, new ApplicationACLsManager(conf), dirsHandler);
+          metrics, new ApplicationACLsManager(conf), dirsHandler) {
+          @Override
+          public void setBlockNewContainerRequests(
+              boolean blockNewContainerRequests) {
+            // do nothing
+          }
+        };
     containerManager.init(conf);
     containerManager.init(conf);
     containerManager.start();
     containerManager.start();
 
 
@@ -513,4 +524,77 @@ public class TestContainerManager extends BaseContainerManagerTest {
     Assert.assertFalse(targetFile.getAbsolutePath() + " exists!!",
     Assert.assertFalse(targetFile.getAbsolutePath() + " exists!!",
         targetFile.exists());
         targetFile.exists());
   }
   }
+
+  @Test
+  public void testContainerLaunchFromPreviousRM() throws IOException,
+      InterruptedException {
+    // There is no real RM registration, simulate and set RMIdentifier
+    NodeStatusUpdater nodeStatusUpdater = mock(NodeStatusUpdater.class);
+    when(nodeStatusUpdater.getRMIdentifier()).thenReturn((long) 1234);
+    containerManager =
+        new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
+          metrics, new ApplicationACLsManager(conf), dirsHandler) {
+          @Override
+          public void setBlockNewContainerRequests(
+              boolean blockNewContainerRequests) {
+            // do nothing
+          }
+        };
+    containerManager.init(conf);
+    containerManager.start();
+
+    ContainerLaunchContext containerLaunchContext =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+    ContainerId cId1 = createContainerId();
+    ContainerId cId2 = createContainerId();
+    containerLaunchContext.setUser(user);
+    containerLaunchContext
+      .setLocalResources(new HashMap<String, LocalResource>());
+    containerLaunchContext.setUser(containerLaunchContext.getUser());
+    Resource mockResource = mock(Resource.class);
+
+    Container mockContainer1 = mock(Container.class);
+    when(mockContainer1.getId()).thenReturn(cId1);
+    // Construct the Container with Invalid RMIdentifier
+    when(mockContainer1.getRMIdentifer()).thenReturn(
+      (long) ResourceManagerConstants.RM_INVALID_IDENTIFIER);
+    StartContainerRequest startRequest1 =
+        recordFactory.newRecordInstance(StartContainerRequest.class);
+    startRequest1.setContainerLaunchContext(containerLaunchContext);
+    startRequest1.setContainer(mockContainer1);
+    boolean catchException = false;
+    try {
+      containerManager.startContainer(startRequest1);
+    } catch (YarnRemoteException e) {
+      catchException = true;
+      Assert.assertTrue(e.getMessage().contains(
+        "Container " + cId1 + " rejected as it is allocated by a previous RM"));
+      // TO DO: This should be replaced to explicitly check exception
+      // class name after YARN-142
+      Assert.assertTrue(e.getRemoteTrace().contains(
+        InvalidContainerException.class.getName()));
+    }
+
+    // Verify that startContainer fail because of invalid container request
+    Assert.assertTrue(catchException);
+
+    // Construct the Container with a RMIdentifier within current RM
+    Container mockContainer2 = mock(Container.class);
+    when(mockContainer2.getId()).thenReturn(cId2);
+    when(mockContainer2.getRMIdentifer()).thenReturn((long) 1234);
+    when(mockContainer2.getResource()).thenReturn(mockResource);
+    StartContainerRequest startRequest2 =
+        recordFactory.newRecordInstance(StartContainerRequest.class);
+    startRequest2.setContainerLaunchContext(containerLaunchContext);
+    startRequest2.setContainer(mockContainer2);
+    boolean noException = true;
+    try {
+      containerManager.startContainer(startRequest2);
+    } catch (YarnRemoteException e) {
+      noException = false;
+    }
+    // Verify that startContainer get no YarnRemoteException
+    Assert.assertTrue(noException);
+  }
 }
 }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

@@ -196,6 +196,7 @@ public class ResourceTrackerService extends AbstractService implements
         + capability + ", assigned nodeId " + nodeId);
         + capability + ", assigned nodeId " + nodeId);
 
 
     response.setNodeAction(NodeAction.NORMAL);
     response.setNodeAction(NodeAction.NORMAL);
+    response.setRMIdentifier(ResourceManager.clusterTimeStamp);
     return response;
     return response;
   }
   }
 
 

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -1243,7 +1244,7 @@ public class LeafQueue implements CSQueue {
     // Create the container
     // Create the container
     Container container = BuilderUtils.newContainer(containerId, nodeId,
     Container container = BuilderUtils.newContainer(containerId, nodeId,
         node.getRMNode().getHttpAddress(), capability, priority,
         node.getRMNode().getHttpAddress(), capability, priority,
-        null);
+        null, ResourceManager.clusterTimeStamp);
   
   
     return container;
     return container;
   }
   }

+ 3 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java

@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@@ -173,7 +174,7 @@ public class AppSchedulable extends Schedulable {
     // Create the container
     // Create the container
     Container container = BuilderUtils.newContainer(containerId, nodeId,
     Container container = BuilderUtils.newContainer(containerId, nodeId,
         node.getRMNode().getHttpAddress(), capability, priority,
         node.getRMNode().getHttpAddress(), capability, priority,
-        containerToken);
+        containerToken, ResourceManager.clusterTimeStamp);
 
 
     return container;
     return container;
   }
   }
@@ -371,4 +372,4 @@ public class AppSchedulable extends Schedulable {
         Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null,
         Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null,
             request.getCapability(), node.getRMNode().getTotalCapability());
             request.getCapability(), node.getRMNode().getTotalCapability());
   }
   }
-}
+}

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
@@ -565,7 +566,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
         // Create the container
         // Create the container
         Container container = BuilderUtils.newContainer(containerId, nodeId,
         Container container = BuilderUtils.newContainer(containerId, nodeId,
             node.getRMNode().getHttpAddress(), capability, priority,
             node.getRMNode().getHttpAddress(), capability, priority,
-            containerToken);
+            containerToken, ResourceManager.clusterTimeStamp);
         
         
         // Allocate!
         // Allocate!
         
         

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java

@@ -188,6 +188,7 @@ public class NodeManager implements ContainerManager {
             this.nodeId, nodeHttpAddress,
             this.nodeId, nodeHttpAddress,
             requestContainer.getResource(),
             requestContainer.getResource(),
             null, null                                 // DKDC - Doesn't matter
             null, null                                 // DKDC - Doesn't matter
+            , 0
             );
             );
 
 
     ContainerStatus containerStatus =
     ContainerStatus containerStatus =

+ 16 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java

@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
@@ -267,6 +268,21 @@ public class TestResourceTrackerService {
     Assert.assertEquals(NodeAction.SHUTDOWN,response.getNodeAction());
     Assert.assertEquals(NodeAction.SHUTDOWN,response.getNodeAction());
   }
   }
 
 
+  @Test
+  public void testSetRMIdentifierInRegistration() throws Exception {
+
+    Configuration conf = new Configuration();
+    rm = new MockRM(conf);
+    rm.start();
+
+    MockNM nm = new MockNM("host1:1234", 5120, rm.getResourceTrackerService());
+    RegisterNodeManagerResponse response = nm.registerNode();
+
+    // Verify the RMIdentifier is correctly set in RegisterNodeManagerResponse
+    Assert.assertEquals(ResourceManager.clusterTimeStamp,
+      response.getRMIdentifier());
+  }
+
   @Test
   @Test
   public void testReboot() throws Exception {
   public void testReboot() throws Exception {
     Configuration conf = new Configuration();
     Configuration conf = new Configuration();

+ 84 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationmasterservice/TestApplicationMasterService.java

@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.applicationmasterservice;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestApplicationMasterService {
+  private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
+
+  private final int GB = 1024;
+  private static YarnConfiguration conf;
+
+  @BeforeClass
+  public static void setup() {
+    conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
+      ResourceScheduler.class);
+  }
+
+  @Test(timeout = 30000)
+  public void testRMIdentifierOnContainerAllocation() throws Exception {
+    MockRM rm = new MockRM(conf);
+    rm.start();
+
+    // Register node1
+    MockNM nm1 = rm.registerNode("h1:1234", 6 * GB);
+
+    // Submit an application
+    RMApp app1 = rm.submitApp(2048);
+
+    // kick the scheduling
+    nm1.nodeHeartbeat(true);
+    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+    MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+    am1.registerAppAttempt();
+
+    am1.addRequests(new String[] { "h1" }, GB, 1, 1);
+    AllocateResponse alloc1Response = am1.schedule(); // send the request
+
+    // kick the scheduler
+    nm1.nodeHeartbeat(true);
+    while (alloc1Response.getAllocatedContainers().size() < 1) {
+      LOG.info("Waiting for containers to be created for app 1...");
+      Thread.sleep(1000);
+      alloc1Response = am1.schedule();
+    }
+
+    // assert RMIdentifer is set properly in allocated containers
+    Assert.assertEquals(rm.clusterTimeStamp, alloc1Response
+      .getAllocatedContainers().get(0).getRMIdentifer());
+    rm.stop();
+  }
+}

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java

@@ -69,7 +69,7 @@ public class TestRMContainerImpl {
     Priority priority = BuilderUtils.newPriority(5);
     Priority priority = BuilderUtils.newPriority(5);
 
 
     Container container = BuilderUtils.newContainer(containerId, nodeId,
     Container container = BuilderUtils.newContainer(containerId, nodeId,
-        "host:3465", resource, priority, null);
+        "host:3465", resource, priority, null, 0);
 
 
     RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
     RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
         nodeId, eventHandler, expirer);
         nodeId, eventHandler, expirer);
@@ -139,7 +139,7 @@ public class TestRMContainerImpl {
     Priority priority = BuilderUtils.newPriority(5);
     Priority priority = BuilderUtils.newPriority(5);
 
 
     Container container = BuilderUtils.newContainer(containerId, nodeId,
     Container container = BuilderUtils.newContainer(containerId, nodeId,
-        "host:3465", resource, priority, null);
+        "host:3465", resource, priority, null, 0);
 
 
     RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
     RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
         nodeId, eventHandler, expirer);
         nodeId, eventHandler, expirer);

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java

@@ -359,7 +359,7 @@ public class TestContainerManagerSecurity {
         Container container =
         Container container =
             BuilderUtils.newContainer(newTokenId.getContainerID(), null, null,
             BuilderUtils.newContainer(newTokenId.getContainerID(), null, null,
                 BuilderUtils.newResource(newTokenId.getResource().getMemory(),
                 BuilderUtils.newResource(newTokenId.getResource().getMemory(),
-                    newTokenId.getResource().getVirtualCores()), null, null);
+                    newTokenId.getResource().getVirtualCores()), null, null, 0);
         StartContainerRequest request = Records.newRecord(StartContainerRequest.class);
         StartContainerRequest request = Records.newRecord(StartContainerRequest.class);
         request.setContainerLaunchContext(context);
         request.setContainerLaunchContext(context);
         request.setContainer(container);
         request.setContainer(container);
@@ -541,7 +541,7 @@ public class TestContainerManagerSecurity {
         createContainerLaunchContextForTest(tokenId);
         createContainerLaunchContextForTest(tokenId);
     Container container =
     Container container =
         BuilderUtils.newContainer(tokenId.getContainerID(), null, null,
         BuilderUtils.newContainer(tokenId.getContainerID(), null, null,
-            BuilderUtils.newResource(2048, 1), null, null);
+            BuilderUtils.newResource(2048, 1), null, null, 0);
     request.setContainerLaunchContext(context);
     request.setContainerLaunchContext(context);
     request.setContainer(container);
     request.setContainer(container);
     try {
     try {
@@ -569,7 +569,7 @@ public class TestContainerManagerSecurity {
     Container container =
     Container container =
         BuilderUtils.newContainer(tokenId.getContainerID(), null, null,
         BuilderUtils.newContainer(tokenId.getContainerID(), null, null,
             BuilderUtils.newResource(tokenId.getResource().getMemory(), tokenId
             BuilderUtils.newResource(tokenId.getResource().getMemory(), tokenId
-                .getResource().getVirtualCores()), null, null);
+                .getResource().getVirtualCores()), null, null, 0);
     request.setContainerLaunchContext(context);
     request.setContainerLaunchContext(context);
     request.setContainer(container);
     request.setContainer(container);
     try {
     try {