Browse Source

YARN-5008. LeveldbRMStateStore database can grow substantially leading to long recovery times. Contributed by Jason Lowe

(cherry picked from commit a9707dceaf181ea0f6d3c4291d1f3ccc2681f273)
Jian He 9 years ago
parent
commit
d7b2da6518

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -558,6 +558,13 @@ public class YarnConfiguration extends Configuration {
   public static final String RM_LEVELDB_STORE_PATH = RM_PREFIX
       + "leveldb-state-store.path";
 
+  /** The time in seconds between full compactions of the leveldb database.
+   *  Setting the interval to zero disables the full compaction cycles.
+   */
+  public static final String RM_LEVELDB_COMPACTION_INTERVAL_SECS = RM_PREFIX
+      + "leveldb-state-store.compaction-interval-secs";
+  public static final long DEFAULT_RM_LEVELDB_COMPACTION_INTERVAL_SECS = 3600;
+
   /** The maximum number of completed applications RM keeps. */ 
   public static final String RM_MAX_COMPLETED_APPLICATIONS =
     RM_PREFIX + "max-completed-applications";

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -490,6 +490,14 @@
     <value>${hadoop.tmp.dir}/yarn/system/rmstore</value>
   </property>
 
+  <property>
+    <description>The time in seconds between full compactions of the leveldb
+    database. Setting the interval to zero disables the full compaction
+    cycles.</description>
+    <name>yarn.resourcemanager.leveldb-state-store.compaction-interval-secs</name>
+    <value>3600</value>
+  </property>
+
   <property>
     <description>Enable RM high-availability. When enabled,
       (1) The RM starts in the Standby mode by default, and transitions to

+ 42 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java

@@ -28,6 +28,8 @@ import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,6 +39,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -89,6 +92,8 @@ public class LeveldbRMStateStore extends RMStateStore {
       .newInstance(1, 0);
 
   private DB db;
+  private Timer compactionTimer;
+  private long compactionIntervalMsec;
 
   private String getApplicationNodeKey(ApplicationId appId) {
     return RM_APP_ROOT + SEPARATOR + appId;
@@ -114,6 +119,9 @@ public class LeveldbRMStateStore extends RMStateStore {
 
   @Override
   protected void initInternal(Configuration conf) throws Exception {
+    compactionIntervalMsec = conf.getLong(
+        YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS,
+        YarnConfiguration.DEFAULT_RM_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000;
   }
 
   private Path getStorageDir() throws IOException {
@@ -135,6 +143,11 @@ public class LeveldbRMStateStore extends RMStateStore {
 
   @Override
   protected void startInternal() throws Exception {
+    db = openDatabase();
+    startCompactionTimer();
+  }
+
+  protected DB openDatabase() throws Exception {
     Path storeRoot = createStorageDir();
     Options options = new Options();
     options.createIfMissing(false);
@@ -158,10 +171,24 @@ public class LeveldbRMStateStore extends RMStateStore {
         throw e;
       }
     }
+    return db;
+  }
+
+  private void startCompactionTimer() {
+    if (compactionIntervalMsec > 0) {
+      compactionTimer = new Timer(
+          this.getClass().getSimpleName() + " compaction timer", true);
+      compactionTimer.schedule(new CompactionTimerTask(),
+          compactionIntervalMsec, compactionIntervalMsec);
+    }
   }
 
   @Override
   protected void closeInternal() throws Exception {
+    if (compactionTimer != null) {
+      compactionTimer.cancel();
+      compactionTimer = null;
+    }
     if (db != null) {
       db.close();
       db = null;
@@ -681,6 +708,21 @@ public class LeveldbRMStateStore extends RMStateStore {
     return numEntries;
   }
 
+  private class CompactionTimerTask extends TimerTask {
+    @Override
+    public void run() {
+      long start = Time.monotonicNow();
+      LOG.info("Starting full compaction cycle");
+      try {
+        db.compactRange(null, null);
+      } catch (DBException e) {
+        LOG.error("Error compacting database", e);
+      }
+      long duration = Time.monotonicNow() - start;
+      LOG.info("Full compaction cycle completed in " + duration + " msec");
+    }
+  }
+
   private static class LeveldbLogger implements Logger {
     private static final Log LOG = LogFactory.getLog(LeveldbLogger.class);
 

+ 23 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java

@@ -18,6 +18,11 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
+import static org.mockito.Mockito.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
 import java.io.File;
 import java.io.IOException;
 
@@ -25,6 +30,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.iq80.leveldb.DB;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -96,6 +102,23 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase {
     testAMRMTokenSecretManagerStateStore(tester);
   }
 
+  @Test(timeout = 60000)
+  public void testCompactionCycle() throws Exception {
+    final DB mockdb = mock(DB.class);
+    conf.setLong(YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS, 1);
+    LeveldbRMStateStore store = new LeveldbRMStateStore() {
+      @Override
+      protected DB openDatabase() throws Exception {
+        return mockdb;
+      }
+    };
+    store.init(conf);
+    store.start();
+    verify(mockdb, timeout(10000)).compactRange(
+        (byte[]) isNull(), (byte[]) isNull());
+    store.close();
+  }
+
   class LeveldbStateStoreTester implements RMStateStoreHelper {
 
     @Override