Browse Source

YARN-6059. Update paused container state in the NM state store. (Hitesh Sharma via asuresh)

Arun Suresh 7 years ago
parent
commit
a97487ed01
11 changed files with 273 additions and 18 deletions
  1. 12 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
  2. 22 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
  3. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
  4. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
  5. 124 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoverPausedContainerLaunch.java
  6. 1 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java
  7. 41 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
  8. 9 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
  9. 22 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
  10. 13 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
  11. 17 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java

+ 12 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java

@@ -836,10 +836,18 @@ public class ContainerImpl implements Container {
 
   @SuppressWarnings("unchecked") // dispatcher not typed
   private void sendScheduleEvent() {
-    dispatcher.getEventHandler().handle(
-        new ContainerSchedulerEvent(this,
-            ContainerSchedulerEventType.SCHEDULE_CONTAINER)
-    );
+    if (recoveredStatus == RecoveredContainerStatus.PAUSED) {
+      // Recovery is not supported for paused container so we raise the
+      // launch event which will proceed to kill the paused container instead
+      // of raising the schedule event.
+      ContainersLauncherEventType launcherEvent;
+      launcherEvent = ContainersLauncherEventType.RECOVER_PAUSED_CONTAINER;
+      dispatcher.getEventHandler()
+          .handle(new ContainersLauncherEvent(this, launcherEvent));
+    } else {
+      dispatcher.getEventHandler().handle(new ContainerSchedulerEvent(this,
+          ContainerSchedulerEventType.SCHEDULE_CONTAINER));
+    }
   }
 
   @SuppressWarnings("unchecked") // dispatcher not typed

+ 22 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java

@@ -837,6 +837,14 @@ public class ContainerLaunch implements Callable<Integer> {
       dispatcher.getEventHandler().handle(new ContainerEvent(
           containerId,
           ContainerEventType.CONTAINER_PAUSED));
+
+      try {
+        this.context.getNMStateStore().storeContainerPaused(
+            container.getContainerId());
+      } catch (IOException e) {
+        LOG.warn("Could not store container [" + container.getContainerId()
+            + "] state. The Container has been paused.", e);
+      }
     } catch (Exception e) {
       String message =
           "Exception when trying to pause container " + containerIdStr
@@ -873,12 +881,20 @@ public class ContainerLaunch implements Callable<Integer> {
 
     // If the container has already started
     try {
-        exec.resumeContainer(container);
-        // ResumeContainer is a blocking call. We are here almost means the
-        // container is resumed, so send out the event.
-        dispatcher.getEventHandler().handle(new ContainerEvent(
-            containerId,
-            ContainerEventType.CONTAINER_RESUMED));
+      exec.resumeContainer(container);
+      // ResumeContainer is a blocking call. We are here almost means the
+      // container is resumed, so send out the event.
+      dispatcher.getEventHandler().handle(new ContainerEvent(
+          containerId,
+          ContainerEventType.CONTAINER_RESUMED));
+
+      try {
+        this.context.getNMStateStore().removeContainerPaused(
+            container.getContainerId());
+      } catch (IOException e) {
+        LOG.warn("Could not store container [" + container.getContainerId()
+            + "] state. The Container has been resumed.", e);
+      }
     } catch (Exception e) {
       String message =
           "Exception when trying to resume container " + containerIdStr

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java

@@ -139,6 +139,16 @@ public class ContainersLauncher extends AbstractService
         containerLauncher.submit(launch);
         running.put(containerId, launch);
         break;
+      case RECOVER_PAUSED_CONTAINER:
+        // Recovery for paused containers is not supported, thus here
+        // we locate any paused containers, and terminate them.
+        app = context.getApplications().get(
+            containerId.getApplicationAttemptId().getApplicationId());
+        launch = new RecoverPausedContainerLaunch(context, getConfig(),
+            dispatcher, exec, app, event.getContainer(), dirsHandler,
+            containerManager);
+        containerLauncher.submit(launch);
+        break;
       case CLEANUP_CONTAINER:
       case CLEANUP_CONTAINER_FOR_REINIT:
         ContainerLaunch launcher = running.remove(containerId);

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java

@@ -26,6 +26,7 @@ public enum ContainersLauncherEventType {
   CLEANUP_CONTAINER_FOR_REINIT, // The process(grp) itself.
   SIGNAL_CONTAINER,
   PAUSE_CONTAINER,
-  RESUME_CONTAINER
+  RESUME_CONTAINER,
+  RECOVER_PAUSED_CONTAINER
 
 }

+ 124 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoverPausedContainerLaunch.java

@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.*;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+
+/**
+ * This is a ContainerLaunch which has been recovered after an NM restart for
+ * pause containers (for rolling upgrades)
+ */
+public class RecoverPausedContainerLaunch extends ContainerLaunch {
+
+  private static final Log LOG = LogFactory.getLog(
+      RecoveredContainerLaunch.class);
+
+  public RecoverPausedContainerLaunch(Context context,
+      Configuration configuration, Dispatcher dispatcher,
+      ContainerExecutor exec, Application app, Container container,
+      LocalDirsHandlerService dirsHandler,
+      ContainerManagerImpl containerManager) {
+    super(context, configuration, dispatcher, exec, app, container, dirsHandler,
+        containerManager);
+  }
+
+  /**
+   * Cleanup the paused container by issuing a kill on it.
+   */
+  @SuppressWarnings("unchecked")
+  @Override
+  public Integer call() {
+    int retCode = ContainerExecutor.ExitCode.LOST.getExitCode();
+    ContainerId containerId = container.getContainerId();
+    String appIdStr =
+        containerId.getApplicationAttemptId().getApplicationId().toString();
+    String containerIdStr = containerId.toString();
+
+    boolean notInterrupted = true;
+    try {
+      File pidFile = locatePidFile(appIdStr, containerIdStr);
+      if (pidFile != null) {
+        String pidPathStr = pidFile.getPath();
+        pidFilePath = new Path(pidPathStr);
+        exec.activateContainer(containerId, pidFilePath);
+        exec.signalContainer(new ContainerSignalContext.Builder()
+            .setContainer(container)
+            .setUser(container.getUser())
+            .setSignal(ContainerExecutor.Signal.KILL)
+            .build());
+      } else {
+        LOG.warn("Unable to locate pid file for container " + containerIdStr);
+      }
+
+    } catch (InterruptedIOException e) {
+      LOG.warn("Interrupted while waiting for exit code from " + containerId);
+      notInterrupted = false;
+    } catch (IOException e) {
+      LOG.error("Unable to kill the paused container " + containerIdStr, e);
+    } finally {
+      if (notInterrupted) {
+        this.completed.set(true);
+        exec.deactivateContainer(containerId);
+        try {
+          getContext().getNMStateStore()
+              .storeContainerCompleted(containerId, retCode);
+        } catch (IOException e) {
+          LOG.error("Unable to set exit code for container " + containerId);
+        }
+      }
+    }
+
+    LOG.warn("Recovered container exited with a non-zero exit code "
+        + retCode);
+    this.dispatcher.getEventHandler().handle(new ContainerExitEvent(
+        containerId,
+        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, retCode,
+        "Container exited with a non-zero exit code " + retCode));
+
+    return retCode;
+  }
+
+  private File locatePidFile(String appIdStr, String containerIdStr) {
+    String pidSubpath= getPidFileSubpath(appIdStr, containerIdStr);
+    for (String dir : getContext().getLocalDirsHandler().
+        getLocalDirsForRead()) {
+      File pidFile = new File(dir, pidSubpath);
+      if (pidFile.exists()) {
+        return pidFile;
+      }
+    }
+    return null;
+  }
+}

+ 1 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java

@@ -40,10 +40,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
 
-
 /**
  * This is a ContainerLaunch which has been recovered after an NM restart (for
- * rolling upgrades)
+ * rolling upgrades).
  */
 public class RecoveredContainerLaunch extends ContainerLaunch {
 

+ 41 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java

@@ -119,6 +119,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
   private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics";
   private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched";
   private static final String CONTAINER_QUEUED_KEY_SUFFIX = "/queued";
+  private static final String CONTAINER_PAUSED_KEY_SUFFIX = "/paused";
   private static final String CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX =
       "/resourceChanged";
   private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed";
@@ -272,9 +273,16 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         if (rcs.status == RecoveredContainerStatus.REQUESTED) {
           rcs.status = RecoveredContainerStatus.QUEUED;
         }
+      } else if (suffix.equals(CONTAINER_PAUSED_KEY_SUFFIX)) {
+        if ((rcs.status == RecoveredContainerStatus.LAUNCHED)
+            ||(rcs.status == RecoveredContainerStatus.QUEUED)
+            ||(rcs.status == RecoveredContainerStatus.REQUESTED)) {
+          rcs.status = RecoveredContainerStatus.PAUSED;
+        }
       } else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) {
         if ((rcs.status == RecoveredContainerStatus.REQUESTED)
-            || (rcs.status == RecoveredContainerStatus.QUEUED)) {
+            || (rcs.status == RecoveredContainerStatus.QUEUED)
+            ||(rcs.status == RecoveredContainerStatus.PAUSED)) {
           rcs.status = RecoveredContainerStatus.LAUNCHED;
         }
       } else if (suffix.equals(CONTAINER_KILLED_KEY_SUFFIX)) {
@@ -366,6 +374,37 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
     }
   }
 
+  @Override
+  public void storeContainerPaused(ContainerId containerId) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("storeContainerPaused: containerId=" + containerId);
+    }
+
+    String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+        + CONTAINER_PAUSED_KEY_SUFFIX;
+    try {
+      db.put(bytes(key), EMPTY_VALUE);
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void removeContainerPaused(ContainerId containerId)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("removeContainerPaused: containerId=" + containerId);
+    }
+
+    String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+        + CONTAINER_PAUSED_KEY_SUFFIX;
+    try {
+      db.delete(bytes(key));
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
   @Override
   public void storeContainerDiagnostics(ContainerId containerId,
       StringBuilder diagnostics) throws IOException {
@@ -510,6 +549,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         batch.delete(bytes(keyPrefix + CONTAINER_DIAGS_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_LAUNCHED_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_QUEUED_KEY_SUFFIX));
+        batch.delete(bytes(keyPrefix + CONTAINER_PAUSED_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX));
         List<String> unknownKeysForContainer = containerUnknownKeySuffixes

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java

@@ -79,6 +79,15 @@ public class NMNullStateStoreService extends NMStateStoreService {
   public void storeContainerQueued(ContainerId containerId) throws IOException {
   }
 
+  @Override
+  public void storeContainerPaused(ContainerId containerId) throws IOException {
+  }
+
+  @Override
+  public void removeContainerPaused(ContainerId containerId)
+      throws IOException {
+  }
+
   @Override
   public void storeContainerDiagnostics(ContainerId containerId,
       StringBuilder diagnostics) throws IOException {

+ 22 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java

@@ -73,7 +73,8 @@ public abstract class NMStateStoreService extends AbstractService {
     REQUESTED,
     QUEUED,
     LAUNCHED,
-    COMPLETED
+    COMPLETED,
+    PAUSED
   }
 
   public static class RecoveredContainerState {
@@ -349,9 +350,9 @@ public abstract class NMStateStoreService extends AbstractService {
   }
 
   /**
-   * Load the state of applications
-   * @return recovered state for applications
-   * @throws IOException
+   * Load the state of applications.
+   * @return recovered state for applications.
+   * @throws IOException IO Exception.
    */
   public abstract RecoveredApplicationsState loadApplicationsState()
       throws IOException;
@@ -402,6 +403,23 @@ public abstract class NMStateStoreService extends AbstractService {
   public abstract void storeContainerQueued(ContainerId containerId)
       throws IOException;
 
+  /**
+   * Record that a container has been paused at the NM.
+   * @param containerId the container ID.
+   * @throws IOException IO Exception.
+   */
+  public abstract void storeContainerPaused(ContainerId containerId)
+      throws IOException;
+
+  /**
+   * Record that a container has been resumed at the NM by removing the
+   * fact that it has be paused.
+   * @param containerId the container ID.
+   * @throws IOException IO Exception.
+   */
+  public abstract void removeContainerPaused(ContainerId containerId)
+      throws IOException;
+
   /**
    * Record that a container has been launched
    * @param containerId the container ID

+ 13 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java

@@ -144,6 +144,19 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
     rcs.status = RecoveredContainerStatus.QUEUED;
   }
 
+  @Override
+  public void storeContainerPaused(ContainerId containerId) throws IOException {
+    RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+    rcs.status = RecoveredContainerStatus.PAUSED;
+  }
+
+  @Override
+  public void removeContainerPaused(ContainerId containerId)
+      throws IOException {
+    RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+    rcs.status = RecoveredContainerStatus.LAUNCHED;
+  }
+
   @Override
   public synchronized void storeContainerDiagnostics(ContainerId containerId,
       StringBuilder diagnostics) throws IOException {

+ 17 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java

@@ -289,6 +289,23 @@ public class TestNMLeveldbStateStoreService {
     assertEquals(containerReq, rcs.getStartRequest());
     assertEquals(diags.toString(), rcs.getDiagnostics());
 
+    // pause the container, and verify recovered
+    stateStore.storeContainerPaused(containerId);
+    restartStateStore();
+    recoveredContainers = stateStore.loadContainersState();
+    assertEquals(1, recoveredContainers.size());
+    rcs = recoveredContainers.get(0);
+    assertEquals(RecoveredContainerStatus.PAUSED, rcs.getStatus());
+    assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
+    assertEquals(false, rcs.getKilled());
+    assertEquals(containerReq, rcs.getStartRequest());
+
+    // Resume the container
+    stateStore.removeContainerPaused(containerId);
+    restartStateStore();
+    recoveredContainers = stateStore.loadContainersState();
+    assertEquals(1, recoveredContainers.size());
+
     // increase the container size, and verify recovered
     stateStore.storeContainerResourceChanged(containerId, 2,
         Resource.newInstance(2468, 4));