Browse Source

YARN-3116. RM notifies NM whether a container is an AM container or normal task container. Contributed by Giovanni Matteo Fumarola.

(cherry picked from commit 1ea36299a47af302379ae0750b571ec021eb54ad)
Zhijie Shen 10 years ago
parent
commit
37a93c2d78
18 changed files with 253 additions and 22 deletions
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 19 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerContext.java
  3. 7 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerInitializationContext.java
  4. 7 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerTerminationContext.java
  5. 34 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerType.java
  6. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
  7. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
  8. 38 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
  9. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto
  10. 53 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/security/TestYARNTokenIdentifier.java
  11. 4 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java
  12. 15 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
  13. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
  14. 8 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java
  15. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
  16. 3 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
  17. 26 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
  18. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -83,6 +83,9 @@ Release 2.8.0 - UNRELEASED
     YARN-1012. Report NM aggregated container resource utilization in heartbeat. 
     YARN-1012. Report NM aggregated container resource utilization in heartbeat. 
     (Inigo Goiri via kasha)
     (Inigo Goiri via kasha)
 
 
+    YARN-3116. RM notifies NM whether a container is an AM container or normal
+    task container. (Giovanni Matteo Fumarola via zjshen)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
     YARN-644. Basic null check is not performed on passed in arguments before
     YARN-644. Basic null check is not performed on passed in arguments before

+ 19 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerContext.java

@@ -35,14 +35,23 @@ public class ContainerContext {
   private final String user;
   private final String user;
   private final ContainerId containerId;
   private final ContainerId containerId;
   private final Resource resource;
   private final Resource resource;
+  private final ContainerType containerType;
 
 
   @Private
   @Private
   @Unstable
   @Unstable
   public ContainerContext(String user, ContainerId containerId,
   public ContainerContext(String user, ContainerId containerId,
       Resource resource) {
       Resource resource) {
+    this(user, containerId, resource, ContainerType.TASK);
+  }
+
+  @Private
+  @Unstable
+  public ContainerContext(String user, ContainerId containerId,
+      Resource resource, ContainerType containerType) {
     this.user = user;
     this.user = user;
     this.containerId = containerId;
     this.containerId = containerId;
     this.resource = resource;
     this.resource = resource;
+    this.containerType = containerType;
   }
   }
 
 
   /**
   /**
@@ -72,4 +81,14 @@ public class ContainerContext {
   public Resource getResource() {
   public Resource getResource() {
     return resource;
     return resource;
   }
   }
+
+  /**
+   * Get {@link ContainerType} the type of the container
+   * being initialized or stopped.
+   *
+   * @return the type of the container
+   */
+  public ContainerType getContainerType() {
+    return containerType;
+  }
 }
 }

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerInitializationContext.java

@@ -41,4 +41,11 @@ public class ContainerInitializationContext extends ContainerContext {
     super(user, containerId, resource);
     super(user, containerId, resource);
   }
   }
 
 
+  @Private
+  @Unstable
+  public ContainerInitializationContext(String user, ContainerId containerId,
+      Resource resource, ContainerType containerType) {
+    super(user, containerId, resource, containerType);
+  }
+
 }
 }

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerTerminationContext.java

@@ -41,4 +41,11 @@ public class ContainerTerminationContext extends ContainerContext {
     super(user, containerId, resource);
     super(user, containerId, resource);
   }
   }
 
 
+  @Private
+  @Unstable
+  public ContainerTerminationContext(String user, ContainerId containerId,
+      Resource resource, ContainerType containerType) {
+    super(user, containerId, resource, containerType);
+  }
+
 }
 }

+ 34 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerType.java

@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api;
+
+/**
+ * Container property encoding allocation and execution semantics.
+ * 
+ * <p>
+ * The container types are the following:
+ * <ul>
+ * <li>{@link #APPLICATION_MASTER}
+ * <li>{@link #TASK}
+ * </ul>
+ * </p>
+ */
+public enum ContainerType {
+  APPLICATION_MASTER, TASK
+}

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto

@@ -264,6 +264,11 @@ message NodeLabelProto {
   optional bool isExclusive = 2 [default = true]; 
   optional bool isExclusive = 2 [default = true]; 
 }
 }
 
 
+enum ContainerTypeProto {
+  APPLICATION_MASTER = 1;
+  TASK = 2;
+}
+
 ////////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////////
 ////// From AM_RM_Protocol /////////////////////////////////////////////
 ////// From AM_RM_Protocol /////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////////
 ////////////////////////////////////////////////////////////////////////

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java

@@ -53,7 +53,9 @@ import org.apache.hadoop.yarn.proto.YarnProtos.QueueStateProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ReservationRequestInterpreterProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ReservationRequestInterpreterProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationAttemptStateProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationAttemptStateProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos;
+import org.apache.hadoop.yarn.server.api.ContainerType;
 
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ByteString;
 
 
@@ -270,4 +272,14 @@ public class ProtoUtils {
     return LogAggregationStatus.valueOf(e.name().replace(
     return LogAggregationStatus.valueOf(e.name().replace(
       LOG_AGGREGATION_STATUS_PREFIX, ""));
       LOG_AGGREGATION_STATUS_PREFIX, ""));
   }
   }
+
+  /*
+   * ContainerType
+   */
+  public static ContainerTypeProto convertToProtoFormat(ContainerType e) {
+    return ContainerTypeProto.valueOf(e.name());
+  }
+  public static ContainerType convertFromProtoFormat(ContainerTypeProto e) {
+    return ContainerType.valueOf(e.name());
+  }
 }
 }

+ 38 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java

@@ -39,13 +39,15 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto;
 import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto;
 import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto;
+import org.apache.hadoop.yarn.server.api.ContainerType;
 
 
 import com.google.protobuf.TextFormat;
 import com.google.protobuf.TextFormat;
 
 
-
 /**
 /**
  * TokenIdentifier for a container. Encodes {@link ContainerId},
  * TokenIdentifier for a container. Encodes {@link ContainerId},
  * {@link Resource} needed by the container and the target NMs host-address.
  * {@link Resource} needed by the container and the target NMs host-address.
@@ -66,14 +68,24 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
       int masterKeyId, long rmIdentifier, Priority priority, long creationTime) {
       int masterKeyId, long rmIdentifier, Priority priority, long creationTime) {
     this(containerID, hostName, appSubmitter, r, expiryTimeStamp, masterKeyId,
     this(containerID, hostName, appSubmitter, r, expiryTimeStamp, masterKeyId,
         rmIdentifier, priority, creationTime, null,
         rmIdentifier, priority, creationTime, null,
-        CommonNodeLabelsManager.NO_LABEL);
+        CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK);
   }
   }
 
 
   public ContainerTokenIdentifier(ContainerId containerID, String hostName,
   public ContainerTokenIdentifier(ContainerId containerID, String hostName,
       String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId,
       String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId,
       long rmIdentifier, Priority priority, long creationTime,
       long rmIdentifier, Priority priority, long creationTime,
       LogAggregationContext logAggregationContext, String nodeLabelExpression) {
       LogAggregationContext logAggregationContext, String nodeLabelExpression) {
-    ContainerTokenIdentifierProto.Builder builder = 
+    this(containerID, hostName, appSubmitter, r, expiryTimeStamp, masterKeyId,
+        rmIdentifier, priority, creationTime, logAggregationContext,
+        nodeLabelExpression, ContainerType.TASK);
+  }
+
+  public ContainerTokenIdentifier(ContainerId containerID, String hostName,
+      String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId,
+      long rmIdentifier, Priority priority, long creationTime,
+      LogAggregationContext logAggregationContext, String nodeLabelExpression,
+      ContainerType containerType) {
+    ContainerTokenIdentifierProto.Builder builder =
         ContainerTokenIdentifierProto.newBuilder();
         ContainerTokenIdentifierProto.newBuilder();
     if (containerID != null) {
     if (containerID != null) {
       builder.setContainerId(((ContainerIdPBImpl)containerID).getProto());
       builder.setContainerId(((ContainerIdPBImpl)containerID).getProto());
@@ -99,7 +111,8 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
     if (nodeLabelExpression != null) {
     if (nodeLabelExpression != null) {
       builder.setNodeLabelExpression(nodeLabelExpression);
       builder.setNodeLabelExpression(nodeLabelExpression);
     }
     }
-    
+    builder.setContainerType(convertToProtoFormat(containerType));
+
     proto = builder.build();
     proto = builder.build();
   }
   }
 
 
@@ -156,7 +169,18 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
   public long getRMIdentifier() {
   public long getRMIdentifier() {
     return proto.getRmIdentifier();
     return proto.getRmIdentifier();
   }
   }
-  
+
+  /**
+   * Get the ContainerType of container to allocate
+   * @return ContainerType
+   */
+  public ContainerType getContainerType(){
+    if (!proto.hasContainerType()) {
+      return null;
+    }
+    return convertFromProtoFormat(proto.getContainerType());
+  }
+
   public ContainerTokenIdentifierProto getProto() {
   public ContainerTokenIdentifierProto getProto() {
     return proto;
     return proto;
   }
   }
@@ -232,4 +256,13 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
   public String toString() {
   public String toString() {
     return TextFormat.shortDebugString(getProto());
     return TextFormat.shortDebugString(getProto());
   }
   }
+
+  private ContainerTypeProto convertToProtoFormat(ContainerType containerType) {
+    return ProtoUtils.convertToProtoFormat(containerType);
+  }
+
+  private ContainerType convertFromProtoFormat(
+      ContainerTypeProto containerType) {
+    return ProtoUtils.convertFromProtoFormat(containerType);
+  }
 }
 }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto

@@ -50,6 +50,7 @@ message ContainerTokenIdentifierProto {
   optional int64 creationTime = 9;
   optional int64 creationTime = 9;
   optional LogAggregationContextProto logAggregationContext = 10;
   optional LogAggregationContextProto logAggregationContext = 10;
   optional string nodeLabelExpression = 11;
   optional string nodeLabelExpression = 11;
+  optional ContainerTypeProto containerType = 12;
 }
 }
 
 
 message ClientToAMTokenIdentifierProto {
 message ClientToAMTokenIdentifierProto {

+ 53 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/security/TestYARNTokenIdentifier.java

@@ -33,10 +33,12 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.YARNDelegationTokenIdentifierProto;
 import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.YARNDelegationTokenIdentifierProto;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.Test;
 
 
@@ -201,6 +203,12 @@ public class TestYARNTokenIdentifier {
         anotherToken.getCreationTime(), creationTime);
         anotherToken.getCreationTime(), creationTime);
     
     
     Assert.assertNull(anotherToken.getLogAggregationContext());
     Assert.assertNull(anotherToken.getLogAggregationContext());
+
+    Assert.assertEquals(CommonNodeLabelsManager.NO_LABEL,
+        anotherToken.getNodeLabelExpression());
+
+    Assert.assertEquals(ContainerType.TASK,
+        anotherToken.getContainerType());
   }
   }
   
   
   @Test
   @Test
@@ -347,4 +355,49 @@ public class TestYARNTokenIdentifier {
     Assert.assertEquals(new Text("yarn"), token.getRenewer());
     Assert.assertEquals(new Text("yarn"), token.getRenewer());
   }
   }
 
 
+  @Test
+  public void testAMContainerTokenIdentifier() throws IOException {
+    ContainerId containerID = ContainerId.newContainerId(
+        ApplicationAttemptId.newInstance(ApplicationId.newInstance(
+            1, 1), 1), 1);
+    String hostName = "host0";
+    String appSubmitter = "usr0";
+    Resource r = Resource.newInstance(1024, 1);
+    long expiryTimeStamp = 1000;
+    int masterKeyId = 1;
+    long rmIdentifier = 1;
+    Priority priority = Priority.newInstance(1);
+    long creationTime = 1000;
+
+    ContainerTokenIdentifier token =
+        new ContainerTokenIdentifier(containerID, hostName, appSubmitter, r,
+            expiryTimeStamp, masterKeyId, rmIdentifier, priority, creationTime,
+            null, CommonNodeLabelsManager.NO_LABEL, ContainerType.APPLICATION_MASTER);
+
+    ContainerTokenIdentifier anotherToken = new ContainerTokenIdentifier();
+
+    byte[] tokenContent = token.getBytes();
+    DataInputBuffer dib = new DataInputBuffer();
+    dib.reset(tokenContent, tokenContent.length);
+    anotherToken.readFields(dib);
+
+    Assert.assertEquals(ContainerType.APPLICATION_MASTER,
+        anotherToken.getContainerType());
+
+    token =
+        new ContainerTokenIdentifier(containerID, hostName, appSubmitter, r,
+            expiryTimeStamp, masterKeyId, rmIdentifier, priority, creationTime,
+            null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK);
+
+    anotherToken = new ContainerTokenIdentifier();
+
+    tokenContent = token.getBytes();
+    dib = new DataInputBuffer();
+    dib.reset(tokenContent, tokenContent.length);
+    anotherToken.readFields(dib);
+
+    Assert.assertEquals(ContainerType.TASK,
+        anotherToken.getContainerType());
+  }
+
 }
 }

+ 4 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java

@@ -225,7 +225,8 @@ public class AuxServices extends AbstractService
           try {
           try {
             serv.initializeContainer(new ContainerInitializationContext(
             serv.initializeContainer(new ContainerInitializationContext(
                 event.getUser(), event.getContainer().getContainerId(),
                 event.getUser(), event.getContainer().getContainerId(),
-                event.getContainer().getResource()));
+                event.getContainer().getResource(), event.getContainer()
+                .getContainerTokenIdentifier().getContainerType()));
           } catch (Throwable th) {
           } catch (Throwable th) {
             logWarningWhenAuxServiceThrowExceptions(serv,
             logWarningWhenAuxServiceThrowExceptions(serv,
                 AuxServicesEventType.CONTAINER_INIT, th);
                 AuxServicesEventType.CONTAINER_INIT, th);
@@ -237,7 +238,8 @@ public class AuxServices extends AbstractService
           try {
           try {
             serv.stopContainer(new ContainerTerminationContext(
             serv.stopContainer(new ContainerTerminationContext(
                 event.getUser(), event.getContainer().getContainerId(),
                 event.getUser(), event.getContainer().getContainerId(),
-                event.getContainer().getResource()));
+                event.getContainer().getResource(), event.getContainer()
+                .getContainerTokenIdentifier().getContainerType()));
           } catch (Throwable th) {
           } catch (Throwable th) {
             logWarningWhenAuxServiceThrowExceptions(serv,
             logWarningWhenAuxServiceThrowExceptions(serv,
                 AuxServicesEventType.CONTAINER_STOP, th);
                 AuxServicesEventType.CONTAINER_STOP, th);

+ 15 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java

@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -61,7 +62,6 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
 
 
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.HashMultiset;
@@ -467,13 +467,26 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
       .hasNext();) {
       .hasNext();) {
       RMContainer rmContainer = i.next();
       RMContainer rmContainer = i.next();
       Container container = rmContainer.getContainer();
       Container container = rmContainer.getContainer();
+      ContainerType containerType = ContainerType.TASK;
+      // The working knowledge is that masterContainer for AM is null as it
+      // itself is the master container.
+      RMAppAttempt appAttempt =
+          rmContext
+              .getRMApps()
+              .get(
+                  container.getId().getApplicationAttemptId()
+                      .getApplicationId()).getCurrentAppAttempt();
+      if (appAttempt.getMasterContainer() == null
+          && appAttempt.getSubmissionContext().getUnmanagedAM() == false) {
+        containerType = ContainerType.APPLICATION_MASTER;
+      }
       try {
       try {
         // create container token and NMToken altogether.
         // create container token and NMToken altogether.
         container.setContainerToken(rmContext.getContainerTokenSecretManager()
         container.setContainerToken(rmContext.getContainerTokenSecretManager()
             .createContainerToken(container.getId(), container.getNodeId(),
             .createContainerToken(container.getId(), container.getNodeId(),
                 getUser(), container.getResource(), container.getPriority(),
                 getUser(), container.getResource(), container.getPriority(),
                 rmContainer.getCreationTime(), this.logAggregationContext,
                 rmContainer.getCreationTime(), this.logAggregationContext,
-                rmContainer.getNodeLabelExpression()));
+                rmContainer.getNodeLabelExpression(), containerType));
         NMToken nmToken =
         NMToken nmToken =
             rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
             rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
               getApplicationAttemptId(), container);
               getApplicationAttemptId(), container);

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -829,7 +829,8 @@ public class LeafQueue extends AbstractCSQueue {
             RMAppAttempt rmAppAttempt =
             RMAppAttempt rmAppAttempt =
                 csContext.getRMContext().getRMApps()
                 csContext.getRMContext().getRMApps()
                     .get(application.getApplicationId()).getCurrentAppAttempt();
                     .get(application.getApplicationId()).getCurrentAppAttempt();
-            if (null == rmAppAttempt.getMasterContainer()) {
+            if (rmAppAttempt.getSubmissionContext().getUnmanagedAM() == false
+                && null == rmAppAttempt.getMasterContainer()) {
               if (LOG.isDebugEnabled()) {
               if (LOG.isDebugEnabled()) {
                 LOG.debug("Skip allocating AM container to app_attempt="
                 LOG.debug("Skip allocating AM container to app_attempt="
                     + application.getApplicationAttemptId()
                     + application.getApplicationAttemptId()

+ 8 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
@@ -166,7 +167,7 @@ public class RMContainerTokenSecretManager extends
 
 
   /**
   /**
    * Helper function for creating ContainerTokens
    * Helper function for creating ContainerTokens
-   * 
+   *
    * @param containerId
    * @param containerId
    * @param nodeId
    * @param nodeId
    * @param appSubmitter
    * @param appSubmitter
@@ -179,12 +180,12 @@ public class RMContainerTokenSecretManager extends
       String appSubmitter, Resource capability, Priority priority,
       String appSubmitter, Resource capability, Priority priority,
       long createTime) {
       long createTime) {
     return createContainerToken(containerId, nodeId, appSubmitter, capability,
     return createContainerToken(containerId, nodeId, appSubmitter, capability,
-      priority, createTime, null, null);
+      priority, createTime, null, null, ContainerType.TASK);
   }
   }
 
 
   /**
   /**
    * Helper function for creating ContainerTokens
    * Helper function for creating ContainerTokens
-   * 
+   *
    * @param containerId
    * @param containerId
    * @param nodeId
    * @param nodeId
    * @param appSubmitter
    * @param appSubmitter
@@ -192,12 +193,14 @@ public class RMContainerTokenSecretManager extends
    * @param priority
    * @param priority
    * @param createTime
    * @param createTime
    * @param logAggregationContext
    * @param logAggregationContext
+   * @param nodeLabelExpression
+   * @param containerType
    * @return the container-token
    * @return the container-token
    */
    */
   public Token createContainerToken(ContainerId containerId, NodeId nodeId,
   public Token createContainerToken(ContainerId containerId, NodeId nodeId,
       String appSubmitter, Resource capability, Priority priority,
       String appSubmitter, Resource capability, Priority priority,
       long createTime, LogAggregationContext logAggregationContext,
       long createTime, LogAggregationContext logAggregationContext,
-      String nodeLabelExpression) {
+      String nodeLabelExpression, ContainerType containerType) {
     byte[] password;
     byte[] password;
     ContainerTokenIdentifier tokenIdentifier;
     ContainerTokenIdentifier tokenIdentifier;
     long expiryTimeStamp =
     long expiryTimeStamp =
@@ -211,7 +214,7 @@ public class RMContainerTokenSecretManager extends
             appSubmitter, capability, expiryTimeStamp, this.currentMasterKey
             appSubmitter, capability, expiryTimeStamp, this.currentMasterKey
               .getMasterKey().getKeyId(),
               .getMasterKey().getKeyId(),
             ResourceManager.getClusterTimeStamp(), priority, createTime,
             ResourceManager.getClusterTimeStamp(), priority, createTime,
-            logAggregationContext, nodeLabelExpression);
+            logAggregationContext, nodeLabelExpression, containerType);
       password = this.createPassword(tokenIdentifier);
       password = this.createPassword(tokenIdentifier);
 
 
     } finally {
     } finally {

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java

@@ -55,6 +55,8 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.Task.State;
 import org.apache.hadoop.yarn.server.resourcemanager.Task.State;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -168,6 +170,16 @@ public class Application {
     
     
     resourceManager.getClientRMService().submitApplication(request);
     resourceManager.getClientRMService().submitApplication(request);
 
 
+    RMAppEvent event =
+        new RMAppEvent(this.applicationId, RMAppEventType.START);
+    resourceManager.getRMContext().getRMApps().get(applicationId).handle(event);
+    event =
+        new RMAppEvent(this.applicationId, RMAppEventType.APP_NEW_SAVED);
+    resourceManager.getRMContext().getRMApps().get(applicationId).handle(event);
+    event =
+        new RMAppEvent(this.applicationId, RMAppEventType.APP_ACCEPTED);
+    resourceManager.getRMContext().getRMApps().get(applicationId).handle(event);
+
     // Notify scheduler
     // Notify scheduler
     AppAddedSchedulerEvent addAppEvent =
     AppAddedSchedulerEvent addAppEvent =
         new AppAddedSchedulerEvent(this.applicationId, this.queue, "user");
         new AppAddedSchedulerEvent(this.applicationId, this.queue, "user");

+ 3 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@@ -286,11 +287,11 @@ public class TestContainerAllocation {
         public Token createContainerToken(ContainerId containerId,
         public Token createContainerToken(ContainerId containerId,
             NodeId nodeId, String appSubmitter, Resource capability,
             NodeId nodeId, String appSubmitter, Resource capability,
             Priority priority, long createTime,
             Priority priority, long createTime,
-            LogAggregationContext logAggregationContext, String nodeLabelExp) {
+            LogAggregationContext logAggregationContext, String nodeLabelExp, ContainerType containerType) {
           numRetries++;
           numRetries++;
           return super.createContainerToken(containerId, nodeId, appSubmitter,
           return super.createContainerToken(containerId, nodeId, appSubmitter,
               capability, priority, createTime, logAggregationContext,
               capability, priority, createTime, logAggregationContext,
-              nodeLabelExp);
+              nodeLabelExp, containerType);
         }
         }
       };
       };
     }
     }

+ 26 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -38,6 +39,8 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
@@ -152,6 +155,11 @@ public class FairSchedulerTestBase {
     when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
     when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
     when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn(
     when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn(
         new RMAppAttemptMetrics(id, resourceManager.getRMContext()));
         new RMAppAttemptMetrics(id, resourceManager.getRMContext()));
+    ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class);
+    when(submissionContext.getUnmanagedAM()).thenReturn(false);
+    when(rmAppAttempt.getSubmissionContext()).thenReturn(submissionContext);
+    Container container = mock(Container.class);
+    when(rmAppAttempt.getMasterContainer()).thenReturn(container);
     resourceManager.getRMContext().getRMApps()
     resourceManager.getRMContext().getRMApps()
         .put(id.getApplicationId(), rmApp);
         .put(id.getApplicationId(), rmApp);
 
 
@@ -175,6 +183,9 @@ public class FairSchedulerTestBase {
     when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
     when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
     when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn(
     when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn(
         new RMAppAttemptMetrics(id,resourceManager.getRMContext()));
         new RMAppAttemptMetrics(id,resourceManager.getRMContext()));
+    ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class);
+    when(submissionContext.getUnmanagedAM()).thenReturn(false);
+    when(rmAppAttempt.getSubmissionContext()).thenReturn(submissionContext);
     resourceManager.getRMContext().getRMApps()
     resourceManager.getRMContext().getRMApps()
         .put(id.getApplicationId(), rmApp);
         .put(id.getApplicationId(), rmApp);
 
 
@@ -206,13 +217,20 @@ public class FairSchedulerTestBase {
   protected void createApplicationWithAMResource(ApplicationAttemptId attId,
   protected void createApplicationWithAMResource(ApplicationAttemptId attId,
       String queue, String user, Resource amResource) {
       String queue, String user, Resource amResource) {
     RMContext rmContext = resourceManager.getRMContext();
     RMContext rmContext = resourceManager.getRMContext();
-    RMApp rmApp = new RMAppImpl(attId.getApplicationId(), rmContext, conf,
-        null, null, null, ApplicationSubmissionContext.newInstance(null, null,
-        null, null, null, false, false, 0, amResource, null), null, null,
+    ApplicationId appId = attId.getApplicationId();
+    RMApp rmApp = new RMAppImpl(appId, rmContext, conf,
+        null, user, null, ApplicationSubmissionContext.newInstance(appId, null,
+        queue, null, null, false, false, 0, amResource, null), null, null,
         0, null, null, null);
         0, null, null, null);
-    rmContext.getRMApps().put(attId.getApplicationId(), rmApp);
+    rmContext.getRMApps().put(appId, rmApp);
+    RMAppEvent event = new RMAppEvent(appId, RMAppEventType.START);
+    resourceManager.getRMContext().getRMApps().get(appId).handle(event);
+    event = new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED);
+    resourceManager.getRMContext().getRMApps().get(appId).handle(event);
+    event = new RMAppEvent(appId, RMAppEventType.APP_ACCEPTED);
+    resourceManager.getRMContext().getRMApps().get(appId).handle(event);
     AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(
     AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(
-        attId.getApplicationId(), queue, user);
+        appId, queue, user);
     scheduler.handle(appAddedEvent);
     scheduler.handle(appAddedEvent);
     AppAttemptAddedSchedulerEvent attempAddedEvent =
     AppAttemptAddedSchedulerEvent attempAddedEvent =
         new AppAttemptAddedSchedulerEvent(attId, false);
         new AppAttemptAddedSchedulerEvent(attId, false);
@@ -227,6 +245,9 @@ public class FairSchedulerTestBase {
     RMAppAttemptMetrics attemptMetric = mock(RMAppAttemptMetrics.class);
     RMAppAttemptMetrics attemptMetric = mock(RMAppAttemptMetrics.class);
     when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
     when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
     when(app.getCurrentAppAttempt()).thenReturn(attempt);
     when(app.getCurrentAppAttempt()).thenReturn(attempt);
+    ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class);
+    when(submissionContext.getUnmanagedAM()).thenReturn(false);
+    when(attempt.getSubmissionContext()).thenReturn(submissionContext);
     resourceManager.getRMContext().getRMApps()
     resourceManager.getRMContext().getRMApps()
         .put(attemptId.getApplicationId(), app);
         .put(attemptId.getApplicationId(), app);
     return app;
     return app;

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java

@@ -39,6 +39,7 @@ import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -1184,6 +1185,9 @@ public class TestFifoScheduler {
     RMAppAttemptMetrics attemptMetric = mock(RMAppAttemptMetrics.class);
     RMAppAttemptMetrics attemptMetric = mock(RMAppAttemptMetrics.class);
     when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
     when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
     when(app.getCurrentAppAttempt()).thenReturn(attempt);
     when(app.getCurrentAppAttempt()).thenReturn(attempt);
+    ApplicationSubmissionContext submissionContext = mock(ApplicationSubmissionContext.class);
+    when(submissionContext.getUnmanagedAM()).thenReturn(false);
+    when(attempt.getSubmissionContext()).thenReturn(submissionContext);
     context.getRMApps().putIfAbsent(attemptId.getApplicationId(), app);
     context.getRMApps().putIfAbsent(attemptId.getApplicationId(), app);
     return app;
     return app;
   }
   }