Browse Source

YARN-8512. ATSv2 entities are not published to HBase from second attempt onwards. Contributed by Rohith Sharma K S.

(cherry picked from commit 7f1d3d0e9dbe328fae0d43421665e0b6907b33fe)
Sunil G 6 years ago
parent
commit
9b4ead92c8

+ 51 - 18
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java

@@ -1102,24 +1102,8 @@ public class ContainerManagerImpl extends CompositeService implements
           // Create the application
           // Create the application
           // populate the flow context from the launch context if the timeline
           // populate the flow context from the launch context if the timeline
           // service v.2 is enabled
           // service v.2 is enabled
-          FlowContext flowContext = null;
-          if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
-            String flowName = launchContext.getEnvironment()
-                .get(TimelineUtils.FLOW_NAME_TAG_PREFIX);
-            String flowVersion = launchContext.getEnvironment()
-                .get(TimelineUtils.FLOW_VERSION_TAG_PREFIX);
-            String flowRunIdStr = launchContext.getEnvironment()
-                .get(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
-            long flowRunId = 0L;
-            if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
-              flowRunId = Long.parseLong(flowRunIdStr);
-            }
-            flowContext = new FlowContext(flowName, flowVersion, flowRunId);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Flow context: " + flowContext
-                  + " created for an application " + applicationID);
-            }
-          }
+          FlowContext flowContext =
+              getFlowContext(launchContext, applicationID);
 
 
           Application application =
           Application application =
               new ApplicationImpl(dispatcher, user, flowContext,
               new ApplicationImpl(dispatcher, user, flowContext,
@@ -1138,6 +1122,31 @@ public class ContainerManagerImpl extends CompositeService implements
             dispatcher.getEventHandler().handle(new ApplicationInitEvent(
             dispatcher.getEventHandler().handle(new ApplicationInitEvent(
                 applicationID, appAcls, logAggregationContext));
                 applicationID, appAcls, logAggregationContext));
           }
           }
+        } else if (containerTokenIdentifier.getContainerType()
+            == ContainerType.APPLICATION_MASTER) {
+          FlowContext flowContext =
+              getFlowContext(launchContext, applicationID);
+          if (flowContext != null) {
+            ApplicationImpl application =
+                (ApplicationImpl) context.getApplications().get(applicationID);
+
+            // update flowContext reference in ApplicationImpl
+            application.setFlowContext(flowContext);
+
+            // Required to update state store for recovery.
+            context.getNMStateStore().storeApplication(applicationID,
+                buildAppProto(applicationID, user, credentials,
+                    container.getLaunchContext().getApplicationACLs(),
+                    containerTokenIdentifier.getLogAggregationContext(),
+                    flowContext));
+
+            LOG.info(
+                "Updated application reference with flowContext " + flowContext
+                    + " for app " + applicationID);
+          } else {
+            LOG.info("TimelineService V2.0 is not enabled. Skipping updating "
+                + "flowContext for application " + applicationID);
+          }
         }
         }
 
 
         this.context.getNMStateStore().storeContainer(containerId,
         this.context.getNMStateStore().storeContainer(containerId,
@@ -1163,6 +1172,30 @@ public class ContainerManagerImpl extends CompositeService implements
     }
     }
   }
   }
 
 
+  private FlowContext getFlowContext(ContainerLaunchContext launchContext,
+      ApplicationId applicationID) {
+    FlowContext flowContext = null;
+    if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
+      String flowName = launchContext.getEnvironment()
+          .get(TimelineUtils.FLOW_NAME_TAG_PREFIX);
+      String flowVersion = launchContext.getEnvironment()
+          .get(TimelineUtils.FLOW_VERSION_TAG_PREFIX);
+      String flowRunIdStr = launchContext.getEnvironment()
+          .get(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
+      long flowRunId = 0L;
+      if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
+        flowRunId = Long.parseLong(flowRunIdStr);
+      }
+      flowContext = new FlowContext(flowName, flowVersion, flowRunId);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "Flow context: " + flowContext + " created for an application "
+                + applicationID);
+      }
+    }
+    return flowContext;
+  }
+
   protected ContainerTokenIdentifier verifyAndGetContainerTokenIdentifier(
   protected ContainerTokenIdentifier verifyAndGetContainerTokenIdentifier(
       org.apache.hadoop.yarn.api.records.Token token,
       org.apache.hadoop.yarn.api.records.Token token,
       ContainerTokenIdentifier containerTokenIdentifier) throws YarnException,
       ContainerTokenIdentifier containerTokenIdentifier) throws YarnException,

+ 6 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java

@@ -25,6 +25,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
@@ -66,7 +68,6 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
-import com.google.common.annotations.VisibleForTesting;
 
 
 /**
 /**
  * The state machine for the representation of an Application
  * The state machine for the representation of an Application
@@ -688,4 +689,8 @@ public class ApplicationImpl implements Application {
   public long getFlowRunId() {
   public long getFlowRunId() {
     return flowContext == null ? 0L : flowContext.getFlowRunId();
     return flowContext == null ? 0L : flowContext.getFlowRunId();
   }
   }
+
+  public void setFlowContext(FlowContext fc) {
+    this.flowContext = fc;
+  }
 }
 }

+ 25 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java

@@ -428,6 +428,16 @@ public abstract class BaseContainerManagerTest {
         containerTokenSecretManager, logAggregationContext);
         containerTokenSecretManager, logAggregationContext);
   }
   }
 
 
+  public static Token createContainerToken(ContainerId cId, long rmIdentifier,
+      NodeId nodeId, String user,
+      NMContainerTokenSecretManager containerTokenSecretManager,
+      LogAggregationContext logAggregationContext, ContainerType containerType)
+      throws IOException {
+    Resource r = BuilderUtils.newResource(1024, 1);
+    return createContainerToken(cId, rmIdentifier, nodeId, user, r,
+        containerTokenSecretManager, logAggregationContext, containerType);
+  }
+
   public static Token createContainerToken(ContainerId cId, long rmIdentifier,
   public static Token createContainerToken(ContainerId cId, long rmIdentifier,
       NodeId nodeId, String user, Resource resource,
       NodeId nodeId, String user, Resource resource,
       NMContainerTokenSecretManager containerTokenSecretManager,
       NMContainerTokenSecretManager containerTokenSecretManager,
@@ -442,6 +452,21 @@ public abstract class BaseContainerManagerTest {
             containerTokenIdentifier);
             containerTokenIdentifier);
   }
   }
 
 
+  public static Token createContainerToken(ContainerId cId, long rmIdentifier,
+      NodeId nodeId, String user, Resource resource,
+      NMContainerTokenSecretManager containerTokenSecretManager,
+      LogAggregationContext logAggregationContext, ContainerType continerType)
+      throws IOException {
+    ContainerTokenIdentifier containerTokenIdentifier =
+        new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource,
+            System.currentTimeMillis() + 100000L, 123, rmIdentifier,
+            Priority.newInstance(0), 0, logAggregationContext, null,
+            continerType);
+    return BuilderUtils.newContainerToken(nodeId,
+        containerTokenSecretManager.retrievePassword(containerTokenIdentifier),
+        containerTokenIdentifier);
+  }
+
   public static Token createContainerToken(ContainerId cId, int version,
   public static Token createContainerToken(ContainerId cId, int version,
       long rmIdentifier, NodeId nodeId, String user, Resource resource,
       long rmIdentifier, NodeId nodeId, String user, Resource resource,
       NMContainerTokenSecretManager containerTokenSecretManager,
       NMContainerTokenSecretManager containerTokenSecretManager,

+ 98 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mock;
@@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
@@ -205,7 +207,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
           "includePatternInRollingAggregation",
           "includePatternInRollingAggregation",
           "excludePatternInRollingAggregation");
           "excludePatternInRollingAggregation");
    StartContainersResponse startResponse = startContainer(context, cm, cid,
    StartContainersResponse startResponse = startContainer(context, cm, cid,
-        clc, logAggregationContext);
+        clc, logAggregationContext, ContainerType.TASK);
     assertTrue(startResponse.getFailedRequests().isEmpty());
     assertTrue(startResponse.getFailedRequests().isEmpty());
     assertEquals(1, context.getApplications().size());
     assertEquals(1, context.getApplications().size());
     Application app = context.getApplications().get(appId);
     Application app = context.getApplications().get(appId);
@@ -342,7 +344,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
         null, null);
         null, null);
 
 
     StartContainersResponse startResponse = startContainer(context, cm, cid,
     StartContainersResponse startResponse = startContainer(context, cm, cid,
-        clc, null);
+        clc, null, ContainerType.TASK);
     assertTrue(startResponse.getFailedRequests().isEmpty());
     assertTrue(startResponse.getFailedRequests().isEmpty());
     assertEquals(1, context.getApplications().size());
     assertEquals(1, context.getApplications().size());
     Application app = context.getApplications().get(appId);
     Application app = context.getApplications().get(appId);
@@ -579,7 +581,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
     cm.init(conf);
     cm.init(conf);
     cm.start();
     cm.start();
     StartContainersResponse startResponse = startContainer(context, cm, cid,
     StartContainersResponse startResponse = startContainer(context, cm, cid,
-        clc, logAggregationContext);
+        clc, logAggregationContext, ContainerType.TASK);
     assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
     assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
     cm.stop();
     cm.stop();
     verify(cm).handle(isA(CMgrCompletedAppsEvent.class));
     verify(cm).handle(isA(CMgrCompletedAppsEvent.class));
@@ -595,7 +597,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
     cm.init(conf);
     cm.init(conf);
     cm.start();
     cm.start();
     startResponse = startContainer(context, cm, cid,
     startResponse = startContainer(context, cm, cid,
-        clc, logAggregationContext);
+        clc, logAggregationContext, ContainerType.TASK);
     assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
     assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
     cm.stop();
     cm.stop();
     memStore.close();
     memStore.close();
@@ -612,7 +614,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
     cm.init(conf);
     cm.init(conf);
     cm.start();
     cm.start();
     startResponse = startContainer(context, cm, cid,
     startResponse = startContainer(context, cm, cid,
-        clc, logAggregationContext);
+        clc, logAggregationContext, ContainerType.TASK);
     assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
     assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
     cm.stop();
     cm.stop();
     memStore.close();
     memStore.close();
@@ -661,7 +663,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
         localResources, containerEnv, commands, serviceData,
         localResources, containerEnv, commands, serviceData,
         containerTokens, acls);
         containerTokens, acls);
     StartContainersResponse startResponse = startContainer(
     StartContainersResponse startResponse = startContainer(
-        context, cm, cid, clc, null);
+        context, cm, cid, clc, null, ContainerType.TASK);
     assertTrue(startResponse.getFailedRequests().isEmpty());
     assertTrue(startResponse.getFailedRequests().isEmpty());
     assertEquals(1, context.getApplications().size());
     assertEquals(1, context.getApplications().size());
     // make sure the container reaches RUNNING state
     // make sure the container reaches RUNNING state
@@ -736,14 +738,15 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
 
 
   private StartContainersResponse startContainer(Context context,
   private StartContainersResponse startContainer(Context context,
       final ContainerManagerImpl cm, ContainerId cid,
       final ContainerManagerImpl cm, ContainerId cid,
-      ContainerLaunchContext clc, LogAggregationContext logAggregationContext)
+      ContainerLaunchContext clc, LogAggregationContext logAggregationContext,
+      ContainerType containerType)
           throws Exception {
           throws Exception {
     UserGroupInformation user = UserGroupInformation.createRemoteUser(
     UserGroupInformation user = UserGroupInformation.createRemoteUser(
         cid.getApplicationAttemptId().toString());
         cid.getApplicationAttemptId().toString());
     StartContainerRequest scReq = StartContainerRequest.newInstance(
     StartContainerRequest scReq = StartContainerRequest.newInstance(
         clc, TestContainerManager.createContainerToken(cid, 0,
         clc, TestContainerManager.createContainerToken(cid, 0,
             context.getNodeId(), user.getShortUserName(),
             context.getNodeId(), user.getShortUserName(),
-            context.getContainerTokenSecretManager(), logAggregationContext));
+            context.getContainerTokenSecretManager(), logAggregationContext, containerType));
     final List<StartContainerRequest> scReqList =
     final List<StartContainerRequest> scReqList =
         new ArrayList<StartContainerRequest>();
         new ArrayList<StartContainerRequest>();
     scReqList.add(scReq);
     scReqList.add(scReq);
@@ -910,4 +913,91 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
     }
     }
   }
   }
 
 
+  @Test
+  public void testApplicationRecoveryAfterFlowContextUpdated()
+      throws Exception {
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
+    conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
+    conf.set(YarnConfiguration.YARN_ADMIN_ACL, "yarn_admin_user");
+    NMStateStoreService stateStore = new NMMemoryStateStoreService();
+    stateStore.init(conf);
+    stateStore.start();
+    Context context = createContext(conf, stateStore);
+    ContainerManagerImpl cm = createContainerManager(context);
+    cm.init(conf);
+    cm.start();
+
+    // add an application by starting a container
+    String appName = "app_name1";
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
+
+    // create 1nd attempt container with containerId 2
+    ContainerId cid = ContainerId.newContainerId(attemptId, 2);
+    Map<String, LocalResource> localResources = Collections.emptyMap();
+    Map<String, String> containerEnv = new HashMap<>();
+
+    List<String> containerCmds = Collections.emptyList();
+    Map<String, ByteBuffer> serviceData = Collections.emptyMap();
+    Credentials containerCreds = new Credentials();
+    DataOutputBuffer dob = new DataOutputBuffer();
+    containerCreds.writeTokenStorageToStream(dob);
+    ByteBuffer containerTokens =
+        ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+    Map<ApplicationAccessType, String> acls =
+        new HashMap<ApplicationAccessType, String>();
+    ContainerLaunchContext clc = ContainerLaunchContext
+        .newInstance(localResources, containerEnv, containerCmds, serviceData,
+            containerTokens, acls);
+    // create the logAggregationContext
+    LogAggregationContext logAggregationContext = LogAggregationContext
+        .newInstance("includePattern", "excludePattern",
+            "includePatternInRollingAggregation",
+            "excludePatternInRollingAggregation");
+
+    StartContainersResponse startResponse =
+        startContainer(context, cm, cid, clc, logAggregationContext,
+            ContainerType.TASK);
+    assertTrue(startResponse.getFailedRequests().isEmpty());
+    assertEquals(1, context.getApplications().size());
+    ApplicationImpl app =
+        (ApplicationImpl) context.getApplications().get(appId);
+    assertNotNull(app);
+    waitForAppState(app, ApplicationState.INITING);
+    assertNull(app.getFlowName());
+
+    // 2nd attempt
+    ApplicationAttemptId attemptId2 =
+        ApplicationAttemptId.newInstance(appId, 2);
+    // create 2nd attempt master container
+    ContainerId cid2 = ContainerId.newContainerId(attemptId, 1);
+    setFlowContext(containerEnv, appName, appId);
+    // once again create for updating launch context
+    clc = ContainerLaunchContext
+        .newInstance(localResources, containerEnv, containerCmds, serviceData,
+            containerTokens, acls);
+    // start container with container type AM.
+    startResponse =
+        startContainer(context, cm, cid2, clc, logAggregationContext,
+            ContainerType.APPLICATION_MASTER);
+    assertTrue(startResponse.getFailedRequests().isEmpty());
+    assertEquals(1, context.getApplications().size());
+    waitForAppState(app, ApplicationState.INITING);
+    assertEquals(appName, app.getFlowName());
+
+    // reset container manager and verify flow context information
+    cm.stop();
+    context = createContext(conf, stateStore);
+    cm = createContainerManager(context);
+    cm.init(conf);
+    cm.start();
+    assertEquals(1, context.getApplications().size());
+    app = (ApplicationImpl) context.getApplications().get(appId);
+    assertNotNull(app);
+    assertEquals(appName, app.getFlowName());
+    waitForAppState(app, ApplicationState.INITING);
+
+    cm.stop();
+  }
 }
 }