Browse Source

YARN-355. Fixes a bug where RM app submission could jam under load. Contributed by Daryn Sharp.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1443136 13f79535-47bb-0310-9956-ffa450edef68
Siddharth Seth 12 năm trước cách đây
mục cha
commit
a0f9692082

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -32,6 +32,9 @@ Release 0.23.7 - UNRELEASED
 
     YARN-40. Provide support for missing yarn commands (Devaraj K via tgraves)
 
+    YARN-355. Fixes a bug where RM app submission could jam under load.
+    (Daryn Sharp via sseth)
+
 Release 0.23.6 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 0 - 106
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/RMDelegationTokenRenewer.java

@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.yarn.security;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenRenewer;
-import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
-import org.apache.hadoop.yarn.api.records.DelegationToken;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.hadoop.yarn.util.Records;
-
-public class RMDelegationTokenRenewer extends TokenRenewer {
-
-  private static final Log LOG = LogFactory
-      .getLog(RMDelegationTokenRenewer.class);
-
-  @Override
-  public boolean handleKind(Text kind) {
-    return RMDelegationTokenIdentifier.KIND_NAME.equals(kind);
-  }
-
-  @Override
-  public boolean isManaged(Token<?> token) throws IOException {
-    return true;
-  }
-
-  @Override
-  public long renew(Token<?> token, Configuration conf) throws IOException,
-      InterruptedException {
-
-    ClientRMProtocol rm =
-        getRMClient(conf, SecurityUtil.getTokenServiceAddr(token));
-
-    try {
-      DelegationToken dToken = BuilderUtils.newDelegationToken(
-          token.getIdentifier(), token.getKind().toString(),
-          token.getPassword(), token.getService().toString());
-      RenewDelegationTokenRequest request =
-          Records.newRecord(RenewDelegationTokenRequest.class);
-      request.setDelegationToken(dToken);
-      return rm.renewDelegationToken(request).getNextExpirationTime();
-    } finally {
-      RPC.stopProxy(rm);
-    }
-  }
-
-  @Override
-  public void cancel(Token<?> token, Configuration conf) throws IOException,
-      InterruptedException {
-    ClientRMProtocol rm =
-        getRMClient(conf, SecurityUtil.getTokenServiceAddr(token));
-
-    try {
-      DelegationToken dToken = BuilderUtils.newDelegationToken(
-          token.getIdentifier(), token.getKind().toString(),
-          token.getPassword(), token.getService().toString());
-      CancelDelegationTokenRequest request =
-          Records.newRecord(CancelDelegationTokenRequest.class);
-      request.setDelegationToken(dToken);
-      rm.cancelDelegationToken(request);
-    } finally {
-      RPC.stopProxy(rm);
-    }
-  }
-
-  private ClientRMProtocol getRMClient(Configuration conf,
-      InetSocketAddress rmAddress) {
-    YarnRPC rpc = YarnRPC.create(conf);
-    LOG.debug("Connecting to ResourceManager at " + rmAddress);
-    ClientRMProtocol rm =
-        (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
-            rmAddress, conf);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Connected to ResourceManager at " + rmAddress);
-    }
-    return rm;
-  }
-}

+ 118 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/RMDelegationTokenIdentifier.java

@@ -19,10 +19,30 @@
 package org.apache.hadoop.yarn.security.client;
 
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenRenewer;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.records.DelegationToken;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
 
 /**
  * Delegation Token Identifier that identifies the delegation tokens from the 
@@ -51,4 +71,102 @@ public class RMDelegationTokenIdentifier extends AbstractDelegationTokenIdentifi
   public Text getKind() {
     return KIND_NAME;
   }
+  
+  public static class Renewer extends TokenRenewer {
+
+    @Override
+    public boolean handleKind(Text kind) {
+      return KIND_NAME.equals(kind);
+    }
+
+    @Override
+    public boolean isManaged(Token<?> token) throws IOException {
+      return true;
+    }
+
+    private static
+    AbstractDelegationTokenSecretManager<RMDelegationTokenIdentifier> localSecretManager;
+    private static InetSocketAddress localServiceAddress;
+    
+    @Private
+    public static void setSecretManager(
+        AbstractDelegationTokenSecretManager<RMDelegationTokenIdentifier> secretManager,
+        InetSocketAddress serviceAddress) {
+      localSecretManager = secretManager;
+      localServiceAddress = serviceAddress;
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Override
+    public long renew(Token<?> token, Configuration conf) throws IOException,
+        InterruptedException {
+      final ClientRMProtocol rmClient = getRmClient(token, conf);
+      if (rmClient != null) {
+        try {
+          RenewDelegationTokenRequest request =
+              Records.newRecord(RenewDelegationTokenRequest.class);
+          request.setDelegationToken(convertToProtoToken(token));
+          return rmClient.renewDelegationToken(request).getNextExpirationTime();
+        } finally {
+          RPC.stopProxy(rmClient);
+        }
+      } else {
+        return localSecretManager.renewToken(
+            (Token<RMDelegationTokenIdentifier>)token, getRenewer(token));
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void cancel(Token<?> token, Configuration conf) throws IOException,
+        InterruptedException {
+      final ClientRMProtocol rmClient = getRmClient(token, conf);
+      if (rmClient != null) {
+        try {
+          CancelDelegationTokenRequest request =
+              Records.newRecord(CancelDelegationTokenRequest.class);
+          request.setDelegationToken(convertToProtoToken(token));
+          rmClient.cancelDelegationToken(request);
+        } finally {
+          RPC.stopProxy(rmClient);
+        }
+      } else {
+        localSecretManager.cancelToken(
+            (Token<RMDelegationTokenIdentifier>)token, getRenewer(token));
+      }
+    }
+    
+    private static ClientRMProtocol getRmClient(Token<?> token,
+        Configuration conf) {
+      InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token);
+      if (localSecretManager != null) {
+        // return null if it's our token
+        if (localServiceAddress.getAddress().isAnyLocalAddress()) {
+            if (NetUtils.isLocalAddress(addr.getAddress()) &&
+                addr.getPort() == localServiceAddress.getPort()) {
+              return null;
+            }
+        } else if (addr.equals(localServiceAddress)) {
+          return null;
+        }
+      }
+      final YarnRPC rpc = YarnRPC.create(conf);
+      return (ClientRMProtocol)rpc.getProxy(ClientRMProtocol.class, addr, conf);        
+    }
+    
+    // get renewer so we can always renew our own tokens
+    private static String getRenewer(Token<?> token) throws IOException {
+      RMDelegationTokenIdentifier id = new RMDelegationTokenIdentifier();
+      DataInputStream in = new DataInputStream(
+          new ByteArrayInputStream(token.getIdentifier()));
+      id.readFields(in);
+      return id.getRenewer().toString();
+    }
+
+    private static DelegationToken convertToProtoToken(Token<?> token) {
+      return BuilderUtils.newDelegationToken(
+          token.getIdentifier(), token.getKind().toString(),
+          token.getPassword(), token.getService().toString());
+    }
+  }
 }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer

@@ -13,4 +13,4 @@
 
 org.apache.hadoop.yarn.security.ApplicationTokenIdentifier$Renewer
 org.apache.hadoop.yarn.security.ContainerTokenIdentifier$Renewer
-org.apache.hadoop.yarn.security.RMDelegationTokenRenewer
+org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier$Renewer

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java

@@ -157,6 +157,10 @@ public class ClientRMService extends AbstractService implements
     this.server.start();
     clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS,
                                                server.getListenerAddress());
+    // enable RM to short-circuit token operations directly to itself
+    RMDelegationTokenIdentifier.Renewer.setSecretManager(
+        rmDTSecretManager, clientBindAddress);
+    
     super.start();
   }
 

+ 130 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java

@@ -17,13 +17,12 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
@@ -34,9 +33,15 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ClientRMProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
@@ -46,12 +51,14 @@ import org.apache.hadoop.yarn.api.records.DelegationToken;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ProtoUtils;
 import org.apache.hadoop.yarn.util.Records;
+import org.junit.Before;
 import org.junit.Test;
 
 
@@ -59,6 +66,10 @@ public class TestClientRMTokens {
 
   private static final Log LOG = LogFactory.getLog(TestClientRMTokens.class);
   
+  @Before
+  public void resetSecretManager() {
+    RMDelegationTokenIdentifier.Renewer.setSecretManager(null, null);
+  }
   
   @Test
   public void testDelegationToken() throws IOException, InterruptedException {
@@ -202,7 +213,122 @@ public class TestClientRMTokens {
         RPC.stopProxy(clientRMWithDT);
       }
     }
+  }
+  
+  @Test
+  public void testShortCircuitRenewCancel()
+      throws IOException, InterruptedException {
+    InetSocketAddress addr =
+        new InetSocketAddress(InetAddress.getLocalHost(), 123);    
+    checkShortCircuitRenewCancel(addr, addr, true);
+  }
+
+  @Test
+  public void testShortCircuitRenewCancelWildcardAddress()
+      throws IOException, InterruptedException {
+    InetSocketAddress rmAddr = new InetSocketAddress(123);
+    checkShortCircuitRenewCancel(
+        rmAddr,
+        new InetSocketAddress(InetAddress.getLocalHost(), rmAddr.getPort()),
+        true);
+  }
+
+  @Test
+  public void testShortCircuitRenewCancelSameHostDifferentPort()
+      throws IOException, InterruptedException {
+    InetSocketAddress rmAddr =
+        new InetSocketAddress(InetAddress.getLocalHost(), 123);    
+    checkShortCircuitRenewCancel(
+        rmAddr,
+        new InetSocketAddress(rmAddr.getAddress(), rmAddr.getPort()+1),
+        false);
+  }
+
+  @Test
+  public void testShortCircuitRenewCancelDifferentHostSamePort()
+      throws IOException, InterruptedException {
+    InetSocketAddress rmAddr =
+        new InetSocketAddress(InetAddress.getLocalHost(), 123);    
+    checkShortCircuitRenewCancel(
+        rmAddr,
+        new InetSocketAddress("1.1.1.1", rmAddr.getPort()),
+        false);
+  }
+
+  @Test
+  public void testShortCircuitRenewCancelDifferentHostDifferentPort()
+      throws IOException, InterruptedException {
+    InetSocketAddress rmAddr =
+        new InetSocketAddress(InetAddress.getLocalHost(), 123);    
+    checkShortCircuitRenewCancel(
+        rmAddr,
+        new InetSocketAddress("1.1.1.1", rmAddr.getPort()+1),
+        false);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void checkShortCircuitRenewCancel(InetSocketAddress rmAddr,
+                                            InetSocketAddress serviceAddr,
+                                            boolean shouldShortCircuit
+      ) throws IOException, InterruptedException {
+    Configuration conf = new Configuration();
+    conf.setClass(YarnConfiguration.IPC_RPC_IMPL,
+        YarnBadRPC.class, YarnRPC.class);
     
+    RMDelegationTokenSecretManager secretManager =
+        mock(RMDelegationTokenSecretManager.class);
+    RMDelegationTokenIdentifier.Renewer.setSecretManager(secretManager, rmAddr);
+
+    RMDelegationTokenIdentifier ident = new RMDelegationTokenIdentifier(
+        new Text("owner"), new Text("renewer"), null);
+    Token<RMDelegationTokenIdentifier> token =
+        new Token<RMDelegationTokenIdentifier>(ident, secretManager);
+
+    SecurityUtil.setTokenService(token, serviceAddr);
+    if (shouldShortCircuit) {
+      token.renew(conf);
+      verify(secretManager).renewToken(eq(token), eq("renewer"));
+      reset(secretManager);
+      token.cancel(conf);
+      verify(secretManager).cancelToken(eq(token), eq("renewer"));
+    } else {      
+      try { 
+        token.renew(conf);
+        fail();
+      } catch (RuntimeException e) {
+        assertEquals("getProxy", e.getMessage());
+      }
+      verify(secretManager, never()).renewToken(any(Token.class), anyString());
+      try { 
+        token.cancel(conf);
+        fail();
+      } catch (RuntimeException e) {
+        assertEquals("getProxy", e.getMessage());
+      }
+      verify(secretManager, never()).cancelToken(any(Token.class), anyString());
+    }
+  }
+  
+  @SuppressWarnings("rawtypes")
+  public static class YarnBadRPC extends YarnRPC {
+    @Override
+    public Object getProxy(Class protocol, InetSocketAddress addr,
+        Configuration conf) {
+      throw new RuntimeException("getProxy");
+    }
+
+    @Override
+    public void stopProxy(Object proxy, Configuration conf) {
+      throw new RuntimeException("stopProxy");
+    }
+
+    @Override
+    public Server getServer(Class protocol, Object instance,
+        InetSocketAddress addr, Configuration conf,
+        SecretManager<? extends TokenIdentifier> secretManager,
+        int numHandlers, String portRangeConfig) {
+      throw new RuntimeException("getServer");
+    }
   }
   
   // Get the delegation token directly as it is a little difficult to setup