Kaynağa Gözat

YARN-5049. Extend NMStateStore to save queued container information. (Konstantinos Karanasos via asuresh)

Arun Suresh 9 yıl önce
ebeveyn
işleme
0101973dbf

+ 17 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java

@@ -376,7 +376,6 @@ public class ContainerManagerImpl extends CompositeService implements
     app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
   }
 
-  @SuppressWarnings("unchecked")
   private void recoverContainer(RecoveredContainerState rcs)
       throws IOException {
     StartContainerRequest req = rcs.getStartRequest();
@@ -405,6 +404,7 @@ public class ContainerManagerImpl extends CompositeService implements
                 "Due to invalid StateStore info container was killed"
                     + " during recovery"));
       }
+      recoverActiveContainer(launchContext, token, rcs);
     } else {
       if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) {
         LOG.warn(containerId + " has no corresponding application!");
@@ -414,6 +414,22 @@ public class ContainerManagerImpl extends CompositeService implements
     }
   }
 
+  /**
+   * Recover a running container.
+   */
+  @SuppressWarnings("unchecked")
+  protected void recoverActiveContainer(
+      ContainerLaunchContext launchContext, ContainerTokenIdentifier token,
+      RecoveredContainerState rcs) throws IOException {
+    Credentials credentials = YarnServerSecurityUtils.parseCredentials(
+        launchContext);
+    Container container = new ContainerImpl(getConfig(), dispatcher,
+        launchContext, credentials, metrics, token, context, rcs);
+    context.getContainers().put(token.getContainerID(), container);
+    dispatcher.getEventHandler().handle(new ApplicationContainerInitEvent(
+        container));
+  }
+
   private void waitForRecoveredContainers() throws InterruptedException {
     final int sleepMsec = 100;
     int waitIterations = 100;

+ 10 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -263,15 +264,15 @@ public class ContainerScheduler extends AbstractService implements
               "Opportunistic container queue is full.");
         }
       }
-//      if (isQueued) {
-//        try {
-//          this.context.getNMStateStore().storeContainerQueued(
-//              container.getContainerId());
-//        } catch (IOException e) {
-//          LOG.warn("Could not store container [" + container.getContainerId()
-//              + "] state. The Container has been queued.", e);
-//        }
-//      }
+      if (isQueued) {
+        try {
+          this.context.getNMStateStore().storeContainerQueued(
+              container.getContainerId());
+        } catch (IOException e) {
+          LOG.warn("Could not store container [" + container.getContainerId()
+              + "] state. The Container has been queued.", e);
+        }
+      }
     }
   }
 

+ 27 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java

@@ -85,7 +85,10 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
   private static final String DB_NAME = "yarn-nm-state";
   private static final String DB_SCHEMA_VERSION_KEY = "nm-schema-version";
 
-  private static final Version CURRENT_VERSION_INFO = Version.newInstance(3, 0);
+  // Set to 1.1 by YARN-5049
+  // Set to 1.2 by YARN-6127
+  private static final Version CURRENT_VERSION_INFO = Version
+      .newInstance(1, 2);
 
   private static final String DELETION_TASK_KEY_PREFIX =
       "DeletionService/deltask_";
@@ -112,6 +115,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
   private static final String CONTAINER_VERSION_KEY_SUFFIX = "/version";
   private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics";
   private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched";
+  private static final String CONTAINER_QUEUED_KEY_SUFFIX = "/queued";
   private static final String CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX =
       "/resourceChanged";
   private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed";
@@ -256,8 +260,13 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         rcs.version = Integer.parseInt(asString(entry.getValue()));
       } else if (suffix.equals(CONTAINER_DIAGS_KEY_SUFFIX)) {
         rcs.diagnostics = asString(entry.getValue());
-      } else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) {
+      } else if (suffix.equals(CONTAINER_QUEUED_KEY_SUFFIX)) {
         if (rcs.status == RecoveredContainerStatus.REQUESTED) {
+          rcs.status = RecoveredContainerStatus.QUEUED;
+        }
+      } else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) {
+        if ((rcs.status == RecoveredContainerStatus.REQUESTED)
+            || (rcs.status == RecoveredContainerStatus.QUEUED)) {
           rcs.status = RecoveredContainerStatus.LAUNCHED;
         }
       } else if (suffix.equals(CONTAINER_KILLED_KEY_SUFFIX)) {
@@ -321,6 +330,21 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     return CONTAINERS_KEY_PREFIX + containerId + CONTAINER_VERSION_KEY_SUFFIX;
   }
 
+  @Override
+  public void storeContainerQueued(ContainerId containerId) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("storeContainerQueued: containerId=" + containerId);
+    }
+
+    String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+        + CONTAINER_QUEUED_KEY_SUFFIX;
+    try {
+      db.put(bytes(key), EMPTY_VALUE);
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
   @Override
   public void storeContainerDiagnostics(ContainerId containerId,
       StringBuilder diagnostics) throws IOException {
@@ -464,6 +488,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         batch.delete(bytes(keyPrefix + CONTAINER_REQUEST_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_DIAGS_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_LAUNCHED_KEY_SUFFIX));
+        batch.delete(bytes(keyPrefix + CONTAINER_QUEUED_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX));
         List<String> unknownKeysForContainer =

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java

@@ -74,6 +74,10 @@ public class NMNullStateStoreService extends NMStateStoreService {
       StartContainerRequest startRequest) throws IOException {
   }
 
+  @Override
+  public void storeContainerQueued(ContainerId containerId) throws IOException {
+  }
+
   @Override
   public void storeContainerDiagnostics(ContainerId containerId,
       StringBuilder diagnostics) throws IOException {

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java

@@ -69,6 +69,7 @@ public abstract class NMStateStoreService extends AbstractService {
 
   public enum RecoveredContainerStatus {
     REQUESTED,
+    QUEUED,
     LAUNCHED,
     COMPLETED
   }
@@ -371,6 +372,14 @@ public abstract class NMStateStoreService extends AbstractService {
       int containerVersion, StartContainerRequest startRequest)
       throws IOException;
 
+  /**
+   * Record that a container has been queued at the NM
+   * @param containerId the container ID
+   * @throws IOException
+   */
+  public abstract void storeContainerQueued(ContainerId containerId)
+      throws IOException;
+
   /**
    * Record that a container has been launched
    * @param containerId the container ID

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java

@@ -133,6 +133,12 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
     containerStates.put(containerId, rcs);
   }
 
+  @Override
+  public void storeContainerQueued(ContainerId containerId) throws IOException {
+    RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+    rcs.status = RecoveredContainerStatus.QUEUED;
+  }
+
   @Override
   public synchronized void storeContainerDiagnostics(ContainerId containerId,
       StringBuilder diagnostics) throws IOException {

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java

@@ -259,6 +259,18 @@ public class TestNMLeveldbStateStoreService {
     // check whether the new container record is discarded
     assertEquals(1, recoveredContainers.size());
 
+    // queue the container, and verify recovered
+    stateStore.storeContainerQueued(containerId);
+    restartStateStore();
+    recoveredContainers = stateStore.loadContainersState();
+    assertEquals(1, recoveredContainers.size());
+    rcs = recoveredContainers.get(0);
+    assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus());
+    assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
+    assertEquals(false, rcs.getKilled());
+    assertEquals(containerReq, rcs.getStartRequest());
+    assertTrue(rcs.getDiagnostics().isEmpty());
+
     // launch the container, add some diagnostics, and verify recovered
     StringBuilder diags = new StringBuilder();
     stateStore.storeContainerLaunched(containerId);