Browse Source

YARN-2079. Recover NonAggregatingLogHandler state upon nodemanager restart. (Contributed by Jason Lowe)
(cherry picked from commit 04f5ef18f7877ce30b12b1a3c1e851c420531b72)

Junping Du 10 năm trước cách đây
mục cha
commit
380cc4dbed
10 tập tin đã thay đổi với 362 bổ sung39 xóa
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
  3. 58 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
  4. 66 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
  5. 16 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
  6. 35 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
  7. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
  8. 73 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
  9. 54 25
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
  10. 51 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -240,6 +240,9 @@ Release 2.7.0 - UNRELEASED
 
     YARN-3147. Clean up RM web proxy code. (Steve Loughran via xgong)
 
+    YARN-2079. Recover NonAggregatingLogHandler state upon nodemanager
+    restart. (Jason Lowe via junping_du) 
+
   OPTIMIZATIONS
 
     YARN-2990. FairScheduler's delay-scheduling always waits for node-local and 

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java

@@ -135,7 +135,6 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
-import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -362,7 +361,8 @@ public class ContainerManagerImpl extends CompositeService implements
           deletionService, dirsHandler);
     } else {
       return new NonAggregatingLogHandler(this.dispatcher, deletionService,
-                                          dirsHandler);
+                                          dirsHandler,
+                                          context.getNMStateStore());
     }
   }
 

+ 58 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@@ -45,6 +46,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLogDeleterState;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -62,15 +65,18 @@ public class NonAggregatingLogHandler extends AbstractService implements
   private final Map<ApplicationId, String> appOwners;
 
   private final LocalDirsHandlerService dirsHandler;
+  private final NMStateStoreService stateStore;
   private long deleteDelaySeconds;
   private ScheduledThreadPoolExecutor sched;
 
   public NonAggregatingLogHandler(Dispatcher dispatcher,
-      DeletionService delService, LocalDirsHandlerService dirsHandler) {
+      DeletionService delService, LocalDirsHandlerService dirsHandler,
+      NMStateStoreService stateStore) {
     super(NonAggregatingLogHandler.class.getName());
     this.dispatcher = dispatcher;
     this.delService = delService;
     this.dirsHandler = dirsHandler;
+    this.stateStore = stateStore;
     this.appOwners = new ConcurrentHashMap<ApplicationId, String>();
   }
 
@@ -82,6 +88,7 @@ public class NonAggregatingLogHandler extends AbstractService implements
                 YarnConfiguration.DEFAULT_NM_LOG_RETAIN_SECONDS);
     sched = createScheduledThreadPoolExecutor(conf);
     super.serviceInit(conf);
+    recover();
   }
 
   @Override
@@ -110,6 +117,31 @@ public class NonAggregatingLogHandler extends AbstractService implements
     }
   }
 
+  private void recover() throws IOException {
+    if (stateStore.canRecover()) {
+      RecoveredLogDeleterState state = stateStore.loadLogDeleterState();
+      long now = System.currentTimeMillis();
+      for (Map.Entry<ApplicationId, LogDeleterProto> entry :
+        state.getLogDeleterMap().entrySet()) {
+        ApplicationId appId = entry.getKey();
+        LogDeleterProto proto = entry.getValue();
+        long deleteDelayMsec = proto.getDeletionTime() - now;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Scheduling deletion of " + appId + " logs in "
+              + deleteDelayMsec + " msec");
+        }
+        LogDeleterRunnable logDeleter =
+            new LogDeleterRunnable(proto.getUser(), appId);
+        try {
+          sched.schedule(logDeleter, deleteDelayMsec, TimeUnit.MILLISECONDS);
+        } catch (RejectedExecutionException e) {
+          // Handling this event in local thread before starting threads
+          // or after calling sched.shutdownNow().
+          logDeleter.run();
+        }
+      }
+    }
+  }
 
   @SuppressWarnings("unchecked")
   @Override
@@ -130,13 +162,28 @@ public class NonAggregatingLogHandler extends AbstractService implements
       case APPLICATION_FINISHED:
         LogHandlerAppFinishedEvent appFinishedEvent =
             (LogHandlerAppFinishedEvent) event;
+        ApplicationId appId = appFinishedEvent.getApplicationId();
         // Schedule - so that logs are available on the UI till they're deleted.
         LOG.info("Scheduling Log Deletion for application: "
-            + appFinishedEvent.getApplicationId() + ", with delay of "
+            + appId + ", with delay of "
             + this.deleteDelaySeconds + " seconds");
-        LogDeleterRunnable logDeleter =
-            new LogDeleterRunnable(appOwners.remove(appFinishedEvent
-                  .getApplicationId()), appFinishedEvent.getApplicationId());
+        String user = appOwners.remove(appId);
+        if (user == null) {
+          LOG.error("Unable to locate user for " + appId);
+          break;
+        }
+        LogDeleterRunnable logDeleter = new LogDeleterRunnable(user, appId);
+        long deletionTimestamp = System.currentTimeMillis()
+            + this.deleteDelaySeconds * 1000;
+        LogDeleterProto deleterProto = LogDeleterProto.newBuilder()
+            .setUser(user)
+            .setDeletionTime(deletionTimestamp)
+            .build();
+        try {
+          stateStore.storeLogDeleter(appId, deleterProto);
+        } catch (IOException e) {
+          LOG.error("Unable to record log deleter state", e);
+        }
         try {
           sched.schedule(logDeleter, this.deleteDelaySeconds,
               TimeUnit.SECONDS);
@@ -198,6 +245,12 @@ public class NonAggregatingLogHandler extends AbstractService implements
         NonAggregatingLogHandler.this.delService.delete(user, null,
           (Path[]) localAppLogDirs.toArray(new Path[localAppLogDirs.size()]));
       }
+      try {
+        NonAggregatingLogHandler.this.stateStore.removeLogDeleter(
+            this.applicationId);
+      } catch (IOException e) {
+        LOG.error("Error removing log deletion state", e);
+      }
     }
 
     @Override

+ 66 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java

@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
@@ -115,6 +116,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
   private static final String CONTAINER_TOKENS_PREV_MASTER_KEY =
       CONTAINER_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX;
 
+  private static final String LOG_DELETER_KEY_PREFIX = "LogDeleters/";
+
   private static final byte[] EMPTY_VALUE = new byte[0];
 
   private DB db;
@@ -851,6 +854,69 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
   }
 
 
+  @Override
+  public RecoveredLogDeleterState loadLogDeleterState() throws IOException {
+    RecoveredLogDeleterState state = new RecoveredLogDeleterState();
+    state.logDeleterMap = new HashMap<ApplicationId, LogDeleterProto>();
+    LeveldbIterator iter = null;
+    try {
+      iter = new LeveldbIterator(db);
+      iter.seek(bytes(LOG_DELETER_KEY_PREFIX));
+      final int logDeleterKeyPrefixLength = LOG_DELETER_KEY_PREFIX.length();
+      while (iter.hasNext()) {
+        Entry<byte[], byte[]> entry = iter.next();
+        String fullKey = asString(entry.getKey());
+        if (!fullKey.startsWith(LOG_DELETER_KEY_PREFIX)) {
+          break;
+        }
+
+        String appIdStr = fullKey.substring(logDeleterKeyPrefixLength);
+        ApplicationId appId = null;
+        try {
+          appId = ConverterUtils.toApplicationId(appIdStr);
+        } catch (IllegalArgumentException e) {
+          LOG.warn("Skipping unknown log deleter key " + fullKey);
+          continue;
+        }
+
+        LogDeleterProto proto = LogDeleterProto.parseFrom(entry.getValue());
+        state.logDeleterMap.put(appId, proto);
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    } finally {
+      if (iter != null) {
+        iter.close();
+      }
+    }
+    return state;
+  }
+
+  @Override
+  public void storeLogDeleter(ApplicationId appId, LogDeleterProto proto)
+      throws IOException {
+    String key = getLogDeleterKey(appId);
+    try {
+      db.put(bytes(key), proto.toByteArray());
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void removeLogDeleter(ApplicationId appId) throws IOException {
+    String key = getLogDeleterKey(appId);
+    try {
+      db.delete(bytes(key));
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private String getLogDeleterKey(ApplicationId appId) {
+    return LOG_DELETER_KEY_PREFIX + appId;
+  }
+
   @Override
   protected void initStorage(Configuration conf)
       throws IOException {
@@ -966,5 +1032,4 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
             + getCurrentVersion() + ", but loading version " + loadedVersion);
     }
   }
-  
 }

+ 16 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 
 // The state store to use when state isn't being stored
@@ -191,6 +192,21 @@ public class NMNullStateStoreService extends NMStateStoreService {
       throws IOException {
   }
 
+  @Override
+  public RecoveredLogDeleterState loadLogDeleterState() throws IOException {
+    throw new UnsupportedOperationException(
+        "Recovery not supported by this state store");
+  }
+
+  @Override
+  public void storeLogDeleter(ApplicationId appId, LogDeleterProto proto)
+      throws IOException {
+  }
+
+  @Override
+  public void removeLogDeleter(ApplicationId appId) throws IOException {
+  }
+
   @Override
   protected void initStorage(Configuration conf) throws IOException {
   }

+ 35 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 
 @Private
@@ -189,6 +190,14 @@ public abstract class NMStateStoreService extends AbstractService {
     }
   }
 
+  public static class RecoveredLogDeleterState {
+    Map<ApplicationId, LogDeleterProto> logDeleterMap;
+
+    public Map<ApplicationId, LogDeleterProto> getLogDeleterMap() {
+      return logDeleterMap;
+    }
+  }
+
   /** Initialize the state storage */
   @Override
   public void serviceInit(Configuration conf) throws IOException {
@@ -459,6 +468,32 @@ public abstract class NMStateStoreService extends AbstractService {
       throws IOException;
 
 
+  /**
+   * Load the state of log deleters
+   * @return recovered log deleter state
+   * @throws IOException
+   */
+  public abstract RecoveredLogDeleterState loadLogDeleterState()
+      throws IOException;
+
+  /**
+   * Store the state of a log deleter
+   * @param appId the application ID for the log deleter
+   * @param proto the serialized state of the log deleter
+   * @throws IOException
+   */
+  public abstract void storeLogDeleter(ApplicationId appId,
+      LogDeleterProto proto) throws IOException;
+
+  /**
+   * Remove the state of a log deleter
+   * @param appId the application ID for the log deleter
+   * @throws IOException
+   */
+  public abstract void removeLogDeleter(ApplicationId appId)
+      throws IOException;
+
+
   protected abstract void initStorage(Configuration conf) throws IOException;
 
   protected abstract void startStorage() throws IOException;

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto

@@ -47,3 +47,7 @@ message LocalizedResourceProto {
   optional int64 size = 3;
 }
 
+message LogDeleterProto {
+  optional string user = 1;
+  optional int64 deletionTime = 2;
+}

+ 73 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java

@@ -18,10 +18,12 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
@@ -65,10 +67,14 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
 import org.mockito.exceptions.verification.WantedButNotInvoked;
@@ -123,7 +129,8 @@ public class TestNonAggregatingLogHandler {
     dirsHandler.init(conf);
 
     NonAggregatingLogHandler rawLogHandler =
-        new NonAggregatingLogHandler(dispatcher, mockDelService, dirsHandler);
+        new NonAggregatingLogHandler(dispatcher, mockDelService, dirsHandler,
+            new NMNullStateStoreService());
     NonAggregatingLogHandler logHandler = spy(rawLogHandler);
     AbstractFileSystem spylfs =
         spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
@@ -209,7 +216,8 @@ public class TestNonAggregatingLogHandler {
   @Test
   public void testStop() throws Exception {
     NonAggregatingLogHandler aggregatingLogHandler = 
-        new NonAggregatingLogHandler(null, null, null);
+        new NonAggregatingLogHandler(null, null, null,
+            new NMNullStateStoreService());
 
     // It should not throw NullPointerException
     aggregatingLogHandler.stop();
@@ -232,7 +240,8 @@ public class TestNonAggregatingLogHandler {
     NonAggregatingLogHandler aggregatingLogHandler =
         new NonAggregatingLogHandler(new InlineDispatcher(),
             delService,
-            dirsHandler);
+            dirsHandler,
+            new NMNullStateStoreService());
 
     dirsHandler.init(conf);
     dirsHandler.start();
@@ -258,7 +267,13 @@ public class TestNonAggregatingLogHandler {
 
     public NonAggregatingLogHandlerWithMockExecutor(Dispatcher dispatcher,
         DeletionService delService, LocalDirsHandlerService dirsHandler) {
-      super(dispatcher, delService, dirsHandler);
+      this(dispatcher, delService, dirsHandler, new NMNullStateStoreService());
+    }
+
+    public NonAggregatingLogHandlerWithMockExecutor(Dispatcher dispatcher,
+        DeletionService delService, LocalDirsHandlerService dirsHandler,
+        NMStateStoreService stateStore) {
+      super(dispatcher, delService, dirsHandler, stateStore);
     }
 
     @Override
@@ -303,7 +318,8 @@ public class TestNonAggregatingLogHandler {
     LocalDirsHandlerService mockDirsHandler = mock(LocalDirsHandlerService.class);
 
     NonAggregatingLogHandler rawLogHandler =
-        new NonAggregatingLogHandler(dispatcher, mockDelService, mockDirsHandler);
+        new NonAggregatingLogHandler(dispatcher, mockDelService,
+            mockDirsHandler, new NMNullStateStoreService());
     NonAggregatingLogHandler logHandler = spy(rawLogHandler);
     AbstractFileSystem spylfs =
         spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
@@ -316,7 +332,58 @@ public class TestNonAggregatingLogHandler {
       mockDirsHandler, conf, spylfs, lfs, localLogDirs);
     logHandler.close();
   }
-  
+
+  @Test
+  public void testRecovery() throws Exception {
+    File[] localLogDirs = getLocalLogDirFiles(this.getClass().getName(), 2);
+    String localLogDirsString =
+        localLogDirs[0].getAbsolutePath() + ","
+            + localLogDirs[1].getAbsolutePath();
+
+    conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
+    conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
+
+    conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS,
+            YarnConfiguration.DEFAULT_NM_LOG_RETAIN_SECONDS);
+
+    dirsHandler.init(conf);
+
+    NMStateStoreService stateStore = new NMMemoryStateStoreService();
+    stateStore.init(conf);
+    stateStore.start();
+    NonAggregatingLogHandlerWithMockExecutor logHandler =
+        new NonAggregatingLogHandlerWithMockExecutor(dispatcher, mockDelService,
+                                                     dirsHandler, stateStore);
+    logHandler.init(conf);
+    logHandler.start();
+
+    logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null,
+        ContainerLogsRetentionPolicy.ALL_CONTAINERS, null));
+    logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
+    logHandler.handle(new LogHandlerAppFinishedEvent(appId));
+
+    // simulate a restart and verify deletion is rescheduled
+    logHandler.close();
+    logHandler = new NonAggregatingLogHandlerWithMockExecutor(dispatcher,
+        mockDelService, dirsHandler, stateStore);
+    logHandler.init(conf);
+    logHandler.start();
+    ArgumentCaptor<Runnable> schedArg = ArgumentCaptor.forClass(Runnable.class);
+    verify(logHandler.mockSched).schedule(schedArg.capture(),
+        anyLong(), eq(TimeUnit.MILLISECONDS));
+
+    // execute the runnable and verify another restart has nothing scheduled
+    schedArg.getValue().run();
+    logHandler.close();
+    logHandler = new NonAggregatingLogHandlerWithMockExecutor(dispatcher,
+        mockDelService, dirsHandler, stateStore);
+    logHandler.init(conf);
+    logHandler.start();
+    verify(logHandler.mockSched, never()).schedule(any(Runnable.class),
+        anyLong(), any(TimeUnit.class));
+    logHandler.close();
+   }
+
   /**
    * Function to run a log handler with directories failing the getFileStatus
    * call. The function accepts the log handler, setup the mocks to fail with

+ 54 - 25
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 
@@ -48,6 +49,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
   private RecoveredNMTokensState nmTokenState;
   private RecoveredContainerTokensState containerTokenState;
+  private Map<ApplicationId, LogDeleterProto> logDeleterState;
 
   public NMMemoryStateStoreService() {
     super(NMMemoryStateStoreService.class.getName());
@@ -65,6 +67,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
     containerTokenState.activeTokens = new HashMap<ContainerId, Long>();
     trackerStates = new HashMap<TrackerKey, TrackerState>();
     deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>();
+    logDeleterState = new HashMap<ApplicationId, LogDeleterProto>();
   }
 
   @Override
@@ -77,7 +80,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
 
 
   @Override
-  public RecoveredApplicationsState loadApplicationsState()
+  public synchronized RecoveredApplicationsState loadApplicationsState()
       throws IOException {
     RecoveredApplicationsState state = new RecoveredApplicationsState();
     state.applications = new ArrayList<ContainerManagerApplicationProto>(
@@ -87,7 +90,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   }
 
   @Override
-  public void storeApplication(ApplicationId appId,
+  public synchronized void storeApplication(ApplicationId appId,
       ContainerManagerApplicationProto proto) throws IOException {
     ContainerManagerApplicationProto protoCopy =
         ContainerManagerApplicationProto.parseFrom(proto.toByteString());
@@ -95,18 +98,19 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   }
 
   @Override
-  public void storeFinishedApplication(ApplicationId appId) {
+  public synchronized void storeFinishedApplication(ApplicationId appId) {
     finishedApps.add(appId);
   }
 
   @Override
-  public void removeApplication(ApplicationId appId) throws IOException {
+  public synchronized void removeApplication(ApplicationId appId)
+      throws IOException {
     apps.remove(appId);
     finishedApps.remove(appId);
   }
 
   @Override
-  public List<RecoveredContainerState> loadContainersState()
+  public synchronized List<RecoveredContainerState> loadContainersState()
       throws IOException {
     // return a copy so caller can't modify our state
     List<RecoveredContainerState> result =
@@ -124,7 +128,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   }
 
   @Override
-  public void storeContainer(ContainerId containerId,
+  public synchronized void storeContainer(ContainerId containerId,
       StartContainerRequest startRequest) throws IOException {
     RecoveredContainerState rcs = new RecoveredContainerState();
     rcs.startRequest = startRequest;
@@ -132,14 +136,14 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   }
 
   @Override
-  public void storeContainerDiagnostics(ContainerId containerId,
+  public synchronized void storeContainerDiagnostics(ContainerId containerId,
       StringBuilder diagnostics) throws IOException {
     RecoveredContainerState rcs = getRecoveredContainerState(containerId);
     rcs.diagnostics = diagnostics.toString();
   }
 
   @Override
-  public void storeContainerLaunched(ContainerId containerId)
+  public synchronized void storeContainerLaunched(ContainerId containerId)
       throws IOException {
     RecoveredContainerState rcs = getRecoveredContainerState(containerId);
     if (rcs.exitCode != ContainerExitStatus.INVALID) {
@@ -149,22 +153,23 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   }
 
   @Override
-  public void storeContainerKilled(ContainerId containerId)
+  public synchronized void storeContainerKilled(ContainerId containerId)
       throws IOException {
     RecoveredContainerState rcs = getRecoveredContainerState(containerId);
     rcs.killed = true;
   }
 
   @Override
-  public void storeContainerCompleted(ContainerId containerId, int exitCode)
-      throws IOException {
+  public synchronized void storeContainerCompleted(ContainerId containerId,
+      int exitCode) throws IOException {
     RecoveredContainerState rcs = getRecoveredContainerState(containerId);
     rcs.status = RecoveredContainerStatus.COMPLETED;
     rcs.exitCode = exitCode;
   }
 
   @Override
-  public void removeContainer(ContainerId containerId) throws IOException {
+  public synchronized void removeContainer(ContainerId containerId)
+      throws IOException {
     containerStates.remove(containerId);
   }
 
@@ -252,7 +257,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
 
 
   @Override
-  public RecoveredDeletionServiceState loadDeletionServiceState()
+  public synchronized RecoveredDeletionServiceState loadDeletionServiceState()
       throws IOException {
     RecoveredDeletionServiceState result =
         new RecoveredDeletionServiceState();
@@ -274,7 +279,8 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
 
 
   @Override
-  public RecoveredNMTokensState loadNMTokensState() throws IOException {
+  public synchronized RecoveredNMTokensState loadNMTokensState()
+      throws IOException {
     // return a copy so caller can't modify our state
     RecoveredNMTokensState result = new RecoveredNMTokensState();
     result.currentMasterKey = nmTokenState.currentMasterKey;
@@ -286,36 +292,36 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   }
 
   @Override
-  public void storeNMTokenCurrentMasterKey(MasterKey key)
+  public synchronized void storeNMTokenCurrentMasterKey(MasterKey key)
       throws IOException {
     MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
     nmTokenState.currentMasterKey = new MasterKeyPBImpl(keypb.getProto());
   }
 
   @Override
-  public void storeNMTokenPreviousMasterKey(MasterKey key)
+  public synchronized void storeNMTokenPreviousMasterKey(MasterKey key)
       throws IOException {
     MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
     nmTokenState.previousMasterKey = new MasterKeyPBImpl(keypb.getProto());
   }
 
   @Override
-  public void storeNMTokenApplicationMasterKey(ApplicationAttemptId attempt,
-      MasterKey key) throws IOException {
+  public synchronized void storeNMTokenApplicationMasterKey(
+      ApplicationAttemptId attempt, MasterKey key) throws IOException {
     MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
     nmTokenState.applicationMasterKeys.put(attempt,
         new MasterKeyPBImpl(keypb.getProto()));
   }
 
   @Override
-  public void removeNMTokenApplicationMasterKey(ApplicationAttemptId attempt)
-      throws IOException {
+  public synchronized void removeNMTokenApplicationMasterKey(
+      ApplicationAttemptId attempt) throws IOException {
     nmTokenState.applicationMasterKeys.remove(attempt);
   }
 
 
   @Override
-  public RecoveredContainerTokensState loadContainerTokensState()
+  public synchronized RecoveredContainerTokensState loadContainerTokensState()
       throws IOException {
     // return a copy so caller can't modify our state
     RecoveredContainerTokensState result =
@@ -328,7 +334,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   }
 
   @Override
-  public void storeContainerTokenCurrentMasterKey(MasterKey key)
+  public synchronized void storeContainerTokenCurrentMasterKey(MasterKey key)
       throws IOException {
     MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
     containerTokenState.currentMasterKey =
@@ -336,7 +342,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   }
 
   @Override
-  public void storeContainerTokenPreviousMasterKey(MasterKey key)
+  public synchronized void storeContainerTokenPreviousMasterKey(MasterKey key)
       throws IOException {
     MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
     containerTokenState.previousMasterKey =
@@ -344,18 +350,41 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   }
 
   @Override
-  public void storeContainerToken(ContainerId containerId,
+  public synchronized void storeContainerToken(ContainerId containerId,
       Long expirationTime) throws IOException {
     containerTokenState.activeTokens.put(containerId, expirationTime);
   }
 
   @Override
-  public void removeContainerToken(ContainerId containerId)
+  public synchronized void removeContainerToken(ContainerId containerId)
       throws IOException {
     containerTokenState.activeTokens.remove(containerId);
   }
 
 
+  @Override
+  public synchronized RecoveredLogDeleterState loadLogDeleterState()
+      throws IOException {
+    RecoveredLogDeleterState state = new RecoveredLogDeleterState();
+    state.logDeleterMap = new HashMap<ApplicationId,LogDeleterProto>(
+        logDeleterState);
+    return state;
+  }
+
+  @Override
+  public synchronized void storeLogDeleter(ApplicationId appId,
+      LogDeleterProto proto)
+      throws IOException {
+    logDeleterState.put(appId, proto);
+  }
+
+  @Override
+  public synchronized void removeLogDeleter(ApplicationId appId)
+      throws IOException {
+    logDeleterState.remove(appId);
+  }
+
+
   private static class TrackerState {
     Map<Path, LocalResourceProto> inProgressMap =
         new HashMap<Path, LocalResourceProto>();

+ 51 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java

@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
@@ -66,6 +67,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLogDeleterState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokensState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
 import org.apache.hadoop.yarn.server.records.Version;
@@ -831,6 +833,55 @@ public class TestNMLeveldbStateStoreService {
     assertEquals(expTime3, loadedActiveTokens.get(cid3));
   }
 
+  @Test
+  public void testLogDeleterStorage() throws IOException {
+    // test empty when no state
+    RecoveredLogDeleterState state = stateStore.loadLogDeleterState();
+    assertTrue(state.getLogDeleterMap().isEmpty());
+
+    // store log deleter state
+    final ApplicationId appId1 = ApplicationId.newInstance(1, 1);
+    LogDeleterProto proto1 = LogDeleterProto.newBuilder()
+        .setUser("user1")
+        .setDeletionTime(1234)
+        .build();
+    stateStore.storeLogDeleter(appId1, proto1);
+
+    // restart state store and verify recovered
+    restartStateStore();
+    state = stateStore.loadLogDeleterState();
+    assertEquals(1, state.getLogDeleterMap().size());
+    assertEquals(proto1, state.getLogDeleterMap().get(appId1));
+
+    // store another log deleter
+    final ApplicationId appId2 = ApplicationId.newInstance(2, 2);
+    LogDeleterProto proto2 = LogDeleterProto.newBuilder()
+        .setUser("user2")
+        .setDeletionTime(5678)
+        .build();
+    stateStore.storeLogDeleter(appId2, proto2);
+
+    // restart state store and verify recovered
+    restartStateStore();
+    state = stateStore.loadLogDeleterState();
+    assertEquals(2, state.getLogDeleterMap().size());
+    assertEquals(proto1, state.getLogDeleterMap().get(appId1));
+    assertEquals(proto2, state.getLogDeleterMap().get(appId2));
+
+    // remove a deleter and verify removed after restart and recovery
+    stateStore.removeLogDeleter(appId1);
+    restartStateStore();
+    state = stateStore.loadLogDeleterState();
+    assertEquals(1, state.getLogDeleterMap().size());
+    assertEquals(proto2, state.getLogDeleterMap().get(appId2));
+
+    // remove last deleter and verify empty after restart and recovery
+    stateStore.removeLogDeleter(appId2);
+    restartStateStore();
+    state = stateStore.loadLogDeleterState();
+    assertTrue(state.getLogDeleterMap().isEmpty());
+  }
+
   private static class NMTokenSecretManagerForTest extends
       BaseNMTokenSecretManager {
     public MasterKey generateKey() {