Browse Source

MAPREDUCE-6324. Fixed MapReduce uber jobs to not fail the udpate of AM-RM tokens when they roll-over. Contributed by Jason Lowe.

(cherry picked from commit 9fc32c5c4d1d5f50c605bdb0e3b13f44c86660c8)
(cherry picked from commit 32dc13d907a416049bdb7deff429725bd6dbcb49)
(cherry picked from commit aad56fe3a2f29e73a013b9afa9b44b151a34e0f3)
Vinod Kumar Vavilapalli 10 năm trước cách đây
mục cha
commit
262275d561

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -32,6 +32,9 @@ Release 2.6.1 - UNRELEASED
     MAPREDUCE-6238. MR2 can't run local jobs with -libjars command options
     which is a regression from MR1 (zxu via rkanter)
 
+    MAPREDUCE-6324. Fixed MapReduce uber jobs to not fail the udpate of AM-RM
+    tokens when they roll-over. (Jason Lowe via vinodkv)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

+ 27 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java

@@ -18,11 +18,13 @@
 
 package org.apache.hadoop.mapreduce.v2.app.local;
 
+import java.io.IOException;
 import java.util.ArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
@@ -35,17 +37,22 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssigned
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
 import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 
 /**
  * Allocates containers locally. Doesn't allocate a real container;
@@ -99,8 +106,9 @@ public class LocalContainerAllocator extends RMCommunicator
         AllocateRequest.newInstance(this.lastResponseID,
           super.getApplicationProgress(), new ArrayList<ResourceRequest>(),
         new ArrayList<ContainerId>(), null);
+    AllocateResponse allocateResponse = null;
     try {
-      scheduler.allocate(allocateRequest);
+      allocateResponse = scheduler.allocate(allocateRequest);
       // Reset retry count if no exception occurred.
       retrystartTime = System.currentTimeMillis();
     } catch (ApplicationAttemptNotFoundException e) {
@@ -131,6 +139,24 @@ public class LocalContainerAllocator extends RMCommunicator
       // continue to attempt to contact the RM.
       throw e;
     }
+
+    if (allocateResponse != null) {
+      this.lastResponseID = allocateResponse.getResponseId();
+      Token token = allocateResponse.getAMRMToken();
+      if (token != null) {
+        updateAMRMToken(token);
+      }
+    }
+  }
+
+  private void updateAMRMToken(Token token) throws IOException {
+    org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken =
+        new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(token
+          .getIdentifier().array(), token.getPassword().array(), new Text(
+          token.getKind()), new Text(token.getService()));
+    UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser();
+    currentUGI.addToken(amrmToken);
+    amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConfig()));
   }
 
   @SuppressWarnings("unchecked")

+ 142 - 10
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java

@@ -22,23 +22,43 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
+import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -48,8 +68,13 @@ public class TestLocalContainerAllocator {
   public void testRMConnectionRetry() throws Exception {
     // verify the connection exception is thrown
     // if we haven't exhausted the retry interval
+    ApplicationMasterProtocol mockScheduler =
+        mock(ApplicationMasterProtocol.class);
+    when(mockScheduler.allocate(isA(AllocateRequest.class)))
+      .thenThrow(RPCUtil.getRemoteException(new IOException("forcefail")));
     Configuration conf = new Configuration();
-    LocalContainerAllocator lca = new StubbedLocalContainerAllocator();
+    LocalContainerAllocator lca =
+        new StubbedLocalContainerAllocator(mockScheduler);
     lca.init(conf);
     lca.start();
     try {
@@ -63,7 +88,7 @@ public class TestLocalContainerAllocator {
 
     // verify YarnRuntimeException is thrown when the retry interval has expired
     conf.setLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, 0);
-    lca = new StubbedLocalContainerAllocator();
+    lca = new StubbedLocalContainerAllocator(mockScheduler);
     lca.init(conf);
     lca.start();
     try {
@@ -76,12 +101,84 @@ public class TestLocalContainerAllocator {
     }
   }
 
+  @Test
+  public void testAllocResponseId() throws Exception {
+    ApplicationMasterProtocol scheduler = new MockScheduler();
+    Configuration conf = new Configuration();
+    LocalContainerAllocator lca =
+        new StubbedLocalContainerAllocator(scheduler);
+    lca.init(conf);
+    lca.start();
+
+    // do two heartbeats to verify the response ID is being tracked
+    lca.heartbeat();
+    lca.heartbeat();
+    lca.close();
+  }
+
+  @Test
+  public void testAMRMTokenUpdate() throws Exception {
+    Configuration conf = new Configuration();
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(1, 1), 1);
+    AMRMTokenIdentifier oldTokenId = new AMRMTokenIdentifier(attemptId, 1);
+    AMRMTokenIdentifier newTokenId = new AMRMTokenIdentifier(attemptId, 2);
+    Token<AMRMTokenIdentifier> oldToken = new Token<AMRMTokenIdentifier>(
+        oldTokenId.getBytes(), "oldpassword".getBytes(), oldTokenId.getKind(),
+        new Text());
+    Token<AMRMTokenIdentifier> newToken = new Token<AMRMTokenIdentifier>(
+        newTokenId.getBytes(), "newpassword".getBytes(), newTokenId.getKind(),
+        new Text());
+
+    MockScheduler scheduler = new MockScheduler();
+    scheduler.amToken = newToken;
+
+    final LocalContainerAllocator lca =
+        new StubbedLocalContainerAllocator(scheduler);
+    lca.init(conf);
+    lca.start();
+
+    UserGroupInformation testUgi = UserGroupInformation.createUserForTesting(
+        "someuser", new String[0]);
+    testUgi.addToken(oldToken);
+    testUgi.doAs(new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            lca.heartbeat();
+            return null;
+          }
+    });
+    lca.close();
+
+    // verify there is only one AMRM token in the UGI and it matches the
+    // updated token from the RM
+    int tokenCount = 0;
+    Token<? extends TokenIdentifier> ugiToken = null;
+    for (Token<? extends TokenIdentifier> token : testUgi.getTokens()) {
+      if (AMRMTokenIdentifier.KIND_NAME.equals(token.getKind())) {
+        ugiToken = token;
+        ++tokenCount;
+      }
+    }
+
+    Assert.assertEquals("too many AMRM tokens", 1, tokenCount);
+    Assert.assertArrayEquals("token identifier not updated",
+        newToken.getIdentifier(), ugiToken.getIdentifier());
+    Assert.assertArrayEquals("token password not updated",
+        newToken.getPassword(), ugiToken.getPassword());
+    Assert.assertEquals("AMRM token service not updated",
+        new Text(ClientRMProxy.getAMRMTokenService(conf)),
+        ugiToken.getService());
+  }
+
   private static class StubbedLocalContainerAllocator
     extends LocalContainerAllocator {
+    private ApplicationMasterProtocol scheduler;
 
-    public StubbedLocalContainerAllocator() {
+    public StubbedLocalContainerAllocator(ApplicationMasterProtocol scheduler) {
       super(mock(ClientService.class), createAppContext(),
           "nmhost", 1, 2, null);
+      this.scheduler = scheduler;
     }
 
     @Override
@@ -99,13 +196,6 @@ public class TestLocalContainerAllocator {
 
     @Override
     protected ApplicationMasterProtocol createSchedulerProxy() {
-      ApplicationMasterProtocol scheduler = mock(ApplicationMasterProtocol.class);
-      try {
-        when(scheduler.allocate(isA(AllocateRequest.class)))
-          .thenThrow(RPCUtil.getRemoteException(new IOException("forcefail")));
-      } catch (YarnException e) {
-      } catch (IOException e) {
-      }
       return scheduler;
     }
 
@@ -126,4 +216,46 @@ public class TestLocalContainerAllocator {
       return ctx;
     }
   }
+
+  private static class MockScheduler implements ApplicationMasterProtocol {
+    int responseId = 0;
+    Token<AMRMTokenIdentifier> amToken = null;
+
+    @Override
+    public RegisterApplicationMasterResponse registerApplicationMaster(
+        RegisterApplicationMasterRequest request) throws YarnException,
+        IOException {
+      return null;
+    }
+
+    @Override
+    public FinishApplicationMasterResponse finishApplicationMaster(
+        FinishApplicationMasterRequest request) throws YarnException,
+        IOException {
+      return null;
+    }
+
+    @Override
+    public AllocateResponse allocate(AllocateRequest request)
+        throws YarnException, IOException {
+      Assert.assertEquals("response ID mismatch",
+          responseId, request.getResponseId());
+      ++responseId;
+      org.apache.hadoop.yarn.api.records.Token yarnToken = null;
+      if (amToken != null) {
+        yarnToken = org.apache.hadoop.yarn.api.records.Token.newInstance(
+            amToken.getIdentifier(), amToken.getKind().toString(),
+            amToken.getPassword(), amToken.getService().toString());
+      }
+      return AllocateResponse.newInstance(responseId,
+          Collections.<ContainerStatus>emptyList(),
+          Collections.<Container>emptyList(),
+          Collections.<NodeReport>emptyList(),
+          Resources.none(), null, 1, null,
+          Collections.<NMToken>emptyList(),
+          yarnToken,
+          Collections.<ContainerResourceIncrease>emptyList(),
+          Collections.<ContainerResourceDecrease>emptyList());
+    }
+  }
 }