Преглед на файлове

YARN-11225. [Federation] Add postDelegationToken postDelegationTokenExpiration cancelDelegationToken REST APIs for Router. (#5185)

slfan1989 преди 2 години
родител
ревизия
0926fa5a2c
променени са 10 файла, в които са добавени 602 реда и са изтрити 26 реда
  1. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java
  2. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/RouterDelegationTokenSecretManager.java
  3. 13 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/AbstractRESTRequestInterceptor.java
  4. 211 10
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
  5. 15 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RESTRequestInterceptor.java
  6. 114 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java
  7. 7 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java
  8. 9 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
  9. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java
  10. 210 10
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java

@@ -611,4 +611,16 @@ public class RouterClientRMService extends AbstractService
   public RouterDelegationTokenSecretManager getRouterDTSecretManager() {
     return routerDTSecretManager;
   }
+
+  @VisibleForTesting
+  public void setRouterDTSecretManager(RouterDelegationTokenSecretManager routerDTSecretManager) {
+    this.routerDTSecretManager = routerDTSecretManager;
+  }
+
+  @VisibleForTesting
+  public void initUserPipelineMap(Configuration conf) {
+    int maxCacheSize = conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
+        YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
+    this.userPipelineMap = Collections.synchronizedMap(new LRUCacheHashMap<>(maxCacheSize, true));
+  }
 }

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/RouterDelegationTokenSecretManager.java

@@ -252,6 +252,16 @@ public class RouterDelegationTokenSecretManager
     return allTokens;
   }
 
+  public long getRenewDate(RMDelegationTokenIdentifier ident)
+      throws InvalidToken {
+    DelegationTokenInformation info = currentTokens.get(ident);
+    if (info == null) {
+      throw new InvalidToken("token (" + ident.toString()
+          + ") can't be found in cache");
+    }
+    return info.getRenewDate();
+  }
+
   @Override
   protected synchronized int incrementDelegationTokenSeqNum() {
     return federationFacade.incrementDelegationTokenSeqNum();

+ 13 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/AbstractRESTRequestInterceptor.java

@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.router.webapp;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
+
 import org.apache.hadoop.yarn.server.router.RouterServerUtil;
 
 /**
@@ -32,6 +34,7 @@ public abstract class AbstractRESTRequestInterceptor
   private Configuration conf;
   private RESTRequestInterceptor nextInterceptor;
   private UserGroupInformation user = null;
+  private RouterClientRMService routerClientRMService = null;
 
   /**
    * Sets the {@link RESTRequestInterceptor} in the chain.
@@ -93,4 +96,14 @@ public abstract class AbstractRESTRequestInterceptor
   public UserGroupInformation getUser() {
     return user;
   }
+
+  @Override
+  public RouterClientRMService getRouterClientRMService() {
+    return routerClientRMService;
+  }
+
+  @Override
+  public void setRouterClientRMService(RouterClientRMService routerClientRMService) {
+    this.routerClientRMService = routerClientRMService;
+  }
 }

+ 211 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.router.webapp;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.security.Principal;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -46,11 +47,20 @@ import org.apache.commons.lang3.NotImplementedException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Sets;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -61,6 +71,7 @@ import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
 import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
@@ -107,8 +118,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefin
 import org.apache.hadoop.yarn.server.router.RouterMetrics;
 import org.apache.hadoop.yarn.server.router.RouterServerUtil;
 import org.apache.hadoop.yarn.server.router.clientrm.ClientMethod;
+import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
+import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey;
 import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
@@ -124,6 +138,9 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+import static org.apache.hadoop.yarn.server.router.webapp.RouterWebServiceUtil.extractToken;
+import static org.apache.hadoop.yarn.server.router.webapp.RouterWebServiceUtil.getKerberosUserGroupInformation;
+
 /**
  * Extends the {@code AbstractRESTRequestInterceptor} class and provides an
  * implementation for federation of YARN RM and scaling an application across
@@ -1567,25 +1584,209 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
     throw new RuntimeException("updateAppQueue Failed.");
   }
 
+  /**
+   * This method posts a delegation token from the client.
+   *
+   * @param tokenData the token to delegate. It is a content param.
+   * @param hsr the servlet request.
+   * @return Response containing the status code.
+   * @throws AuthorizationException if Kerberos auth failed.
+   * @throws IOException if the delegation failed.
+   * @throws InterruptedException if interrupted.
+   * @throws Exception in case of bad request.
+   */
   @Override
-  public Response postDelegationToken(DelegationToken tokenData,
-      HttpServletRequest hsr) throws AuthorizationException, IOException,
-      InterruptedException, Exception {
-    throw new NotImplementedException("Code is not implemented");
+  public Response postDelegationToken(DelegationToken tokenData, HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException, Exception {
+
+    if (tokenData == null || hsr == null) {
+      throw new IllegalArgumentException("Parameter error, the tokenData or hsr is null.");
+    }
+
+    try {
+      // get Caller UserGroupInformation
+      Configuration conf = federationFacade.getConf();
+      UserGroupInformation callerUGI = getKerberosUserGroupInformation(conf, hsr);
+
+      // create a delegation token
+      return createDelegationToken(tokenData, callerUGI);
+    } catch (YarnException e) {
+      LOG.error("Create delegation token request failed.", e);
+      return Response.status(Status.FORBIDDEN).entity(e.getMessage()).build();
+    }
+  }
+
+  /**
+   * Create DelegationToken.
+   *
+   * @param dtoken DelegationToken Data.
+   * @param callerUGI UserGroupInformation.
+   * @return Response.
+   * @throws Exception An exception occurred when creating a delegationToken.
+   */
+  private Response createDelegationToken(DelegationToken dtoken, UserGroupInformation callerUGI)
+      throws IOException, InterruptedException {
+
+    String renewer = dtoken.getRenewer();
+
+    GetDelegationTokenResponse resp = callerUGI.doAs(
+        (PrivilegedExceptionAction<GetDelegationTokenResponse>) () -> {
+        GetDelegationTokenRequest createReq = GetDelegationTokenRequest.newInstance(renewer);
+        return this.getRouterClientRMService().getDelegationToken(createReq);
+      });
+
+    DelegationToken respToken = getDelegationToken(renewer, resp);
+    return Response.status(Status.OK).entity(respToken).build();
+  }
+
+  /**
+   * Get DelegationToken.
+   *
+   * @param renewer renewer.
+   * @param resp GetDelegationTokenResponse.
+   * @return DelegationToken.
+   * @throws IOException if there are I/O errors.
+   */
+  private DelegationToken getDelegationToken(String renewer, GetDelegationTokenResponse resp)
+      throws IOException {
+    // Step1. Parse token from GetDelegationTokenResponse.
+    Token<RMDelegationTokenIdentifier> tk = getToken(resp);
+    String tokenKind = tk.getKind().toString();
+    RMDelegationTokenIdentifier tokenIdentifier = tk.decodeIdentifier();
+    String owner = tokenIdentifier.getOwner().toString();
+    long maxDate = tokenIdentifier.getMaxDate();
+
+    // Step2. Call the interface to get the expiration time of Token.
+    RouterClientRMService clientRMService = this.getRouterClientRMService();
+    RouterDelegationTokenSecretManager tokenSecretManager =
+        clientRMService.getRouterDTSecretManager();
+    long currentExpiration = tokenSecretManager.getRenewDate(tokenIdentifier);
+
+    // Step3. Generate Delegation token.
+    DelegationToken delegationToken = new DelegationToken(tk.encodeToUrlString(),
+        renewer, owner, tokenKind, currentExpiration, maxDate);
+
+    return delegationToken;
   }
 
+  /**
+   * GetToken.
+   * We convert RMDelegationToken in GetDelegationTokenResponse to Token.
+   *
+   * @param resp GetDelegationTokenResponse.
+   * @return Token.
+   */
+  private static Token<RMDelegationTokenIdentifier> getToken(GetDelegationTokenResponse resp) {
+    org.apache.hadoop.yarn.api.records.Token token = resp.getRMDelegationToken();
+    byte[] identifier = token.getIdentifier().array();
+    byte[] password = token.getPassword().array();
+    Text kind = new Text(token.getKind());
+    Text service = new Text(token.getService());
+    Token<RMDelegationTokenIdentifier> tk = new Token<>(identifier, password, kind, service);
+    return tk;
+  }
+
+  /**
+   * This method updates the expiration for a delegation token from the client.
+   *
+   * @param hsr the servlet request
+   * @return Response containing the status code.
+   * @throws AuthorizationException if Kerberos auth failed.
+   * @throws IOException if the delegation failed.
+   * @throws InterruptedException  if interrupted.
+   * @throws Exception in case of bad request.
+   */
   @Override
   public Response postDelegationTokenExpiration(HttpServletRequest hsr)
-      throws AuthorizationException, IOException, InterruptedException,
-      Exception {
-    throw new NotImplementedException("Code is not implemented");
+      throws AuthorizationException, IOException, InterruptedException, Exception {
+
+    if (hsr == null) {
+      throw new IllegalArgumentException("Parameter error, the hsr is null.");
+    }
+
+    try {
+      // get Caller UserGroupInformation
+      Configuration conf = federationFacade.getConf();
+      UserGroupInformation callerUGI = getKerberosUserGroupInformation(conf, hsr);
+      return renewDelegationToken(hsr, callerUGI);
+    } catch (YarnException e) {
+      LOG.error("Renew delegation token request failed.", e);
+      return Response.status(Status.FORBIDDEN).entity(e.getMessage()).build();
+    }
   }
 
+  /**
+   * Renew DelegationToken.
+   *
+   * @param hsr HttpServletRequest.
+   * @param callerUGI UserGroupInformation.
+   * @return Response
+   * @throws IOException if there are I/O errors.
+   * @throws InterruptedException if any thread has interrupted.
+   */
+  private Response renewDelegationToken(HttpServletRequest hsr, UserGroupInformation callerUGI)
+      throws IOException, InterruptedException {
+
+    // renew Delegation Token
+    DelegationToken tokenData = new DelegationToken();
+    String encodeToken = extractToken(hsr).encodeToUrlString();
+    tokenData.setToken(encodeToken);
+
+    // Parse token data
+    Token<RMDelegationTokenIdentifier> token = extractToken(tokenData.getToken());
+    org.apache.hadoop.yarn.api.records.Token dToken =
+        BuilderUtils.newDelegationToken(token.getIdentifier(), token.getKind().toString(),
+        token.getPassword(), token.getService().toString());
+
+    // Renew token
+    RenewDelegationTokenResponse resp = callerUGI.doAs(
+        (PrivilegedExceptionAction<RenewDelegationTokenResponse>) () -> {
+        RenewDelegationTokenRequest req = RenewDelegationTokenRequest.newInstance(dToken);
+        return this.getRouterClientRMService().renewDelegationToken(req);
+      });
+
+    // return DelegationToken
+    long renewTime = resp.getNextExpirationTime();
+    DelegationToken respToken = new DelegationToken();
+    respToken.setNextExpirationTime(renewTime);
+    return Response.status(Status.OK).entity(respToken).build();
+  }
+
+  /**
+   * Cancel DelegationToken.
+   *
+   * @param hsr the servlet request
+   * @return  Response containing the status code.
+   * @throws AuthorizationException if Kerberos auth failed.
+   * @throws IOException if the delegation failed.
+   * @throws InterruptedException if interrupted.
+   * @throws Exception in case of bad request.
+   */
   @Override
   public Response cancelDelegationToken(HttpServletRequest hsr)
-      throws AuthorizationException, IOException, InterruptedException,
-      Exception {
-    throw new NotImplementedException("Code is not implemented");
+      throws AuthorizationException, IOException, InterruptedException, Exception {
+    try {
+      // get Caller UserGroupInformation
+      Configuration conf = federationFacade.getConf();
+      UserGroupInformation callerUGI = getKerberosUserGroupInformation(conf, hsr);
+
+      // parse Token Data
+      Token<RMDelegationTokenIdentifier> token = extractToken(hsr);
+      org.apache.hadoop.yarn.api.records.Token dToken = BuilderUtils
+          .newDelegationToken(token.getIdentifier(), token.getKind().toString(),
+          token.getPassword(), token.getService().toString());
+
+      // cancelDelegationToken
+      callerUGI.doAs((PrivilegedExceptionAction<CancelDelegationTokenResponse>) () -> {
+        CancelDelegationTokenRequest req = CancelDelegationTokenRequest.newInstance(dToken);
+        return this.getRouterClientRMService().cancelDelegationToken(req);
+      });
+
+      return Response.status(Status.OK).build();
+    } catch (YarnException e) {
+      LOG.error("Cancel delegation token request failed.", e);
+      return Response.status(Status.FORBIDDEN).entity(e.getMessage()).build();
+    }
   }
 
   @Override

+ 15 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RESTRequestInterceptor.java

@@ -23,6 +23,7 @@ import javax.servlet.http.HttpServletResponse;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServiceProtocol;
+import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
 import org.apache.hadoop.yarn.server.webapp.WebServices;
 import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
@@ -122,4 +123,18 @@ public interface RESTRequestInterceptor
    */
   ContainerInfo getContainer(HttpServletRequest req, HttpServletResponse res,
       String appId, String appAttemptId, String containerId);
+
+  /**
+   * Set RouterClientRMService.
+   *
+   * @param routerClientRMService routerClientRMService.
+   */
+  void setRouterClientRMService(RouterClientRMService routerClientRMService);
+
+  /**
+   * Get RouterClientRMService.
+   *
+   * @return RouterClientRMService
+   */
+  RouterClientRMService getRouterClientRMService();
 }

+ 114 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.router.webapp;
 
 import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT;
 import static javax.servlet.http.HttpServletResponse.SC_OK;
+import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServices.DELEGATION_TOKEN_HEADER;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -43,11 +44,18 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.ResponseBuilder;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationHandler;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
@@ -614,4 +622,110 @@ public final class RouterWebServiceUtil {
     resultNodeLabelInfo.setPartitionInfo(newPartitionInfo);
     return resultNodeLabelInfo;
   }
+
+  /**
+   * initForWritableEndpoints does the init and acls verification for all
+   * writable REST end points.
+   *
+   * @param conf Configuration.
+   * @param callerUGI remote caller who initiated the request.
+   * @throws AuthorizationException in case of no access to perfom this op.
+   */
+  public static void initForWritableEndpoints(Configuration conf, UserGroupInformation callerUGI)
+          throws AuthorizationException {
+    if (callerUGI == null) {
+      String msg = "Unable to obtain user name, user not authenticated";
+      throw new AuthorizationException(msg);
+    }
+
+    if (UserGroupInformation.isSecurityEnabled() && isStaticUser(conf, callerUGI)) {
+      String msg = "The default static user cannot carry out this operation.";
+      throw new ForbiddenException(msg);
+    }
+  }
+
+  /**
+   * Determine whether the user is a static user.
+   *
+   * @param conf Configuration.
+   * @param callerUGI remote caller who initiated the request.
+   * @return true, static user; false, not static user;
+   */
+  private static boolean isStaticUser(Configuration conf, UserGroupInformation callerUGI) {
+    String staticUser = conf.get(CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER,
+            CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER);
+    return staticUser.equals(callerUGI.getUserName());
+  }
+
+  public static void createKerberosUserGroupInformation(HttpServletRequest hsr)
+          throws YarnException {
+    String authType = hsr.getAuthType();
+
+    if (!KerberosAuthenticationHandler.TYPE.equalsIgnoreCase(authType)) {
+      String msg = "Delegation token operations can only be carried out on a "
+              + "Kerberos authenticated channel. Expected auth type is "
+              + KerberosAuthenticationHandler.TYPE + ", got type " + authType;
+      throw new YarnException(msg);
+    }
+
+    Object ugiAttr =
+            hsr.getAttribute(DelegationTokenAuthenticationHandler.DELEGATION_TOKEN_UGI_ATTRIBUTE);
+    if (ugiAttr != null) {
+      String msg = "Delegation token operations cannot be carried out using "
+              + "delegation token authentication.";
+      throw new YarnException(msg);
+    }
+  }
+
+  /**
+   * Parse Token data.
+   *
+   * @param encodedToken tokenData
+   * @return RMDelegationTokenIdentifier.
+   */
+  public static Token<RMDelegationTokenIdentifier> extractToken(String encodedToken) {
+    Token<RMDelegationTokenIdentifier> token = new Token<>();
+    try {
+      token.decodeFromUrlString(encodedToken);
+    } catch (Exception ie) {
+      throw new BadRequestException("Could not decode encoded token");
+    }
+    return token;
+  }
+
+  public static Token<RMDelegationTokenIdentifier> extractToken(HttpServletRequest request) {
+    String encodedToken = request.getHeader(DELEGATION_TOKEN_HEADER);
+    if (encodedToken == null) {
+      String msg = "Header '" + DELEGATION_TOKEN_HEADER
+              + "' containing encoded token not found";
+      throw new BadRequestException(msg);
+    }
+    return extractToken(encodedToken);
+  }
+
+  /**
+   * Get Kerberos UserGroupInformation.
+   *
+   * Parse ugi from hsr and set kerberos authentication attributes.
+   *
+   * @param conf Configuration.
+   * @param request the servlet request.
+   * @return UserGroupInformation.
+   * @throws AuthorizationException if Kerberos auth failed.
+   * @throws YarnException If Authentication Type verification fails.
+   */
+  public static UserGroupInformation getKerberosUserGroupInformation(Configuration conf,
+      HttpServletRequest request) throws AuthorizationException, YarnException {
+    // Parse ugi from hsr And Check ugi as expected.
+    // If ugi is empty or user is a static user, an exception will be thrown.
+    UserGroupInformation callerUGI = RMWebAppUtil.getCallerUserGroupInformation(request, true);
+    initForWritableEndpoints(conf, callerUGI);
+
+    // Set AuthenticationMethod Kerberos for ugi.
+    createKerberosUserGroupInformation(request);
+    callerUGI.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS);
+
+    // return caller UGI
+    return callerUGI;
+  }
 }

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java

@@ -81,6 +81,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesIn
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
 import org.apache.hadoop.yarn.server.router.Router;
 import org.apache.hadoop.yarn.server.router.RouterServerUtil;
+import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
 import org.apache.hadoop.yarn.util.LRUCacheHashMap;
@@ -208,6 +209,8 @@ public class RouterWebServices implements RMWebServiceProtocol {
         RESTRequestInterceptor interceptorChain =
             this.createRequestInterceptorChain();
         interceptorChain.init(user);
+        RouterClientRMService routerClientRMService = router.getClientRMProxyService();
+        interceptorChain.setRouterClientRMService(routerClientRMService);
         chainWrapper.init(interceptorChain);
       } catch (Exception e) {
         LOG.error("Init RESTRequestInterceptor error for user: {}", user, e);
@@ -954,4 +957,8 @@ public class RouterWebServices implements RMWebServiceProtocol {
     RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
     return pipeline.getRootInterceptor().getRMNodeLabels(hsr);
   }
+
+  public Router getRouter() {
+    return router;
+  }
 }

+ 9 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java

@@ -224,17 +224,20 @@ public class TestableFederationClientInterceptor
   public RouterDelegationTokenSecretManager createRouterRMDelegationTokenSecretManager(
       Configuration conf) {
 
-    long secretKeyInterval = conf.getLong(
+    long secretKeyInterval = conf.getTimeDuration(
         YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY,
-        YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
+        YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
 
-    long tokenMaxLifetime = conf.getLong(
+    long tokenMaxLifetime = conf.getTimeDuration(
         YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_KEY,
-        YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
+        YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT,
+        TimeUnit.MILLISECONDS);
 
-    long tokenRenewInterval = conf.getLong(
+    long tokenRenewInterval = conf.getTimeDuration(
         YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
-        YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
+        YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
 
     long removeScanInterval = conf.getTimeDuration(
         YarnConfiguration.RM_DELEGATION_TOKEN_REMOVE_SCAN_INTERVAL_KEY,

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java

@@ -86,6 +86,7 @@ public abstract class BaseRouterWebServicesTest {
 
   @Before
   public void setUp() throws YarnException, IOException {
+
     this.conf = createConfiguration();
 
     router = spy(new Router());

+ 210 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java

@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.HashSet;
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.core.Response;
@@ -92,7 +93,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationListI
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.DelegationToken;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo;
+import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
+import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService.RequestInterceptorChainWrapper;
+import org.apache.hadoop.yarn.server.router.clientrm.TestableFederationClientInterceptor;
+import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
@@ -108,12 +114,23 @@ import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
 import org.apache.hadoop.yarn.util.LRUCacheHashMap;
 import org.apache.hadoop.yarn.util.MonotonicClock;
 import org.apache.hadoop.yarn.util.Times;
+import org.apache.hadoop.yarn.webapp.BadRequestException;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_KEY;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_KEY;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_TOKEN_REMOVE_SCAN_INTERVAL_DEFAULT;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_TOKEN_REMOVE_SCAN_INTERVAL_KEY;
+
 import static org.apache.hadoop.yarn.server.router.webapp.MockDefaultRequestInterceptorREST.DURATION;
 import static org.apache.hadoop.yarn.server.router.webapp.MockDefaultRequestInterceptorREST.NUM_CONTAINERS;
 import static org.mockito.Mockito.mock;
@@ -137,9 +154,10 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
   private MemoryFederationStateStore stateStore;
   private FederationStateStoreTestUtil stateStoreUtil;
   private List<SubClusterId> subClusters;
+  private static final String TEST_RENEWER = "test-renewer";
 
-  @Override
   public void setUp() throws YarnException, IOException {
+
     super.setUpConfig();
     interceptor = new TestableFederationInterceptorREST();
 
@@ -154,17 +172,38 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
 
     subClusters = new ArrayList<>();
 
-    try {
-      for (int i = 0; i < NUM_SUBCLUSTER; i++) {
-        SubClusterId sc = SubClusterId.newInstance(Integer.toString(i));
-        stateStoreUtil.registerSubCluster(sc);
-        subClusters.add(sc);
-      }
-    } catch (YarnException e) {
-      LOG.error(e.getMessage());
-      Assert.fail();
+    for (int i = 0; i < NUM_SUBCLUSTER; i++) {
+      SubClusterId sc = SubClusterId.newInstance(Integer.toString(i));
+      stateStoreUtil.registerSubCluster(sc);
+      subClusters.add(sc);
     }
 
+    RouterClientRMService routerClientRMService = new RouterClientRMService();
+    routerClientRMService.initUserPipelineMap(getConf());
+    long secretKeyInterval = this.getConf().getLong(
+        RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY, RM_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
+    long tokenMaxLifetime = this.getConf().getLong(
+        RM_DELEGATION_TOKEN_MAX_LIFETIME_KEY, RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
+    long tokenRenewInterval = this.getConf().getLong(
+        RM_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
+    long removeScanInterval = this.getConf().getTimeDuration(
+        RM_DELEGATION_TOKEN_REMOVE_SCAN_INTERVAL_KEY,
+        RM_DELEGATION_TOKEN_REMOVE_SCAN_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
+    RouterDelegationTokenSecretManager tokenSecretManager = new RouterDelegationTokenSecretManager(
+        secretKeyInterval, tokenMaxLifetime, tokenRenewInterval, removeScanInterval);
+    tokenSecretManager.startThreads();
+    routerClientRMService.setRouterDTSecretManager(tokenSecretManager);
+
+    TestableFederationClientInterceptor clientInterceptor =
+        new TestableFederationClientInterceptor();
+    clientInterceptor.setConf(this.getConf());
+    clientInterceptor.init(TEST_RENEWER);
+    clientInterceptor.setTokenSecretManager(tokenSecretManager);
+    RequestInterceptorChainWrapper wrapper = new RequestInterceptorChainWrapper();
+    wrapper.init(clientInterceptor);
+    routerClientRMService.getUserPipelineMap().put(TEST_RENEWER, wrapper);
+    interceptor.setRouterClientRMService(routerClientRMService);
+
     for (SubClusterId subCluster : subClusters) {
       SubClusterInfo subClusterInfo = stateStoreUtil.querySubClusterInfo(subCluster);
       interceptor.getOrCreateInterceptorForSubCluster(
@@ -172,6 +211,7 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
     }
 
     interceptor.setupResourceManager();
+
   }
 
   @Override
@@ -1485,4 +1525,164 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
     Assert.assertNotNull(interceptorREST.getClient());
     Assert.assertEquals(webAppAddress, interceptorREST.getWebAppAddress());
   }
+
+  @Test
+  public void testPostDelegationTokenErrorHsr() throws Exception {
+    // Prepare delegationToken data
+    DelegationToken token = new DelegationToken();
+    token.setRenewer(TEST_RENEWER);
+
+    HttpServletRequest request = mock(HttpServletRequest.class);
+
+    // If we don't set token
+    LambdaTestUtils.intercept(IllegalArgumentException.class,
+        "Parameter error, the tokenData or hsr is null.",
+        () -> interceptor.postDelegationToken(null, request));
+
+    // If we don't set hsr
+    LambdaTestUtils.intercept(IllegalArgumentException.class,
+        "Parameter error, the tokenData or hsr is null.",
+        () -> interceptor.postDelegationToken(token, null));
+
+    // If we don't set renewUser, we will get error message.
+    LambdaTestUtils.intercept(AuthorizationException.class,
+        "Unable to obtain user name, user not authenticated",
+        () -> interceptor.postDelegationToken(token, request));
+
+    Principal principal = mock(Principal.class);
+    when(principal.getName()).thenReturn(TEST_RENEWER);
+    when(request.getRemoteUser()).thenReturn(TEST_RENEWER);
+    when(request.getUserPrincipal()).thenReturn(principal);
+
+    // If we don't set the authentication type, we will get error message.
+    Response response = interceptor.postDelegationToken(token, request);
+    Assert.assertNotNull(response);
+    Assert.assertEquals(response.getStatus(), Status.FORBIDDEN.getStatusCode());
+    String errMsg = "Delegation token operations can only be carried out on a " +
+        "Kerberos authenticated channel. Expected auth type is kerberos, got type null";
+    Object entity = response.getEntity();
+    Assert.assertNotNull(entity);
+    Assert.assertTrue(entity instanceof String);
+    String entityMsg = String.valueOf(entity);
+    Assert.assertTrue(errMsg.contains(entityMsg));
+  }
+
+  @Test
+  public void testPostDelegationToken() throws Exception {
+    Long now = Time.now();
+
+    DelegationToken token = new DelegationToken();
+    token.setRenewer(TEST_RENEWER);
+
+    Principal principal = mock(Principal.class);
+    when(principal.getName()).thenReturn(TEST_RENEWER);
+
+    HttpServletRequest request = mock(HttpServletRequest.class);
+    when(request.getRemoteUser()).thenReturn(TEST_RENEWER);
+    when(request.getUserPrincipal()).thenReturn(principal);
+    when(request.getAuthType()).thenReturn("kerberos");
+
+    Response response = interceptor.postDelegationToken(token, request);
+    Assert.assertNotNull(response);
+
+    Object entity = response.getEntity();
+    Assert.assertNotNull(entity);
+    Assert.assertTrue(entity instanceof DelegationToken);
+
+    DelegationToken dtoken = DelegationToken.class.cast(entity);
+    Assert.assertEquals(TEST_RENEWER, dtoken.getRenewer());
+    Assert.assertEquals(TEST_RENEWER, dtoken.getOwner());
+    Assert.assertEquals("RM_DELEGATION_TOKEN", dtoken.getKind());
+    Assert.assertNotNull(dtoken.getToken());
+    Assert.assertTrue(dtoken.getNextExpirationTime() > now);
+  }
+
+  @Test
+  public void testPostDelegationTokenExpirationError() throws Exception {
+
+    // If we don't set hsr
+    LambdaTestUtils.intercept(IllegalArgumentException.class,
+        "Parameter error, the hsr is null.",
+        () -> interceptor.postDelegationTokenExpiration(null));
+
+    Principal principal = mock(Principal.class);
+    when(principal.getName()).thenReturn(TEST_RENEWER);
+
+    HttpServletRequest request = mock(HttpServletRequest.class);
+    when(request.getRemoteUser()).thenReturn(TEST_RENEWER);
+    when(request.getUserPrincipal()).thenReturn(principal);
+    when(request.getAuthType()).thenReturn("kerberos");
+
+    // If we don't set the header.
+    String errorMsg = "Header 'Hadoop-YARN-RM-Delegation-Token' containing encoded token not found";
+    LambdaTestUtils.intercept(BadRequestException.class, errorMsg,
+        () -> interceptor.postDelegationTokenExpiration(request));
+  }
+
+  @Test
+  public void testPostDelegationTokenExpiration() throws Exception {
+
+    DelegationToken token = new DelegationToken();
+    token.setRenewer(TEST_RENEWER);
+
+    Principal principal = mock(Principal.class);
+    when(principal.getName()).thenReturn(TEST_RENEWER);
+
+    HttpServletRequest request = mock(HttpServletRequest.class);
+    when(request.getRemoteUser()).thenReturn(TEST_RENEWER);
+    when(request.getUserPrincipal()).thenReturn(principal);
+    when(request.getAuthType()).thenReturn("kerberos");
+
+    Response response = interceptor.postDelegationToken(token, request);
+    Assert.assertNotNull(response);
+    Object entity = response.getEntity();
+    Assert.assertNotNull(entity);
+    Assert.assertTrue(entity instanceof DelegationToken);
+    DelegationToken dtoken = DelegationToken.class.cast(entity);
+
+    final String yarnTokenHeader = "Hadoop-YARN-RM-Delegation-Token";
+    when(request.getHeader(yarnTokenHeader)).thenReturn(dtoken.getToken());
+
+    Response renewResponse = interceptor.postDelegationTokenExpiration(request);
+    Assert.assertNotNull(renewResponse);
+
+    Object renewEntity = renewResponse.getEntity();
+    Assert.assertNotNull(renewEntity);
+    Assert.assertTrue(renewEntity instanceof DelegationToken);
+
+    // renewDelegation, we only return renewDate, other values are NULL.
+    DelegationToken renewDToken = DelegationToken.class.cast(renewEntity);
+    Assert.assertNull(renewDToken.getRenewer());
+    Assert.assertNull(renewDToken.getOwner());
+    Assert.assertNull(renewDToken.getKind());
+    Assert.assertTrue(renewDToken.getNextExpirationTime() > dtoken.getNextExpirationTime());
+  }
+
+  @Test
+  public void testCancelDelegationToken() throws Exception {
+    DelegationToken token = new DelegationToken();
+    token.setRenewer(TEST_RENEWER);
+
+    Principal principal = mock(Principal.class);
+    when(principal.getName()).thenReturn(TEST_RENEWER);
+
+    HttpServletRequest request = mock(HttpServletRequest.class);
+    when(request.getRemoteUser()).thenReturn(TEST_RENEWER);
+    when(request.getUserPrincipal()).thenReturn(principal);
+    when(request.getAuthType()).thenReturn("kerberos");
+
+    Response response = interceptor.postDelegationToken(token, request);
+    Assert.assertNotNull(response);
+    Object entity = response.getEntity();
+    Assert.assertNotNull(entity);
+    Assert.assertTrue(entity instanceof DelegationToken);
+    DelegationToken dtoken = DelegationToken.class.cast(entity);
+
+    final String yarnTokenHeader = "Hadoop-YARN-RM-Delegation-Token";
+    when(request.getHeader(yarnTokenHeader)).thenReturn(dtoken.getToken());
+
+    Response cancelResponse = interceptor.cancelDelegationToken(request);
+    Assert.assertNotNull(cancelResponse);
+    Assert.assertEquals(response.getStatus(), Status.OK.getStatusCode());
+  }
 }