Parcourir la source

YARN-5008. LeveldbRMStateStore database can grow substantially leading to long recovery times. Contributed by Jason Lowe

Jian He il y a 9 ans
Parent
commit
dd80042c42

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -644,6 +644,13 @@ public class YarnConfiguration extends Configuration {
   public static final String RM_LEVELDB_STORE_PATH = RM_PREFIX
       + "leveldb-state-store.path";
 
+  /** The time in seconds between full compactions of the leveldb database.
+   *  Setting the interval to zero disables the full compaction cycles.
+   */
+  public static final String RM_LEVELDB_COMPACTION_INTERVAL_SECS = RM_PREFIX
+      + "leveldb-state-store.compaction-interval-secs";
+  public static final long DEFAULT_RM_LEVELDB_COMPACTION_INTERVAL_SECS = 3600;
+
   /** The maximum number of completed applications RM keeps. */ 
   public static final String RM_MAX_COMPLETED_APPLICATIONS =
     RM_PREFIX + "max-completed-applications";

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -554,6 +554,14 @@
     <value>${hadoop.tmp.dir}/yarn/system/rmstore</value>
   </property>
 
+  <property>
+    <description>The time in seconds between full compactions of the leveldb
+    database. Setting the interval to zero disables the full compaction
+    cycles.</description>
+    <name>yarn.resourcemanager.leveldb-state-store.compaction-interval-secs</name>
+    <value>3600</value>
+  </property>
+
   <property>
     <description>Enable RM high-availability. When enabled,
       (1) The RM starts in the Standby mode by default, and transitions to

+ 42 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java

@@ -29,6 +29,8 @@ import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,6 +40,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
@@ -97,6 +100,8 @@ public class LeveldbRMStateStore extends RMStateStore {
       .newInstance(1, 1);
 
   private DB db;
+  private Timer compactionTimer;
+  private long compactionIntervalMsec;
 
   private String getApplicationNodeKey(ApplicationId appId) {
     return RM_APP_ROOT + SEPARATOR + appId;
@@ -128,6 +133,9 @@ public class LeveldbRMStateStore extends RMStateStore {
 
   @Override
   protected void initInternal(Configuration conf) throws Exception {
+    compactionIntervalMsec = conf.getLong(
+        YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS,
+        YarnConfiguration.DEFAULT_RM_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000;
   }
 
   private Path getStorageDir() throws IOException {
@@ -149,6 +157,11 @@ public class LeveldbRMStateStore extends RMStateStore {
 
   @Override
   protected void startInternal() throws Exception {
+    db = openDatabase();
+    startCompactionTimer();
+  }
+
+  protected DB openDatabase() throws Exception {
     Path storeRoot = createStorageDir();
     Options options = new Options();
     options.createIfMissing(false);
@@ -172,10 +185,24 @@ public class LeveldbRMStateStore extends RMStateStore {
         throw e;
       }
     }
+    return db;
+  }
+
+  private void startCompactionTimer() {
+    if (compactionIntervalMsec > 0) {
+      compactionTimer = new Timer(
+          this.getClass().getSimpleName() + " compaction timer", true);
+      compactionTimer.schedule(new CompactionTimerTask(),
+          compactionIntervalMsec, compactionIntervalMsec);
+    }
   }
 
   @Override
   protected void closeInternal() throws Exception {
+    if (compactionTimer != null) {
+      compactionTimer.cancel();
+      compactionTimer = null;
+    }
     if (db != null) {
       db.close();
       db = null;
@@ -825,6 +852,21 @@ public class LeveldbRMStateStore extends RMStateStore {
     return numEntries;
   }
 
+  private class CompactionTimerTask extends TimerTask {
+    @Override
+    public void run() {
+      long start = Time.monotonicNow();
+      LOG.info("Starting full compaction cycle");
+      try {
+        db.compactRange(null, null);
+      } catch (DBException e) {
+        LOG.error("Error compacting database", e);
+      }
+      long duration = Time.monotonicNow() - start;
+      LOG.info("Full compaction cycle completed in " + duration + " msec");
+    }
+  }
+
   private static class LeveldbLogger implements Logger {
     private static final Log LOG = LogFactory.getLog(LeveldbLogger.class);
 

+ 23 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java

@@ -18,6 +18,11 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
+import static org.mockito.Mockito.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
 import java.io.File;
 import java.io.IOException;
 
@@ -26,6 +31,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.iq80.leveldb.DB;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -115,6 +121,23 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase {
     testReservationStateStore(tester);
   }
 
+  @Test(timeout = 60000)
+  public void testCompactionCycle() throws Exception {
+    final DB mockdb = mock(DB.class);
+    conf.setLong(YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS, 1);
+    LeveldbRMStateStore store = new LeveldbRMStateStore() {
+      @Override
+      protected DB openDatabase() throws Exception {
+        return mockdb;
+      }
+    };
+    store.init(conf);
+    store.start();
+    verify(mockdb, timeout(10000)).compactRange(
+        (byte[]) isNull(), (byte[]) isNull());
+    store.close();
+  }
+
   class LeveldbStateStoreTester implements RMStateStoreHelper {
 
     @Override