1
0
Kaynağa Gözat

YARN-5547. NMLeveldbStateStore should be more tolerant of unknown keys. Contributed by Ajith S

Jason Lowe 8 yıl önce
ebeveyn
işleme
a33ce45e35

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java

@@ -144,6 +144,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerType;
 import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
 import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -403,6 +404,12 @@ public class ContainerManagerImpl extends CompositeService implements
 
     if (context.getApplications().containsKey(appId)) {
       recoverActiveContainer(launchContext, token, rcs);
+      if (rcs.getRecoveryType() == RecoveredContainerType.KILL) {
+        dispatcher.getEventHandler().handle(
+            new ContainerKillEvent(containerId, ContainerExitStatus.ABORTED,
+                "Due to invalid StateStore info container was killed"
+                    + " during recovery"));
+      }
     } else {
       if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) {
         LOG.warn(containerId + " has no corresponding application!");

+ 18 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java

@@ -70,6 +70,8 @@ import org.iq80.leveldb.Options;
 import org.iq80.leveldb.WriteBatch;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
 
 public class NMLeveldbStateStoreService extends NMStateStoreService {
 
@@ -139,6 +141,12 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
   private boolean isNewlyCreated;
   private Timer compactionTimer;
 
+  /**
+   * Map of containerID vs List of unknown key suffixes.
+   */
+  private ListMultimap<ContainerId, String> containerUnknownKeySuffixes =
+      ArrayListMultimap.create();
+
   public NMLeveldbStateStoreService() {
     super(NMLeveldbStateStoreService.class.getName());
   }
@@ -268,7 +276,11 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
       } else if (suffix.equals(CONTAINER_LOG_DIR_KEY_SUFFIX)) {
         rcs.setLogDir(asString(entry.getValue()));
       } else {
-        throw new IOException("Unexpected container state key: " + key);
+        LOG.warn("the container " + containerId
+            + " will be killed because of the unknown key " + key
+            + " during recovery.");
+        containerUnknownKeySuffixes.put(containerId, suffix);
+        rcs.setRecoveryType(RecoveredContainerType.KILL);
       }
     }
     return rcs;
@@ -470,6 +482,11 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         batch.delete(bytes(keyPrefix + CONTAINER_QUEUED_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX));
+        List<String> unknownKeysForContainer = containerUnknownKeySuffixes
+            .removeAll(containerId);
+        for (String unknownKeySuffix : unknownKeysForContainer) {
+          batch.delete(bytes(keyPrefix + unknownKeySuffix));
+        }
         db.write(batch);
       } finally {
         batch.close();

+ 17 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java

@@ -60,6 +60,13 @@ public abstract class NMStateStoreService extends AbstractService {
 
   }
 
+  /**
+   * Type of post recovery action.
+   */
+  public enum RecoveredContainerType {
+    KILL, RECOVER
+  }
+
   public enum RecoveredContainerStatus {
     REQUESTED,
     QUEUED,
@@ -78,6 +85,8 @@ public abstract class NMStateStoreService extends AbstractService {
     private String workDir;
     private String logDir;
     int version;
+    private RecoveredContainerType recoveryType =
+        RecoveredContainerType.RECOVER;
 
     public RecoveredContainerStatus getStatus() {
       return status;
@@ -145,6 +154,14 @@ public abstract class NMStateStoreService extends AbstractService {
           .append(", LogDir: ").append(logDir)
           .toString();
     }
+
+    public RecoveredContainerType getRecoveryType() {
+      return recoveryType;
+    }
+
+    public void setRecoveryType(RecoveredContainerType recoveryType) {
+      this.recoveryType = recoveryType;
+    }
   }
 
   public static class LocalResourceTrackerState {

+ 69 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java

@@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerType;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLogDeleterState;
@@ -947,6 +948,74 @@ public class TestNMLeveldbStateStoreService {
     store.close();
   }
 
+  @Test
+  public void testUnexpectedKeyDoesntThrowException() throws IOException {
+    // test empty when no state
+    List<RecoveredContainerState> recoveredContainers = stateStore
+        .loadContainersState();
+    assertTrue(recoveredContainers.isEmpty());
+
+    // create a container request
+    ApplicationId appId = ApplicationId.newInstance(1234, 3);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
+        4);
+    ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
+    LocalResource lrsrc = LocalResource.newInstance(
+        URL.newInstance("hdfs", "somehost", 12345, "/some/path/to/rsrc"),
+        LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 123L,
+        1234567890L);
+    Map<String, LocalResource> localResources =
+        new HashMap<String, LocalResource>();
+    localResources.put("rsrc", lrsrc);
+    Map<String, String> env = new HashMap<String, String>();
+    env.put("somevar", "someval");
+    List<String> containerCmds = new ArrayList<String>();
+    containerCmds.add("somecmd");
+    containerCmds.add("somearg");
+    Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
+    serviceData.put("someservice",
+        ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 }));
+    ByteBuffer containerTokens = ByteBuffer
+        .wrap(new byte[] { 0x7, 0x8, 0x9, 0xa });
+    Map<ApplicationAccessType, String> acls =
+        new HashMap<ApplicationAccessType, String>();
+    acls.put(ApplicationAccessType.VIEW_APP, "viewuser");
+    acls.put(ApplicationAccessType.MODIFY_APP, "moduser");
+    ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
+        localResources, env, containerCmds,
+        serviceData, containerTokens, acls);
+    Resource containerRsrc = Resource.newInstance(1357, 3);
+    ContainerTokenIdentifier containerTokenId = new ContainerTokenIdentifier(
+        containerId, "host", "user", containerRsrc, 9876543210L, 42, 2468,
+        Priority.newInstance(7), 13579);
+    Token containerToken = Token.newInstance(containerTokenId.getBytes(),
+        ContainerTokenIdentifier.KIND.toString(), "password".getBytes(),
+        "tokenservice");
+    StartContainerRequest containerReq = StartContainerRequest.newInstance(clc,
+        containerToken);
+
+    stateStore.storeContainer(containerId, 0, containerReq);
+
+    // add a invalid key
+    byte[] invalidKey = ("ContainerManager/containers/"
+    + containerId.toString() + "/invalidKey1234").getBytes();
+    stateStore.getDB().put(invalidKey, new byte[1]);
+    restartStateStore();
+    recoveredContainers = stateStore.loadContainersState();
+    assertEquals(1, recoveredContainers.size());
+    RecoveredContainerState rcs = recoveredContainers.get(0);
+    assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus());
+    assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
+    assertEquals(false, rcs.getKilled());
+    assertEquals(containerReq, rcs.getStartRequest());
+    assertTrue(rcs.getDiagnostics().isEmpty());
+    assertEquals(RecoveredContainerType.KILL, rcs.getRecoveryType());
+    // assert unknown keys are cleaned up finally
+    assertNotNull(stateStore.getDB().get(invalidKey));
+    stateStore.removeContainer(containerId);
+    assertNull(stateStore.getDB().get(invalidKey));
+  }
+
   private static class NMTokenSecretManagerForTest extends
       BaseNMTokenSecretManager {
     public MasterKey generateKey() {