Selaa lähdekoodia

YARN-9788. Queue Management API does not support parallel updates. Contributed by Prabhu Joseph

Szilard Nemeth 5 vuotta sitten
vanhempi
commit
1c51f36be7
14 muutettua tiedostoa jossa 116 lisäystä ja 71 poistoa
  1. 4 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestSchedConfCLI.java
  2. 7 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java
  3. 4 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java
  4. 6 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java
  5. 2 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
  6. 5 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
  7. 4 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
  8. 2 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java
  9. 5 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
  10. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
  11. 4 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
  12. 9 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java
  13. 45 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
  14. 16 14
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java

+ 4 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestSchedConfCLI.java

@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.JAXBContextResolver;
@@ -247,10 +248,10 @@ public class TestSchedConfCLI extends JerseyTestBase {
       globalUpdates.put("schedKey1", "schedVal1");
       schedUpdateInfo.setGlobalParams(globalUpdates);
 
-      provider.logAndApplyMutation(UserGroupInformation.getCurrentUser(),
-          schedUpdateInfo);
+      LogMutation log = provider.logAndApplyMutation(
+          UserGroupInformation.getCurrentUser(), schedUpdateInfo);
       rm.getRMContext().getRMAdminService().refreshQueues();
-      provider.confirmPendingMutation(true);
+      provider.confirmPendingMutation(log, true);
 
       Configuration schedulerConf = provider.getConfiguration();
       assertEquals("schedVal1", schedulerConf.get("schedKey1"));

+ 7 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
 import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
 
 import java.io.IOException;
@@ -46,18 +47,21 @@ public interface MutableConfigurationProvider {
    * Log user's requested configuration mutation, and applies it in-memory.
    * @param user User who requested the change
    * @param confUpdate User's requested configuration change
+   * @return LogMutation with update info from given SchedConfUpdateInfo
    * @throws Exception if logging the mutation fails
    */
-  void logAndApplyMutation(UserGroupInformation user, SchedConfUpdateInfo
-      confUpdate) throws Exception;
+  LogMutation logAndApplyMutation(UserGroupInformation user,
+      SchedConfUpdateInfo confUpdate) throws Exception;
 
   /**
    * Confirm last logged mutation.
+   * @param pendingMutation the log mutation to apply
    * @param isValid if the last logged mutation is applied to scheduler
    *                properly.
    * @throws Exception if confirming mutation fails
    */
-  void confirmPendingMutation(boolean isValid) throws Exception;
+  void confirmPendingMutation(LogMutation pendingMutation,
+      boolean isValid) throws Exception;
 
   /**
    * Returns scheduler configuration cached in this provider.

+ 4 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java

@@ -58,7 +58,6 @@ public class FSSchedulerConfigurationStore extends YarnConfigurationStore {
   private int maxVersion;
   private Path schedulerConfDir;
   private FileSystem fileSystem;
-  private LogMutation pendingMutation;
   private PathFilter configFilePathFilter;
   private volatile Configuration schedConf;
   private volatile Configuration oldConf;
@@ -134,10 +133,9 @@ public class FSSchedulerConfigurationStore extends YarnConfigurationStore {
    */
   @Override
   public void logMutation(LogMutation logMutation) throws IOException {
-    pendingMutation = logMutation;
     LOG.info(new GsonBuilder().serializeNulls().create().toJson(logMutation));
     oldConf = new Configuration(schedConf);
-    Map<String, String> mutations = pendingMutation.getUpdates();
+    Map<String, String> mutations = logMutation.getUpdates();
     for (Map.Entry<String, String> kv : mutations.entrySet()) {
       if (kv.getValue() == null) {
         this.schedConf.unset(kv.getKey());
@@ -149,12 +147,14 @@ public class FSSchedulerConfigurationStore extends YarnConfigurationStore {
   }
 
   /**
+   * @param pendingMutation the log mutation to apply
    * @param isValid if true, finalize temp configuration file
    *                if false, remove temp configuration file and rollback
    * @throws Exception throw IOE when write temp configuration file fail
    */
   @Override
-  public void confirmMutation(boolean isValid) throws Exception {
+  public void confirmMutation(LogMutation pendingMutation,
+      boolean isValid) throws Exception {
     if (pendingMutation == null || tempConfigPath == null) {
       LOG.warn("pendingMutation or tempConfigPath is null, do nothing");
       return;

+ 6 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/InMemoryConfigurationStore.java

@@ -32,7 +32,6 @@ import java.util.Map;
 public class InMemoryConfigurationStore extends YarnConfigurationStore {
 
   private Configuration schedConf;
-  private LogMutation pendingMutation;
   private long configVersion;
 
   @Override
@@ -42,13 +41,17 @@ public class InMemoryConfigurationStore extends YarnConfigurationStore {
     this.configVersion = 1L;
   }
 
+  /**
+   * This method does not log as it does not support backing store.
+   * The mutation to be applied on top of schedConf will be directly passed
+   * in confirmMutation.
+   */
   @Override
   public void logMutation(LogMutation logMutation) {
-    pendingMutation = logMutation;
   }
 
   @Override
-  public void confirmMutation(boolean isValid) {
+  public void confirmMutation(LogMutation pendingMutation, boolean isValid) {
     if (isValid) {
       for (Map.Entry<String, String> kv : pendingMutation.getUpdates()
           .entrySet()) {
@@ -60,7 +63,6 @@ public class InMemoryConfigurationStore extends YarnConfigurationStore {
       }
       this.configVersion = this.configVersion + 1L;
     }
-    pendingMutation = null;
   }
 
   @Override

+ 2 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java

@@ -75,7 +75,6 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
   private DB versiondb;
   private long maxLogs;
   private Configuration conf;
-  private LogMutation pendingMutation;
   @VisibleForTesting
   protected static final Version CURRENT_VERSION_INFO = Version
       .newInstance(0, 1);
@@ -232,11 +231,11 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
       }
       db.put(bytes(LOG_KEY), serLogMutations(logs));
     }
-    pendingMutation = logMutation;
   }
 
   @Override
-  public void confirmMutation(boolean isValid) throws IOException {
+  public void confirmMutation(LogMutation pendingMutation,
+      boolean isValid) throws IOException {
     WriteBatch updateBatch = db.createWriteBatch();
     if (isValid) {
       for (Map.Entry<String, String> changes :
@@ -252,7 +251,6 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
           bytes(String.valueOf(configVersion)));
     }
     db.write(updateBatch);
-    pendingMutation = null;
   }
 
   private byte[] serLogMutations(LinkedList<LogMutation> mutations) throws

+ 5 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java

@@ -128,7 +128,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
   }
 
   @Override
-  public void logAndApplyMutation(UserGroupInformation user,
+  public LogMutation logAndApplyMutation(UserGroupInformation user,
       SchedConfUpdateInfo confUpdate) throws Exception {
     oldConf = new Configuration(schedConf);
     Map<String, String> kvUpdate = constructKeyValueConfUpdate(confUpdate);
@@ -141,6 +141,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
         schedConf.set(kv.getKey(), kv.getValue());
       }
     }
+    return log;
   }
 
   @Override
@@ -184,10 +185,11 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
   }
 
   @Override
-  public void confirmPendingMutation(boolean isValid) throws Exception {
+  public void confirmPendingMutation(LogMutation pendingMutation,
+      boolean isValid) throws Exception {
     formatLock.readLock().lock();
     try {
-      confStore.confirmMutation(isValid);
+      confStore.confirmMutation(pendingMutation, isValid);
       if (!isValid) {
         schedConf = oldConf;
       }

+ 4 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java

@@ -52,7 +52,7 @@ public abstract class YarnConfigurationStore {
    * LogMutation encapsulates the fields needed for configuration mutation
    * audit logging and recovery.
    */
-  static class LogMutation implements Serializable {
+  public static class LogMutation implements Serializable {
     private Map<String, String> updates;
     private String user;
 
@@ -113,11 +113,13 @@ public abstract class YarnConfigurationStore {
    * last logged by {@code logMutation} and marks the mutation as persisted (no
    * longer pending). If isValid is true, merge the mutation with the persisted
    * configuration.
+   * @param pendingMutation the log mutation to apply
    * @param isValid if true, update persisted configuration with pending
    *                mutation.
    * @throws Exception if mutation confirmation fails
    */
-  public abstract void confirmMutation(boolean isValid) throws Exception;
+  public abstract void confirmMutation(LogMutation pendingMutation,
+      boolean isValid) throws Exception;
 
   /**
    * Retrieve the persisted configuration.

+ 2 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ZKConfigurationStore.java

@@ -54,7 +54,6 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
   protected static final Version CURRENT_VERSION_INFO = Version
       .newInstance(0, 1);
   private Configuration conf;
-  private LogMutation pendingMutation;
 
   private String znodeParentPath;
 
@@ -175,12 +174,11 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
       zkManager.safeSetData(logsPath, serializeObject(logs), -1, zkAcl,
               fencingNodePath);
     }
-    pendingMutation = logMutation;
   }
 
   @Override
-  public void confirmMutation(boolean isValid)
-      throws Exception {
+  public void confirmMutation(LogMutation pendingMutation,
+      boolean isValid) throws Exception {
     if (isValid) {
       Configuration storedConfigs = retrieve();
       Map<String, String> mapConf = new HashMap<>();
@@ -201,7 +199,6 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
       zkManager.setData(confVersionPath, String.valueOf(configVersion), -1);
 
     }
-    pendingMutation = null;
   }
 
   @Override

+ 5 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java

@@ -148,6 +148,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
@@ -2643,14 +2644,15 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
               throw new org.apache.hadoop.security.AccessControlException("User"
                   + " is not admin of all modified queues.");
             }
-            provider.logAndApplyMutation(callerUGI, mutationInfo);
+            LogMutation logMutation = provider.logAndApplyMutation(callerUGI,
+                mutationInfo);
             try {
               rm.getRMContext().getRMAdminService().refreshQueues();
             } catch (IOException | YarnException e) {
-              provider.confirmPendingMutation(false);
+              provider.confirmPendingMutation(logMutation, false);
               throw e;
             }
-            provider.confirmPendingMutation(true);
+            provider.confirmPendingMutation(logMutation, true);
             return null;
           }
         });

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java

@@ -64,7 +64,7 @@ public abstract class ConfigurationStoreBaseTest {
     YarnConfigurationStore.LogMutation mutation1 =
         new YarnConfigurationStore.LogMutation(update1, TEST_USER);
     confStore.logMutation(mutation1);
-    confStore.confirmMutation(true);
+    confStore.confirmMutation(mutation1, true);
     assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1"));
 
     Map<String, String> update2 = new HashMap<>();
@@ -72,7 +72,7 @@ public abstract class ConfigurationStoreBaseTest {
     YarnConfigurationStore.LogMutation mutation2 =
         new YarnConfigurationStore.LogMutation(update2, TEST_USER);
     confStore.logMutation(mutation2);
-    confStore.confirmMutation(false);
+    confStore.confirmMutation(mutation2, false);
     assertNull("Configuration should not be updated",
         confStore.retrieve().get("keyUpdate2"));
     confStore.close();
@@ -89,7 +89,7 @@ public abstract class ConfigurationStoreBaseTest {
     YarnConfigurationStore.LogMutation mutation =
         new YarnConfigurationStore.LogMutation(update, TEST_USER);
     confStore.logMutation(mutation);
-    confStore.confirmMutation(true);
+    confStore.confirmMutation(mutation, true);
     assertNull(confStore.retrieve().get("key"));
     confStore.close();
   }

+ 4 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java

@@ -100,7 +100,7 @@ public class TestFSSchedulerConfigurationStore {
 
     LogMutation logMutation = new LogMutation(updates, "test");
     configurationStore.logMutation(logMutation);
-    configurationStore.confirmMutation(true);
+    configurationStore.confirmMutation(logMutation, true);
     storeConf = configurationStore.retrieve();
     assertEquals(null, storeConf.get("a"));
     assertEquals("bb", storeConf.get("b"));
@@ -110,7 +110,7 @@ public class TestFSSchedulerConfigurationStore {
 
     updates.put("b", "bbb");
     configurationStore.logMutation(logMutation);
-    configurationStore.confirmMutation(true);
+    configurationStore.confirmMutation(logMutation, true);
     storeConf = configurationStore.retrieve();
     assertEquals(null, storeConf.get("a"));
     assertEquals("bbb", storeConf.get("b"));
@@ -133,7 +133,7 @@ public class TestFSSchedulerConfigurationStore {
 
     LogMutation logMutation = new LogMutation(updates, "test");
     configurationStore.logMutation(logMutation);
-    configurationStore.confirmMutation(false);
+    configurationStore.confirmMutation(logMutation, false);
     storeConf = configurationStore.retrieve();
 
     compareConfig(conf, storeConf);
@@ -168,7 +168,7 @@ public class TestFSSchedulerConfigurationStore {
         updates.put("testkey", "testvalue");
         LogMutation logMutation = new LogMutation(updates, "test");
         configStore.logMutation(logMutation);
-        configStore.confirmMutation(true);
+        configStore.confirmMutation(logMutation, true);
       } catch (IOException e) {
         if (e.getMessage().contains("Filesystem closed")) {
           fail("FSSchedulerConfigurationStore failed to handle " +

+ 9 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestLeveldbConfigurationStore.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
 import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
 import org.junit.Before;
 import org.junit.Test;
@@ -103,7 +104,7 @@ public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
     YarnConfigurationStore.LogMutation mutation =
         new YarnConfigurationStore.LogMutation(update, TEST_USER);
     confStore.logMutation(mutation);
-    confStore.confirmMutation(true);
+    confStore.confirmMutation(mutation, true);
     assertEquals("val", confStore.retrieve().get("key"));
     confStore.close();
 
@@ -159,7 +160,7 @@ public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
     logs = ((LeveldbConfigurationStore) confStore).getLogs();
     assertEquals(1, logs.size());
     assertEquals("val1", logs.get(0).getUpdates().get("key1"));
-    confStore.confirmMutation(true);
+    confStore.confirmMutation(mutation, true);
     assertEquals(1, logs.size());
     assertEquals("val1", logs.get(0).getUpdates().get("key1"));
 
@@ -171,7 +172,7 @@ public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
     assertEquals(2, logs.size());
     assertEquals("val1", logs.get(0).getUpdates().get("key1"));
     assertEquals("val2", logs.get(1).getUpdates().get("key2"));
-    confStore.confirmMutation(true);
+    confStore.confirmMutation(mutation, true);
     assertEquals(2, logs.size());
     assertEquals("val1", logs.get(0).getUpdates().get("key1"));
     assertEquals("val2", logs.get(1).getUpdates().get("key2"));
@@ -185,7 +186,7 @@ public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
     assertEquals(2, logs.size());
     assertEquals("val2", logs.get(0).getUpdates().get("key2"));
     assertEquals("val3", logs.get(1).getUpdates().get("key3"));
-    confStore.confirmMutation(true);
+    confStore.confirmMutation(mutation, true);
     assertEquals(2, logs.size());
     assertEquals("val2", logs.get(0).getUpdates().get("key2"));
     assertEquals("val3", logs.get(1).getUpdates().get("key3"));
@@ -211,16 +212,17 @@ public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
         rm1.getResourceScheduler()).getMutableConfProvider();
     UserGroupInformation user = UserGroupInformation
         .createUserForTesting(TEST_USER, new String[0]);
-    confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+    LogMutation log = confProvider.logAndApplyMutation(user,
+        schedConfUpdateInfo);
     rm1.getResourceScheduler().reinitialize(conf, rm1.getRMContext());
     assertEquals("val", ((MutableConfScheduler) rm1.getResourceScheduler())
         .getConfiguration().get("key"));
-    confProvider.confirmPendingMutation(true);
+    confProvider.confirmPendingMutation(log, true);
     assertEquals("val", ((MutableCSConfigurationProvider) confProvider)
         .getConfStore().retrieve().get("key"));
     // Next update is not persisted, it should not be recovered
     schedConfUpdateInfo.getGlobalParams().put("key", "badVal");
-    confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+    log = confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
     rm1.close();
 
     // Start RM2 and verifies it starts with updated configuration

+ 45 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
 import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
 import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
 import org.junit.Before;
@@ -93,15 +94,15 @@ public class TestMutableCSConfigurationProvider {
     assertNull(confProvider.loadConfiguration(conf)
         .get("yarn.scheduler.capacity.root.a.goodKey"));
 
-    confProvider.logAndApplyMutation(TEST_USER, goodUpdate);
-    confProvider.confirmPendingMutation(true);
+    LogMutation log = confProvider.logAndApplyMutation(TEST_USER, goodUpdate);
+    confProvider.confirmPendingMutation(log, true);
     assertEquals("goodVal", confProvider.loadConfiguration(conf)
         .get("yarn.scheduler.capacity.root.a.goodKey"));
 
     assertNull(confProvider.loadConfiguration(conf).get(
         "yarn.scheduler.capacity.root.a.badKey"));
-    confProvider.logAndApplyMutation(TEST_USER, badUpdate);
-    confProvider.confirmPendingMutation(false);
+    log = confProvider.logAndApplyMutation(TEST_USER, badUpdate);
+    confProvider.confirmPendingMutation(log, false);
     assertNull(confProvider.loadConfiguration(conf).get(
         "yarn.scheduler.capacity.root.a.badKey"));
 
@@ -125,8 +126,8 @@ public class TestMutableCSConfigurationProvider {
         QueueConfigInfo("root.a", updateMap);
     updateInfo.getUpdateQueueInfo().add(queueConfigInfo);
 
-    confProvider.logAndApplyMutation(TEST_USER, updateInfo);
-    confProvider.confirmPendingMutation(true);
+    LogMutation log = confProvider.logAndApplyMutation(TEST_USER, updateInfo);
+    confProvider.confirmPendingMutation(log, true);
     assertEquals("testval1", confProvider.loadConfiguration(conf)
         .get("yarn.scheduler.capacity.root.a.testkey1"));
     assertEquals("testval2", confProvider.loadConfiguration(conf)
@@ -138,8 +139,8 @@ public class TestMutableCSConfigurationProvider {
     queueConfigInfo = new QueueConfigInfo("root.a", updateMap);
     updateInfo.getUpdateQueueInfo().add(queueConfigInfo);
 
-    confProvider.logAndApplyMutation(TEST_USER, updateInfo);
-    confProvider.confirmPendingMutation(true);
+    log = confProvider.logAndApplyMutation(TEST_USER, updateInfo);
+    confProvider.confirmPendingMutation(log, true);
     assertNull("Failed to remove config",
         confProvider.loadConfiguration(conf)
         .get("yarn.scheduler.capacity.root.a.testkey1"));
@@ -147,6 +148,38 @@ public class TestMutableCSConfigurationProvider {
         .get("yarn.scheduler.capacity.root.a.testkey2"));
   }
 
+  @Test
+  public void testMultipleUpdatesNotLost() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
+        YarnConfiguration.MEMORY_CONFIGURATION_STORE);
+    confProvider.init(conf);
+
+    SchedConfUpdateInfo updateInfo1 = new SchedConfUpdateInfo();
+    Map<String, String> updateMap1 = new HashMap<>();
+    updateMap1.put("key1", "val1");
+    QueueConfigInfo queueConfigInfo1 = new
+        QueueConfigInfo("root.a", updateMap1);
+    updateInfo1.getUpdateQueueInfo().add(queueConfigInfo1);
+    LogMutation log1 = confProvider.logAndApplyMutation(TEST_USER, updateInfo1);
+
+    SchedConfUpdateInfo updateInfo2 = new SchedConfUpdateInfo();
+    Map<String, String> updateMap2 = new HashMap<>();
+    updateMap2.put("key2", "val2");
+    QueueConfigInfo queueConfigInfo2 = new
+        QueueConfigInfo("root.a", updateMap2);
+    updateInfo2.getUpdateQueueInfo().add(queueConfigInfo2);
+    LogMutation log2 = confProvider.logAndApplyMutation(TEST_USER, updateInfo2);
+
+    confProvider.confirmPendingMutation(log1, true);
+    confProvider.confirmPendingMutation(log2, true);
+
+    assertEquals("val1", confProvider.loadConfiguration(conf)
+        .get("yarn.scheduler.capacity.root.a.key1"));
+    assertEquals("val2", confProvider.loadConfiguration(conf)
+        .get("yarn.scheduler.capacity.root.a.key2"));
+  }
+
   @Test
   public void testHDFSBackedProvider() throws Exception {
     File testSchedulerConfigurationDir = new File(
@@ -166,15 +199,15 @@ public class TestMutableCSConfigurationProvider {
     assertNull(confProvider.loadConfiguration(conf)
         .get("yarn.scheduler.capacity.root.a.goodKey"));
 
-    confProvider.logAndApplyMutation(TEST_USER, goodUpdate);
-    confProvider.confirmPendingMutation(true);
+    LogMutation log = confProvider.logAndApplyMutation(TEST_USER, goodUpdate);
+    confProvider.confirmPendingMutation(log, true);
     assertEquals("goodVal", confProvider.loadConfiguration(conf)
         .get("yarn.scheduler.capacity.root.a.goodKey"));
 
     assertNull(confProvider.loadConfiguration(conf).get(
         "yarn.scheduler.capacity.root.a.badKey"));
-    confProvider.logAndApplyMutation(TEST_USER, badUpdate);
-    confProvider.confirmPendingMutation(false);
+    log = confProvider.logAndApplyMutation(TEST_USER, badUpdate);
+    confProvider.confirmPendingMutation(log, false);
     assertNull(confProvider.loadConfiguration(conf).get(
         "yarn.scheduler.capacity.root.a.badKey"));
 

+ 16 - 14
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestZKConfigurationStore.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
 import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
 import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
 import org.junit.After;
@@ -148,7 +149,7 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
     YarnConfigurationStore.LogMutation mutation =
         new YarnConfigurationStore.LogMutation(update, TEST_USER);
     confStore.logMutation(mutation);
-    confStore.confirmMutation(true);
+    confStore.confirmMutation(mutation, true);
     long v2 = confStore.getConfigVersion();
     assertEquals(2, v2);
   }
@@ -160,10 +161,9 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
 
     Map<String, String> update = new HashMap<>();
     update.put("key", "val");
-    YarnConfigurationStore.LogMutation mutation =
-        new YarnConfigurationStore.LogMutation(update, TEST_USER);
+    LogMutation mutation = new LogMutation(update, TEST_USER);
     confStore.logMutation(mutation);
-    confStore.confirmMutation(true);
+    confStore.confirmMutation(mutation, true);
     assertEquals("val", confStore.retrieve().get("key"));
 
     // Create a new configuration store, and check for updated configuration
@@ -190,7 +190,7 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
     logs = ((ZKConfigurationStore) confStore).getLogs();
     assertEquals(1, logs.size());
     assertEquals("val1", logs.get(0).getUpdates().get("key1"));
-    confStore.confirmMutation(true);
+    confStore.confirmMutation(mutation, true);
     assertEquals(1, logs.size());
     assertEquals("val1", logs.get(0).getUpdates().get("key1"));
 
@@ -202,7 +202,7 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
     assertEquals(2, logs.size());
     assertEquals("val1", logs.get(0).getUpdates().get("key1"));
     assertEquals("val2", logs.get(1).getUpdates().get("key2"));
-    confStore.confirmMutation(true);
+    confStore.confirmMutation(mutation, true);
     assertEquals(2, logs.size());
     assertEquals("val1", logs.get(0).getUpdates().get("key1"));
     assertEquals("val2", logs.get(1).getUpdates().get("key2"));
@@ -216,7 +216,7 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
     assertEquals(2, logs.size());
     assertEquals("val2", logs.get(0).getUpdates().get("key2"));
     assertEquals("val3", logs.get(1).getUpdates().get("key3"));
-    confStore.confirmMutation(true);
+    confStore.confirmMutation(mutation, true);
     assertEquals(2, logs.size());
     assertEquals("val2", logs.get(0).getUpdates().get("key2"));
     assertEquals("val3", logs.get(1).getUpdates().get("key3"));
@@ -308,16 +308,17 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
         rm1.getResourceScheduler()).getMutableConfProvider();
     UserGroupInformation user = UserGroupInformation
         .createUserForTesting(TEST_USER, new String[0]);
-    confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+    LogMutation log = confProvider.logAndApplyMutation(user,
+        schedConfUpdateInfo);
     rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext());
     assertEquals("val", ((MutableConfScheduler) rm1.getResourceScheduler())
         .getConfiguration().get("key"));
-    confProvider.confirmPendingMutation(true);
+    confProvider.confirmPendingMutation(log, true);
     assertEquals("val", ((MutableCSConfigurationProvider) confProvider)
         .getConfStore().retrieve().get("key"));
     // Next update is not persisted, it should not be recovered
     schedConfUpdateInfo.getGlobalParams().put("key", "badVal");
-    confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+    log = confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
 
     // Start RM2 and verifies it starts with updated configuration
     rm2.getRMContext().getRMAdminService().transitionToActive(req);
@@ -400,9 +401,10 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
     stopParams.put("capacity", "0");
     QueueConfigInfo stopInfo = new QueueConfigInfo("root.default", stopParams);
     schedConfUpdateInfo.getUpdateQueueInfo().add(stopInfo);
-    confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+    LogMutation log = confProvider.logAndApplyMutation(user,
+        schedConfUpdateInfo);
     rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext());
-    confProvider.confirmPendingMutation(true);
+    confProvider.confirmPendingMutation(log, true);
     assertTrue(Arrays.asList(((MutableConfScheduler) rm1.getResourceScheduler())
         .getConfiguration().get("yarn.scheduler.capacity.root.queues").split
             (",")).contains("a"));
@@ -411,9 +413,9 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
     schedConfUpdateInfo.getUpdateQueueInfo().clear();
     schedConfUpdateInfo.getAddQueueInfo().clear();
     schedConfUpdateInfo.getRemoveQueueInfo().add("root.default");
-    confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
+    log =  confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
     rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext());
-    confProvider.confirmPendingMutation(true);
+    confProvider.confirmPendingMutation(log, true);
     assertEquals("a", ((MutableConfScheduler) rm1.getResourceScheduler())
         .getConfiguration().get("yarn.scheduler.capacity.root.queues"));