Browse Source

YARN-6523. Optimize system credentials sent in node heartbeat responses. Contributed by Manikandan R

Jason Lowe 6 years ago
parent
commit
6a923464af
20 changed files with 632 additions and 124 deletions
  1. 18 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
  2. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
  3. 13 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
  4. 13 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
  5. 29 62
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
  6. 68 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java
  7. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
  8. 52 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
  9. 12 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
  10. 8 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
  11. 25 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
  12. 25 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
  13. 8 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
  14. 13 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
  15. 26 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
  16. 15 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
  17. 3 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
  18. 43 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
  19. 157 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
  20. 98 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java

+ 18 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
 import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -62,6 +63,7 @@ import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TimedPlacementCon
 import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.hadoop.yarn.proto.YarnProtos.AMCommandProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAccessTypeProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutTypeProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
@@ -92,12 +94,16 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.ContainerUpdateTypeProto;
 import org.apache.hadoop.yarn.server.api.ContainerType;
 
+import com.google.common.collect.Interner;
+import com.google.common.collect.Interners;
 import com.google.protobuf.ByteString;
 
 @Private
 @Unstable
 public class ProtoUtils {
 
+  public static final Interner<ByteString> BYTE_STRING_INTERNER =
+      Interners.newWeakInterner();
 
   /*
    * ContainerState
@@ -578,6 +584,18 @@ public class ProtoUtils {
           TimedPlacementConstraintProto.DelayUnit u) {
     return TimedPlacementConstraint.DelayUnit.valueOf(u.name());
   }
+
+  /*
+   * ApplicationId
+   */
+  public static ApplicationIdPBImpl convertFromProtoFormat(
+      ApplicationIdProto p) {
+    return new ApplicationIdPBImpl(p);
+  }
+
+  public static ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+    return ((ApplicationIdPBImpl) t).getProto();
+  }
 }
 
 

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java

@@ -102,4 +102,8 @@ public abstract class NodeHeartbeatRequest {
   public abstract Set<NodeAttribute> getNodeAttributes();
 
   public abstract void setNodeAttributes(Set<NodeAttribute> nodeAttributes);
+
+  public abstract void setTokenSequenceNo(long tokenSequenceNo);
+
+  public abstract long getTokenSequenceNo();
 }

+ 13 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java

@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords;
 
-import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -28,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
 import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
@@ -88,13 +88,6 @@ public abstract class NodeHeartbeatResponse {
 
   public abstract void setDiagnosticsMessage(String diagnosticsMessage);
 
-  // Credentials (i.e. hdfs tokens) needed by NodeManagers for application
-  // localizations and logAggreations.
-  public abstract Map<ApplicationId, ByteBuffer> getSystemCredentialsForApps();
-
-  public abstract void setSystemCredentialsForApps(
-      Map<ApplicationId, ByteBuffer> systemCredentials);
-  
   public abstract boolean getAreNodeLabelsAcceptedByRM();
 
   public abstract void setAreNodeLabelsAcceptedByRM(
@@ -123,4 +116,16 @@ public abstract class NodeHeartbeatResponse {
 
   public abstract void setAreNodeAttributesAcceptedByRM(
       boolean areNodeAttributesAcceptedByRM);
+
+  public abstract void setTokenSequenceNo(long tokenSequenceNo);
+
+  public abstract long getTokenSequenceNo();
+
+  // Credentials (i.e. hdfs tokens) needed by NodeManagers for application
+  // localizations and logAggregations.
+  public abstract void setSystemCredentialsForApps(
+      Collection<SystemCredentialsForAppsProto> systemCredentials);
+
+  public abstract Collection<SystemCredentialsForAppsProto>
+      getSystemCredentialsForApps();
 }

+ 13 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java

@@ -455,4 +455,17 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
     }
     this.logAggregationReportsForApps = logAggregationStatusForApps;
   }
+
+  @Override
+  public void setTokenSequenceNo(long tokenSequenceNo) {
+    maybeInitBuilder();
+    this.builder.setTokenSequenceNo(tokenSequenceNo);
+  }
+
+  @Override
+  public long getTokenSequenceNo() {
+    NodeHeartbeatRequestProtoOrBuilder p =
+        this.viaProto ? this.proto : this.builder;
+    return p.getTokenSequenceNo();
+  }
 }  

+ 29 - 62
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java

@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -26,10 +25,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import com.google.common.collect.Interner;
-import com.google.common.collect.Interners;
-import com.google.protobuf.ByteString;
-
 import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
@@ -39,7 +34,6 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
@@ -76,7 +70,6 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
   private List<ContainerId> containersToCleanup = null;
   private List<ContainerId> containersToBeRemovedFromNM = null;
   private List<ApplicationId> applicationsToCleanup = null;
-  private Map<ApplicationId, ByteBuffer> systemCredentials = null;
   private Resource resource = null;
   private Map<ApplicationId, AppCollectorData> appCollectorsMap = null;
 
@@ -88,9 +81,6 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
   private List<Container> containersToDecrease = null;
   private List<SignalContainerRequest> containersToSignal = null;
 
-  private static final Interner<ByteString> BYTE_STRING_INTERNER =
-      Interners.newWeakInterner();
-
   public NodeHeartbeatResponsePBImpl() {
     builder = NodeHeartbeatResponseProto.newBuilder();
   }
@@ -129,9 +119,6 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
       builder.setContainerQueuingLimit(
           convertToProtoFormat(this.containerQueuingLimit));
     }
-    if (this.systemCredentials != null) {
-      addSystemCredentialsToProto();
-    }
     if (this.containersToUpdate != null) {
       addContainersToUpdateToProto();
     }
@@ -149,17 +136,6 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
     }
   }
 
-  private void addSystemCredentialsToProto() {
-    maybeInitBuilder();
-    builder.clearSystemCredentialsForApps();
-    for (Map.Entry<ApplicationId, ByteBuffer> entry : systemCredentials.entrySet()) {
-      builder.addSystemCredentialsForApps(SystemCredentialsForAppsProto.newBuilder()
-        .setAppId(convertToProtoFormat(entry.getKey()))
-        .setCredentialsForApp(BYTE_STRING_INTERNER.intern(
-            ProtoUtils.convertToProtoFormat(entry.getValue().duplicate()))));
-    }
-  }
-
   private void addAppCollectorsMapToProto() {
     maybeInitBuilder();
     builder.clearAppCollectors();
@@ -168,7 +144,7 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
       AppCollectorData data = entry.getValue();
       AppCollectorDataProto.Builder appCollectorDataBuilder =
           AppCollectorDataProto.newBuilder()
-              .setAppId(convertToProtoFormat(entry.getKey()))
+              .setAppId(ProtoUtils.convertToProtoFormat(entry.getKey()))
               .setAppCollectorAddr(data.getCollectorAddr())
               .setRmIdentifier(data.getRMIdentifier())
               .setVersion(data.getVersion());
@@ -477,7 +453,7 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
     this.applicationsToCleanup = new ArrayList<ApplicationId>();
 
     for (ApplicationIdProto c : list) {
-      this.applicationsToCleanup.add(convertFromProtoFormat(c));
+      this.applicationsToCleanup.add(ProtoUtils.convertFromProtoFormat(c));
     }
   }
 
@@ -510,7 +486,7 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
 
           @Override
           public ApplicationIdProto next() {
-            return convertToProtoFormat(iter.next());
+            return ProtoUtils.convertToProtoFormat(iter.next());
           }
 
           @Override
@@ -644,15 +620,6 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
     builder.addAllContainersToDecrease(iterable);
   }
 
-  @Override
-  public Map<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
-    if (this.systemCredentials != null) {
-      return this.systemCredentials;
-    }
-    initSystemCredentials();
-    return systemCredentials;
-  }
-
   @Override
   public Map<ApplicationId, AppCollectorData> getAppCollectors() {
     if (this.appCollectorsMap != null) {
@@ -662,24 +629,13 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
     return appCollectorsMap;
   }
 
-  private void initSystemCredentials() {
-    NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
-    List<SystemCredentialsForAppsProto> list = p.getSystemCredentialsForAppsList();
-    this.systemCredentials = new HashMap<ApplicationId, ByteBuffer> ();
-    for (SystemCredentialsForAppsProto c : list) {
-      ApplicationId appId = convertFromProtoFormat(c.getAppId());
-      ByteBuffer byteBuffer = ProtoUtils.convertFromProtoFormat(c.getCredentialsForApp());
-      this.systemCredentials.put(appId, byteBuffer);
-    }
-  }
-
   private void initAppCollectorsMap() {
     NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
     List<AppCollectorDataProto> list = p.getAppCollectorsList();
     if (!list.isEmpty()) {
       this.appCollectorsMap = new HashMap<>();
       for (AppCollectorDataProto c : list) {
-        ApplicationId appId = convertFromProtoFormat(c.getAppId());
+        ApplicationId appId = ProtoUtils.convertFromProtoFormat(c.getAppId());
         Token collectorToken = null;
         if (c.hasAppCollectorToken()){
           collectorToken = convertFromProtoFormat(c.getAppCollectorToken());
@@ -694,13 +650,19 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
 
   @Override
   public void setSystemCredentialsForApps(
-      Map<ApplicationId, ByteBuffer> systemCredentials) {
-    if (systemCredentials == null || systemCredentials.isEmpty()) {
-      return;
-    }
+      Collection<SystemCredentialsForAppsProto> systemCredentialsForAppsProto) {
     maybeInitBuilder();
-    this.systemCredentials = new HashMap<ApplicationId, ByteBuffer>();
-    this.systemCredentials.putAll(systemCredentials);
+    builder.clearSystemCredentialsForApps();
+    if (systemCredentialsForAppsProto != null) {
+      builder.addAllSystemCredentialsForApps(systemCredentialsForAppsProto);
+    }
+  }
+
+  @Override
+  public Collection<SystemCredentialsForAppsProto>
+      getSystemCredentialsForApps() {
+    NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getSystemCredentialsForAppsList();
   }
 
   @Override
@@ -742,14 +704,6 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
     return ProtoUtils.convertToProtoFormat(t);
   }
 
-  private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
-    return new ApplicationIdPBImpl(p);
-  }
-
-  private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
-    return ((ApplicationIdPBImpl) t).getProto();
-  }
-
   private NodeAction convertFromProtoFormat(NodeActionProto p) {
     return NodeAction.valueOf(p.name());
   }
@@ -889,5 +843,18 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
   private TokenPBImpl convertFromProtoFormat(TokenProto p) {
     return new TokenPBImpl(p);
   }
+
+  @Override
+  public void setTokenSequenceNo(long tokenSequenceNo) {
+    maybeInitBuilder();
+    this.builder.setTokenSequenceNo(tokenSequenceNo);
+  }
+
+  @Override
+  public long getTokenSequenceNo() {
+    NodeHeartbeatResponseProtoOrBuilder p =
+        this.viaProto ? this.proto : this.builder;
+    return p.getTokenSequenceNo();
+  }
 }
 

+ 68 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java

@@ -18,12 +18,19 @@
 
 package org.apache.hadoop.yarn.server.utils;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
@@ -66,4 +73,65 @@ public class YarnServerBuilderUtils {
     }
     return response;
   }
+
+  /**
+   * Build SystemCredentialsForAppsProto objects.
+   *
+   * @param applicationId Application ID
+   * @param credentials HDFS Tokens
+   * @return systemCredentialsForAppsProto SystemCredentialsForAppsProto
+   */
+  public static SystemCredentialsForAppsProto newSystemCredentialsForAppsProto(
+      ApplicationId applicationId, ByteBuffer credentials) {
+    SystemCredentialsForAppsProto systemCredentialsForAppsProto =
+        SystemCredentialsForAppsProto.newBuilder()
+            .setAppId(ProtoUtils.convertToProtoFormat(applicationId))
+            .setCredentialsForApp(ProtoUtils.BYTE_STRING_INTERNER.intern(
+                ProtoUtils.convertToProtoFormat(credentials.duplicate())))
+            .build();
+    return systemCredentialsForAppsProto;
+  }
+
+  /**
+   * Convert Collection of SystemCredentialsForAppsProto proto objects to a Map
+   * of ApplicationId to ByteBuffer.
+   *
+   * @param systemCredentials List of SystemCredentialsForAppsProto proto
+   *          objects
+   * @return systemCredentialsForApps Map of Application Id to ByteBuffer
+   */
+  public static Map<ApplicationId, ByteBuffer> convertFromProtoFormat(
+      Collection<SystemCredentialsForAppsProto> systemCredentials) {
+
+    Map<ApplicationId, ByteBuffer> systemCredentialsForApps =
+        new HashMap<ApplicationId, ByteBuffer>(systemCredentials.size());
+    for (SystemCredentialsForAppsProto proto : systemCredentials) {
+      systemCredentialsForApps.put(
+          ProtoUtils.convertFromProtoFormat(proto.getAppId()),
+          ProtoUtils.convertFromProtoFormat(proto.getCredentialsForApp()));
+    }
+    return systemCredentialsForApps;
+  }
+
+  /**
+   * Convert Map of Application Id to ByteBuffer to Collection of
+   * SystemCredentialsForAppsProto proto objects.
+   *
+   * @param systemCredentialsForApps Map of Application Id to ByteBuffer
+   * @return systemCredentials List of SystemCredentialsForAppsProto proto
+   *         objects
+   */
+  public static List<SystemCredentialsForAppsProto> convertToProtoFormat(
+      Map<ApplicationId, ByteBuffer> systemCredentialsForApps) {
+    List<SystemCredentialsForAppsProto> systemCredentials =
+        new ArrayList<SystemCredentialsForAppsProto>(
+            systemCredentialsForApps.size());
+    for (Map.Entry<ApplicationId, ByteBuffer> entry : systemCredentialsForApps
+        .entrySet()) {
+      SystemCredentialsForAppsProto proto =
+          newSystemCredentialsForAppsProto(entry.getKey(), entry.getValue());
+      systemCredentials.add(proto);
+    }
+    return systemCredentials;
+  }
 }

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto

@@ -102,6 +102,7 @@ message NodeHeartbeatRequestProto {
   repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5;
   repeated AppCollectorDataProto registering_collectors = 6;
   optional NodeAttributesProto nodeAttributes = 7;
+  optional int64 tokenSequenceNo = 8;
 }
 
 message LogAggregationReportProto {
@@ -131,6 +132,7 @@ message NodeHeartbeatResponseProto {
   // to be used in place of containers_to_decrease
   repeated ContainerProto containers_to_update = 17;
   optional bool areNodeAttributesAcceptedByRM = 18 [default = false];
+  optional int64 tokenSequenceNo = 19;
 }
 
 message ContainerQueuingLimitProto {

+ 52 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java

@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -29,6 +30,12 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -37,7 +44,6 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
@@ -56,6 +62,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl;
+import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -153,10 +160,12 @@ public class TestYarnServerApiClasses {
 
   /**
    * Test NodeHeartbeatResponsePBImpl.
+   *
+   * @throws IOException
    */
 
   @Test
-  public void testNodeHeartbeatResponsePBImpl() {
+  public void testNodeHeartbeatResponsePBImpl() throws IOException {
     NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl();
 
     original.setDiagnosticsMessage("testDiagnosticMessage");
@@ -168,6 +177,29 @@ public class TestYarnServerApiClasses {
     Map<ApplicationId, AppCollectorData> collectors = getCollectors(false);
     original.setAppCollectors(collectors);
 
+    // create token1
+    Text userText1 = new Text("user1");
+    DelegationTokenIdentifier dtId1 = new DelegationTokenIdentifier(userText1,
+        new Text("renewer1"), userText1);
+    final Token<DelegationTokenIdentifier> expectedToken1 =
+        new Token<DelegationTokenIdentifier>(dtId1.getBytes(),
+            "password12".getBytes(), dtId1.getKind(), new Text("service1"));
+
+    Credentials credentials1 = new Credentials();
+    credentials1.addToken(expectedToken1.getService(), expectedToken1);
+
+    DataOutputBuffer dob1 = new DataOutputBuffer();
+    credentials1.writeTokenStorageToStream(dob1);
+
+    ByteBuffer byteBuffer1 =
+        ByteBuffer.wrap(dob1.getData(), 0, dob1.getLength());
+
+    Map<ApplicationId, ByteBuffer> systemCredentials =
+        new HashMap<ApplicationId, ByteBuffer>();
+    systemCredentials.put(getApplicationId(1), byteBuffer1);
+    original.setSystemCredentialsForApps(
+        YarnServerBuilderUtils.convertToProtoFormat(systemCredentials));
+
     NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl(
         original.getProto());
     assertEquals(100, copy.getResponseId());
@@ -178,6 +210,22 @@ public class TestYarnServerApiClasses {
     assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage());
     assertEquals(collectors, copy.getAppCollectors());
     assertEquals(false, copy.getAreNodeLabelsAcceptedByRM());
+    assertEquals(1, copy.getSystemCredentialsForApps().size());
+
+    Credentials credentials1Out = new Credentials();
+    DataInputByteBuffer buf = new DataInputByteBuffer();
+    ByteBuffer buffer =
+        YarnServerBuilderUtils
+            .convertFromProtoFormat(copy.getSystemCredentialsForApps())
+            .get(getApplicationId(1));
+    Assert.assertNotNull(buffer);
+    buffer.rewind();
+    buf.reset(buffer);
+    credentials1Out.readTokenStorageStream(buf);
+    assertEquals(1, credentials1Out.getAllTokens().size());
+    // Ensure token1's password "password12" is available from proto response
+    assertEquals(10,
+        credentials1Out.getAllTokens().iterator().next().getPassword().length);
    }
 
   @Test
@@ -376,7 +424,8 @@ public class TestYarnServerApiClasses {
     AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr);
     if (!hasNullCollectorToken) {
       data.setCollectorToken(
-          Token.newInstance(new byte[0], "kind", new byte[0], "s"));
+          org.apache.hadoop.yarn.api.records.Token.newInstance(new byte[0],
+              "kind", new byte[0], "s"));
     }
     Map<ApplicationId, AppCollectorData> collectorMap =
         new HashMap<>();

+ 12 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java

@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.google.common.collect.Sets;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
@@ -38,28 +37,29 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeAttributeType;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.NodeAttribute;
-import org.apache.hadoop.yarn.api.records.NodeAttributeType;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NMContainerStatusPBImpl;
-
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
     .NodeHeartbeatRequestPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
-
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
+import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.google.common.collect.Sets;
+
 public class TestProtocolRecords {
 
   @Test
@@ -154,14 +154,17 @@ public class TestProtocolRecords {
 
     DataOutputBuffer dob = new DataOutputBuffer();
     app1Cred.writeTokenStorageToStream(dob);
-    ByteBuffer byteBuffer1 = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-    appCredentials.put(ApplicationId.newInstance(1234, 1), byteBuffer1);
-    record.setSystemCredentialsForApps(appCredentials);
+    ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+
+    appCredentials.put(ApplicationId.newInstance(1234, 1), byteBuffer);
+    record.setSystemCredentialsForApps(
+        YarnServerBuilderUtils.convertToProtoFormat(appCredentials));
 
     NodeHeartbeatResponse proto =
         new NodeHeartbeatResponsePBImpl(
           ((NodeHeartbeatResponsePBImpl) record).getProto());
-    Assert.assertEquals(appCredentials, proto.getSystemCredentialsForApps());
+    Assert.assertEquals(appCredentials, YarnServerBuilderUtils
+        .convertFromProtoFormat(proto.getSystemCredentialsForApps()));
   }
 
   @Test

+ 8 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

@@ -91,6 +91,7 @@ import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvid
 import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
 import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
+import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
 import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
 import org.apache.hadoop.yarn.util.YarnVersionInfo;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -158,6 +159,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   private NMNodeAttributesHandler nodeAttributesHandler;
   private NodeLabelsProvider nodeLabelsProvider;
   private NodeAttributesProvider nodeAttributesProvider;
+  private long tokenSequenceNo;
 
   public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
       NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
@@ -1320,6 +1322,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
             }
           }
 
+          request.setTokenSequenceNo(
+              NodeStatusUpdaterImpl.this.tokenSequenceNo);
           response = resourceTracker.nodeHeartbeat(request);
           //get next heartbeat interval from response
           nextHeartBeatInterval = response.getNextHeartBeatInterval();
@@ -1360,7 +1364,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
                       CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
             }
             Map<ApplicationId, ByteBuffer> systemCredentials =
-                response.getSystemCredentialsForApps();
+                YarnServerBuilderUtils.convertFromProtoFormat(
+                    response.getSystemCredentialsForApps());
             if (systemCredentials != null && !systemCredentials.isEmpty()) {
               ((NMContext) context).setSystemCrendentialsForApps(
                   parseCredentials(systemCredentials));
@@ -1404,6 +1409,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
             updateTimelineCollectorData(response);
           }
 
+          NodeStatusUpdaterImpl.this.tokenSequenceNo =
+              response.getTokenSequenceNo();
         } catch (ConnectException e) {
           //catch and throw the exception if tried MAX wait time to connect RM
           dispatcher.getEventHandler().handle(

+ 25 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java

@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.nodemanager;
 
 import static org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils.newNodeHeartbeatResponse;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.EOFException;
@@ -79,6 +78,8 @@ import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl;
+import org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
@@ -112,6 +113,8 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
 
 @SuppressWarnings("rawtypes")
 public class TestNodeStatusUpdater extends NodeManagerTestBase {
@@ -325,16 +328,28 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
     public MyNodeStatusUpdater2(Context context, Dispatcher dispatcher,
         NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
       super(context, dispatcher, healthChecker, metrics);
-      resourceTracker = new MyResourceTracker4(context);
+      InetSocketAddress address = new InetSocketAddress(0);
+      Configuration configuration = new Configuration();
+      Server server = RpcServerFactoryPBImpl.get().getServer(
+          ResourceTracker.class, new MyResourceTracker4(context), address,
+          configuration, null, 1);
+      server.start();
+      this.resourceTracker = (ResourceTracker) RpcClientFactoryPBImpl.get()
+          .getClient(
+          ResourceTracker.class, 1, NetUtils.getConnectAddress(server),
+              configuration);
     }
 
     @Override
-    protected ResourceTracker getRMClient() {
+    protected ResourceTracker getRMClient() throws IOException {
       return resourceTracker;
     }
 
     @Override
     protected void stopRMProxy() {
+      if (this.resourceTracker != null) {
+        RPC.stopProxy(this.resourceTracker);
+      }
       return;
     }
   }
@@ -780,7 +795,8 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
       ByteBuffer byteBuffer1 =
           ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
       appCredentials.put(ApplicationId.newInstance(1234, 1), byteBuffer1);
-      nhResponse.setSystemCredentialsForApps(appCredentials);
+      nhResponse.setSystemCredentialsForApps(
+          YarnServerBuilderUtils.convertToProtoFormat(appCredentials));
       return nhResponse;
     }
 
@@ -1702,7 +1718,8 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
 
   @Test
   public void testConcurrentAccessToSystemCredentials(){
-    final Map<ApplicationId, ByteBuffer> testCredentials = new HashMap<>();
+    final Map<ApplicationId, ByteBuffer> testCredentials =
+        new HashMap<>();
     ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[300]);
     ApplicationId applicationId = ApplicationId.newInstance(123456, 120);
     testCredentials.put(applicationId, byteBuffer);
@@ -1727,8 +1744,9 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
                 NodeHeartbeatResponse nodeHeartBeatResponse =
                     newNodeHeartbeatResponse(0, NodeAction.NORMAL,
                         null, null, null, null, 0);
-                nodeHeartBeatResponse.setSystemCredentialsForApps(
-                    testCredentials);
+                nodeHeartBeatResponse
+                    .setSystemCredentialsForApps(YarnServerBuilderUtils
+                        .convertToProtoFormat(testCredentials));
                 NodeHeartbeatResponseProto proto =
                     ((NodeHeartbeatResponsePBImpl)nodeHeartBeatResponse)
                         .getProto();

+ 25 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java

@@ -18,9 +18,9 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
@@ -81,8 +82,8 @@ public class RMActiveServiceContext {
   private final ConcurrentMap<NodeId, RMNode> inactiveNodes =
       new ConcurrentHashMap<NodeId, RMNode>();
 
-  private final ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
-      new ConcurrentHashMap<ApplicationId, ByteBuffer>();
+  private final ConcurrentMap<ApplicationId, SystemCredentialsForAppsProto> systemCredentials =
+    new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>();
 
   private boolean isWorkPreservingRecoveryEnabled;
 
@@ -124,6 +125,8 @@ public class RMActiveServiceContext {
   private ProxyCAManager proxyCAManager;
   private VolumeManager volumeManager;
 
+  private AtomicLong tokenSequenceNo = new AtomicLong(1);
+
   public RMActiveServiceContext() {
     queuePlacementManager = new PlacementManager();
   }
@@ -509,7 +512,8 @@ public class RMActiveServiceContext {
 
   @Private
   @Unstable
-  public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
+  public ConcurrentMap<ApplicationId, SystemCredentialsForAppsProto>
+      getSystemCredentialsForApps() {
     return systemCredentials;
   }
   
@@ -583,4 +587,21 @@ public class RMActiveServiceContext {
   public void setVolumeManager(VolumeManager volumeManager) {
     this.volumeManager = volumeManager;
   }
+
+  /**
+   * Get token sequence no.
+   *
+   * @return the tokenSequenceNo
+   */
+  public Long getTokenSequenceNo() {
+    return tokenSequenceNo.get();
+  }
+
+  /**
+   * Increment token sequence no.
+   *
+   */
+  public void incrTokenSequenceNo() {
+    this.tokenSequenceNo.incrementAndGet();
+  }
 }

+ 8 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java

@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.conf.Configuration;
@@ -29,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.ConfigurationProvider;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
@@ -72,8 +72,9 @@ public interface RMContext extends ApplicationMasterServiceContext {
   RMStateStore getStateStore();
 
   ConcurrentMap<ApplicationId, RMApp> getRMApps();
-  
-  ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps();
+
+  ConcurrentMap<ApplicationId, SystemCredentialsForAppsProto>
+      getSystemCredentialsForApps();
 
   ConcurrentMap<NodeId, RMNode> getInactiveRMNodes();
 
@@ -198,4 +199,8 @@ public interface RMContext extends ApplicationMasterServiceContext {
   VolumeManager getVolumeManager();
 
   void setVolumeManager(VolumeManager volumeManager);
+
+  long getTokenSequenceNo();
+
+  void incrTokenSequenceNo();
 }

+ 13 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java

@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.logging.Log;
@@ -35,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.ConfigurationProvider;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
@@ -572,7 +572,8 @@ public class RMContextImpl implements RMContext {
     activeServiceContext.setSystemClock(clock);
   }
 
-  public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
+  public ConcurrentMap<ApplicationId, SystemCredentialsForAppsProto>
+      getSystemCredentialsForApps() {
     return activeServiceContext.getSystemCredentialsForApps();
   }
 
@@ -666,4 +667,14 @@ public class RMContextImpl implements RMContext {
   public NodeAttributesManager getNodeAttributesManager() {
     return activeServiceContext.getNodeAttributesManager();
   }
+
+  @Override
+  public long getTokenSequenceNo() {
+    return this.activeServiceContext.getTokenSequenceNo();
+  }
+
+  @Override
+  public void incrTokenSequenceNo() {
+    this.activeServiceContext.incrTokenSequenceNo();
+  }
 }

+ 26 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

@@ -21,13 +21,11 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
@@ -639,11 +637,7 @@ public class ResourceTrackerService extends AbstractService implements
 
     populateKeys(request, nodeHeartBeatResponse);
 
-    ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
-        rmContext.getSystemCredentialsForApps();
-    if (!systemCredentials.isEmpty()) {
-      nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);
-    }
+    populateTokenSequenceNo(request, nodeHeartBeatResponse);
 
     if (timelineV2Enabled) {
       // Return collectors' map that NM needs to know
@@ -952,4 +946,29 @@ public class ResourceTrackerService extends AbstractService implements
   public Server getServer() {
     return this.server;
   }
+
+  private void populateTokenSequenceNo(NodeHeartbeatRequest request,
+      NodeHeartbeatResponse nodeHeartBeatResponse) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Token sequence no received from heartbeat request: "
+          + request.getTokenSequenceNo() + ". Current token sequeunce no: "
+          + this.rmContext.getTokenSequenceNo()
+          + ". System credentials for apps size: "
+          + rmContext.getSystemCredentialsForApps().size());
+    }
+    if(request.getTokenSequenceNo() != this.rmContext.getTokenSequenceNo()) {
+      if (!rmContext.getSystemCredentialsForApps().isEmpty()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              "Sending System credentials for apps as part of NodeHeartbeat "
+                  + "response.");
+        }
+        nodeHeartBeatResponse
+            .setSystemCredentialsForApps(
+                rmContext.getSystemCredentialsForApps().values());
+      }
+    }
+    nodeHeartBeatResponse.setTokenSequenceNo(
+        this.rmContext.getTokenSequenceNo());
+  }
 }

+ 15 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java

@@ -64,10 +64,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -637,7 +639,7 @@ public class DelegationTokenRenewer extends AbstractService {
 
   // Request new hdfs token if the token is about to expire, and remove the old
   // token from the tokenToRenew list
-  private void requestNewHdfsDelegationTokenIfNeeded(
+  void requestNewHdfsDelegationTokenIfNeeded(
       final DelegationTokenToRenew dttr) throws IOException,
       InterruptedException {
 
@@ -679,6 +681,7 @@ public class DelegationTokenRenewer extends AbstractService {
       Collection<ApplicationId> referringAppIds,
       String user, boolean shouldCancelAtEnd) throws IOException,
       InterruptedException {
+    boolean incrTokenSequenceNo = false;
     if (!hasProxyUserPrivileges) {
       LOG.info("RM proxy-user privilege is not enabled. Skip requesting hdfs tokens.");
       return;
@@ -703,14 +706,24 @@ public class DelegationTokenRenewer extends AbstractService {
             appTokens.get(applicationId).add(tokenToRenew);
           }
           LOG.info("Received new token " + token);
+          incrTokenSequenceNo = true;
         }
       }
     }
+
+    if(incrTokenSequenceNo) {
+      this.rmContext.incrTokenSequenceNo();
+    }
+
     DataOutputBuffer dob = new DataOutputBuffer();
     credentials.writeTokenStorageToStream(dob);
     ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
     for (ApplicationId applicationId : referringAppIds) {
-      rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer);
+      SystemCredentialsForAppsProto systemCredentialsForAppsProto =
+          YarnServerBuilderUtils.newSystemCredentialsForAppsProto(applicationId,
+              byteBuffer);
+      rmContext.getSystemCredentialsForApps().put(applicationId,
+          systemCredentialsForAppsProto);
     }
   }
 

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java

@@ -68,6 +68,7 @@ public class MockNM {
   private Map<ApplicationId, AppCollectorData> registeringCollectors
       = new ConcurrentHashMap<>();
   private Set<NodeLabel> nodeLabels;
+  private long tokenSequenceNo;
 
   public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) {
     // scale vcores based on the requested memory
@@ -278,6 +279,7 @@ public class MockNM {
     req.setLastKnownNMTokenMasterKey(this.currentNMTokenMasterKey);
 
     req.setRegisteringCollectors(this.registeringCollectors);
+    req.setTokenSequenceNo(this.tokenSequenceNo);
 
     NodeHeartbeatResponse heartbeatResponse =
         resourceTracker.nodeHeartbeat(req);
@@ -302,6 +304,7 @@ public class MockNM {
       capability = Resources.clone(newResource);
     }
 
+    this.tokenSequenceNo = heartbeatResponse.getTokenSequenceNo();
     return heartbeatResponse;
   }
 

+ 43 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java

@@ -119,11 +119,13 @@ public class MockNodes {
     private ResourceUtilization containersUtilization;
     private ResourceUtilization nodeUtilization;
     private Resource physicalResource;
+    private RMContext rmContext;
 
-    public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
+    MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
         Resource perNode, String rackName, String healthReport,
-        long lastHealthReportTime, int cmdPort, String hostName, NodeState state,
-        Set<String> labels, ResourceUtilization containersUtilization,
+        long lastHealthReportTime, int cmdPort, String hostName,
+        NodeState state, Set<String> labels,
+        ResourceUtilization containersUtilization,
         ResourceUtilization nodeUtilization, Resource pPhysicalResource) {
       this.nodeId = nodeId;
       this.nodeAddr = nodeAddr;
@@ -141,6 +143,18 @@ public class MockNodes {
       this.physicalResource = pPhysicalResource;
     }
 
+    public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
+        Resource perNode, String rackName, String healthReport,
+        long lastHealthReportTime, int cmdPort, String hostName,
+        NodeState state, Set<String> labels,
+        ResourceUtilization containersUtilization,
+        ResourceUtilization nodeUtilization, Resource pPhysicalResource,
+        RMContext rmContext) {
+      this(nodeId, nodeAddr, httpAddress, perNode, rackName, healthReport,
+          lastHealthReportTime, cmdPort, hostName, state, labels,
+          containersUtilization, nodeUtilization, pPhysicalResource);
+      this.rmContext = rmContext;
+    }
     @Override
     public NodeId getNodeID() {
       return this.nodeId;
@@ -298,7 +312,7 @@ public class MockNodes {
 
     @Override
     public RMContext getRMContext() {
-      return null;
+      return this.rmContext;
     }
 
     @Override
@@ -343,6 +357,26 @@ public class MockNodes {
         containersUtilization, nodeUtilization, physicalResource);
   }
 
+  private static RMNode buildRMNode(int rack, final Resource perNode,
+      NodeState state, String httpAddr, int hostnum, String hostName, int port,
+      Set<String> labels, ResourceUtilization containersUtilization,
+      ResourceUtilization nodeUtilization, Resource physicalResource,
+      RMContext rmContext) {
+    final String rackName = "rack" + rack;
+    final int nid = hostnum;
+    final String nodeAddr = hostName + ":" + nid;
+    if (hostName == null) {
+      hostName = "host" + nid;
+    }
+    final NodeId nodeID = NodeId.newInstance(hostName, port);
+
+    final String httpAddress = httpAddr;
+    String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe";
+    return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode, rackName,
+        healthReport, 0, nid, hostName, state, labels, containersUtilization,
+        nodeUtilization, physicalResource, rmContext);
+  }
+
   public static RMNode nodeInfo(int rack, final Resource perNode,
       NodeState state) {
     return buildRMNode(rack, perNode, state, "N/A");
@@ -371,4 +405,9 @@ public class MockNodes {
     return buildRMNode(rack, perNode, NodeState.RUNNING, "localhost:0", hostnum, hostName, port);
   }
 
+  public static RMNode newNodeInfo(int rack, final Resource perNode,
+      int hostnum, String hostName, int port, RMContext rmContext) {
+    return buildRMNode(rack, perNode, NodeState.RUNNING, "localhost:0", hostnum,
+        hostName, port, null, null, null, null, rmContext);
+  }
 }

+ 157 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java

@@ -21,12 +21,17 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.net.ServerSocketUtil;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
 import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
 import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.ServerRMProxy;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.NodeEventDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.FileSystemNodeAttributeStore;
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -37,6 +42,7 @@ import static org.mockito.Mockito.when;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -47,6 +53,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -59,10 +67,13 @@ import javax.xml.transform.OutputKeys;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.LocalConfigurationProvider;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@@ -83,9 +94,13 @@ import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.event.InlineDispatcher;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.nodelabels.AttributeValue;
 import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
 import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -105,6 +120,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
@@ -112,10 +128,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
 import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.YarnVersionInfo;
 import org.junit.After;
@@ -2891,4 +2911,141 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     public void close() {
     }
   }
+
+  @Test(timeout = 5000)
+  public void testSystemCredentialsAfterTokenSequenceNoChange()
+      throws Exception {
+
+    Configuration conf = new Configuration();
+
+    RMContext rmContext = mock(RMContextImpl.class);
+
+    Dispatcher dispatcher = new InlineDispatcher();
+    when(rmContext.getDispatcher()).thenReturn(dispatcher);
+
+    NodeId nodeId = NodeId.newInstance("localhost", 1234);
+    ConcurrentMap<NodeId, RMNode> rmNodes =
+        new ConcurrentHashMap<NodeId, RMNode>();
+    RMNode rmNode = MockNodes.newNodeInfo(1, Resource.newInstance(1024, 1), 1,
+        "localhost", 1234, rmContext);
+    rmNodes.put(nodeId, rmNode);
+    when(rmContext.getRMNodes()).thenReturn(rmNodes);
+
+    ConcurrentMap<NodeId, RMNode> inactiveNodes =
+        new ConcurrentHashMap<NodeId, RMNode>();
+    when(rmContext.getInactiveRMNodes()).thenReturn(inactiveNodes);
+    when(rmContext.getConfigurationProvider())
+        .thenReturn(new LocalConfigurationProvider());
+
+    dispatcher.register(SchedulerEventType.class,
+        new InlineDispatcher.EmptyEventHandler());
+    dispatcher.register(RMNodeEventType.class,
+        new NodeEventDispatcher(rmContext));
+
+    NMLivelinessMonitor nmLivelinessMonitor =
+        new NMLivelinessMonitor(dispatcher);
+    nmLivelinessMonitor.init(conf);
+    nmLivelinessMonitor.start();
+    NodesListManager nodesListManager = new NodesListManager(rmContext);
+    nodesListManager.init(conf);
+    RMContainerTokenSecretManager containerTokenSecretManager =
+        new RMContainerTokenSecretManager(conf);
+    containerTokenSecretManager.start();
+    NMTokenSecretManagerInRM nmTokenSecretManager =
+        new NMTokenSecretManagerInRM(conf);
+    nmTokenSecretManager.start();
+    ResourceTrackerService resourceTrackerService = new ResourceTrackerService(
+        rmContext, nodesListManager, nmLivelinessMonitor,
+        containerTokenSecretManager, nmTokenSecretManager);
+
+    resourceTrackerService.init(conf);
+    resourceTrackerService.start();
+
+    RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+
+    RegisterNodeManagerRequest request =
+        recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
+    request.setNodeId(nodeId);
+    request.setHttpPort(1234);
+    request.setResource(BuilderUtils.newResource(1024, 1));
+    resourceTrackerService.registerNodeManager(request);
+
+    org.apache.hadoop.yarn.server.api.records.NodeStatus nodeStatus =
+        recordFactory.newRecordInstance(
+            org.apache.hadoop.yarn.server.api.records.NodeStatus.class);
+    nodeStatus.setNodeId(nodeId);
+    nodeStatus.setResponseId(0);
+    nodeStatus.setNodeHealthStatus(
+        recordFactory.newRecordInstance(NodeHealthStatus.class));
+    nodeStatus.getNodeHealthStatus().setIsNodeHealthy(true);
+
+    NodeHeartbeatRequest request1 =
+        recordFactory.newRecordInstance(NodeHeartbeatRequest.class);
+    request1.setNodeStatus(nodeStatus);
+
+    // Set NM's token sequence no as 1
+    request1.setTokenSequenceNo(1);
+
+    // Set RM's token sequence no as 1
+    when(rmContext.getTokenSequenceNo()).thenReturn((long) 1);
+
+    // Populate SystemCredentialsForApps
+    final ApplicationId appId = ApplicationId.newInstance(1234, 1);
+    Credentials app1Cred = new Credentials();
+
+    Token<DelegationTokenIdentifier> token =
+        new Token<DelegationTokenIdentifier>();
+    token.setKind(new Text("kind1"));
+    app1Cred.addToken(new Text("token1"), token);
+    Token<DelegationTokenIdentifier> token2 =
+        new Token<DelegationTokenIdentifier>();
+    token2.setKind(new Text("kind2"));
+    app1Cred.addToken(new Text("token2"), token2);
+
+    DataOutputBuffer dob = new DataOutputBuffer();
+    app1Cred.writeTokenStorageToStream(dob);
+    ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+
+    SystemCredentialsForAppsProto systemCredentialsForAppsProto =
+        YarnServerBuilderUtils.newSystemCredentialsForAppsProto(appId,
+            byteBuffer);
+
+    ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto> systemCredentialsForApps =
+        new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>(1);
+
+    systemCredentialsForApps.put(appId, systemCredentialsForAppsProto);
+
+    when(rmContext.getSystemCredentialsForApps())
+        .thenReturn(systemCredentialsForApps);
+
+    // first ping
+    NodeHeartbeatResponse response =
+        resourceTrackerService.nodeHeartbeat(request1);
+
+    // Though SystemCredentialsForApps size is 1, it is not being sent as part
+    // of response as there is no difference between NM's and RM's token
+    // sequence no
+    assertEquals(1, rmContext.getTokenSequenceNo());
+    assertEquals(1, rmContext.getSystemCredentialsForApps().size());
+    assertEquals(1, response.getTokenSequenceNo());
+    assertEquals(0, response.getSystemCredentialsForApps().size());
+
+    // Set RM's token sequence no as 2
+    when(rmContext.getTokenSequenceNo()).thenReturn((long) 2);
+
+    // Ensure new heartbeat has been sent to avoid duplicate issues
+    nodeStatus.setResponseId(1);
+    request1.setNodeStatus(nodeStatus);
+
+    // second ping
+    NodeHeartbeatResponse response1 =
+        resourceTrackerService.nodeHeartbeat(request1);
+
+    // Since NM's and RM's token sequence no is different, response should
+    // contain SystemCredentialsForApps
+    assertEquals(2, response1.getTokenSequenceNo());
+    assertEquals(1, response1.getSystemCredentialsForApps().size());
+
+    resourceTrackerService.close();
+  }
 }

+ 98 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java

@@ -35,6 +35,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.concurrent.BlockingQueue;
@@ -80,7 +81,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@@ -95,12 +98,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer.DelegationTokenToRenew;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -146,8 +151,13 @@ public class TestDelegationTokenRenewer {
     @Override
     public long renew(Token<?> t, Configuration conf) throws IOException {
       if ( !(t instanceof MyToken)) {
-        // renew in 3 seconds
-        return System.currentTimeMillis() + 3000;
+        if(conf.get("override_token_expire_time") != null) {
+          return System.currentTimeMillis() +
+              Long.parseLong(conf.get("override_token_expire_time"));
+        } else {
+          // renew in 3 seconds
+          return System.currentTimeMillis() + 3000;
+        }
       }
       MyToken token = (MyToken)t;
       if(token.isCanceled()) {
@@ -201,6 +211,7 @@ public class TestDelegationTokenRenewer {
     counter = new AtomicInteger(0);
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
         "kerberos");
+    conf.set("override_token_expire_time", "3000");
     UserGroupInformation.setConfiguration(conf);
     eventQueue = new LinkedBlockingQueue<Event>();
     dispatcher = new AsyncDispatcher(eventQueue);
@@ -209,7 +220,7 @@ public class TestDelegationTokenRenewer {
     RMContext mockContext =  mock(RMContext.class);
     ClientRMService mockClientRMService = mock(ClientRMService.class);
     when(mockContext.getSystemCredentialsForApps()).thenReturn(
-      new ConcurrentHashMap<ApplicationId, ByteBuffer>());
+        new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>());
     when(mockContext.getDelegationTokenRenewer()).thenReturn(
         delegationTokenRenewer);
     when(mockContext.getDispatcher()).thenReturn(dispatcher);
@@ -581,7 +592,7 @@ public class TestDelegationTokenRenewer {
         createNewDelegationTokenRenewer(lconf, counter);
     RMContext mockContext = mock(RMContext.class);
     when(mockContext.getSystemCredentialsForApps()).thenReturn(
-      new ConcurrentHashMap<ApplicationId, ByteBuffer>());
+        new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>());
     ClientRMService mockClientRMService = mock(ClientRMService.class);
     when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
     when(mockContext.getDelegationTokenRenewer()).thenReturn(
@@ -661,7 +672,7 @@ public class TestDelegationTokenRenewer {
         createNewDelegationTokenRenewer(conf, counter);
     RMContext mockContext = mock(RMContext.class);
     when(mockContext.getSystemCredentialsForApps()).thenReturn(
-      new ConcurrentHashMap<ApplicationId, ByteBuffer>());
+        new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>());
     ClientRMService mockClientRMService = mock(ClientRMService.class);
     when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
     when(mockContext.getDelegationTokenRenewer()).thenReturn(
@@ -766,7 +777,7 @@ public class TestDelegationTokenRenewer {
          createNewDelegationTokenRenewer(conf, counter);
     RMContext mockContext = mock(RMContext.class);
     when(mockContext.getSystemCredentialsForApps()).thenReturn(
-      new ConcurrentHashMap<ApplicationId, ByteBuffer>());
+        new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>());
     ClientRMService mockClientRMService = mock(ClientRMService.class);
     when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
     InetSocketAddress sockAddr =
@@ -825,7 +836,7 @@ public class TestDelegationTokenRenewer {
         createNewDelegationTokenRenewer(conf, counter);           
     RMContext mockContext = mock(RMContext.class);
     when(mockContext.getSystemCredentialsForApps()).thenReturn(
-      new ConcurrentHashMap<ApplicationId, ByteBuffer>());
+        new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>());
     ClientRMService mockClientRMService = mock(ClientRMService.class);         
     when(mockContext.getClientRMService()).thenReturn(mockClientRMService);    
     InetSocketAddress sockAddr =                                               
@@ -890,7 +901,7 @@ public class TestDelegationTokenRenewer {
   }
 
 
-  @Test (timeout = 20000)
+  @Test(timeout = 30000)
   public void testReplaceExpiringDelegationToken() throws Exception {
     conf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true);
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
@@ -969,8 +980,14 @@ public class TestDelegationTokenRenewer {
         new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
     nm1.registerNode();
     NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
+
+    NodeHeartbeatResponse proto = new NodeHeartbeatResponsePBImpl(
+        ((NodeHeartbeatResponsePBImpl) response).getProto());
+
     ByteBuffer tokenBuffer =
-        response.getSystemCredentialsForApps().get(app.getApplicationId());
+        YarnServerBuilderUtils
+            .convertFromProtoFormat(proto.getSystemCredentialsForApps())
+            .get(app.getApplicationId());
     Assert.assertNotNull(tokenBuffer);
     Credentials appCredentials = new Credentials();
     DataInputByteBuffer buf = new DataInputByteBuffer();
@@ -1062,8 +1079,14 @@ public class TestDelegationTokenRenewer {
         new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
     nm1.registerNode();
     NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
+
+    NodeHeartbeatResponse proto = new NodeHeartbeatResponsePBImpl(
+        ((NodeHeartbeatResponsePBImpl) response).getProto());
+
     ByteBuffer tokenBuffer =
-        response.getSystemCredentialsForApps().get(app.getApplicationId());
+        YarnServerBuilderUtils
+            .convertFromProtoFormat(proto.getSystemCredentialsForApps())
+            .get(app.getApplicationId());
     Assert.assertNotNull(tokenBuffer);
     Credentials appCredentials = new Credentials();
     DataInputByteBuffer buf = new DataInputByteBuffer();
@@ -1117,8 +1140,14 @@ public class TestDelegationTokenRenewer {
         new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
     nm1.registerNode();
     NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
+
+    NodeHeartbeatResponse proto = new NodeHeartbeatResponsePBImpl(
+        ((NodeHeartbeatResponsePBImpl) response).getProto());
+
     ByteBuffer tokenBuffer =
-        response.getSystemCredentialsForApps().get(app.getApplicationId());
+        YarnServerBuilderUtils
+            .convertFromProtoFormat(proto.getSystemCredentialsForApps())
+            .get(app.getApplicationId());
     Assert.assertNotNull(tokenBuffer);
     Credentials appCredentials = new Credentials();
     DataInputByteBuffer buf = new DataInputByteBuffer();
@@ -1430,7 +1459,7 @@ public class TestDelegationTokenRenewer {
     DelegationTokenRenewer dtr = createNewDelegationTokenRenewer(conf, counter);
     RMContext mockContext = mock(RMContext.class);
     when(mockContext.getSystemCredentialsForApps()).thenReturn(
-        new ConcurrentHashMap<ApplicationId, ByteBuffer>());
+        new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>());
     ClientRMService mockClientRMService = mock(ClientRMService.class);
     when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
     InetSocketAddress sockAddr =
@@ -1444,4 +1473,61 @@ public class TestDelegationTokenRenewer {
     delegationTokenRenewer.applicationFinished(
         BuilderUtils.newApplicationId(0, 1));
   }
+
+  @Test(timeout = 10000)
+  public void testTokenSequenceNoAfterNewTokenAndRenewal() throws Exception {
+    conf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true);
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        "kerberos");
+    UserGroupInformation.setConfiguration(conf);
+
+    final Credentials credsx = new Credentials();
+
+    DelegationTokenIdentifier dtId1 = new DelegationTokenIdentifier(
+        new Text("user1"), new Text("renewer"), new Text("user1"));
+    final Token<DelegationTokenIdentifier> expectedToken =
+        new Token<DelegationTokenIdentifier>(dtId1.getBytes(),
+            "password2".getBytes(), dtId1.getKind(), new Text("service2"));
+
+    // fire up the renewer
+    final DelegationTokenRenewer dtr = new DelegationTokenRenewer() {
+      @Override
+      protected Token<?>[] obtainSystemTokensForUser(String user,
+          final Credentials credentials) throws IOException {
+        credentials.addToken(expectedToken.getService(), expectedToken);
+        return new Token<?>[] {expectedToken};
+      }
+    };
+
+    RMContext mockContext = mock(RMContext.class);
+    when(mockContext.getSystemCredentialsForApps()).thenReturn(
+        new ConcurrentHashMap<ApplicationId, SystemCredentialsForAppsProto>());
+    ClientRMService mockClientRMService = mock(ClientRMService.class);
+    when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
+    InetSocketAddress sockAddr =
+        InetSocketAddress.createUnresolved("localhost", 1234);
+    when(mockClientRMService.getBindAddress()).thenReturn(sockAddr);
+    dtr.setRMContext(mockContext);
+    when(mockContext.getDelegationTokenRenewer()).thenReturn(dtr);
+    dtr.init(conf);
+    dtr.start();
+
+    final ApplicationId appId1 = ApplicationId.newInstance(1234, 1);
+
+    Collection<ApplicationId> appIds = new ArrayList<ApplicationId>(1);
+    appIds.add(appId1);
+
+    dtr.addApplicationSync(appId1, credsx, false, "user1");
+
+    // Ensure incrTokenSequenceNo has been called for new token request
+    Mockito.verify(mockContext, Mockito.times(1)).incrTokenSequenceNo();
+
+    DelegationTokenToRenew dttr = new DelegationTokenToRenew(appIds,
+        expectedToken, conf, 1000, false, "user1");
+
+    dtr.requestNewHdfsDelegationTokenIfNeeded(dttr);
+
+    // Ensure incrTokenSequenceNo has been called for token renewal as well.
+    Mockito.verify(mockContext, Mockito.times(2)).incrTokenSequenceNo();
+  }
 }