فهرست منبع

YARN-5601. Make the RM epoch base value configurable. Contributed by Subru Krishnan

(cherry picked from commit 9ca2aba9cc65090162b3517b194b5e655ee4a157)
(cherry picked from commit 2797507d51566ab3b8328e5fb1d0beb9fbce5bae)
Jian He 8 سال پیش
والد
کامیت
aac8755125
13فایلهای تغییر یافته به همراه32 افزوده شده و 7 حذف شده
  1. 4 1
      hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
  2. 3 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  3. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
  4. 7 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
  5. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
  6. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
  7. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
  8. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
  9. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
  10. 5 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
  11. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
  12. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
  13. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java

+ 4 - 1
hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml

@@ -276,7 +276,10 @@
   </Match>
   <Match>
     <Class name="org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore" />
-    <Field name="resourceManager"/>
+    <Or>
+      <Field name="resourceManager"/>
+      <Field name="baseEpoch"/>
+    </Or>
     <Bug pattern="IS2_INCONSISTENT_SYNC" />
   </Match>
   <Match>

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -149,6 +149,9 @@ public class YarnConfiguration extends Configuration {
 
   public static final String RM_HOSTNAME = RM_PREFIX + "hostname";
 
+  public static final String RM_EPOCH = RM_PREFIX + "epoch";
+  public static final long DEFAULT_RM_EPOCH = 0L;
+
   /** The address of the applications manager interface in the RM.*/
   public static final String RM_ADDRESS = 
     RM_PREFIX + "address";

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java

@@ -77,6 +77,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
         .add(YarnConfiguration.FEDERATION_FAILOVER_ENABLED);
     configurationPropsToSkipCompare
         .add(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS);
+    configurationPropsToSkipCompare
+        .add(YarnConfiguration.RM_EPOCH);
 
     // Ignore blacklisting nodes for AM failures feature since it is still a
     // "work in progress"

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -718,6 +718,13 @@ public class ResourceManager extends CompositeService implements Recoverable {
           LOG.error("Failed to load/recover state", e);
           throw e;
         }
+      } else {
+        if (HAUtil.isFederationEnabled(conf)) {
+          long epoch = conf.getLong(YarnConfiguration.RM_EPOCH,
+              YarnConfiguration.DEFAULT_RM_EPOCH);
+          rmContext.setEpoch(epoch);
+          LOG.info("Epoch set for Federation: " + epoch);
+        }
       }
 
       super.serviceStart();

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java

@@ -202,7 +202,7 @@ public class FileSystemRMStateStore extends RMStateStore {
   @Override
   public synchronized long getAndIncrementEpoch() throws Exception {
     Path epochNodePath = getNodePath(rootDirPath, EPOCH_NODE);
-    long currentEpoch = 0;
+    long currentEpoch = baseEpoch;
     FileStatus status = getFileStatusWithRetries(epochNodePath);
     if (status != null) {
       // load current epoch

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java

@@ -255,7 +255,7 @@ public class LeveldbRMStateStore extends RMStateStore {
 
   @Override
   public synchronized long getAndIncrementEpoch() throws Exception {
-    long currentEpoch = 0;
+    long currentEpoch = baseEpoch;
     byte[] dbKeyBytes = bytes(EPOCH_NODE);
     try {
       byte[] data = db.get(dbKeyBytes);

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java

@@ -83,6 +83,7 @@ public class MemoryRMStateStore extends RMStateStore {
   
   @Override
   public synchronized void initInternal(Configuration conf) {
+    epoch = baseEpoch;
   }
 
   @Override

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java

@@ -98,6 +98,7 @@ public abstract class RMStateStore extends AbstractService {
       "ReservationSystemRoot";
   protected static final String VERSION_NODE = "RMVersionNode";
   protected static final String EPOCH_NODE = "EpochNode";
+  protected long baseEpoch;
   protected ResourceManager resourceManager;
   private final ReadLock readLock;
   private final WriteLock writeLock;
@@ -690,6 +691,9 @@ public abstract class RMStateStore extends AbstractService {
     dispatcher.register(RMStateStoreEventType.class, 
                         rmStateStoreEventHandler);
     dispatcher.setDrainEventsOnStop();
+    // read the base epoch value from conf
+    baseEpoch = conf.getLong(YarnConfiguration.RM_EPOCH,
+        YarnConfiguration.DEFAULT_RM_EPOCH);
     initInternal(conf);
   }
 

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java

@@ -437,7 +437,7 @@ public class ZKRMStateStore extends RMStateStore {
   @Override
   public synchronized long getAndIncrementEpoch() throws Exception {
     String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE);
-    long currentEpoch = 0;
+    long currentEpoch = baseEpoch;
 
     if (exists(epochNodePath)) {
       // load current epoch

+ 5 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java

@@ -91,6 +91,8 @@ public class RMStateStoreTestBase {
 
   public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);
 
+  protected final long epoch = 10L;
+
   static class TestDispatcher implements Dispatcher, EventHandler<Event> {
 
     ApplicationAttemptId attemptId;
@@ -564,13 +566,13 @@ public class RMStateStoreTestBase {
     store.setRMDispatcher(new TestDispatcher());
     
     long firstTimeEpoch = store.getAndIncrementEpoch();
-    Assert.assertEquals(0, firstTimeEpoch);
+    Assert.assertEquals(epoch, firstTimeEpoch);
     
     long secondTimeEpoch = store.getAndIncrementEpoch();
-    Assert.assertEquals(1, secondTimeEpoch);
+    Assert.assertEquals(epoch + 1, secondTimeEpoch);
     
     long thirdTimeEpoch = store.getAndIncrementEpoch();
-    Assert.assertEquals(2, thirdTimeEpoch);
+    Assert.assertEquals(epoch + 2, thirdTimeEpoch);
   }
 
   public void testAppDeletion(RMStateStoreHelper stateStoreHelper)

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java

@@ -118,6 +118,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
       conf.setInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, 8);
       conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
               900L);
+      conf.setLong(YarnConfiguration.RM_EPOCH, epoch);
       if (adminCheckEnable) {
         conf.setBoolean(
           YarnConfiguration.YARN_INTERMEDIATE_DATA_ENCRYPTION, true);

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java

@@ -82,6 +82,7 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase {
 
   @Test(timeout = 60000)
   public void testEpoch() throws Exception {
+    conf.setLong(YarnConfiguration.RM_EPOCH, epoch);
     LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
     testEpoch(tester);
   }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java

@@ -188,6 +188,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
       conf.set(YarnConfiguration.RM_ZK_ADDRESS,
           curatorTestingServer.getConnectString());
       conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
+      conf.setLong(YarnConfiguration.RM_EPOCH, epoch);
       this.store = new TestZKRMStateStoreInternal(conf, workingZnode);
       return this.store;
     }