Browse Source

YARN-714. Added NMTokens to be sent to AMs as part of heart-beat response. Contributed by Omkar Vinit Joshi.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1487704 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli 12 years ago
parent
commit
1858c7e52e

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -221,6 +221,9 @@ Release 2.0.5-beta - UNRELEASED
     YARN-711. Copied BuilderUtil methods in individual API records as
     BuilderUtils is going to be dismantled. (Jian He via vinodkv)
 
+    YARN-714. Added NMTokens to be sent to AMs as part of heart-beat response.
+    (Omkar Vinit Joshi via vinodkv)
+
   OPTIMIZATIONS
 
     YARN-512. Log aggregation root directory check is more expensive than it

+ 20 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
 
 /**
  * <p>The response sent by the <code>ResourceManager</code> the  
@@ -176,5 +177,24 @@ public interface AllocateResponse {
   @Private
   @Unstable
   public void setPreemptionMessage(PreemptionMessage request);
+  
+  @Public
+  @Stable
+  public void setNMTokens(List<Token> nmTokens);
+  
+  /**
+   * Get the list of NMTokens required for communicating with NM. New NMTokens
+   * issued only if
+   * 1) AM is receiving first container on underlying NodeManager.
+   * OR
+   * 2) NMToken master key rolled over in ResourceManager and AM is getting new
+   * container on the same underlying NodeManager.
+   * AM will receive one NMToken per NM irrespective of the number of containers
+   * issued on same NM. AM is expected to store these tokens until issued a
+   * new token for the same NM.
+   */
+  @Public
+  @Stable
+  public List<Token> getNMTokens();
 
 }

+ 76 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java

@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.PreemptionMessage;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -30,10 +31,12 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.ProtoBase;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
@@ -52,6 +55,7 @@ public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto>
   Resource limit;
 
   private List<Container> allocatedContainers = null;
+  private List<Token> nmTokens = null;
   private List<ContainerStatus> completedContainersStatuses = null;
 
   private List<NodeReport> updatedNodes = null;
@@ -81,6 +85,11 @@ public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto>
           getProtoIterable(this.allocatedContainers);
       builder.addAllAllocatedContainers(iterable);
     }
+    if (nmTokens != null) {
+      builder.clearNmTokens();
+      Iterable<TokenProto> iterable = getTokenProtoIterable(nmTokens);
+      builder.addAllNmTokens(iterable);
+    }
     if (this.completedContainersStatuses != null) {
       builder.clearCompletedContainerStatuses();
       Iterable<ContainerStatusProto> iterable =
@@ -210,6 +219,24 @@ public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto>
     completedContainersStatuses.addAll(containers);
   }
 
+  @Override
+  public synchronized void setNMTokens(List<Token> nmTokens) {
+    if (nmTokens == null || nmTokens.isEmpty()) {
+      this.nmTokens.clear();
+      builder.clearNmTokens();
+      return;
+    }
+    // Implementing it as an append rather than set for consistency
+    initLocalNewNMTokenList();
+    this.nmTokens.addAll(nmTokens);
+  }
+
+  @Override
+  public synchronized List<Token> getNMTokens() {
+    initLocalNewNMTokenList();
+    return nmTokens;
+  }
+  
   @Override
   public synchronized int getNumClusterNodes() {
     AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
@@ -274,6 +301,18 @@ public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto>
     }
   }
 
+  private synchronized void initLocalNewNMTokenList() {
+    if (nmTokens != null) {
+      return;
+    }
+    AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
+    List<TokenProto> list = p.getNmTokensList();
+    nmTokens = new ArrayList<Token>();
+    for (TokenProto t : list) {
+      nmTokens.add(convertFromProtoFormat(t));
+    }
+  }
+
   private synchronized Iterable<ContainerProto> getProtoIterable(
       final List<Container> newContainersList) {
     maybeInitBuilder();
@@ -305,6 +344,35 @@ public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto>
     };
   }
 
+  private synchronized Iterable<TokenProto> getTokenProtoIterable(
+      final List<Token> nmTokenList) {
+    maybeInitBuilder();
+    return new Iterable<TokenProto>() {
+      @Override
+      public synchronized Iterator<TokenProto> iterator() {
+        return new Iterator<TokenProto>() {
+
+          Iterator<Token> iter = nmTokenList.iterator();
+          
+          @Override
+          public boolean hasNext() {
+            return iter.hasNext();
+          }
+          
+          @Override
+          public TokenProto next() {
+            return convertToProtoFormat(iter.next());
+          }
+          
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException();
+          }
+        };
+      }
+    };
+  }
+  
   private synchronized Iterable<ContainerStatusProto>
   getContainerStatusProtoIterable(
       final List<ContainerStatus> newContainersList) {
@@ -427,4 +495,12 @@ public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto>
   private synchronized PreemptionMessageProto convertToProtoFormat(PreemptionMessage r) {
     return ((PreemptionMessagePBImpl)r).getProto();
   }
+  
+  private synchronized TokenProto convertToProtoFormat(Token token) {
+    return ((TokenPBImpl)token).getProto();
+  }
+  
+  private synchronized Token convertFromProtoFormat(TokenProto proto) {
+    return new TokenPBImpl(proto);
+  }
 }  

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto

@@ -67,6 +67,7 @@ message AllocateResponseProto {
   repeated NodeReportProto updated_nodes = 6;
   optional int32 num_cluster_nodes = 7;
   optional PreemptionMessageProto preempt = 8;
+  repeated hadoop.common.TokenProto nm_tokens = 9;
 }
 
 message PreemptionMessageProto {