소스 검색

merge YARN-355 from trunk. Fixes a bug where RM app submission could jam under load. Contributed by Daryn Sharp.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1443134 13f79535-47bb-0310-9956-ffa450edef68
Siddharth Seth 12 년 전
부모
커밋
b887c9e8f5

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -216,6 +216,9 @@ Release 2.0.3-alpha - 2013-02-06
     YARN-370. Fix SchedulerUtils to correctly round up the resource for
     containers. (Zhijie Shen via acmurthy) 
 
+    YARN-355. Fixes a bug where RM app submission could jam under load.
+    (Daryn Sharp via sseth)
+
 Release 2.0.2-alpha - 2012-09-07 
 
     YARN-9. Rename YARN_HOME to HADOOP_YARN_HOME. (vinodkv via acmurthy)

+ 0 - 28
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java

@@ -25,13 +25,11 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -47,8 +45,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -199,30 +195,6 @@ public class YarnClientImpl extends AbstractService implements YarnClient {
   }
 
 
-  // Not part of YarnClient itself. Placed in YarnClientImpl while renew/cancel
-  // are part of ClientRMProtocol.
-  @Private
-  public long renewRMDelegationToken(DelegationToken rmToken)
-      throws YarnRemoteException {
-    RenewDelegationTokenRequest request = Records
-        .newRecord(RenewDelegationTokenRequest.class);
-    request.setDelegationToken(rmToken);
-    RenewDelegationTokenResponse response = rmClient
-        .renewDelegationToken(request);
-    return response.getNextExpirationTime();
-  }
-
-  // Not part of YarnClient itself. Placed in YarnClientImpl while renew/cancel
-  // are part of ClietnRMProtocol
-  @Private
-  public void cancelRMDelegationToken(DelegationToken rmToken)
-      throws YarnRemoteException {
-    CancelDelegationTokenRequest request = Records
-        .newRecord(CancelDelegationTokenRequest.class);
-    request.setDelegationToken(rmToken);
-    rmClient.cancelDelegationToken(request);
-  }
-
   private GetQueueInfoRequest
       getQueueInfoRequest(String queueName, boolean includeApplications,
           boolean includeChildQueues, boolean recursive) {

+ 0 - 83
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/security/RMDelegationTokenRenewer.java

@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.yarn.security;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenRenewer;
-import org.apache.hadoop.yarn.api.records.DelegationToken;
-import org.apache.hadoop.yarn.client.YarnClientImpl;
-import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-
-public class RMDelegationTokenRenewer extends TokenRenewer {
-
-  @Override
-  public boolean handleKind(Text kind) {
-    return RMDelegationTokenIdentifier.KIND_NAME.equals(kind);
-  }
-
-  @Override
-  public boolean isManaged(Token<?> token) throws IOException {
-    return true;
-  }
-
-  @Override
-  public long renew(Token<?> token, Configuration conf) throws IOException,
-      InterruptedException {
-    YarnClientImpl yarnClient = getYarnClient(conf,
-        SecurityUtil.getTokenServiceAddr(token));
-    try {
-      DelegationToken dToken = BuilderUtils.newDelegationToken(
-          token.getIdentifier(), token.getKind().toString(),
-          token.getPassword(), token.getService().toString());
-      return yarnClient.renewRMDelegationToken(dToken);
-    } finally {
-      yarnClient.stop();
-    }
-  }
-
-  @Override
-  public void cancel(Token<?> token, Configuration conf) throws IOException,
-      InterruptedException {
-    YarnClientImpl yarnClient = getYarnClient(conf,
-        SecurityUtil.getTokenServiceAddr(token));
-    try {
-      DelegationToken dToken = BuilderUtils.newDelegationToken(
-          token.getIdentifier(), token.getKind().toString(),
-          token.getPassword(), token.getService().toString());
-      yarnClient.cancelRMDelegationToken(dToken);
-      return;
-    } finally {
-      yarnClient.stop();
-    }
-  }
-
-  private YarnClientImpl getYarnClient(Configuration conf,
-      InetSocketAddress rmAddress) {
-    YarnClientImpl yarnClient = new YarnClientImpl(rmAddress);
-    yarnClient.init(conf);
-    yarnClient.start();
-    return yarnClient;
-  }
-}

+ 0 - 14
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer

@@ -1,14 +0,0 @@
-#
-#   Licensed under the Apache License, Version 2.0 (the "License");
-#   you may not use this file except in compliance with the License.
-#   You may obtain a copy of the License at
-#
-#       http://www.apache.org/licenses/LICENSE-2.0
-#
-#   Unless required by applicable law or agreed to in writing, software
-#   distributed under the License is distributed on an "AS IS" BASIS,
-#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#   See the License for the specific language governing permissions and
-#   limitations under the License.
-#
-org.apache.hadoop.yarn.security.RMDelegationTokenRenewer;

+ 114 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java

@@ -19,10 +19,28 @@
 package org.apache.hadoop.yarn.security.client;
 
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenRenewer;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.records.DelegationToken;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
 
 /**
  * Delegation Token Identifier that identifies the delegation tokens from the 
@@ -51,4 +69,100 @@ public class RMDelegationTokenIdentifier extends AbstractDelegationTokenIdentifi
   public Text getKind() {
     return KIND_NAME;
   }
+  
+  public static class Renewer extends TokenRenewer {
+
+    @Override
+    public boolean handleKind(Text kind) {
+      return KIND_NAME.equals(kind);
+    }
+
+    @Override
+    public boolean isManaged(Token<?> token) throws IOException {
+      return true;
+    }
+
+    private static
+    AbstractDelegationTokenSecretManager<RMDelegationTokenIdentifier> localSecretManager;
+    private static InetSocketAddress localServiceAddress;
+    
+    @Private
+    public static void setSecretManager(
+        AbstractDelegationTokenSecretManager<RMDelegationTokenIdentifier> secretManager,
+        InetSocketAddress serviceAddress) {
+      localSecretManager = secretManager;
+      localServiceAddress = serviceAddress;
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Override
+    public long renew(Token<?> token, Configuration conf) throws IOException,
+        InterruptedException {
+      final ClientRMProtocol rmClient = getRmClient(token, conf);
+      if (rmClient != null) {
+        try {
+          RenewDelegationTokenRequest request =
+              Records.newRecord(RenewDelegationTokenRequest.class);
+          request.setDelegationToken(convertToProtoToken(token));
+          return rmClient.renewDelegationToken(request).getNextExpirationTime();
+        } finally {
+          RPC.stopProxy(rmClient);
+        }
+      } else {
+        return localSecretManager.renewToken(
+            (Token<RMDelegationTokenIdentifier>)token, getRenewer(token));
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void cancel(Token<?> token, Configuration conf) throws IOException,
+        InterruptedException {
+      final ClientRMProtocol rmClient = getRmClient(token, conf);
+      if (rmClient != null) {
+        try {
+          CancelDelegationTokenRequest request =
+              Records.newRecord(CancelDelegationTokenRequest.class);
+          request.setDelegationToken(convertToProtoToken(token));
+          rmClient.cancelDelegationToken(request);
+        } finally {
+          RPC.stopProxy(rmClient);
+        }
+      } else {
+        localSecretManager.cancelToken(
+            (Token<RMDelegationTokenIdentifier>)token, getRenewer(token));
+      }
+    }
+    
+    private static ClientRMProtocol getRmClient(Token<?> token,
+        Configuration conf) {
+      InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token);
+      if (localSecretManager != null) {
+        // return null if it's our token
+        if (localServiceAddress.getAddress().isAnyLocalAddress()) {
+            if (NetUtils.isLocalAddress(addr.getAddress()) &&
+                addr.getPort() == localServiceAddress.getPort()) {
+              return null;
+            }
+        } else if (addr.equals(localServiceAddress)) {
+          return null;
+        }
+      }
+      final YarnRPC rpc = YarnRPC.create(conf);
+      return (ClientRMProtocol)rpc.getProxy(ClientRMProtocol.class, addr, conf);        
+    }
+
+    // get renewer so we can always renew our own tokens
+    @SuppressWarnings("unchecked")
+    private static String getRenewer(Token<?> token) throws IOException {
+      return ((Token<RMDelegationTokenIdentifier>)token).decodeIdentifier()
+          .getRenewer().toString();
+    }
+    
+    private static DelegationToken convertToProtoToken(Token<?> token) {
+      return BuilderUtils.newDelegationToken(
+          token.getIdentifier(), token.getKind().toString(),
+          token.getPassword(), token.getService().toString());
+    }
+  }
 }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer

@@ -13,3 +13,4 @@
 #
 org.apache.hadoop.yarn.security.ApplicationTokenIdentifier$Renewer
 org.apache.hadoop.yarn.security.ContainerTokenIdentifier$Renewer
+org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier$Renewer

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java

@@ -157,6 +157,10 @@ public class ClientRMService extends AbstractService implements
     this.server.start();
     clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS,
                                                server.getListenerAddress());
+    // enable RM to short-circuit token operations directly to itself
+    RMDelegationTokenIdentifier.Renewer.setSecretManager(
+        rmDTSecretManager, clientBindAddress);
+    
     super.start();
   }
 

+ 130 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java

@@ -17,13 +17,12 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
@@ -34,9 +33,15 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
@@ -46,12 +51,14 @@ import org.apache.hadoop.yarn.api.records.DelegationToken;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ProtoUtils;
 import org.apache.hadoop.yarn.util.Records;
+import org.junit.Before;
 import org.junit.Test;
 
 
@@ -59,6 +66,10 @@ public class TestClientRMTokens {
 
   private static final Log LOG = LogFactory.getLog(TestClientRMTokens.class);
   
+  @Before
+  public void resetSecretManager() {
+    RMDelegationTokenIdentifier.Renewer.setSecretManager(null, null);
+  }
   
   @Test
   public void testDelegationToken() throws IOException, InterruptedException {
@@ -200,7 +211,122 @@ public class TestClientRMTokens {
         RPC.stopProxy(clientRMWithDT);
       }
     }
+  }
+  
+  @Test
+  public void testShortCircuitRenewCancel()
+      throws IOException, InterruptedException {
+    InetSocketAddress addr =
+        new InetSocketAddress(InetAddress.getLocalHost(), 123);    
+    checkShortCircuitRenewCancel(addr, addr, true);
+  }
+
+  @Test
+  public void testShortCircuitRenewCancelWildcardAddress()
+      throws IOException, InterruptedException {
+    InetSocketAddress rmAddr = new InetSocketAddress(123);
+    checkShortCircuitRenewCancel(
+        rmAddr,
+        new InetSocketAddress(InetAddress.getLocalHost(), rmAddr.getPort()),
+        true);
+  }
+
+  @Test
+  public void testShortCircuitRenewCancelSameHostDifferentPort()
+      throws IOException, InterruptedException {
+    InetSocketAddress rmAddr =
+        new InetSocketAddress(InetAddress.getLocalHost(), 123);    
+    checkShortCircuitRenewCancel(
+        rmAddr,
+        new InetSocketAddress(rmAddr.getAddress(), rmAddr.getPort()+1),
+        false);
+  }
+
+  @Test
+  public void testShortCircuitRenewCancelDifferentHostSamePort()
+      throws IOException, InterruptedException {
+    InetSocketAddress rmAddr =
+        new InetSocketAddress(InetAddress.getLocalHost(), 123);    
+    checkShortCircuitRenewCancel(
+        rmAddr,
+        new InetSocketAddress("1.1.1.1", rmAddr.getPort()),
+        false);
+  }
+
+  @Test
+  public void testShortCircuitRenewCancelDifferentHostDifferentPort()
+      throws IOException, InterruptedException {
+    InetSocketAddress rmAddr =
+        new InetSocketAddress(InetAddress.getLocalHost(), 123);    
+    checkShortCircuitRenewCancel(
+        rmAddr,
+        new InetSocketAddress("1.1.1.1", rmAddr.getPort()+1),
+        false);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void checkShortCircuitRenewCancel(InetSocketAddress rmAddr,
+                                            InetSocketAddress serviceAddr,
+                                            boolean shouldShortCircuit
+      ) throws IOException, InterruptedException {
+    Configuration conf = new Configuration();
+    conf.setClass(YarnConfiguration.IPC_RPC_IMPL,
+        YarnBadRPC.class, YarnRPC.class);
     
+    RMDelegationTokenSecretManager secretManager =
+        mock(RMDelegationTokenSecretManager.class);
+    RMDelegationTokenIdentifier.Renewer.setSecretManager(secretManager, rmAddr);
+
+    RMDelegationTokenIdentifier ident = new RMDelegationTokenIdentifier(
+        new Text("owner"), new Text("renewer"), null);
+    Token<RMDelegationTokenIdentifier> token =
+        new Token<RMDelegationTokenIdentifier>(ident, secretManager);
+
+    SecurityUtil.setTokenService(token, serviceAddr);
+    if (shouldShortCircuit) {
+      token.renew(conf);
+      verify(secretManager).renewToken(eq(token), eq("renewer"));
+      reset(secretManager);
+      token.cancel(conf);
+      verify(secretManager).cancelToken(eq(token), eq("renewer"));
+    } else {      
+      try { 
+        token.renew(conf);
+        fail();
+      } catch (RuntimeException e) {
+        assertEquals("getProxy", e.getMessage());
+      }
+      verify(secretManager, never()).renewToken(any(Token.class), anyString());
+      try { 
+        token.cancel(conf);
+        fail();
+      } catch (RuntimeException e) {
+        assertEquals("getProxy", e.getMessage());
+      }
+      verify(secretManager, never()).cancelToken(any(Token.class), anyString());
+    }
+  }
+  
+  @SuppressWarnings("rawtypes")
+  public static class YarnBadRPC extends YarnRPC {
+    @Override
+    public Object getProxy(Class protocol, InetSocketAddress addr,
+        Configuration conf) {
+      throw new RuntimeException("getProxy");
+    }
+
+    @Override
+    public void stopProxy(Object proxy, Configuration conf) {
+      throw new RuntimeException("stopProxy");
+    }
+
+    @Override
+    public Server getServer(Class protocol, Object instance,
+        InetSocketAddress addr, Configuration conf,
+        SecretManager<? extends TokenIdentifier> secretManager,
+        int numHandlers, String portRangeConfig) {
+      throw new RuntimeException("getServer");
+    }
   }
   
   // Get the delegation token directly as it is a little difficult to setup