Bladeren bron

YARN-2704. Changed ResourceManager to optionally obtain tokens itself for the sake of localization and log-aggregation for long-running services. Contributed by Jian He.

(cherry picked from commit a16d022ca4313a41425c8e97841c841a2d6f2f54)
Vinod Kumar Vavilapalli 10 jaren geleden
bovenliggende
commit
0ad33e1483
24 gewijzigde bestanden met toevoegingen van 701 en 124 verwijderingen
  1. 4 0
      hadoop-yarn-project/CHANGES.txt
  2. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  3. 15 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  4. 9 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
  5. 54 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
  6. 6 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
  7. 40 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
  8. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
  9. 17 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
  10. 30 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
  11. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
  12. 30 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
  13. 15 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
  14. 2 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
  15. 20 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
  16. 2 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
  17. 10 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
  18. 23 17
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
  19. 4 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
  20. 3 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
  21. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
  22. 10 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
  23. 198 68
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
  24. 190 18
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java

+ 4 - 0
hadoop-yarn-project/CHANGES.txt

@@ -370,6 +370,10 @@ Release 2.6.0 - UNRELEASED
     YARN-2703. Added logUploadedTime into LogValue for better display. (Xuan Gong
     via zjshen)
 
+    YARN-2704. Changed ResourceManager to optionally obtain tokens itself for the
+    sake of localization and log-aggregation for long-running services. (Jian He
+    via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -695,6 +695,10 @@ public class YarnConfiguration extends Configuration {
       RM_PREFIX + "delegation-token-renewer.thread-count";
   public static final int DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = 50;
 
+  public static final String RM_PROXY_USER_PRIVILEGES_ENABLED = RM_PREFIX
+      + "proxy-user-privileges.enabled";
+  public static boolean DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED = false;
+
   /** Whether to enable log aggregation */
   public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
       + "log-aggregation-enable";

+ 15 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -553,6 +553,21 @@
     <value>30000</value>
   </property>
 
+  <property>
+  <description>If true, ResourceManager will have proxy-user privileges.
+    Use case: In a secure cluster, YARN requires the user hdfs delegation-tokens to
+    do localization and log-aggregation on behalf of the user. If this is set to true,
+    ResourceManager is able to request new hdfs delegation tokens on behalf of
+    the user. This is needed by long-running-service, because the hdfs tokens
+    will eventually expire and YARN requires new valid tokens to do localization
+    and log-aggregation. Note that to enable this use case, the corresponding
+    HDFS NameNode has to configure ResourceManager as the proxy-user so that
+    ResourceManager can itself ask for new tokens on behalf of the user when
+    tokens are past their max-life-time.</description>
+    <name>yarn.resourcemanager.proxy-user-privileges.enabled</name>
+    <value>false</value>
+  </property>
+
   <property>
     <description>Interval for the roll over for the master key used to generate
         application tokens

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java

@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords;
 
+import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -58,4 +60,11 @@ public interface NodeHeartbeatResponse {
   String getDiagnosticsMessage();
 
   void setDiagnosticsMessage(String diagnosticsMessage);
+
+  // Credentials (i.e. hdfs tokens) needed by NodeManagers for application
+  // localizations and logAggreations.
+  Map<ApplicationId, ByteBuffer> getSystemCredentialsForApps();
+
+  void setSystemCredentialsForApps(
+      Map<ApplicationId, ByteBuffer> systemCredentials);
 }

+ 54 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java

@@ -18,21 +18,26 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.List;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
@@ -49,6 +54,8 @@ public class NodeHeartbeatResponsePBImpl extends
   private List<ContainerId> containersToCleanup = null;
   private List<ContainerId> containersToBeRemovedFromNM = null;
   private List<ApplicationId> applicationsToCleanup = null;
+  private Map<ApplicationId, ByteBuffer> systemCredentials = null;
+
   private MasterKey containerTokenMasterKey = null;
   private MasterKey nmTokenMasterKey = null;
   
@@ -62,7 +69,7 @@ public class NodeHeartbeatResponsePBImpl extends
   }
   
   public NodeHeartbeatResponseProto getProto() {
-      mergeLocalToProto();
+    mergeLocalToProto();
     proto = viaProto ? proto : builder.build();
     viaProto = true;
     return proto;
@@ -86,6 +93,19 @@ public class NodeHeartbeatResponsePBImpl extends
       builder.setNmTokenMasterKey(
           convertToProtoFormat(this.nmTokenMasterKey));
     }
+    if (this.systemCredentials != null) {
+      addSystemCredentialsToProto();
+    }
+  }
+
+  private void addSystemCredentialsToProto() {
+    maybeInitBuilder();
+    builder.clearSystemCredentialsForApps();
+    for (Map.Entry<ApplicationId, ByteBuffer> entry : systemCredentials.entrySet()) {
+      builder.addSystemCredentialsForApps(SystemCredentialsForAppsProto.newBuilder()
+        .setAppId(convertToProtoFormat(entry.getKey()))
+        .setCredentialsForApp(ProtoUtils.convertToProtoFormat(entry.getValue())));
+    }
   }
 
   private void mergeLocalToProto() {
@@ -387,6 +407,38 @@ public class NodeHeartbeatResponsePBImpl extends
     builder.addAllApplicationsToCleanup(iterable);
   }
 
+
+  @Override
+  public Map<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
+    if (this.systemCredentials != null) {
+      return this.systemCredentials;
+    }
+    initSystemCredentials();
+    return systemCredentials;
+  }
+
+  private void initSystemCredentials() {
+    NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+    List<SystemCredentialsForAppsProto> list = p.getSystemCredentialsForAppsList();
+    this.systemCredentials = new HashMap<ApplicationId, ByteBuffer> ();
+    for (SystemCredentialsForAppsProto c : list) {
+      ApplicationId appId = convertFromProtoFormat(c.getAppId());
+      ByteBuffer byteBuffer = ProtoUtils.convertFromProtoFormat(c.getCredentialsForApp());
+      this.systemCredentials.put(appId, byteBuffer);
+    }
+  }
+
+  @Override
+  public void setSystemCredentialsForApps(
+      Map<ApplicationId, ByteBuffer> systemCredentials) {
+    if (systemCredentials == null || systemCredentials.isEmpty()) {
+      return;
+    }
+    maybeInitBuilder();
+    this.systemCredentials = new HashMap<ApplicationId, ByteBuffer>();
+    this.systemCredentials.putAll(systemCredentials);
+  }
+
   @Override
   public long getNextHeartBeatInterval() {
     NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto

@@ -59,6 +59,12 @@ message NodeHeartbeatResponseProto {
   optional int64 nextHeartBeatInterval = 7;
   optional string diagnostics_message = 8;
   repeated ContainerIdProto containers_to_be_removed_from_nm = 9;
+  repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10;
+}
+
+message SystemCredentialsForAppsProto {
+  optional ApplicationIdProto appId = 1;
+  optional bytes credentialsForApp = 2;
 }
 
 message NMContainerStatusProto {

+ 40 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java

@@ -18,9 +18,18 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@@ -29,10 +38,10 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
-import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NMContainerStatusPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
+import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -93,4 +102,33 @@ public class TestProtocolRecords {
     Assert.assertEquals(1, requestProto.getRunningApplications().size());
     Assert.assertEquals(appId, requestProto.getRunningApplications().get(0)); 
   }
+
+  @Test
+  public void testNodeHeartBeatResponse() throws IOException {
+    NodeHeartbeatResponse record =
+        Records.newRecord(NodeHeartbeatResponse.class);
+    Map<ApplicationId, ByteBuffer> appCredentials =
+        new HashMap<ApplicationId, ByteBuffer>();
+    Credentials app1Cred = new Credentials();
+
+    Token<DelegationTokenIdentifier> token1 =
+        new Token<DelegationTokenIdentifier>();
+    token1.setKind(new Text("kind1"));
+    app1Cred.addToken(new Text("token1"), token1);
+    Token<DelegationTokenIdentifier> token2 =
+        new Token<DelegationTokenIdentifier>();
+    token2.setKind(new Text("kind2"));
+    app1Cred.addToken(new Text("token2"), token2);
+
+    DataOutputBuffer dob = new DataOutputBuffer();
+    app1Cred.writeTokenStorageToStream(dob);
+    ByteBuffer byteBuffer1 = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+    appCredentials.put(ApplicationId.newInstance(1234, 1), byteBuffer1);
+    record.setSystemCredentialsForApps(appCredentials);
+
+    NodeHeartbeatResponse proto =
+        new NodeHeartbeatResponsePBImpl(
+          ((NodeHeartbeatResponsePBImpl) record).getProto());
+    Assert.assertEquals(appCredentials, proto.getSystemCredentialsForApps());
+  }
 }

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java

@@ -18,8 +18,10 @@
 
 package org.apache.hadoop.yarn.server.nodemanager;
 
+import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -54,6 +56,8 @@ public interface Context {
 
   ConcurrentMap<ApplicationId, Application> getApplications();
 
+  Map<ApplicationId, Credentials> getSystemCredentialsForApps();
+
   ConcurrentMap<ContainerId, Container> getContainers();
 
   NMContainerTokenSecretManager getContainerTokenSecretManager();

+ 17 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java

@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.server.nodemanager;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -32,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -313,6 +316,10 @@ public class NodeManager extends CompositeService
     private NodeId nodeId = null;
     protected final ConcurrentMap<ApplicationId, Application> applications =
         new ConcurrentHashMap<ApplicationId, Application>();
+
+    private Map<ApplicationId, Credentials> systemCredentials =
+        new HashMap<ApplicationId, Credentials>();
+
     protected final ConcurrentMap<ContainerId, Container> containers =
         new ConcurrentSkipListMap<ContainerId, Container>();
 
@@ -420,6 +427,16 @@ public class NodeManager extends CompositeService
     public void setDecommissioned(boolean isDecommissioned) {
       this.isDecommissioned = isDecommissioned;
     }
+
+    @Override
+    public Map<ApplicationId, Credentials> getSystemCredentialsForApps() {
+      return systemCredentials;
+    }
+
+    public void setSystemCrendentials(
+        Map<ApplicationId, Credentials> systemCredentials) {
+      this.systemCredentials = systemCredentials;
+    }
   }
 
 

+ 30 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager;
 
 import java.io.IOException;
 import java.net.ConnectException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -36,7 +37,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.VersionUtil;
@@ -62,6 +65,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -525,6 +529,25 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     return this.rmIdentifier;
   }
 
+  private static Map<ApplicationId, Credentials> parseCredentials(
+      Map<ApplicationId, ByteBuffer> systemCredentials) throws IOException {
+    Map<ApplicationId, Credentials> map =
+        new HashMap<ApplicationId, Credentials>();
+    for (Map.Entry<ApplicationId, ByteBuffer> entry : systemCredentials.entrySet()) {
+      Credentials credentials = new Credentials();
+      DataInputByteBuffer buf = new DataInputByteBuffer();
+      ByteBuffer buffer = entry.getValue();
+      buffer.rewind();
+      buf.reset(buffer);
+      credentials.readTokenStorageStream(buf);
+      map.put(entry.getKey(), credentials);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Retrieved credentials form RM: " + map);
+    }
+    return map;
+  }
+
   protected void startStatusUpdater() {
 
     statusUpdaterRunnable = new Runnable() {
@@ -598,6 +621,13 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
                   new CMgrCompletedAppsEvent(appsToCleanup,
                       CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
             }
+
+            Map<ApplicationId, ByteBuffer> systemCredentials =
+                response.getSystemCredentialsForApps();
+            if (systemCredentials != null && !systemCredentials.isEmpty()) {
+              ((NMContext) context)
+                .setSystemCrendentials(parseCredentials(systemCredentials));
+            }
           } catch (ConnectException e) {
             //catch and throw the exception if tried MAX wait time to connect RM
             dispatcher.getEventHandler().handle(

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java

@@ -186,7 +186,7 @@ public class ContainerManagerImpl extends CompositeService implements
     this.metrics = metrics;
 
     rsrcLocalizationSrvc =
-        createResourceLocalizationService(exec, deletionContext);
+        createResourceLocalizationService(exec, deletionContext, context);
     addService(rsrcLocalizationSrvc);
 
     containersLauncher = createContainersLauncher(context, exec);
@@ -362,9 +362,9 @@ public class ContainerManagerImpl extends CompositeService implements
   }
 
   protected ResourceLocalizationService createResourceLocalizationService(
-      ContainerExecutor exec, DeletionService deletionContext) {
+      ContainerExecutor exec, DeletionService deletionContext, Context context) {
     return new ResourceLocalizationService(this.dispatcher, exec,
-        deletionContext, dirsHandler, context.getNMStateStore());
+        deletionContext, dirsHandler, context);
   }
 
   protected ContainersLauncher createContainersLauncher(Context context,

+ 30 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java

@@ -83,11 +83,11 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
@@ -158,6 +158,7 @@ public class ResourceLocalizationService extends CompositeService
   private LocalResourcesTracker publicRsrc;
 
   private LocalDirsHandlerService dirsHandler;
+  private Context nmContext;
 
   /**
    * Map of LocalResourceTrackers keyed by username, for private
@@ -177,7 +178,7 @@ public class ResourceLocalizationService extends CompositeService
 
   public ResourceLocalizationService(Dispatcher dispatcher,
       ContainerExecutor exec, DeletionService delService,
-      LocalDirsHandlerService dirsHandler, NMStateStoreService stateStore) {
+      LocalDirsHandlerService dirsHandler, Context context) {
 
     super(ResourceLocalizationService.class.getName());
     this.exec = exec;
@@ -189,7 +190,8 @@ public class ResourceLocalizationService extends CompositeService
         new ThreadFactoryBuilder()
           .setNameFormat("ResourceLocalizationService Cache Cleanup")
           .build());
-    this.stateStore = stateStore;
+    this.stateStore = context.getNMStateStore();
+    this.nmContext = context;
   }
 
   FileContext getLocalFileContext(Configuration conf) {
@@ -1110,11 +1112,36 @@ public class ResourceLocalizationService extends CompositeService
       }
     }
 
+    private Credentials getSystemCredentialsSentFromRM(
+        LocalizerContext localizerContext) throws IOException {
+      ApplicationId appId =
+          localizerContext.getContainerId().getApplicationAttemptId()
+            .getApplicationId();
+      Credentials systemCredentials =
+          nmContext.getSystemCredentialsForApps().get(appId);
+      if (systemCredentials == null) {
+        return null;
+      }
+      LOG.info("Adding new framework tokens from RM for " + appId);
+      for (Token<?> token : systemCredentials.getAllTokens()) {
+        LOG.info("Adding new application-token for localization: " + token);
+      }
+      return systemCredentials;
+    }
+    
     private void writeCredentials(Path nmPrivateCTokensPath)
         throws IOException {
       DataOutputStream tokenOut = null;
       try {
         Credentials credentials = context.getCredentials();
+        if (UserGroupInformation.isSecurityEnabled()) {
+          Credentials systemCredentials =
+              getSystemCredentialsSentFromRM(context);
+          if (systemCredentials != null) {
+            credentials = systemCredentials;
+          }
+        }
+
         FileContext lfs = getLocalFileContext(getConfig());
         tokenOut =
             lfs.create(nmPrivateCTokensPath, EnumSet.of(CREATE, OVERWRITE));

+ 15 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java

@@ -39,9 +39,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -60,6 +61,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -342,6 +344,18 @@ public class LogAggregationService extends AbstractService implements
       Map<ApplicationAccessType, String> appAcls,
       LogAggregationContext logAggregationContext) {
 
+    if (UserGroupInformation.isSecurityEnabled()) {
+      Credentials systemCredentials =
+          context.getSystemCredentialsForApps().get(appId);
+      if (systemCredentials != null) {
+        LOG.info("Adding new framework tokens from RM for " + appId);
+        for (Token<?> token : systemCredentials.getAllTokens()) {
+          LOG.info("Adding new application-token for log-aggregation: " + token);
+        }
+        credentials = systemCredentials;
+      }
+    }
+
     // Get user's FileSystem credentials
     final UserGroupInformation userUgi =
         UserGroupInformation.createRemoteUser(user);

+ 2 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java

@@ -54,7 +54,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
-import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 
 public class DummyContainerManager extends ContainerManagerImpl {
@@ -74,9 +73,9 @@ public class DummyContainerManager extends ContainerManagerImpl {
   @Override
   @SuppressWarnings("unchecked")
   protected ResourceLocalizationService createResourceLocalizationService(
-      ContainerExecutor exec, DeletionService deletionContext) {
+      ContainerExecutor exec, DeletionService deletionContext, Context context) {
     return new ResourceLocalizationService(super.dispatcher, exec,
-        deletionContext, super.dirsHandler, new NMNullStateStoreService()) {
+        deletionContext, super.dirsHandler, context) {
       @Override
       public void handle(LocalizationEvent event) {
         switch (event.getType()) {

+ 20 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java

@@ -44,10 +44,14 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
 import org.apache.hadoop.service.Service.STATE;
 import org.apache.hadoop.service.ServiceOperations;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -561,6 +565,7 @@ public class TestNodeStatusUpdater {
 
   // Test NodeStatusUpdater sends the right container statuses each time it
   // heart beats.
+  private Credentials expectedCredentials = new Credentials();
   private class MyResourceTracker4 implements ResourceTracker {
 
     public NodeAction registerNodeAction = NodeAction.NORMAL;
@@ -576,6 +581,11 @@ public class TestNodeStatusUpdater {
         createContainerStatus(5, ContainerState.COMPLETE);
 
     public MyResourceTracker4(Context context) {
+      // create app Credentials
+      org.apache.hadoop.security.token.Token<DelegationTokenIdentifier> token1 =
+          new org.apache.hadoop.security.token.Token<DelegationTokenIdentifier>();
+      token1.setKind(new Text("kind1"));
+      expectedCredentials.addToken(new Text("token1"), token1);
       this.context = context;
     }
 
@@ -694,6 +704,14 @@ public class TestNodeStatusUpdater {
           YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
             heartBeatNodeAction, null, null, null, null, 1000L);
       nhResponse.addContainersToBeRemovedFromNM(finishedContainersPulledByAM);
+      Map<ApplicationId, ByteBuffer> appCredentials =
+          new HashMap<ApplicationId, ByteBuffer>();
+      DataOutputBuffer dob = new DataOutputBuffer();
+      expectedCredentials.writeTokenStorageToStream(dob);
+      ByteBuffer byteBuffer1 =
+          ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+      appCredentials.put(ApplicationId.newInstance(1234, 1), byteBuffer1);
+      nhResponse.setSystemCredentialsForApps(appCredentials);
       return nhResponse;
     }
   }
@@ -1293,6 +1311,8 @@ public class TestNodeStatusUpdater {
     if(assertionFailedInThread.get()) {
       Assert.fail("ContainerStatus Backup failed");
     }
+    Assert.assertNotNull(nm.getNMContext().getSystemCredentialsForApps()
+      .get(ApplicationId.newInstance(1234, 1)).getToken(new Text("token1")));
     nm.stop();
   }
 

+ 2 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java

@@ -278,8 +278,7 @@ public class TestContainerManagerRecovery {
   private ContainerManagerImpl createContainerManager(Context context) {
     final LogHandler logHandler = mock(LogHandler.class);
     final ResourceLocalizationService rsrcSrv =
-        new ResourceLocalizationService(null, null, null, null,
-            context.getNMStateStore()) {
+        new ResourceLocalizationService(null, null, null, null, context) {
           @Override
           public void serviceInit(Configuration conf) throws Exception {
           }
@@ -320,7 +319,7 @@ public class TestContainerManagerRecovery {
 
           @Override
           protected ResourceLocalizationService createResourceLocalizationService(
-              ContainerExecutor exec, DeletionService deletionContext) {
+              ContainerExecutor exec, DeletionService deletionContext, Context context) {
             return rsrcSrv;
           }
 

+ 10 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java

@@ -23,7 +23,12 @@ import org.junit.Assert;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheDirectoryManager.Directory;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.junit.Test;
 
 public class TestLocalCacheDirectoryManager {
@@ -73,8 +78,12 @@ public class TestLocalCacheDirectoryManager {
     YarnConfiguration conf = new YarnConfiguration();
     conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "1");
     Exception e = null;
+    NMContext nmContext =
+        new NMContext(new NMContainerTokenSecretManager(conf),
+          new NMTokenSecretManagerInNM(), null,
+          new ApplicationACLsManager(conf), new NMNullStateStoreService());
     ResourceLocalizationService service =
-        new ResourceLocalizationService(null, null, null, null, null);
+        new ResourceLocalizationService(null, null, null, null, nmContext);
     try {
       service.init(conf);
     } catch (Exception e1) {

+ 23 - 17
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

@@ -105,6 +105,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
 import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
@@ -138,6 +139,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.After;
@@ -159,7 +163,7 @@ public class TestResourceLocalizationService {
   private Configuration conf;
   private AbstractFileSystem spylfs;
   private FileContext lfs;
-  
+  private NMContext nmContext;
   @BeforeClass
   public static void setupClass() {
     mockServer = mock(Server.class);
@@ -174,6 +178,9 @@ public class TestResourceLocalizationService {
 
     String logDir = lfs.makeQualified(new Path(basedir, "logdir ")).toString();
     conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
+    nmContext = new NMContext(new NMContainerTokenSecretManager(
+      conf), new NMTokenSecretManagerInNM(), null,
+      new ApplicationACLsManager(conf), new NMNullStateStoreService());
   }
 
   @After
@@ -206,8 +213,7 @@ public class TestResourceLocalizationService {
 
     ResourceLocalizationService locService =
       spy(new ResourceLocalizationService(dispatcher, exec, delService,
-                                          diskhandler,
-                                          new NMNullStateStoreService()));
+                                          diskhandler, nmContext));
     doReturn(lfs)
       .when(locService).getLocalFileContext(isA(Configuration.class));
     try {
@@ -268,8 +274,7 @@ public class TestResourceLocalizationService {
 
     ResourceLocalizationService locService =
         spy(new ResourceLocalizationService(dispatcher, exec, delService,
-            diskhandler,
-            nmStateStoreService));
+            diskhandler,nmContext));
     doReturn(lfs)
         .when(locService).getLocalFileContext(isA(Configuration.class));
     try {
@@ -340,8 +345,7 @@ public class TestResourceLocalizationService {
 
     ResourceLocalizationService rawService =
       new ResourceLocalizationService(dispatcher, exec, delService,
-                                      dirsHandler,
-                                      new NMNullStateStoreService());
+                                      dirsHandler, nmContext);
     ResourceLocalizationService spyService = spy(rawService);
     doReturn(mockServer).when(spyService).createServer();
     doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
@@ -751,8 +755,7 @@ public class TestResourceLocalizationService {
 
     ResourceLocalizationService rawService =
       new ResourceLocalizationService(dispatcher, exec, delService,
-                                      dirsHandler,
-                                      new NMNullStateStoreService());
+                                      dirsHandler, nmContext);
     ResourceLocalizationService spyService = spy(rawService);
     doReturn(mockServer).when(spyService).createServer();
     doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
@@ -965,8 +968,7 @@ public class TestResourceLocalizationService {
     try {
       ResourceLocalizationService rawService =
           new ResourceLocalizationService(dispatcher, exec, delService,
-                                        dirsHandler,
-                                        new NMNullStateStoreService());
+                                        dirsHandler, nmContext);
       ResourceLocalizationService spyService = spy(rawService);
       doReturn(mockServer).when(spyService).createServer();
       doReturn(lfs).when(spyService).getLocalFileContext(
@@ -1075,7 +1077,7 @@ public class TestResourceLocalizationService {
     try {
       ResourceLocalizationService rawService =
           new ResourceLocalizationService(dispatcher, exec, delService,
-            dirsHandlerSpy, new NMNullStateStoreService());
+            dirsHandlerSpy, nmContext);
       ResourceLocalizationService spyService = spy(rawService);
       doReturn(mockServer).when(spyService).createServer();
       doReturn(lfs).when(spyService).getLocalFileContext(
@@ -1188,7 +1190,7 @@ public class TestResourceLocalizationService {
 
       ResourceLocalizationService rls =
           new ResourceLocalizationService(dispatcher1, exec, delService,
-            localDirHandler, new NMNullStateStoreService());
+            localDirHandler, nmContext);
       dispatcher1.register(LocalizationEventType.class, rls);
       rls.init(conf);
 
@@ -1341,7 +1343,7 @@ public class TestResourceLocalizationService {
 
       ResourceLocalizationService rls =
           new ResourceLocalizationService(dispatcher1, exec, delService,
-            localDirHandler, new NMNullStateStoreService());
+            localDirHandler, nmContext);
       dispatcher1.register(LocalizationEventType.class, rls);
       rls.init(conf);
 
@@ -1507,7 +1509,7 @@ public class TestResourceLocalizationService {
       // it as otherwise it will remove requests from pending queue.
       ResourceLocalizationService rawService =
           new ResourceLocalizationService(dispatcher1, exec, delService,
-            dirsHandler, new NMNullStateStoreService());
+            dirsHandler, nmContext);
       ResourceLocalizationService spyService = spy(rawService);
       dispatcher1.register(LocalizationEventType.class, spyService);
       spyService.init(conf);
@@ -1795,9 +1797,13 @@ public class TestResourceLocalizationService {
     ContainerExecutor exec = mock(ContainerExecutor.class);
     LocalizerTracker mockLocalizerTracker = mock(LocalizerTracker.class);
     DeletionService delService = mock(DeletionService.class);
+    NMContext nmContext =
+        new NMContext(new NMContainerTokenSecretManager(conf),
+          new NMTokenSecretManagerInNM(), null,
+          new ApplicationACLsManager(conf), stateStore);
     ResourceLocalizationService rawService =
       new ResourceLocalizationService(dispatcher, exec, delService,
-                                      dirsHandler, stateStore);
+                                      dirsHandler, nmContext);
     ResourceLocalizationService spyService = spy(rawService);
     doReturn(mockServer).when(spyService).createServer();
     doReturn(mockLocalizerTracker).when(spyService).createLocalizerTracker(
@@ -1861,7 +1867,7 @@ public class TestResourceLocalizationService {
     // setup mocks
     ResourceLocalizationService rawService =
         new ResourceLocalizationService(dispatcher, exec, delService,
-          mockDirsHandler, new NMNullStateStoreService());
+          mockDirsHandler, nmContext);
     ResourceLocalizationService spyService = spy(rawService);
     doReturn(mockServer).when(spyService).createServer();
     doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(

+ 4 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java

@@ -278,7 +278,8 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
       try {
         credentials = parseCredentials(submissionContext);
         this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId,
-          credentials, submissionContext.getCancelTokensWhenComplete());
+          credentials, submissionContext.getCancelTokensWhenComplete(),
+          application.getUser());
       } catch (Exception e) {
         LOG.warn("Unable to parse credentials.", e);
         // Sending APP_REJECTED is fine, since we assume that the
@@ -325,7 +326,8 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
         credentials = parseCredentials(appContext);
         // synchronously renew delegation token on recovery.
         rmContext.getDelegationTokenRenewer().addApplicationSync(appId,
-          credentials, appContext.getCancelTokensWhenComplete());
+          credentials, appContext.getCancelTokensWhenComplete(),
+          application.getUser());
         application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER));
       } catch (Exception e) {
         LOG.warn("Unable to parse and renew delegation tokens.", e);

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
@@ -57,6 +58,8 @@ public interface RMContext {
 
   ConcurrentMap<ApplicationId, RMApp> getRMApps();
   
+  ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps();
+
   ConcurrentMap<String, RMNode> getInactiveRMNodes();
 
   ConcurrentMap<NodeId, RMNode> getRMNodes();

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -67,6 +68,9 @@ public class RMContextImpl implements RMContext {
   private final ConcurrentMap<String, RMNode> inactiveNodes
     = new ConcurrentHashMap<String, RMNode>();
 
+  private final ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
+      new ConcurrentHashMap<ApplicationId, ByteBuffer>();
+
   private boolean isHAEnabled;
   private boolean isWorkPreservingRecoveryEnabled;
   private HAServiceState haServiceState =
@@ -444,4 +448,8 @@ public class RMContextImpl implements RMContext {
   public void setSystemClock(Clock clock) {
     this.systemClock = clock;
   }
+
+  public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
+    return systemCredentials;
+  }
 }

+ 10 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,6 +33,7 @@ import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.VersionUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -385,7 +388,7 @@ public class ResourceTrackerService extends AbstractService implements
     if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse
         .getResponseId()) {
       LOG.info("Received duplicate heartbeat from node "
-          + rmNode.getNodeAddress());
+          + rmNode.getNodeAddress()+ " responseId=" + remoteNodeStatus.getResponseId());
       return lastNodeHeartbeatResponse;
     } else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse
         .getResponseId()) {
@@ -410,6 +413,12 @@ public class ResourceTrackerService extends AbstractService implements
 
     populateKeys(request, nodeHeartBeatResponse);
 
+    ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
+        rmContext.getSystemCredentialsForApps();
+    if (!systemCredentials.isEmpty()) {
+      nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);
+    }
+
     // 4. Send status to RMNode, saving the latest response.
     this.rmContext.getDispatcher().getEventHandler().handle(
         new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),

+ 198 - 68
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java

@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.server.resourcemanager.security;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -45,14 +47,20 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -82,12 +90,10 @@ public class DelegationTokenRenewer extends AbstractService {
   private DelegationTokenCancelThread dtCancelThread =
     new DelegationTokenCancelThread();
   private ThreadPoolExecutor renewerService;
-  
-  // managing the list of tokens using Map
-  // appId=>List<tokens>
-  private Set<DelegationTokenToRenew> delegationTokens = 
-    Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>());
-  
+
+  private ConcurrentMap<ApplicationId, Set<DelegationTokenToRenew>> appTokens =
+      new ConcurrentHashMap<ApplicationId, Set<DelegationTokenToRenew>>();
+
   private final ConcurrentMap<ApplicationId, Long> delayedRemovalMap =
       new ConcurrentHashMap<ApplicationId, Long>();
 
@@ -99,20 +105,33 @@ public class DelegationTokenRenewer extends AbstractService {
   private LinkedBlockingQueue<DelegationTokenRenewerEvent> pendingEventQueue;
   
   private boolean tokenKeepAliveEnabled;
-  
+  private boolean hasProxyUserPrivileges;
+  private long credentialsValidTimeRemaining;
+
+  // this config is supposedly not used by end-users.
+  public static final String RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING =
+      YarnConfiguration.RM_PREFIX + "system-credentials.valid-time-remaining";
+  public static final long DEFAULT_RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING =
+      10800000; // 3h
+
   public DelegationTokenRenewer() {
     super(DelegationTokenRenewer.class.getName());
   }
 
   @Override
-  protected synchronized void serviceInit(Configuration conf) throws Exception {
+  protected void serviceInit(Configuration conf) throws Exception {
+    this.hasProxyUserPrivileges =
+        conf.getBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED,
+          YarnConfiguration.DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED);
     this.tokenKeepAliveEnabled =
         conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
             YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
     this.tokenRemovalDelayMs =
         conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
             YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
-
+    this.credentialsValidTimeRemaining =
+        conf.getLong(RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING,
+          DEFAULT_RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING);
     setLocalSecretManagerAndServiceAddr();
     renewerService = createNewThreadPoolService(conf);
     pendingEventQueue = new LinkedBlockingQueue<DelegationTokenRenewerEvent>();
@@ -182,7 +201,7 @@ public class DelegationTokenRenewer extends AbstractService {
     if (renewalTimer != null) {
       renewalTimer.cancel();
     }
-    delegationTokens.clear();
+    appTokens.clear();
     this.renewerService.shutdown();
     dtCancelThread.interrupt();
     try {
@@ -212,22 +231,28 @@ public class DelegationTokenRenewer extends AbstractService {
     public long expirationDate;
     public TimerTask timerTask;
     public final boolean shouldCancelAtEnd;
-    
-    public DelegationTokenToRenew(
-        ApplicationId jId, Token<?> token, 
-        Configuration conf, long expirationDate, boolean shouldCancelAtEnd) {
+    public long maxDate;
+    public String user;
+
+    public DelegationTokenToRenew(ApplicationId jId, Token<?> token,
+        Configuration conf, long expirationDate, boolean shouldCancelAtEnd,
+        String user) {
       this.token = token;
+      this.user = user;
+      if (token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
+        try {
+          AbstractDelegationTokenIdentifier identifier =
+              (AbstractDelegationTokenIdentifier) token.decodeIdentifier();
+          maxDate = identifier.getMaxDate();
+        } catch (IOException e) {
+          throw new YarnRuntimeException(e);
+        }
+      }
       this.applicationId = jId;
       this.conf = conf;
       this.expirationDate = expirationDate;
       this.timerTask = null;
       this.shouldCancelAtEnd = shouldCancelAtEnd;
-      if (this.token==null || this.applicationId==null || this.conf==null) {
-        throw new IllegalArgumentException("Invalid params to renew token" +
-            ";token=" + this.token +
-            ";appId=" + this.applicationId +
-            ";conf=" + this.conf);
-      }
     }
     
     public void setTimerTask(TimerTask tTask) {
@@ -317,16 +342,14 @@ public class DelegationTokenRenewer extends AbstractService {
       }
     }
   }
-  //adding token
-  private void addTokenToList(DelegationTokenToRenew t) {
-    delegationTokens.add(t);
-  }
 
   @VisibleForTesting
   public Set<Token<?>> getDelegationTokens() {
     Set<Token<?>> tokens = new HashSet<Token<?>>();
-    for(DelegationTokenToRenew delegationToken : delegationTokens) {
-      tokens.add(delegationToken.token);
+    for (Set<DelegationTokenToRenew> tokenList : appTokens.values()) {
+      for (DelegationTokenToRenew token : tokenList) {
+        tokens.add(token.token);
+      }
     }
     return tokens;
   }
@@ -337,25 +360,28 @@ public class DelegationTokenRenewer extends AbstractService {
    * @param ts tokens
    * @param shouldCancelAtEnd true if tokens should be canceled when the app is
    * done else false. 
+   * @param user user
    * @throws IOException
    */
   public void addApplicationAsync(ApplicationId applicationId, Credentials ts,
-      boolean shouldCancelAtEnd) {
+      boolean shouldCancelAtEnd, String user) {
     processDelegationTokenRenewerEvent(new DelegationTokenRenewerAppSubmitEvent(
-      applicationId, ts, shouldCancelAtEnd));
+      applicationId, ts, shouldCancelAtEnd, user));
   }
 
   /**
    * Synchronously renew delegation tokens.
+   * @param user user
    */
   public void addApplicationSync(ApplicationId applicationId, Credentials ts,
-      boolean shouldCancelAtEnd) throws IOException{
+      boolean shouldCancelAtEnd, String user) throws IOException,
+      InterruptedException {
     handleAppSubmitEvent(new DelegationTokenRenewerAppSubmitEvent(
-      applicationId, ts, shouldCancelAtEnd));
+      applicationId, ts, shouldCancelAtEnd, user));
   }
 
   private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt)
-      throws IOException {
+      throws IOException, InterruptedException {
     ApplicationId applicationId = evt.getApplicationId();
     Credentials ts = evt.getCredentials();
     boolean shouldCancelAtEnd = evt.shouldCancelAtEnd();
@@ -375,14 +401,21 @@ public class DelegationTokenRenewer extends AbstractService {
     // all renewable tokens are valid
     // At RM restart it is safe to assume that all the previously added tokens
     // are valid
-    List<DelegationTokenToRenew> tokenList =
-        new ArrayList<DelegationTokenRenewer.DelegationTokenToRenew>();
+    appTokens.put(applicationId,
+      Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>()));
+    Set<DelegationTokenToRenew> tokenList = new HashSet<DelegationTokenToRenew>();
+    boolean hasHdfsToken = false;
     for (Token<?> token : tokens) {
       if (token.isManaged()) {
         tokenList.add(new DelegationTokenToRenew(applicationId,
-            token, getConfig(), now, shouldCancelAtEnd));
+            token, getConfig(), now, shouldCancelAtEnd, evt.getUser()));
+        if (token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
+          LOG.info(applicationId + " found existing hdfs token " + token);
+          hasHdfsToken = true;
+        }
       }
     }
+
     if (!tokenList.isEmpty()) {
       // Renewing token and adding it to timer calls are separated purposefully
       // If user provides incorrect token then it should not be added for
@@ -395,14 +428,15 @@ public class DelegationTokenRenewer extends AbstractService {
         }
       }
       for (DelegationTokenToRenew dtr : tokenList) {
-        addTokenToList(dtr);
+        appTokens.get(applicationId).add(dtr);
         setTimerForTokenRenewal(dtr);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Registering token for renewal for:" + " service = "
-              + dtr.token.getService() + " for appId = " + dtr.applicationId);
-        }
       }
     }
+
+    if (!hasHdfsToken) {
+      requestNewHdfsDelegationToken(applicationId, evt.getUser(),
+        shouldCancelAtEnd);
+    }
   }
 
   /**
@@ -424,14 +458,16 @@ public class DelegationTokenRenewer extends AbstractService {
       }
 
       Token<?> token = dttr.token;
+
       try {
-        renewToken(dttr);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Renewing delegation-token for:" + token.getService() + 
-              "; new expiration;" + dttr.expirationDate);
+        requestNewHdfsDelegationTokenIfNeeded(dttr);
+        // if the token is not replaced by a new token, renew the token
+        if (appTokens.get(dttr.applicationId).contains(dttr)) {
+          renewToken(dttr);
+          setTimerForTokenRenewal(dttr);// set the next one
+        } else {
+          LOG.info("The token was removed already. Token = [" +dttr +"]");
         }
-        
-        setTimerForTokenRenewal(dttr);// set the next one
       } catch (Exception e) {
         LOG.error("Exception renewing token" + token + ". Not rescheduled", e);
         removeFailedDelegationToken(dttr);
@@ -455,12 +491,14 @@ public class DelegationTokenRenewer extends AbstractService {
     // calculate timer time
     long expiresIn = token.expirationDate - System.currentTimeMillis();
     long renewIn = token.expirationDate - expiresIn/10; // little bit before the expiration
-    
     // need to create new task every time
     TimerTask tTask = new RenewalTimerTask(token);
     token.setTimerTask(tTask); // keep reference to the timer
 
     renewalTimer.schedule(token.timerTask, new Date(renewIn));
+
+    LOG.info("Renew " + token + " in " + expiresIn + " ms, appId = "
+        + token.applicationId);
   }
 
   // renew a token
@@ -470,16 +508,99 @@ public class DelegationTokenRenewer extends AbstractService {
     // need to use doAs so that http can find the kerberos tgt
     // NOTE: token renewers should be responsible for the correct UGI!
     try {
-      dttr.expirationDate = UserGroupInformation.getLoginUser().doAs(
-          new PrivilegedExceptionAction<Long>(){          
-            @Override
-            public Long run() throws Exception {
-              return dttr.token.renew(dttr.conf);
-            }
-          });
+      dttr.expirationDate =
+          UserGroupInformation.getLoginUser().doAs(
+            new PrivilegedExceptionAction<Long>() {
+              @Override
+              public Long run() throws Exception {
+                return dttr.token.renew(dttr.conf);
+              }
+            });
     } catch (InterruptedException e) {
       throw new IOException(e);
     }
+    LOG.info("Renewed delegation-token= [" + dttr + "], for "
+        + dttr.applicationId);
+  }
+
+  // Request new hdfs token if the token is about to expire, and remove the old
+  // token from the tokenToRenew list
+  private void requestNewHdfsDelegationTokenIfNeeded(
+      final DelegationTokenToRenew dttr) throws IOException,
+      InterruptedException {
+
+    if (hasProxyUserPrivileges
+        && dttr.maxDate - dttr.expirationDate < credentialsValidTimeRemaining
+        && dttr.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
+
+      // remove all old expiring hdfs tokens for this application.
+      Set<DelegationTokenToRenew> tokenSet = appTokens.get(dttr.applicationId);
+      if (tokenSet != null && !tokenSet.isEmpty()) {
+        Iterator<DelegationTokenToRenew> iter = tokenSet.iterator();
+        synchronized (tokenSet) {
+          while (iter.hasNext()) {
+            DelegationTokenToRenew t = iter.next();
+            if (t.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
+              iter.remove();
+              if (t.timerTask != null) {
+                t.timerTask.cancel();
+              }
+              LOG.info("Removed expiring token " + t);
+            }
+          }
+        }
+      }
+      LOG.info("Token= (" + dttr + ") is expiring, request new token.");
+      requestNewHdfsDelegationToken(dttr.applicationId, dttr.user,
+        dttr.shouldCancelAtEnd);
+    }
+  }
+
+  private void requestNewHdfsDelegationToken(ApplicationId applicationId,
+      String user, boolean shouldCancelAtEnd) throws IOException,
+      InterruptedException {
+    // Get new hdfs tokens for this user
+    Credentials credentials = new Credentials();
+    Token<?>[] newTokens = obtainSystemTokensForUser(user, credentials);
+
+    // Add new tokens to the toRenew list.
+    LOG.info("Received new tokens for " + applicationId + ". Received "
+        + newTokens.length + " tokens.");
+    if (newTokens.length > 0) {
+      for (Token<?> token : newTokens) {
+        if (token.isManaged()) {
+          DelegationTokenToRenew tokenToRenew =
+              new DelegationTokenToRenew(applicationId, token, getConfig(),
+                Time.now(), shouldCancelAtEnd, user);
+          // renew the token to get the next expiration date.
+          renewToken(tokenToRenew);
+          setTimerForTokenRenewal(tokenToRenew);
+          appTokens.get(applicationId).add(tokenToRenew);
+          LOG.info("Received new token " + token);
+        }
+      }
+    }
+    DataOutputBuffer dob = new DataOutputBuffer();
+    credentials.writeTokenStorageToStream(dob);
+    ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+    rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer);
+  }
+
+  protected Token<?>[] obtainSystemTokensForUser(String user,
+      final Credentials credentials) throws IOException, InterruptedException {
+    // Get new hdfs tokens on behalf of this user
+    UserGroupInformation proxyUser =
+        UserGroupInformation.createProxyUser(user,
+          UserGroupInformation.getLoginUser());
+    Token<?>[] newTokens =
+        proxyUser.doAs(new PrivilegedExceptionAction<Token<?>[]>() {
+          @Override
+          public Token<?>[] run() throws Exception {
+            return FileSystem.get(getConfig()).addDelegationTokens(
+              UserGroupInformation.getLoginUser().getUserName(), credentials);
+          }
+        });
+    return newTokens;
   }
 
   // cancel a token
@@ -497,13 +618,13 @@ public class DelegationTokenRenewer extends AbstractService {
    */
   private void removeFailedDelegationToken(DelegationTokenToRenew t) {
     ApplicationId applicationId = t.applicationId;
-    if (LOG.isDebugEnabled())
-      LOG.debug("removing failed delegation token for appid=" + applicationId + 
-          ";t=" + t.token.getService());
-    delegationTokens.remove(t);
+    LOG.error("removing failed delegation token for appid=" + applicationId
+        + ";t=" + t.token.getService());
+    appTokens.get(applicationId).remove(t);
     // cancel the timer
-    if(t.timerTask!=null)
+    if (t.timerTask != null) {
       t.timerTask.cancel();
+    }
   }
 
   /**
@@ -543,18 +664,21 @@ public class DelegationTokenRenewer extends AbstractService {
   }
 
   private void removeApplicationFromRenewal(ApplicationId applicationId) {
-    synchronized (delegationTokens) {
-      Iterator<DelegationTokenToRenew> it = delegationTokens.iterator();
-      while(it.hasNext()) {
-        DelegationTokenToRenew dttr = it.next();
-        if (dttr.applicationId.equals(applicationId)) {
+    rmContext.getSystemCredentialsForApps().remove(applicationId);
+    Set<DelegationTokenToRenew> tokens = appTokens.get(applicationId);
+
+    if (tokens != null && !tokens.isEmpty()) {
+      synchronized (tokens) {
+        Iterator<DelegationTokenToRenew> it = tokens.iterator();
+        while (it.hasNext()) {
+          DelegationTokenToRenew dttr = it.next();
           if (LOG.isDebugEnabled()) {
-            LOG.debug("Removing delegation token for appId=" + applicationId + 
-                "; token=" + dttr.token.getService());
+            LOG.debug("Removing delegation token for appId=" + applicationId
+                + "; token=" + dttr.token.getService());
           }
 
           // cancel the timer
-          if(dttr.timerTask!=null)
+          if (dttr.timerTask != null)
             dttr.timerTask.cancel();
 
           // cancel the token
@@ -670,17 +794,19 @@ public class DelegationTokenRenewer extends AbstractService {
     }
   }
   
-  private static class DelegationTokenRenewerAppSubmitEvent extends
+  static class DelegationTokenRenewerAppSubmitEvent extends
       DelegationTokenRenewerEvent {
 
     private Credentials credentials;
     private boolean shouldCancelAtEnd;
+    private String user;
 
     public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId,
-        Credentials credentails, boolean shouldCancelAtEnd) {
+        Credentials credentails, boolean shouldCancelAtEnd, String user) {
       super(appId, DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION);
       this.credentials = credentails;
       this.shouldCancelAtEnd = shouldCancelAtEnd;
+      this.user = user;
     }
 
     public Credentials getCredentials() {
@@ -690,6 +816,10 @@ public class DelegationTokenRenewer extends AbstractService {
     public boolean shouldCancelAtEnd() {
       return shouldCancelAtEnd;
     }
+
+    public String getUser() {
+      return user;
+    }
   }
   
   enum DelegationTokenRenewerEventType {

+ 190 - 18
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java

@@ -38,6 +38,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -54,6 +55,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -61,6 +63,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenRenewer;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -74,11 +77,16 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -88,16 +96,18 @@ import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import com.google.common.base.Supplier;
+
 /**
  * unit test - 
  * tests addition/deletion/cancellation of renewals of delegation tokens
  *
  */
-@SuppressWarnings("rawtypes")
+@SuppressWarnings({"rawtypes", "unchecked"})
 public class TestDelegationTokenRenewer {
   private static final Log LOG = 
       LogFactory.getLog(TestDelegationTokenRenewer.class);
-  private static final Text KIND = new Text("TestDelegationTokenRenewer.Token");
+  private static final Text KIND = new Text("HDFS_DELEGATION_TOKEN");
   
   private static BlockingQueue<Event> eventQueue;
   private static volatile AtomicInteger counter;
@@ -125,6 +135,9 @@ public class TestDelegationTokenRenewer {
 
     @Override
     public long renew(Token<?> t, Configuration conf) throws IOException {
+      if ( !(t instanceof MyToken)) {
+        return DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT;
+      }
       MyToken token = (MyToken)t;
       if(token.isCanceled()) {
         throw new InvalidToken("token has been canceled");
@@ -179,8 +192,10 @@ public class TestDelegationTokenRenewer {
     dispatcher = new AsyncDispatcher(eventQueue);
     Renewer.reset();
     delegationTokenRenewer = createNewDelegationTokenRenewer(conf, counter);
-    RMContext mockContext = mock(RMContext.class);
+    RMContext mockContext =  mock(RMContext.class);
     ClientRMService mockClientRMService = mock(ClientRMService.class);
+    when(mockContext.getSystemCredentialsForApps()).thenReturn(
+      new ConcurrentHashMap<ApplicationId, ByteBuffer>());
     when(mockContext.getDelegationTokenRenewer()).thenReturn(
         delegationTokenRenewer);
     when(mockContext.getDispatcher()).thenReturn(dispatcher);
@@ -290,9 +305,9 @@ public class TestDelegationTokenRenewer {
     Text user1= new Text("user1");
     
     MyDelegationTokenSecretManager sm = new MyDelegationTokenSecretManager(
-        DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
         DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
         DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT,
+        DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT,
         3600000, null);
     sm.startThreads();
     
@@ -353,7 +368,7 @@ public class TestDelegationTokenRenewer {
     // register the tokens for renewal
     ApplicationId applicationId_0 = 
         BuilderUtils.newApplicationId(0, 0);
-    delegationTokenRenewer.addApplicationAsync(applicationId_0, ts, true);
+    delegationTokenRenewer.addApplicationAsync(applicationId_0, ts, true, "user");
     waitForEventsToGetProcessed(delegationTokenRenewer);
 
     // first 3 initial renewals + 1 real
@@ -393,7 +408,7 @@ public class TestDelegationTokenRenewer {
     
 
     ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
-    delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, true);
+    delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, true, "user");
     waitForEventsToGetProcessed(delegationTokenRenewer);
     delegationTokenRenewer.applicationFinished(applicationId_1);
     waitForEventsToGetProcessed(delegationTokenRenewer);
@@ -429,7 +444,7 @@ public class TestDelegationTokenRenewer {
     
     // register the tokens for renewal
     ApplicationId appId =  BuilderUtils.newApplicationId(0, 0);
-    delegationTokenRenewer.addApplicationAsync(appId, ts, true);
+    delegationTokenRenewer.addApplicationAsync(appId, ts, true, "user");
     int waitCnt = 20;
     while (waitCnt-- >0) {
       if (!eventQueue.isEmpty()) {
@@ -473,7 +488,7 @@ public class TestDelegationTokenRenewer {
     
 
     ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
-    delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, false);
+    delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, false, "user");
     waitForEventsToGetProcessed(delegationTokenRenewer);
     delegationTokenRenewer.applicationFinished(applicationId_1);
     waitForEventsToGetProcessed(delegationTokenRenewer);
@@ -516,6 +531,8 @@ public class TestDelegationTokenRenewer {
     DelegationTokenRenewer localDtr =
         createNewDelegationTokenRenewer(lconf, counter);
     RMContext mockContext = mock(RMContext.class);
+    when(mockContext.getSystemCredentialsForApps()).thenReturn(
+      new ConcurrentHashMap<ApplicationId, ByteBuffer>());
     ClientRMService mockClientRMService = mock(ClientRMService.class);
     when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
     when(mockContext.getDelegationTokenRenewer()).thenReturn(
@@ -540,7 +557,7 @@ public class TestDelegationTokenRenewer {
 
     // register the tokens for renewal
     ApplicationId applicationId_0 =  BuilderUtils.newApplicationId(0, 0);
-    localDtr.addApplicationAsync(applicationId_0, ts, true);
+    localDtr.addApplicationAsync(applicationId_0, ts, true, "user");
     waitForEventsToGetProcessed(localDtr);
     if (!eventQueue.isEmpty()){
       Event evt = eventQueue.take();
@@ -593,6 +610,8 @@ public class TestDelegationTokenRenewer {
     DelegationTokenRenewer localDtr =
         createNewDelegationTokenRenewer(conf, counter);
     RMContext mockContext = mock(RMContext.class);
+    when(mockContext.getSystemCredentialsForApps()).thenReturn(
+      new ConcurrentHashMap<ApplicationId, ByteBuffer>());
     ClientRMService mockClientRMService = mock(ClientRMService.class);
     when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
     when(mockContext.getDelegationTokenRenewer()).thenReturn(
@@ -617,7 +636,7 @@ public class TestDelegationTokenRenewer {
 
     // register the tokens for renewal
     ApplicationId applicationId_0 =  BuilderUtils.newApplicationId(0, 0);
-    localDtr.addApplicationAsync(applicationId_0, ts, true);
+    localDtr.addApplicationAsync(applicationId_0, ts, true, "user");
     localDtr.applicationFinished(applicationId_0);
     waitForEventsToGetProcessed(delegationTokenRenewer);
     //Send another keep alive.
@@ -640,7 +659,7 @@ public class TestDelegationTokenRenewer {
 
   private DelegationTokenRenewer createNewDelegationTokenRenewer(
       Configuration conf, final AtomicInteger counter) {
-    return new DelegationTokenRenewer() {
+    DelegationTokenRenewer renew =  new DelegationTokenRenewer() {
 
       @Override
       protected ThreadPoolExecutor
@@ -664,6 +683,8 @@ public class TestDelegationTokenRenewer {
         return pool;
       }
     };
+    renew.setRMContext(TestUtils.getMockRMContext());
+    return renew;
   }
 
   private void waitForEventsToGetProcessed(DelegationTokenRenewer dtr)
@@ -679,7 +700,12 @@ public class TestDelegationTokenRenewer {
   public void testDTRonAppSubmission()
       throws IOException, InterruptedException, BrokenBarrierException {
     final Credentials credsx = new Credentials();
-    final Token<?> tokenx = mock(Token.class);
+    final Token<DelegationTokenIdentifier> tokenx = mock(Token.class);
+    when(tokenx.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN"));
+    DelegationTokenIdentifier dtId1 = 
+        new DelegationTokenIdentifier(new Text("user1"), new Text("renewer"),
+          new Text("user1"));
+    when(tokenx.decodeIdentifier()).thenReturn(dtId1);
     credsx.addToken(new Text("token"), tokenx);
     doReturn(true).when(tokenx).isManaged();
     doThrow(new IOException("boom"))
@@ -688,6 +714,8 @@ public class TestDelegationTokenRenewer {
     final DelegationTokenRenewer dtr =
          createNewDelegationTokenRenewer(conf, counter);
     RMContext mockContext = mock(RMContext.class);
+    when(mockContext.getSystemCredentialsForApps()).thenReturn(
+      new ConcurrentHashMap<ApplicationId, ByteBuffer>());
     ClientRMService mockClientRMService = mock(ClientRMService.class);
     when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
     InetSocketAddress sockAddr =
@@ -699,7 +727,7 @@ public class TestDelegationTokenRenewer {
     dtr.start();
 
     try {
-      dtr.addApplicationSync(mock(ApplicationId.class), credsx, false);
+      dtr.addApplicationSync(mock(ApplicationId.class), credsx, false, "user");
       fail("Catch IOException on app submission");
     } catch (IOException e){
       Assert.assertTrue(e.getMessage().contains(tokenx.toString()));
@@ -716,7 +744,12 @@ public class TestDelegationTokenRenewer {
                                                                                
     // this token uses barriers to block during renew                          
     final Credentials creds1 = new Credentials();                              
-    final Token<?> token1 = mock(Token.class);                                 
+    final Token<DelegationTokenIdentifier> token1 = mock(Token.class);    
+    when(token1.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN"));
+    DelegationTokenIdentifier dtId1 = 
+        new DelegationTokenIdentifier(new Text("user1"), new Text("renewer"),
+          new Text("user1"));
+    when(token1.decodeIdentifier()).thenReturn(dtId1);
     creds1.addToken(new Text("token"), token1);                                
     doReturn(true).when(token1).isManaged();                                   
     doAnswer(new Answer<Long>() {                                              
@@ -729,7 +762,9 @@ public class TestDelegationTokenRenewer {
                                                                                
     // this dummy token fakes renewing                                         
     final Credentials creds2 = new Credentials();                              
-    final Token<?> token2 = mock(Token.class);                                 
+    final Token<DelegationTokenIdentifier> token2 = mock(Token.class);           
+    when(token2.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN"));
+    when(token2.decodeIdentifier()).thenReturn(dtId1);
     creds2.addToken(new Text("token"), token2);                                
     doReturn(true).when(token2).isManaged();                                   
     doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class));     
@@ -737,7 +772,9 @@ public class TestDelegationTokenRenewer {
     // fire up the renewer                                                     
     final DelegationTokenRenewer dtr =
         createNewDelegationTokenRenewer(conf, counter);           
-    RMContext mockContext = mock(RMContext.class);                             
+    RMContext mockContext = mock(RMContext.class);
+    when(mockContext.getSystemCredentialsForApps()).thenReturn(
+      new ConcurrentHashMap<ApplicationId, ByteBuffer>());
     ClientRMService mockClientRMService = mock(ClientRMService.class);         
     when(mockContext.getClientRMService()).thenReturn(mockClientRMService);    
     InetSocketAddress sockAddr =                                               
@@ -751,14 +788,14 @@ public class TestDelegationTokenRenewer {
     Thread submitThread = new Thread() {                                       
       @Override                                                                
       public void run() {
-        dtr.addApplicationAsync(mock(ApplicationId.class), creds1, false);
+        dtr.addApplicationAsync(mock(ApplicationId.class), creds1, false, "user");
       }                                                                        
     };                                                                         
     submitThread.start();                                                      
                                                                                
     // wait till 1st submit blocks, then submit another
     startBarrier.await();                           
-    dtr.addApplicationAsync(mock(ApplicationId.class), creds2, false);
+    dtr.addApplicationAsync(mock(ApplicationId.class), creds2, false, "user");
     // signal 1st to complete                                                  
     endBarrier.await();                                                        
     submitThread.join(); 
@@ -793,4 +830,139 @@ public class TestDelegationTokenRenewer {
           "Bad header found in token storage"));
     }
   }
+
+
+  @Test (timeout = 20000)
+  public void testReplaceExpiringDelegationToken() throws Exception {
+    conf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true);
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+      "kerberos");
+    UserGroupInformation.setConfiguration(conf);
+
+    // create Token1:
+    Text userText1 = new Text("user1");
+    DelegationTokenIdentifier dtId1 =
+        new DelegationTokenIdentifier(userText1, new Text("renewer1"),
+          userText1);
+    // set max date to 0 to simulate an expiring token;
+    dtId1.setMaxDate(0);
+    final Token<DelegationTokenIdentifier> token1 =
+        new Token<DelegationTokenIdentifier>(dtId1.getBytes(),
+          "password1".getBytes(), dtId1.getKind(), new Text("service1"));
+
+    // create token2
+    Text userText2 = new Text("user2");
+    DelegationTokenIdentifier dtId2 =
+        new DelegationTokenIdentifier(userText1, new Text("renewer2"),
+          userText2);
+    final Token<DelegationTokenIdentifier> expectedToken =
+        new Token<DelegationTokenIdentifier>(dtId2.getBytes(),
+          "password2".getBytes(), dtId2.getKind(), new Text("service2"));
+
+    final MockRM rm = new TestSecurityMockRM(conf, null) {
+      @Override
+      protected DelegationTokenRenewer createDelegationTokenRenewer() {
+        return new DelegationTokenRenewer() {
+          @Override
+          protected Token<?>[] obtainSystemTokensForUser(String user,
+              final Credentials credentials) throws IOException {
+            credentials.addToken(expectedToken.getService(), expectedToken);
+            return new Token<?>[] { expectedToken };
+          }
+        };
+      }
+    };
+    rm.start();
+    Credentials credentials = new Credentials();
+    credentials.addToken(userText1, token1);
+
+    RMApp app =
+        rm.submitApp(200, "name", "user",
+          new HashMap<ApplicationAccessType, String>(), false, "default", 1,
+          credentials);
+
+    // wait for the initial expiring hdfs token to be removed.
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      public Boolean get() {
+        return !rm.getRMContext().getDelegationTokenRenewer()
+          .getDelegationTokens().contains(token1);
+      }
+    }, 1000, 20000);
+
+    // wait for the new retrieved hdfs token.
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      public Boolean get() {
+        return rm.getRMContext().getDelegationTokenRenewer()
+          .getDelegationTokens().contains(expectedToken);
+      }
+    }, 1000, 20000);
+
+    // check nm can retrieve the token
+    final MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
+    nm1.registerNode();
+    NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
+    ByteBuffer tokenBuffer =
+        response.getSystemCredentialsForApps().get(app.getApplicationId());
+    Assert.assertNotNull(tokenBuffer);
+    Credentials appCredentials = new Credentials();
+    DataInputByteBuffer buf = new DataInputByteBuffer();
+    tokenBuffer.rewind();
+    buf.reset(tokenBuffer);
+    appCredentials.readTokenStorageStream(buf);
+    Assert.assertTrue(appCredentials.getAllTokens().contains(expectedToken));
+  }
+
+  // YARN will get the token for the app submitted without the delegation token.
+  @Test
+  public void testAppSubmissionWithoutDelegationToken() throws Exception {
+    // create token2
+    Text userText2 = new Text("user2");
+    DelegationTokenIdentifier dtId2 =
+        new DelegationTokenIdentifier(new Text("user2"), new Text("renewer2"),
+          userText2);
+    final Token<DelegationTokenIdentifier> token2 =
+        new Token<DelegationTokenIdentifier>(dtId2.getBytes(),
+          "password2".getBytes(), dtId2.getKind(), new Text("service2"));
+    final MockRM rm = new TestSecurityMockRM(conf, null) {
+      @Override
+      protected DelegationTokenRenewer createDelegationTokenRenewer() {
+        return new DelegationTokenRenewer() {
+          @Override
+          protected Token<?>[] obtainSystemTokensForUser(String user,
+              final Credentials credentials) throws IOException {
+            credentials.addToken(token2.getService(), token2);
+            return new Token<?>[] { token2 };
+          }
+        };
+      }
+    };
+    rm.start();
+
+    // submit an app without delegationToken
+    RMApp app = rm.submitApp(200);
+
+    // wait for the new retrieved hdfs token.
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      public Boolean get() {
+        return rm.getRMContext().getDelegationTokenRenewer()
+          .getDelegationTokens().contains(token2);
+      }
+    }, 1000, 20000);
+
+    // check nm can retrieve the token
+    final MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
+    nm1.registerNode();
+    NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
+    ByteBuffer tokenBuffer =
+        response.getSystemCredentialsForApps().get(app.getApplicationId());
+    Assert.assertNotNull(tokenBuffer);
+    Credentials appCredentials = new Credentials();
+    DataInputByteBuffer buf = new DataInputByteBuffer();
+    tokenBuffer.rewind();
+    buf.reset(tokenBuffer);
+    appCredentials.readTokenStorageStream(buf);
+    Assert.assertTrue(appCredentials.getAllTokens().contains(token2));
+  }
 }