Browse Source

YARN-9997. Code cleanup in ZKConfigurationStore. Contributed by Andras Gyori

Szilard Nemeth 5 years ago
parent
commit
764fa92c9f

+ 95 - 35
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java

@@ -56,8 +56,6 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
   private Configuration conf;
   private Configuration conf;
   private LogMutation pendingMutation;
   private LogMutation pendingMutation;
 
 
-  private String znodeParentPath;
-
   private static final String ZK_VERSION_PATH = "VERSION";
   private static final String ZK_VERSION_PATH = "VERSION";
   private static final String LOGS_PATH = "LOGS";
   private static final String LOGS_PATH = "LOGS";
   private static final String CONF_STORE_PATH = "CONF_STORE";
   private static final String CONF_STORE_PATH = "CONF_STORE";
@@ -70,19 +68,20 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
   private String fencingNodePath;
   private String fencingNodePath;
   private String confVersionPath;
   private String confVersionPath;
 
 
-  @VisibleForTesting
-  protected ZKCuratorManager zkManager;
+  private ZKCuratorManager zkManager;
   private List<ACL> zkAcl;
   private List<ACL> zkAcl;
 
 
   @Override
   @Override
   public void initialize(Configuration config, Configuration schedConf,
   public void initialize(Configuration config, Configuration schedConf,
       RMContext rmContext) throws Exception {
       RMContext rmContext) throws Exception {
     this.conf = config;
     this.conf = config;
+
+    String znodeParentPath = conf.get(
+        YarnConfiguration.RM_SCHEDCONF_STORE_ZK_PARENT_PATH,
+        YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH);
+
     this.maxLogs = conf.getLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS,
     this.maxLogs = conf.getLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS,
         YarnConfiguration.DEFAULT_RM_SCHEDCONF_ZK_MAX_LOGS);
         YarnConfiguration.DEFAULT_RM_SCHEDCONF_ZK_MAX_LOGS);
-    this.znodeParentPath =
-        conf.get(YarnConfiguration.RM_SCHEDCONF_STORE_ZK_PARENT_PATH,
-            YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH);
     this.zkManager =
     this.zkManager =
         rmContext.getResourceManager().createAndStartZKManager(conf);
         rmContext.getResourceManager().createAndStartZKManager(conf);
     this.zkAcl = ZKCuratorManager.getZKAcls(conf);
     this.zkAcl = ZKCuratorManager.getZKAcls(conf);
@@ -96,37 +95,31 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
     zkManager.createRootDirRecursively(znodeParentPath, zkAcl);
     zkManager.createRootDirRecursively(znodeParentPath, zkAcl);
     zkManager.delete(fencingNodePath);
     zkManager.delete(fencingNodePath);
 
 
-    if (!zkManager.exists(logsPath)) {
-      zkManager.create(logsPath);
-      zkManager.setData(logsPath,
-          serializeObject(new LinkedList<LogMutation>()), -1);
+    if (createNewZkPath(logsPath)) {
+      setZkData(logsPath, new LinkedList<LogMutation>());
     }
     }
 
 
-    if (!zkManager.exists(confVersionPath)) {
-      zkManager.create(confVersionPath);
-      zkManager.setData(confVersionPath, String.valueOf(0), -1);
+    if (createNewZkPath(confVersionPath)) {
+      setZkData(confVersionPath, String.valueOf(0));
     }
     }
 
 
-    if (!zkManager.exists(confStorePath)) {
-      zkManager.create(confStorePath);
+    if (createNewZkPath(confStorePath)) {
       HashMap<String, String> mapSchedConf = new HashMap<>();
       HashMap<String, String> mapSchedConf = new HashMap<>();
       for (Map.Entry<String, String> entry : schedConf) {
       for (Map.Entry<String, String> entry : schedConf) {
         mapSchedConf.put(entry.getKey(), entry.getValue());
         mapSchedConf.put(entry.getKey(), entry.getValue());
       }
       }
-      zkManager.setData(confStorePath, serializeObject(mapSchedConf), -1);
+      setZkData(confStorePath, mapSchedConf);
       long configVersion = getConfigVersion() + 1L;
       long configVersion = getConfigVersion() + 1L;
-      zkManager.setData(confVersionPath, String.valueOf(configVersion), -1);
+      setZkData(confVersionPath, String.valueOf(configVersion));
     }
     }
   }
   }
 
 
   @VisibleForTesting
   @VisibleForTesting
   @Override
   @Override
   protected LinkedList<LogMutation> getLogs() throws Exception {
   protected LinkedList<LogMutation> getLogs() throws Exception {
-    return (LinkedList<LogMutation>)
-        deserializeObject(zkManager.getData(logsPath));
+    return unsafeCast(deserializeObject(getZkData(logsPath)));
   }
   }
 
 
-  // TODO: following version-related code is taken from ZKRMStateStore
   @Override
   @Override
   public Version getCurrentVersion() {
   public Version getCurrentVersion() {
     return CURRENT_VERSION_INFO;
     return CURRENT_VERSION_INFO;
@@ -135,7 +128,7 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
   @Override
   @Override
   public Version getConfStoreVersion() throws Exception {
   public Version getConfStoreVersion() throws Exception {
     if (zkManager.exists(zkVersionPath)) {
     if (zkManager.exists(zkVersionPath)) {
-      byte[] data = zkManager.getData(zkVersionPath);
+      byte[] data = getZkData(zkVersionPath);
       return new VersionPBImpl(YarnServerCommonProtos.VersionProto
       return new VersionPBImpl(YarnServerCommonProtos.VersionProto
           .parseFrom(data));
           .parseFrom(data));
     }
     }
@@ -154,26 +147,24 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
         ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
         ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
 
 
     if (zkManager.exists(zkVersionPath)) {
     if (zkManager.exists(zkVersionPath)) {
-      zkManager.safeSetData(zkVersionPath, data, -1, zkAcl, fencingNodePath);
+      safeSetZkData(zkVersionPath, data);
     } else {
     } else {
-      zkManager.safeCreate(zkVersionPath, data, zkAcl, CreateMode.PERSISTENT,
-          zkAcl, fencingNodePath);
+      safeCreateZkData(zkVersionPath, data);
     }
     }
   }
   }
 
 
   @Override
   @Override
   public void logMutation(LogMutation logMutation) throws Exception {
   public void logMutation(LogMutation logMutation) throws Exception {
-    byte[] storedLogs = zkManager.getData(logsPath);
+    byte[] storedLogs = getZkData(logsPath);
     LinkedList<LogMutation> logs = new LinkedList<>();
     LinkedList<LogMutation> logs = new LinkedList<>();
     if (storedLogs != null) {
     if (storedLogs != null) {
-      logs = (LinkedList<LogMutation>) deserializeObject(storedLogs);
+      logs = unsafeCast(deserializeObject(storedLogs));
     }
     }
     logs.add(logMutation);
     logs.add(logMutation);
     if (logs.size() > maxLogs) {
     if (logs.size() > maxLogs) {
       logs.remove(logs.removeFirst());
       logs.remove(logs.removeFirst());
     }
     }
-    zkManager.safeSetData(logsPath, serializeObject(logs), -1, zkAcl,
-        fencingNodePath);
+    safeSetZkData(logsPath, logs);
     pendingMutation = logMutation;
     pendingMutation = logMutation;
   }
   }
 
 
@@ -194,10 +185,9 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
           mapConf.put(confChange.getKey(), confChange.getValue());
           mapConf.put(confChange.getKey(), confChange.getValue());
         }
         }
       }
       }
-      zkManager.safeSetData(confStorePath, serializeObject(mapConf), -1,
-          zkAcl, fencingNodePath);
+      safeSetZkData(confStorePath, mapConf);
       long configVersion = getConfigVersion() + 1L;
       long configVersion = getConfigVersion() + 1L;
-      zkManager.setData(confVersionPath, String.valueOf(configVersion), -1);
+      setZkData(confVersionPath, String.valueOf(configVersion));
 
 
     }
     }
     pendingMutation = null;
     pendingMutation = null;
@@ -207,14 +197,14 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
   public synchronized Configuration retrieve() {
   public synchronized Configuration retrieve() {
     byte[] serializedSchedConf;
     byte[] serializedSchedConf;
     try {
     try {
-      serializedSchedConf = zkManager.getData(confStorePath);
+      serializedSchedConf = getZkData(confStorePath);
     } catch (Exception e) {
     } catch (Exception e) {
       LOG.error("Failed to retrieve configuration from zookeeper store", e);
       LOG.error("Failed to retrieve configuration from zookeeper store", e);
       return null;
       return null;
     }
     }
     try {
     try {
       Map<String, String> map =
       Map<String, String> map =
-          (HashMap<String, String>) deserializeObject(serializedSchedConf);
+          unsafeCast(deserializeObject(serializedSchedConf));
       Configuration c = new Configuration(false);
       Configuration c = new Configuration(false);
       for (Map.Entry<String, String> e : map.entrySet()) {
       for (Map.Entry<String, String> e : map.entrySet()) {
         c.set(e.getKey(), e.getValue());
         c.set(e.getKey(), e.getValue());
@@ -229,7 +219,14 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
 
 
   @Override
   @Override
   public long getConfigVersion() throws Exception {
   public long getConfigVersion() throws Exception {
-    return Long.parseLong(zkManager.getStringData(confVersionPath));
+    String version = zkManager.getStringData(confVersionPath);
+    if (version == null) {
+      throw new IllegalStateException("Config version can not be properly " +
+          "serialized. Check Zookeeper config version path to locate " +
+          "the error!");
+    }
+
+    return Long.parseLong(version);
   }
   }
 
 
   @Override
   @Override
@@ -237,6 +234,55 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
     return null; // unimplemented
     return null; // unimplemented
   }
   }
 
 
+  /**
+   * Creates a new path in Zookeeper only, if it does not already exist.
+   *
+   * @param path Value of the Zookeeper path
+   * @return <code>true</code>if the creation executed; <code>false</code>
+   * otherwise.
+   * @throws Exception
+   */
+  private boolean createNewZkPath(String path) throws Exception {
+    if (!zkManager.exists(path)) {
+      zkManager.create(path);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @VisibleForTesting
+  protected byte[] getZkData(String path) throws Exception {
+    return zkManager.getData(path);
+  }
+
+  @VisibleForTesting
+  protected void setZkData(String path, byte[] data) throws Exception {
+    zkManager.setData(path, data, -1);
+  }
+
+  private void setZkData(String path, Object data) throws Exception {
+    setZkData(path, serializeObject(data));
+  }
+
+  private void setZkData(String path, String data) throws Exception {
+    zkManager.setData(path, data, -1);
+  }
+
+  private void safeSetZkData(String path, byte[] data) throws Exception {
+    zkManager.safeSetData(path, data, -1, zkAcl, fencingNodePath);
+  }
+
+  private void safeSetZkData(String path, Object data) throws Exception {
+    safeSetZkData(path, serializeObject(data));
+  }
+
+  @VisibleForTesting
+  protected void safeCreateZkData(String path, byte[] data) throws Exception {
+    zkManager.safeCreate(path, data, zkAcl, CreateMode.PERSISTENT,
+        zkAcl, fencingNodePath);
+  }
+
   private static String getNodePath(String root, String nodeName) {
   private static String getNodePath(String root, String nodeName) {
     return ZKCuratorManager.getNodePath(root, nodeName);
     return ZKCuratorManager.getNodePath(root, nodeName);
   }
   }
@@ -257,4 +303,18 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
       return ois.readObject();
       return ois.readObject();
     }
     }
   }
   }
+
+  /**
+   * Casts an object of type Object to type T. It is essential to emphasize,
+   * that it is an unsafe operation.
+   *
+   * @param o Object to be cast from
+   * @param <T> Type to cast to
+   * @return casted object of type T
+   * @throws ClassCastException
+   */
+  @SuppressWarnings("unchecked")
+  private static <T> T unsafeCast(Object o) throws ClassCastException {
+    return (T)o;
+  }
 }
 }

+ 56 - 19
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java

@@ -40,8 +40,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurat
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
 import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
 import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
 import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.ACL;
 import org.junit.After;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
@@ -49,9 +47,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Map;
 
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertEquals;
@@ -113,18 +112,11 @@ public class TestZKConfigurationStore extends
     confStore.initialize(conf, schedConf, rmContext);
     confStore.initialize(conf, schedConf, rmContext);
 
 
     Version otherVersion = Version.newInstance(1, 1);
     Version otherVersion = Version.newInstance(1, 1);
-    String znodeParentPath = conf.get(YarnConfiguration.
-            RM_SCHEDCONF_STORE_ZK_PARENT_PATH,
-        YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH);
-    String zkVersionPath = ZKCuratorManager.getNodePath(znodeParentPath,
-        "VERSION");
-    String fencingNodePath = ZKCuratorManager.getNodePath(znodeParentPath,
-        "FENCING");
+    String zkVersionPath = getZkPath("VERSION");
     byte[] versionData =
     byte[] versionData =
         ((VersionPBImpl) otherVersion).getProto().toByteArray();
         ((VersionPBImpl) otherVersion).getProto().toByteArray();
-    List<ACL> zkAcl = ZKCuratorManager.getZKAcls(conf);
-    ((ZKConfigurationStore) confStore).zkManager.safeCreate(zkVersionPath,
-        versionData, zkAcl, CreateMode.PERSISTENT, zkAcl, fencingNodePath);
+    ((ZKConfigurationStore) confStore).safeCreateZkData(zkVersionPath,
+        versionData);
 
 
     assertEquals("The configuration store should have stored the new" +
     assertEquals("The configuration store should have stored the new" +
         "version.", otherVersion, confStore.getConfStoreVersion());
         "version.", otherVersion, confStore.getConfStoreVersion());
@@ -140,20 +132,58 @@ public class TestZKConfigurationStore extends
     assertNull(confStore.retrieve());
     assertNull(confStore.retrieve());
   }
   }
 
 
+  @Test(expected = IllegalStateException.class)
+  public void testGetConfigurationVersionOnSerializedNullData()
+      throws Exception {
+    confStore.initialize(conf, schedConf, rmContext);
+    String confVersionPath = getZkPath("CONF_VERSION");
+    ((ZKConfigurationStore) confStore).setZkData(confVersionPath, null);
+    confStore.getConfigVersion();
+  }
+
+  /**
+   * The correct behavior of logMutation should be, that even though an
+   * Exception is thrown during serialization, the log data must not be
+   * overridden.
+   *
+   * @throws Exception
+   */
+  @Test(expected = ClassCastException.class)
+  public void testLogMutationAfterSerializationError() throws Exception {
+    byte[] data = null;
+    String logs = "NOT_LINKED_LIST";
+    confStore.initialize(conf, schedConf, rmContext);
+
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+         ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+      oos.writeObject(logs);
+      oos.flush();
+      baos.flush();
+      data = baos.toByteArray();
+    }
+
+    String logsPath = getZkPath("LOGS");
+    ((ZKConfigurationStore)confStore).setZkData(logsPath, data);
+
+    Map<String, String> update = new HashMap<>();
+    update.put("valid_key", "valid_value");
+
+    confStore.logMutation(new YarnConfigurationStore.LogMutation(update, TEST_USER));
+
+    assertEquals(data, ((ZKConfigurationStore)confStore).getZkData(logsPath));
+  }
+
   @Test
   @Test
   public void testDisableAuditLogs() throws Exception {
   public void testDisableAuditLogs() throws Exception {
     conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 0);
     conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 0);
     confStore.initialize(conf, schedConf, rmContext);
     confStore.initialize(conf, schedConf, rmContext);
-    String znodeParentPath = conf.get(YarnConfiguration.
-        RM_SCHEDCONF_STORE_ZK_PARENT_PATH,
-        YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH);
-    String logsPath = ZKCuratorManager.getNodePath(znodeParentPath, "LOGS");
+    String logsPath = getZkPath("LOGS");
     byte[] data = null;
     byte[] data = null;
-    ((ZKConfigurationStore) confStore).zkManager.setData(logsPath, data, -1);
+    ((ZKConfigurationStore) confStore).setZkData(logsPath, data);
 
 
     prepareLogMutation("key1", "val1");
     prepareLogMutation("key1", "val1");
 
 
-    data = ((ZKConfigurationStore) confStore).zkManager.getData(logsPath);
+    data = ((ZKConfigurationStore) confStore).getZkData(logsPath);
     assertNull("Failed to Disable Audit Logs", data);
     assertNull("Failed to Disable Audit Logs", data);
   }
   }
 
 
@@ -373,6 +403,13 @@ public class TestZKConfigurationStore extends
     return new ZKConfigurationStore();
     return new ZKConfigurationStore();
   }
   }
 
 
+  private String getZkPath(String nodeName) {
+    String znodeParentPath = conf.get(YarnConfiguration.
+            RM_SCHEDCONF_STORE_ZK_PARENT_PATH,
+        YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH);
+    return ZKCuratorManager.getNodePath(znodeParentPath, nodeName);
+  }
+
   @Override
   @Override
   Version getVersion() {
   Version getVersion() {
     return ZKConfigurationStore.CURRENT_VERSION_INFO;
     return ZKConfigurationStore.CURRENT_VERSION_INFO;