Bladeren bron

YARN-2952. Fixed incorrect version check in StateStore. Contributed by Rohith Sharmaks
(cherry picked from commit 808cba3821d5bc4267f69d14220757f01cd55715)

(cherry picked from commit 9180d11b3bbb2a49127d5d25f53b38c5113bf7ea)

Jian He 10 jaren geleden
bovenliggende
commit
6dfdcf094d

+ 2 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java

@@ -493,9 +493,9 @@ public class ShuffleHandler extends AuxiliaryService {
   @VisibleForTesting
   @VisibleForTesting
   Version loadVersion() throws IOException {
   Version loadVersion() throws IOException {
     byte[] data = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY));
     byte[] data = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY));
-    // if version is not stored previously, treat it as 1.0.
+    // if version is not stored previously, treat it as CURRENT_VERSION_INFO.
     if (data == null || data.length == 0) {
     if (data == null || data.length == 0) {
-      return Version.newInstance(1, 0);
+      return getCurrentVersion();
     }
     }
     Version version =
     Version version =
         new VersionPBImpl(VersionProto.parseFrom(data));
         new VersionPBImpl(VersionProto.parseFrom(data));

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -51,6 +51,9 @@ Release 2.6.1 - UNRELEASED
     YARN-1984. LeveldbTimelineStore does not handle db exceptions properly
     YARN-1984. LeveldbTimelineStore does not handle db exceptions properly
     (Varun Saxena via jlowe)
     (Varun Saxena via jlowe)
 
 
+    YARN-2952. Fixed incorrect version check in StateStore. (Rohith Sharmaks
+    via jianhe)
+
 Release 2.6.0 - 2014-11-18
 Release 2.6.0 - 2014-11-18
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java

@@ -1610,9 +1610,9 @@ public class LeveldbTimelineStore extends AbstractService
   Version loadVersion() throws IOException {
   Version loadVersion() throws IOException {
     try {
     try {
       byte[] data = db.get(bytes(TIMELINE_STORE_VERSION_KEY));
       byte[] data = db.get(bytes(TIMELINE_STORE_VERSION_KEY));
-      // if version is not stored previously, treat it as 1.0.
+      // if version is not stored previously, treat it as CURRENT_VERSION_INFO.
       if (data == null || data.length == 0) {
       if (data == null || data.length == 0) {
-        return Version.newInstance(1, 0);
+        return getCurrentVersion();
       }
       }
       Version version =
       Version version =
           new VersionPBImpl(VersionProto.parseFrom(data));
           new VersionPBImpl(VersionProto.parseFrom(data));

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java

@@ -907,9 +907,9 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
 
 
   Version loadVersion() throws IOException {
   Version loadVersion() throws IOException {
     byte[] data = db.get(bytes(DB_SCHEMA_VERSION_KEY));
     byte[] data = db.get(bytes(DB_SCHEMA_VERSION_KEY));
-    // if version is not stored previously, treat it as 1.0.
+    // if version is not stored previously, treat it as CURRENT_VERSION_INFO.
     if (data == null || data.length == 0) {
     if (data == null || data.length == 0) {
-      return Version.newInstance(1, 0);
+      return getCurrentVersion();
     }
     }
     Version version =
     Version version =
         new VersionPBImpl(VersionProto.parseFrom(data));
         new VersionPBImpl(VersionProto.parseFrom(data));

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java

@@ -507,7 +507,7 @@ public abstract class RMStateStore extends AbstractService {
    * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
    * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
    * 2) Any incompatible change of state-store is a major upgrade, and any
    * 2) Any incompatible change of state-store is a major upgrade, and any
    *    compatible change of state-store is a minor upgrade.
    *    compatible change of state-store is a minor upgrade.
-   * 3) If theres's no version, treat it as 1.0.
+   * 3) If theres's no version, treat it as CURRENT_VERSION_INFO.
    * 4) Within a minor upgrade, say 1.1 to 1.2:
    * 4) Within a minor upgrade, say 1.1 to 1.2:
    *    overwrite the version info and proceed as normal.
    *    overwrite the version info and proceed as normal.
    * 5) Within a major upgrade, say 1.2 to 2.0:
    * 5) Within a major upgrade, say 1.2 to 2.0:
@@ -520,9 +520,9 @@ public abstract class RMStateStore extends AbstractService {
     if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
     if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
       return;
       return;
     }
     }
-    // if there is no version info, treat it as 1.0;
+    // if there is no version info, treat it as CURRENT_VERSION_INFO;
     if (loadedVersion == null) {
     if (loadedVersion == null) {
-      loadedVersion = Version.newInstance(1, 0);
+      loadedVersion = getCurrentVersion();
     }
     }
     if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
     if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
       LOG.info("Storing RM state version info " + getCurrentVersion());
       LOG.info("Storing RM state version info " + getCurrentVersion());

+ 54 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
 import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.TestZKRMStateStore.TestZKRMStateStoreTester;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@@ -166,6 +167,59 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
     }
     }
   }
   }
 
 
+  @Test(timeout = 60000)
+  public void testCheckMajorVersionChange() throws Exception {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    try {
+      fsTester = new TestFSRMStateStoreTester(cluster) {
+        Version VERSION_INFO = Version.newInstance(Integer.MAX_VALUE, 0);
+
+        @Override
+        public Version getCurrentVersion() throws Exception {
+          return VERSION_INFO;
+        }
+
+        @Override
+        public RMStateStore getRMStateStore() throws Exception {
+          YarnConfiguration conf = new YarnConfiguration();
+          conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI,
+              workingDirPathURI.toString());
+          conf.set(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC,
+              "100,6000");
+          this.store = new TestFileSystemRMStore(conf) {
+            Version storedVersion = null;
+
+            @Override
+            public Version getCurrentVersion() {
+              return VERSION_INFO;
+            }
+
+            @Override
+            protected synchronized Version loadVersion() throws Exception {
+              return storedVersion;
+            }
+
+            @Override
+            protected synchronized void storeVersion() throws Exception {
+              storedVersion = VERSION_INFO;
+            }
+          };
+          return store;
+        }
+      };
+
+      // default version
+      RMStateStore store = fsTester.getRMStateStore();
+      Version defaultVersion = fsTester.getCurrentVersion();
+      store.checkVersion();
+      Assert.assertEquals(defaultVersion, store.loadVersion());
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   @Override
   @Override
   protected void modifyAppState() throws Exception {
   protected void modifyAppState() throws Exception {
     // imitate appAttemptFile1 is still .new, but old one is deleted
     // imitate appAttemptFile1 is still .new, but old one is deleted

+ 47 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.data.Stat;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.Test;
 
 
 public class TestZKRMStateStore extends RMStateStoreTestBase {
 public class TestZKRMStateStore extends RMStateStoreTestBase {
@@ -126,6 +127,52 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
     testAMRMTokenSecretManagerStateStore(zkTester);
     testAMRMTokenSecretManagerStateStore(zkTester);
   }
   }
 
 
+  @Test (timeout = 60000)
+  public void testCheckMajorVersionChange() throws Exception {
+    TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester() {
+      Version VERSION_INFO = Version.newInstance(Integer.MAX_VALUE, 0);
+
+      @Override
+      public Version getCurrentVersion() throws Exception {
+        return VERSION_INFO;
+      }
+
+      @Override
+      public RMStateStore getRMStateStore() throws Exception {
+        YarnConfiguration conf = new YarnConfiguration();
+        workingZnode = "/Test";
+        conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
+        conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
+        this.client = createClient();
+        this.store = new TestZKRMStateStoreInternal(conf, workingZnode) {
+          Version storedVersion = null;
+
+          @Override
+          public Version getCurrentVersion() {
+            return VERSION_INFO;
+          }
+
+          @Override
+          protected synchronized Version loadVersion() throws Exception {
+            return storedVersion;
+          }
+
+          @Override
+          protected synchronized void storeVersion() throws Exception {
+            storedVersion = VERSION_INFO;
+          }
+        };
+        return this.store;
+      }
+
+    };
+    // default version
+    RMStateStore store = zkTester.getRMStateStore();
+    Version defaultVersion = zkTester.getCurrentVersion();
+    store.checkVersion();
+    Assert.assertEquals(defaultVersion, store.loadVersion());
+  }
+
   private Configuration createHARMConf(
   private Configuration createHARMConf(
       String rmIds, String rmId, int adminPort) {
       String rmIds, String rmId, int adminPort) {
     Configuration conf = new YarnConfiguration();
     Configuration conf = new YarnConfiguration();