Переглянути джерело

YARN-1239. Modified ResourceManager state-store implementations to start storing version numbers. Contributed by Jian He.
svn merge --ignore-ancestry -c 1546229 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1546230 13f79535-47bb-0310-9956-ffa450edef68

Vinod Kumar Vavilapalli 11 роки тому
батько
коміт
db027316e6
14 змінених файлів з 459 додано та 17 видалено
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 8 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
  3. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
  4. 37 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
  5. 20 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
  6. 22 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
  7. 51 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
  8. 42 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java
  9. 37 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
  10. 80 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/RMStateVersion.java
  11. 76 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/RMStateVersionPBImpl.java
  12. 36 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
  13. 23 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
  14. 23 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -105,6 +105,9 @@ Release 2.3.0 - UNRELEASED
     YARN-1314. Fixed DistributedShell to not fail with multiple arguments for a
     shell command separated by spaces. (Xuan Gong via vinodkv)
 
+    YARN-1239. Modified ResourceManager state-store implementations to start
+    storing version numbers. (Jian He via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 8 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto

@@ -103,9 +103,9 @@ enum RMAppStateProto {
 
 message ApplicationStateDataProto {
   optional int64 submit_time = 1;
-  optional int64 start_time = 2;
-  optional ApplicationSubmissionContextProto application_submission_context = 3;
-  optional string user = 4;
+  optional ApplicationSubmissionContextProto application_submission_context = 2;
+  optional string user = 3;
+  optional int64 start_time = 4;
   optional RMAppStateProto application_state = 5;
   optional string diagnostics = 6 [default = "N/A"];
   optional int64 finish_time = 7;
@@ -121,3 +121,8 @@ message ApplicationAttemptStateDataProto {
   optional int64 start_time = 7;
   optional FinalApplicationStatusProto final_application_status = 8;
 }
+
+message RMStateVersionProto {
+  optional int32 major_version = 1;
+  optional int32 minor_version = 2;
+}

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -457,6 +457,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
 
       if(recoveryEnabled) {
         try {
+          rmStore.checkVersion();
           RMState state = rmStore.loadState();
           recover(state);
         } catch (Exception e) {

+ 37 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java

@@ -44,9 +44,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -63,7 +66,9 @@ public class FileSystemRMStateStore extends RMStateStore {
 
   public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class);
 
-  private static final String ROOT_DIR_NAME = "FSRMStateRoot";
+  protected static final String ROOT_DIR_NAME = "FSRMStateRoot";
+  protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
+    .newInstance(1, 0);
 
   protected FileSystem fs;
 
@@ -78,7 +83,6 @@ public class FileSystemRMStateStore extends RMStateStore {
   @Override
   public synchronized void initInternal(Configuration conf)
       throws Exception{
-
     fsWorkingPath = new Path(conf.get(YarnConfiguration.FS_RM_STATE_STORE_URI));
     rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
     rmDTSecretManagerRoot = new Path(rootDirPath, RM_DT_SECRET_MANAGER_ROOT);
@@ -100,6 +104,36 @@ public class FileSystemRMStateStore extends RMStateStore {
     fs.close();
   }
 
+  @Override
+  protected RMStateVersion getCurrentVersion() {
+    return CURRENT_VERSION_INFO;
+  }
+
+  @Override
+  protected synchronized RMStateVersion loadVersion() throws Exception {
+    Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
+    if (fs.exists(versionNodePath)) {
+      FileStatus status = fs.getFileStatus(versionNodePath);
+      byte[] data = readFile(versionNodePath, status.getLen());
+      RMStateVersion version =
+          new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data));
+      return version;
+    }
+    return null;
+  }
+
+  @Override
+  protected synchronized void storeVersion() throws Exception {
+    Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
+    byte[] data =
+        ((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
+    if (fs.exists(versionNodePath)) {
+      updateFile(versionNodePath, data);
+    } else {
+      writeFile(versionNodePath, data);
+    }
+  }
+
   @Override
   public synchronized RMState loadState() throws Exception {
     RMState rmState = new RMState();
@@ -430,7 +464,7 @@ public class FileSystemRMStateStore extends RMStateStore {
     fs.rename(tempPath, outputPath);
   }
 
-  private void updateFile(Path outputPath, byte[] data) throws Exception {
+  protected void updateFile(Path outputPath, byte[] data) throws Exception {
     if (fs.exists(outputPath)) {
       deleteFile(outputPath);
     }

+ 20 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -43,12 +44,15 @@ import com.google.common.annotations.VisibleForTesting;
 public class MemoryRMStateStore extends RMStateStore {
   
   RMState state = new RMState();
-  
   @VisibleForTesting
   public RMState getState() {
     return state;
   }
-  
+
+  @Override
+  public void checkVersion() throws Exception {
+  }
+
   @Override
   public synchronized RMState loadState() throws Exception {
     // return a copy of the state to allow for modification of the real state
@@ -224,4 +228,18 @@ public class MemoryRMStateStore extends RMStateStore {
         state.rmSecretManagerState.getMasterKeyState();
     rmDTMasterKeyState.remove(delegationKey);
   }
+
+  @Override
+  protected RMStateVersion loadVersion() throws Exception {
+    return null;
+  }
+
+  @Override
+  protected void storeVersion() throws Exception {
+  }
+
+  @Override
+  protected RMStateVersion getCurrentVersion() {
+    return null;
+  }
 }

+ 22 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 
@@ -99,6 +100,27 @@ public class NullRMStateStore extends RMStateStore {
   @Override
   protected void updateApplicationAttemptStateInternal(String attemptId,
       ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
+  }
+
+  @Override
+  public void checkVersion() throws Exception {
+    // Do nothing
+  }
+
+  @Override
+  protected RMStateVersion loadVersion() throws Exception {
+    // Do nothing
+    return null;
+  }
+
+  @Override
+  protected void storeVersion() throws Exception {
+    // Do nothing
+  }
+
+  @Override
+  protected RMStateVersion getCurrentVersion() {
     // Do nothing
+    return null;
   }
 }

+ 51 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java

@@ -43,18 +43,18 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
-
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRemovedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -78,6 +78,7 @@ public abstract class RMStateStore extends AbstractService {
   protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
   protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
       "RMDTSequenceNumber_";
+  protected static final String VERSION_NODE = "RMVersionNode";
 
   public static final Log LOG = LogFactory.getLog(RMStateStore.class);
 
@@ -304,7 +305,54 @@ public abstract class RMStateStore extends AbstractService {
    * after this
    */
   protected abstract void closeInternal() throws Exception;
-  
+
+  /**
+   * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
+   * 2) Any incompatible change of state-store is a major upgrade, and any
+   *    compatible change of state-store is a minor upgrade.
+   * 3) If theres's no version, treat it as 1.0.
+   * 4) Within a minor upgrade, say 1.1 to 1.2:
+   *    overwrite the version info and proceed as normal.
+   * 5) Within a major upgrade, say 1.2 to 2.0:
+   *    throw exception and indicate user to use a separate upgrade tool to
+   *    upgrade RM state.
+   */
+  public void checkVersion() throws Exception {
+    RMStateVersion loadedVersion = loadVersion();
+    LOG.info("Loaded RM state version info " + loadedVersion);
+    if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
+      return;
+    }
+    // if there is no version info, treat it as 1.0;
+    if (loadedVersion == null) {
+      loadedVersion = RMStateVersion.newInstance(1, 0);
+    }
+    if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
+      LOG.info("Storing RM state version info " + getCurrentVersion());
+      storeVersion();
+    } else {
+      throw new RMStateVersionIncompatibleException(
+        "Expecting RM state version " + getCurrentVersion()
+            + ", but loading version " + loadedVersion);
+    }
+  }
+
+  /**
+   * Derived class use this method to load the version information from state
+   * store.
+   */
+  protected abstract RMStateVersion loadVersion() throws Exception;
+
+  /**
+   * Derived class use this method to store the version information.
+   */
+  protected abstract void storeVersion() throws Exception;
+
+  /**
+   * Get the current version of the underlying state store.
+   */
+  protected abstract RMStateVersion getCurrentVersion();
+
   /**
    * Blocking API
    * The derived class must recover state from the store and return a new 

+ 42 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateVersionIncompatibleException.java

@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * This exception is thrown by ResourceManager if it's loading an incompatible
+ * version of state from state store on recovery.
+ */
+public class RMStateVersionIncompatibleException extends YarnException {
+
+  private static final long serialVersionUID = 1364408L;
+
+  public RMStateVersionIncompatibleException(Throwable cause) {
+    super(cause);
+  }
+
+  public RMStateVersionIncompatibleException(String message) {
+    super(message);
+  }
+
+  public RMStateVersionIncompatibleException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

+ 37 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java

@@ -33,7 +33,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
@@ -41,16 +40,18 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ZKUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.client.RMHAServiceTarget;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -64,9 +65,9 @@ import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
 
 @Private
 @Unstable
@@ -74,7 +75,9 @@ public class ZKRMStateStore extends RMStateStore {
 
   public static final Log LOG = LogFactory.getLog(ZKRMStateStore.class);
 
-  private static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
+  protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
+  protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
+      .newInstance(1, 0);
   private int numRetries;
 
   private String zkHostPort = null;
@@ -301,6 +304,36 @@ public class ZKRMStateStore extends RMStateStore {
     closeZkClients();
   }
 
+  @Override
+  protected RMStateVersion getCurrentVersion() {
+    return CURRENT_VERSION_INFO;
+  }
+
+  @Override
+  protected synchronized void storeVersion() throws Exception {
+    String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
+    byte[] data =
+        ((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
+    if (zkClient.exists(versionNodePath, true) != null) {
+      setDataWithRetries(versionNodePath, data, -1);
+    } else {
+      createWithRetries(versionNodePath, data, zkAcl, CreateMode.PERSISTENT);
+    }
+  }
+
+  @Override
+  protected synchronized RMStateVersion loadVersion() throws Exception {
+    String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
+
+    if (zkClient.exists(versionNodePath, true) != null) {
+      byte[] data = getDataWithRetries(versionNodePath, true);
+      RMStateVersion version =
+          new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data));
+      return version;
+    }
+    return null;
+  }
+
   @Override
   public synchronized RMState loadState() throws Exception {
     RMState rmState = new RMState();

+ 80 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/RMStateVersion.java

@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * The version information of RM state.
+ */
+@Private
+@Unstable
+public abstract class RMStateVersion {
+
+  public static RMStateVersion newInstance(int majorVersion, int minorVersion) {
+    RMStateVersion version = Records.newRecord(RMStateVersion.class);
+    version.setMajorVersion(majorVersion);
+    version.setMinorVersion(minorVersion);
+    return version;
+  }
+
+  public abstract int getMajorVersion();
+
+  public abstract void setMajorVersion(int majorVersion);
+
+  public abstract int getMinorVersion();
+
+  public abstract void setMinorVersion(int minorVersion);
+
+  public String toString() {
+    return getMajorVersion() + "." + getMinorVersion();
+  }
+
+  public boolean isCompatibleTo(RMStateVersion version) {
+    return getMajorVersion() == version.getMajorVersion();
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + getMajorVersion();
+    result = prime * result + getMinorVersion();
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    RMStateVersion other = (RMStateVersion) obj;
+    if (this.getMajorVersion() == other.getMajorVersion()
+        && this.getMinorVersion() == other.getMinorVersion()) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+}

+ 76 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/RMStateVersionPBImpl.java

@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb;
+
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProtoOrBuilder;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
+
+public class RMStateVersionPBImpl extends RMStateVersion {
+
+  RMStateVersionProto proto = RMStateVersionProto.getDefaultInstance();
+  RMStateVersionProto.Builder builder = null;
+  boolean viaProto = false;
+
+  public RMStateVersionPBImpl() {
+    builder = RMStateVersionProto.newBuilder();
+  }
+
+  public RMStateVersionPBImpl(RMStateVersionProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public RMStateVersionProto getProto() {
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = RMStateVersionProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  @Override
+  public int getMajorVersion() {
+    RMStateVersionProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getMajorVersion();
+  }
+
+  @Override
+  public void setMajorVersion(int major) {
+    maybeInitBuilder();
+    builder.setMajorVersion(major);
+  }
+
+  @Override
+  public int getMinorVersion() {
+    RMStateVersionProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getMinorVersion();
+  }
+
+  @Override
+  public void setMinorVersion(int minor) {
+    maybeInitBuilder();
+    builder.setMinorVersion(minor);
+  }
+}

+ 36 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java

@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.Appli
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -106,6 +107,8 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
   interface RMStateStoreHelper {
     RMStateStore getRMStateStore() throws Exception;
     boolean isFinalStateValid() throws Exception;
+    void writeVersion(RMStateVersion version) throws Exception;
+    RMStateVersion getCurrentVersion() throws Exception;
   }
 
   void waitNotify(TestDispatcher dispatcher) {
@@ -379,4 +382,37 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
     appToken.setService(new Text("appToken service"));
     return appToken;
   }
+
+  public void testCheckVersion(RMStateStoreHelper stateStoreHelper)
+      throws Exception {
+    RMStateStore store = stateStoreHelper.getRMStateStore();
+    store.setRMDispatcher(new TestDispatcher());
+
+    // default version
+    RMStateVersion defaultVersion = stateStoreHelper.getCurrentVersion();
+    store.checkVersion();
+    Assert.assertEquals(defaultVersion, store.loadVersion());
+
+    // compatible version
+    RMStateVersion compatibleVersion =
+        RMStateVersion.newInstance(defaultVersion.getMajorVersion(),
+          defaultVersion.getMinorVersion() + 2);
+    stateStoreHelper.writeVersion(compatibleVersion);
+    Assert.assertEquals(compatibleVersion, store.loadVersion());
+    store.checkVersion();
+    // overwrite the compatible version
+    Assert.assertEquals(defaultVersion, store.loadVersion());
+
+    // incompatible version
+    RMStateVersion incompatibleVersion =
+        RMStateVersion.newInstance(defaultVersion.getMajorVersion() + 2,
+          defaultVersion.getMinorVersion());
+    stateStoreHelper.writeVersion(incompatibleVersion);
+    try {
+      store.checkVersion();
+      Assert.fail("Invalid version, should fail.");
+    } catch (Throwable t) {
+      Assert.assertTrue(t instanceof RMStateVersionIncompatibleException);
+    }
+  }
 }

+ 23 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java

@@ -32,6 +32,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.Test;
 
@@ -42,7 +44,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
   class TestFSRMStateStoreTester implements RMStateStoreHelper {
 
     Path workingDirPathURI;
-    FileSystemRMStateStore store;
+    TestFileSystemRMStore store;
     MiniDFSCluster cluster;
 
     class TestFileSystemRMStore extends FileSystemRMStateStore {
@@ -54,6 +56,14 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
         start();
         Assert.assertNotNull(fs);
       }
+
+      public Path getVersionNode() {
+        return new Path(new Path(workingDirPathURI, ROOT_DIR_NAME), VERSION_NODE);
+      }
+
+      public RMStateVersion getCurrentVersion() {
+        return CURRENT_VERSION_INFO;
+      }
     }
 
     public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception {
@@ -81,6 +91,17 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
       FileStatus[] files = fs.listStatus(workingDirPathURI);
       return files.length == 1;
     }
+
+    @Override
+    public void writeVersion(RMStateVersion version) throws Exception {
+      store.updateFile(store.getVersionNode(), ((RMStateVersionPBImpl) version)
+        .getProto().toByteArray());
+    }
+
+    @Override
+    public RMStateVersion getCurrentVersion() throws Exception {
+      return store.getCurrentVersion();
+    }
   }
 
   @Test
@@ -113,6 +134,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
       Assert.assertFalse(fileSystemRMStateStore.fsWorkingPath
           .getFileSystem(conf).exists(tempAppAttemptFile));
       testRMDTSecretManagerStateStore(fsTester);
+      testCheckVersion(fsTester);
     } finally {
       cluster.shutdown();
     }

+ 23 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java

@@ -44,6 +44,8 @@ import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
 import org.apache.zookeeper.ZooKeeper;
 import org.junit.Test;
 
@@ -54,7 +56,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
   class TestZKRMStateStoreTester implements RMStateStoreHelper {
 
     ZooKeeper client;
-    ZKRMStateStore store;
+    TestZKRMStateStoreInternal store;
 
     class TestZKRMStateStoreInternal extends ZKRMStateStore {
 
@@ -69,6 +71,14 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
       public ZooKeeper getNewZooKeeper() throws IOException {
         return client;
       }
+
+      public String getVersionNode() {
+        return znodeWorkingPath + "/" + ROOT_ZNODE_NAME + "/" + VERSION_NODE;
+      }
+
+      public RMStateVersion getCurrentVersion() {
+        return CURRENT_VERSION_INFO;
+      }
     }
 
     public RMStateStore getRMStateStore() throws Exception {
@@ -86,6 +96,17 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
       List<String> nodes = client.getChildren(store.znodeWorkingPath, false);
       return nodes.size() == 1;
     }
+
+    @Override
+    public void writeVersion(RMStateVersion version) throws Exception {
+      client.setData(store.getVersionNode(), ((RMStateVersionPBImpl) version)
+        .getProto().toByteArray(), -1);
+    }
+
+    @Override
+    public RMStateVersion getCurrentVersion() throws Exception {
+      return store.getCurrentVersion();
+    }
   }
 
   @Test
@@ -93,6 +114,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
     TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
     testRMAppStateStore(zkTester);
     testRMDTSecretManagerStateStore(zkTester);
+    testCheckVersion(zkTester);
   }
 
   private Configuration createHARMConf(