Przeglądaj źródła

YARN-10333. YarnClient obtain Delegation Token for Log Aggregation Path. Contributed by Prabhu Joseph.

Sunil G 4 lat temu
rodzic
commit
5dd270e208

+ 13 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml

@@ -38,6 +38,19 @@
       <scope>provided</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>

+ 56 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java

@@ -30,9 +30,12 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Future;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
@@ -131,6 +134,8 @@ import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -314,6 +319,16 @@ public class YarnClientImpl extends YarnClient {
       addTimelineDelegationToken(appContext.getAMContainerSpec());
     }
 
+    // Automatically add the DT for Log Aggregation path
+    // This is useful when a separate storage is used for log aggregation
+    try {
+      if (isSecurityEnabled()) {
+        addLogAggregationDelegationToken(appContext.getAMContainerSpec());
+      }
+    } catch (Exception e) {
+      LOG.warn("Failed to obtain delegation token for Log Aggregation Path", e);
+    }
+
     //TODO: YARN-1763:Handle RM failovers during the submitApplication call.
     rmClient.submitApplication(request);
 
@@ -373,6 +388,47 @@ public class YarnClientImpl extends YarnClient {
     return applicationId;
   }
 
+  private void addLogAggregationDelegationToken(
+      ContainerLaunchContext clc) throws YarnException, IOException {
+    Credentials credentials = new Credentials();
+    DataInputByteBuffer dibb = new DataInputByteBuffer();
+    ByteBuffer tokens = clc.getTokens();
+    if (tokens != null) {
+      dibb.reset(tokens);
+      credentials.readTokenStorageStream(dibb);
+      tokens.rewind();
+    }
+
+    Configuration conf = getConfig();
+    String masterPrincipal = YarnClientUtils.getRmPrincipal(conf);
+    if (StringUtils.isEmpty(masterPrincipal)) {
+      throw new IOException(
+          "Can't get Master Kerberos principal for use as renewer");
+    }
+    LOG.debug("Delegation Token Renewer: " + masterPrincipal);
+
+    LogAggregationFileControllerFactory factory =
+        new LogAggregationFileControllerFactory(conf);
+    LogAggregationFileController fileController =
+        factory.getFileControllerForWrite();
+    Path remoteRootLogDir = fileController.getRemoteRootLogDir();
+    FileSystem fs = remoteRootLogDir.getFileSystem(conf);
+
+    final org.apache.hadoop.security.token.Token<?>[] finalTokens =
+        fs.addDelegationTokens(masterPrincipal, credentials);
+    if (finalTokens != null) {
+      for (org.apache.hadoop.security.token.Token<?> token : finalTokens) {
+        LOG.info("Added delegation token for log aggregation path "
+            + remoteRootLogDir + "; "+token);
+      }
+    }
+
+    DataOutputBuffer dob = new DataOutputBuffer();
+    credentials.writeTokenStorageToStream(dob);
+    tokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+    clc.setTokens(tokens);
+  }
+
   private void addTimelineDelegationToken(
       ContainerLaunchContext clc) throws YarnException, IOException {
     Credentials credentials = new Credentials();

+ 127 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClientImpl.java

@@ -20,6 +20,12 @@ package org.apache.hadoop.yarn.client.api.impl;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
@@ -46,12 +52,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
@@ -63,6 +72,8 @@ import static org.mockito.Mockito.when;
  */
 public class TestYarnClientImpl extends ParameterizedSchedulerTestBase {
 
+  protected static final String YARN_RM = "yarn-rm@EXAMPLE.COM";
+
   public TestYarnClientImpl(SchedulerType type) throws IOException {
     super(type);
   }
@@ -145,6 +156,122 @@ public class TestYarnClientImpl extends ParameterizedSchedulerTestBase {
     }
   }
 
+  // Validates if YarnClientImpl automatically adds HDFS Delegation
+  // token for Log Aggregation Path in a cluster setup with fs.DefaultFS
+  // set to LocalFileSystem and Log Aggregation Path set to HDFS.
+  @Test
+  public void testAutomaitcLogAggregationDelegationToken()
+      throws Exception {
+    Configuration conf = getConf();
+    SecurityUtil.setAuthenticationMethod(
+        UserGroupInformation.AuthenticationMethod.KERBEROS, conf);
+    conf.set(YarnConfiguration.RM_PRINCIPAL, YARN_RM);
+    String remoteRootLogPath = "/tmp/app-logs";
+
+    MiniDFSCluster hdfsCluster = null;
+    try {
+      // Step 1: Start a MiniDFSCluster for Log Aggregation Path
+      HdfsConfiguration hdfsConfig = new HdfsConfiguration();
+      hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig)
+          .numDataNodes(1).build();
+
+      Path remoteRootLogDir = new Path(remoteRootLogPath);
+
+      FileSystem fs = hdfsCluster.getFileSystem();
+      fs.mkdirs(remoteRootLogDir);
+      conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+          fs.getFileStatus(remoteRootLogDir).getPath().toString());
+
+      // Step 2: Prepare a Mock FileSystem which returns Delegation Token
+      // when YarnClientImpl invokes
+      DelegationTokenIdentifier hdfsDT = new DelegationTokenIdentifier(new Text(
+          "test"), new Text(YARN_RM), null);
+      final Token<DelegationTokenIdentifier> dToken =
+          new Token<>(hdfsDT.getBytes(), new byte[0], hdfsDT.getKind(),
+          new Text());
+
+      FileSystem mockFs = mock(FileSystem.class);
+      doAnswer(new Answer<Token<?>[]>() {
+        @Override
+        public Token<?>[] answer(InvocationOnMock invocation) {
+          Object[] args = invocation.getArguments();
+          ((Credentials) args[1]).addToken(hdfsDT.getKind(), dToken);
+          return new Token[]{dToken};
+        }
+      }).when(mockFs).addDelegationTokens(any(), any());
+
+      FileSystemTestHelper.addFileSystemForTesting(fs.getUri(),
+          hdfsConfig, mockFs);
+
+      // Step 3: Prepare a Mock YarnClientImpl
+      YarnClientImpl client = spy(new YarnClientImpl() {
+
+        @Override
+        protected void serviceStart() {
+          rmClient = mock(ApplicationClientProtocol.class);
+        }
+
+        @Override
+        protected void serviceStop() {
+        }
+
+        @Override
+        public ApplicationReport getApplicationReport(ApplicationId appId) {
+          ApplicationReport report = mock(ApplicationReport.class);
+          when(report.getYarnApplicationState())
+              .thenReturn(YarnApplicationState.RUNNING);
+          return report;
+        }
+
+        @Override
+        public boolean isSecurityEnabled() {
+          return true;
+        }
+      });
+
+      client.init(conf);
+      client.start();
+
+      // Step 4: Prepare a ApplicationSubmissionContext and submit the app
+      ApplicationSubmissionContext context =
+          mock(ApplicationSubmissionContext.class);
+      ApplicationId applicationId = ApplicationId.newInstance(0, 1);
+      when(context.getApplicationId()).thenReturn(applicationId);
+
+      DataOutputBuffer dob = new DataOutputBuffer();
+      Credentials credentials = new Credentials();
+      credentials.writeTokenStorageToStream(dob);
+      ByteBuffer tokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+
+      ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
+          null, null, null, null, tokens, null);
+      when(context.getAMContainerSpec()).thenReturn(clc);
+
+      client.submitApplication(context);
+
+      // Step 5: Verify automatic addition of HDFS DT for log aggregation path
+      credentials = new Credentials();
+      DataInputByteBuffer dibb = new DataInputByteBuffer();
+      tokens = clc.getTokens();
+      if (tokens != null) {
+        dibb.reset(tokens);
+        credentials.readTokenStorageStream(dibb);
+        tokens.rewind();
+      }
+      Collection<Token<? extends TokenIdentifier>> dTokens =
+           credentials.getAllTokens();
+      Assert.assertEquals("Failed to place token for Log Aggregation Path",
+          1, dTokens.size());
+      Assert.assertEquals("Wrong Token for Log Aggregation",
+          hdfsDT.getKind(), dTokens.iterator().next().getKind());
+
+    } finally {
+      if (hdfsCluster != null) {
+        hdfsCluster.shutdown();
+      }
+    }
+  }
+
   @Test
   public void testAutomaticTimelineDelegationTokenLoading()
           throws Exception {