瀏覽代碼

YARN-11438. [Federation] ZookeeperFederationStateStore Support Version. (#5537)

slfan1989 2 年之前
父節點
當前提交
635521db4c

+ 41 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java

@@ -30,7 +30,6 @@ import java.util.TimeZone;
 import java.util.Comparator;
 import java.util.stream.Collectors;
 
-import org.apache.commons.lang3.NotImplementedException;
 import org.apache.curator.framework.recipes.shared.SharedCount;
 import org.apache.curator.framework.recipes.shared.VersionedValue;
 import org.apache.hadoop.classification.VisibleForTesting;
@@ -43,8 +42,10 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterIdProto;
 import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterInfoProto;
 import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterPolicyConfigurationProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
 import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateVersionIncompatibleException;
 import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
 import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
@@ -104,6 +105,7 @@ import org.apache.hadoop.yarn.server.federation.store.utils.FederationReservatio
 import org.apache.hadoop.yarn.server.federation.store.utils.FederationRouterRMTokenInputValidator;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.SystemClock;
@@ -154,6 +156,8 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
   private final static String ROOT_ZNODE_NAME_POLICY = "policies";
   private final static String ROOT_ZNODE_NAME_RESERVATION = "reservation";
 
+  protected static final String ROOT_ZNODE_NAME_VERSION = "version";
+
   /** Store Delegation Token Node. */
   private final static String ROUTER_RM_DT_SECRET_MANAGER_ROOT = "router_rm_dt_secret_manager_root";
   private static final String ROUTER_RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME =
@@ -184,6 +188,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
   private String membershipZNode;
   private String policiesZNode;
   private String reservationsZNode;
+  private String versionNode;
   private int maxAppsInStateStore;
 
   /** Directory to store the delegation token data. **/
@@ -195,6 +200,8 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
 
   private volatile Clock clock = SystemClock.getInstance();
 
+  protected static final Version CURRENT_VERSION_INFO = Version.newInstance(1, 1);
+
   @VisibleForTesting
   private ZKFederationStateStoreOpDurations opDurations =
       ZKFederationStateStoreOpDurations.getInstance();
@@ -223,6 +230,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
     appsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_APPLICATION);
     policiesZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_POLICY);
     reservationsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_RESERVATION);
+    versionNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_VERSION);
 
     // delegation token znodes
     routerRMDTSecretManagerRoot = getNodePath(baseZNode, ROUTER_RM_DT_SECRET_MANAGER_ROOT);
@@ -245,6 +253,7 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
       zkManager.createRootDirRecursively(routerRMDTSecretManagerRoot, zkAcl);
       zkManager.createRootDirRecursively(routerRMDTMasterKeysRootPath, zkAcl);
       zkManager.createRootDirRecursively(routerRMDelegationTokensRootPath, zkAcl);
+      zkManager.createRootDirRecursively(versionNode, zkAcl);
     } catch (Exception e) {
       String errMsg = "Cannot create base directories: " + e.getMessage();
       FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
@@ -643,22 +652,49 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
 
   @Override
   public Version getCurrentVersion() {
-    throw new NotImplementedException("Code is not implemented");
+    return CURRENT_VERSION_INFO;
   }
 
   @Override
   public Version loadVersion() throws Exception {
-    throw new NotImplementedException("Code is not implemented");
+    if (exists(versionNode)) {
+      byte[] data = get(versionNode);
+      if (data != null) {
+        return new VersionPBImpl(VersionProto.parseFrom(data));
+      }
+    }
+    return null;
   }
 
   @Override
   public void storeVersion() throws Exception {
-    throw new NotImplementedException("Code is not implemented");
+    byte[] data = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
+    boolean isUpdate = exists(versionNode);
+    put(versionNode, data, isUpdate);
   }
 
   @Override
   public void checkVersion() throws Exception {
-    throw new NotImplementedException("Code is not implemented");
+    Version loadedVersion = loadVersion();
+    LOG.info("Loaded Router State Version Info = {}.", loadedVersion);
+    Version currentVersion = getCurrentVersion();
+    if (loadedVersion != null && loadedVersion.equals(currentVersion)) {
+      return;
+    }
+
+    // if there is no version info, treat it as CURRENT_VERSION_INFO;
+    if (loadedVersion == null) {
+      loadedVersion = currentVersion;
+    }
+
+    if (loadedVersion.isCompatibleTo(currentVersion)) {
+      LOG.info("Storing Router State Version Info {}.", currentVersion);
+      storeVersion();
+    } else {
+      throw new FederationStateVersionIncompatibleException(
+          "Expecting Router state version " + currentVersion +
+          ", but loading version " + loadedVersion);
+    }
   }
 
   /**

+ 46 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java

@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
 import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
 import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
+import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.After;
 import org.junit.Before;
@@ -52,6 +53,7 @@ import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 
 /**
  * Unit tests for ZookeeperFederationStateStore.
@@ -276,4 +278,48 @@ public class TestZookeeperFederationStateStore extends FederationStateStoreBaseT
     assertNotNull(zkRouterStoreToken);
     assertEquals(token, zkRouterStoreToken);
   }
+
+  @Test
+  public void testGetCurrentVersion() {
+    ZookeeperFederationStateStore zkFederationStateStore =
+        ZookeeperFederationStateStore.class.cast(this.getStateStore());
+    Version version = zkFederationStateStore.getCurrentVersion();
+    assertEquals(1, version.getMajorVersion());
+    assertEquals(1, version.getMinorVersion());
+  }
+
+  @Test
+  public void testStoreVersion() throws Exception {
+    ZookeeperFederationStateStore zkFederationStateStore =
+        ZookeeperFederationStateStore.class.cast(this.getStateStore());
+    zkFederationStateStore.storeVersion();
+    Version version = zkFederationStateStore.loadVersion();
+    assertEquals(1, version.getMajorVersion());
+    assertEquals(1, version.getMinorVersion());
+  }
+
+  @Test
+  public void testLoadVersion() throws Exception {
+    ZookeeperFederationStateStore zkFederationStateStore =
+        ZookeeperFederationStateStore.class.cast(this.getStateStore());
+    // We don't store version, loadversion directly will get a null value.
+    Version version = zkFederationStateStore.loadVersion();
+    assertNull(version);
+
+    // After storing the version information, we will get the accurate version information.
+    zkFederationStateStore.storeVersion();
+    Version version1 = zkFederationStateStore.loadVersion();
+    assertEquals(1, version1.getMajorVersion());
+    assertEquals(1, version1.getMinorVersion());
+  }
+
+  @Test
+  public void testCheckVersion() throws Exception {
+    ZookeeperFederationStateStore zkFederationStateStore =
+        ZookeeperFederationStateStore.class.cast(this.getStateStore());
+    zkFederationStateStore.checkVersion();
+    Version version = zkFederationStateStore.loadVersion();
+    assertEquals(1, version.getMajorVersion());
+    assertEquals(1, version.getMinorVersion());
+  }
 }