瀏覽代碼

YARN-5028. RMStateStore should trim down app state for completed applications. Contributed by Gergo Repas.

Yufei Gu 7 年之前
父節點
當前提交
92cbbfe79e

+ 33 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java

@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -65,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Applicatio
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
@@ -257,6 +259,9 @@ public abstract class RMStateStore extends AbstractService {
           appState.getApplicationSubmissionContext().getApplicationId();
       LOG.info("Updating info for app: " + appId);
       try {
+        if (isAppStateFinal(appState)) {
+          pruneAppState(appState);
+        }
         store.updateApplicationStateInternal(appId, appState);
         if (((RMStateUpdateAppEvent) event).isNotifyApplication()) {
           store.notifyApplication(new RMAppEvent(appId,
@@ -276,7 +281,34 @@ public abstract class RMStateStore extends AbstractService {
         }
       }
       return finalState(isFenced);
-    };
+    }
+
+    private boolean isAppStateFinal(ApplicationStateData appState) {
+      RMAppState state = appState.getState();
+      return state == RMAppState.FINISHED || state == RMAppState.FAILED ||
+          state == RMAppState.KILLED;
+    }
+
+    private void pruneAppState(ApplicationStateData appState) {
+      ApplicationSubmissionContext srcCtx =
+          appState.getApplicationSubmissionContext();
+      ApplicationSubmissionContextPBImpl context =
+          new ApplicationSubmissionContextPBImpl();
+      // most fields in the ApplicationSubmissionContext are not needed,
+      // but the following few need to be present for recovery to succeed
+      context.setApplicationId(srcCtx.getApplicationId());
+      context.setResource(srcCtx.getResource());
+      context.setQueue(srcCtx.getQueue());
+      context.setAMContainerResourceRequests(
+          srcCtx.getAMContainerResourceRequests());
+      context.setApplicationType(srcCtx.getApplicationType());
+      ContainerLaunchContextPBImpl amContainerSpec =
+              new ContainerLaunchContextPBImpl();
+      amContainerSpec.setApplicationACLs(
+              srcCtx.getAMContainerSpec().getApplicationACLs());
+      context.setAMContainerSpec(amContainerSpec);
+      appState.setApplicationSubmissionContext(context);
+    }
   }
 
   private static class RemoveAppTransition implements

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java

@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -162,6 +163,7 @@ public class RMStateStoreTestBase {
     ApplicationSubmissionContext context =
         new ApplicationSubmissionContextPBImpl();
     context.setApplicationId(appId);
+    context.setAMContainerSpec(new ContainerLaunchContextPBImpl());
 
     RMApp mockApp = mock(RMApp.class);
     when(mockApp.getApplicationId()).thenReturn(appId);
@@ -378,6 +380,7 @@ public class RMStateStoreTestBase {
     ApplicationSubmissionContext dummyContext =
         new ApplicationSubmissionContextPBImpl();
     dummyContext.setApplicationId(dummyAppId);
+    dummyContext.setAMContainerSpec(new ContainerLaunchContextPBImpl());
     ApplicationStateData dummyApp =
         ApplicationStateData.newInstance(appState.getSubmitTime(),
             appState.getStartTime(), appState.getUser(), dummyContext,

+ 66 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java

@@ -35,7 +35,9 @@ import org.apache.hadoop.service.Service;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Event;
@@ -49,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -83,6 +86,7 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -845,6 +849,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
       ApplicationSubmissionContext context =
           new ApplicationSubmissionContextPBImpl();
       context.setApplicationId(appId);
+      context.setAMContainerSpec(new ContainerLaunchContextPBImpl());
       appStateNew = createAppState(context, submitTime, startTime, finishTime,
           true);
     } else {
@@ -1488,4 +1493,65 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
         tokensWithIndex, sequenceNumber, 3);
     store.close();
   }
+
+  @Test
+  public void testAppSubmissionContextIsPrunedInFinalApplicationState()
+      throws Exception {
+    TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+    ApplicationId appId = ApplicationId.fromString("application_1234_0010");
+
+    Configuration conf = createConfForDelegationTokenNodeSplit(1);
+    RMStateStore store = zkTester.getRMStateStore(conf);
+    ApplicationSubmissionContext ctx =
+        new ApplicationSubmissionContextPBImpl();
+    ctx.setApplicationId(appId);
+    ctx.setQueue("a_queue");
+    ContainerLaunchContextPBImpl containerLaunchCtx =
+        new ContainerLaunchContextPBImpl();
+    containerLaunchCtx.setCommands(Collections.singletonList("a_command"));
+    ctx.setAMContainerSpec(containerLaunchCtx);
+    Resource resource = new ResourcePBImpl();
+    resource.setMemorySize(17L);
+    ctx.setResource(resource);
+    Map<String, String> schedulingPropertiesMap =
+        Collections.singletonMap("a_key", "a_value");
+    ctx.setApplicationSchedulingPropertiesMap(schedulingPropertiesMap);
+    ApplicationStateDataPBImpl appState = new ApplicationStateDataPBImpl();
+    appState.setState(RMAppState.RUNNING);
+    appState.setApplicationSubmissionContext(ctx);
+    store.storeApplicationStateInternal(appId, appState);
+
+    RMState rmState = store.loadState();
+    assertEquals(1, rmState.getApplicationState().size());
+    ctx = rmState.getApplicationState().get(appId)
+        .getApplicationSubmissionContext();
+
+    appState.setState(RMAppState.RUNNING);
+    store.handleStoreEvent(new RMStateUpdateAppEvent(appState, false, null));
+
+    rmState = store.loadState();
+    ctx = rmState.getApplicationState().get(appId)
+        .getApplicationSubmissionContext();
+
+    assertEquals("ApplicationSchedulingPropertiesMap should not have been "
+        + "pruned from the application submission context before the "
+        + "FINISHED state",
+        schedulingPropertiesMap, ctx.getApplicationSchedulingPropertiesMap());
+
+    appState.setState(RMAppState.FINISHED);
+    store.handleStoreEvent(new RMStateUpdateAppEvent(appState, false, null));
+
+    rmState = store.loadState();
+    ctx = rmState.getApplicationState().get(appId)
+        .getApplicationSubmissionContext();
+
+    assertEquals(appId, ctx.getApplicationId());
+    assertEquals("a_queue", ctx.getQueue());
+    assertNotNull(ctx.getAMContainerSpec());
+    assertEquals(17L, ctx.getResource().getMemorySize());
+    assertEquals("ApplicationSchedulingPropertiesMap should have been pruned"
+        + " from the application submission context when in FINISHED STATE",
+        Collections.emptyMap(), ctx.getApplicationSchedulingPropertiesMap());
+    store.close();
+  }
 }