|
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
|
|
|
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
-import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
|
|
|
import org.apache.commons.io.FileUtils;
|
|
@@ -31,36 +30,34 @@ import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
|
|
|
+import org.apache.hadoop.yarn.server.records.Version;
|
|
|
import org.hamcrest.CoreMatchers;
|
|
|
import org.junit.After;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
-import static org.junit.Assert.fail;
|
|
|
import static org.junit.Assert.assertNull;
|
|
|
+import static org.junit.Assert.fail;
|
|
|
import static org.junit.Assert.assertThat;
|
|
|
|
|
|
|
|
|
/**
|
|
|
* Tests {@link FSSchedulerConfigurationStore}.
|
|
|
*/
|
|
|
-public class TestFSSchedulerConfigurationStore {
|
|
|
- private static final String TEST_USER = "test";
|
|
|
- private FSSchedulerConfigurationStore configurationStore;
|
|
|
- private Configuration conf;
|
|
|
+public class TestFSSchedulerConfigurationStore extends
|
|
|
+ PersistentConfigurationStoreBaseTest {
|
|
|
private File testSchedulerConfigurationDir;
|
|
|
|
|
|
@Before
|
|
|
+ @Override
|
|
|
public void setUp() throws Exception {
|
|
|
- configurationStore = new FSSchedulerConfigurationStore();
|
|
|
+ super.setUp();
|
|
|
testSchedulerConfigurationDir = new File(
|
|
|
TestFSSchedulerConfigurationStore.class.getResource("").getPath()
|
|
|
+ FSSchedulerConfigurationStore.class.getSimpleName());
|
|
|
testSchedulerConfigurationDir.mkdirs();
|
|
|
|
|
|
- conf = new Configuration();
|
|
|
conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH,
|
|
|
testSchedulerConfigurationDir.getAbsolutePath());
|
|
|
}
|
|
@@ -81,6 +78,15 @@ public class TestFSSchedulerConfigurationStore {
|
|
|
FileUtils.deleteDirectory(testSchedulerConfigurationDir);
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void checkVersion() {
|
|
|
+ try {
|
|
|
+ confStore.checkVersion();
|
|
|
+ } catch (Exception e) {
|
|
|
+ fail("checkVersion throw exception");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void confirmMutationWithValid() throws Exception {
|
|
|
conf.setInt(
|
|
@@ -89,26 +95,26 @@ public class TestFSSchedulerConfigurationStore {
|
|
|
conf.set("b", "b");
|
|
|
conf.set("c", "c");
|
|
|
writeConf(conf);
|
|
|
- configurationStore.initialize(conf, conf, null);
|
|
|
- Configuration storeConf = configurationStore.retrieve();
|
|
|
+ confStore.initialize(conf, conf, null);
|
|
|
+ Configuration storeConf = confStore.retrieve();
|
|
|
compareConfig(conf, storeConf);
|
|
|
|
|
|
Configuration expectConfig = new Configuration(conf);
|
|
|
expectConfig.unset("a");
|
|
|
expectConfig.set("b", "bb");
|
|
|
|
|
|
- prepareParameterizedLogMutation(configurationStore, true,
|
|
|
- "a", null, "b", "bb");
|
|
|
- storeConf = configurationStore.retrieve();
|
|
|
+ prepareLogMutation("a", null, "b", "bb");
|
|
|
+ confStore.confirmMutation(true);
|
|
|
+ storeConf = confStore.retrieve();
|
|
|
assertNull(storeConf.get("a"));
|
|
|
assertEquals("bb", storeConf.get("b"));
|
|
|
assertEquals("c", storeConf.get("c"));
|
|
|
|
|
|
compareConfig(expectConfig, storeConf);
|
|
|
|
|
|
- prepareParameterizedLogMutation(configurationStore, true,
|
|
|
- "a", null, "b", "bbb");
|
|
|
- storeConf = configurationStore.retrieve();
|
|
|
+ prepareLogMutation("a", null, "b", "bbb");
|
|
|
+ confStore.confirmMutation(true);
|
|
|
+ storeConf = confStore.retrieve();
|
|
|
assertNull(storeConf.get("a"));
|
|
|
assertEquals("bbb", storeConf.get("b"));
|
|
|
assertEquals("c", storeConf.get("c"));
|
|
@@ -120,17 +126,53 @@ public class TestFSSchedulerConfigurationStore {
|
|
|
conf.set("b", "b");
|
|
|
conf.set("c", "c");
|
|
|
writeConf(conf);
|
|
|
- configurationStore.initialize(conf, conf, null);
|
|
|
- Configuration storeConf = configurationStore.retrieve();
|
|
|
+ confStore.initialize(conf, conf, null);
|
|
|
+ Configuration storeConf = confStore.retrieve();
|
|
|
compareConfig(conf, storeConf);
|
|
|
|
|
|
- prepareParameterizedLogMutation(configurationStore, false,
|
|
|
- "a", null, "b", "bb");
|
|
|
- storeConf = configurationStore.retrieve();
|
|
|
+ prepareLogMutation("a", null, "b", "bb");
|
|
|
+ confStore.confirmMutation(false);
|
|
|
+
|
|
|
+ storeConf = confStore.retrieve();
|
|
|
|
|
|
compareConfig(conf, storeConf);
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testConfigRetrieval() throws Exception {
|
|
|
+ Configuration schedulerConf = new Configuration();
|
|
|
+ schedulerConf.set("a", "a");
|
|
|
+ schedulerConf.setLong("long", 1L);
|
|
|
+ schedulerConf.setBoolean("boolean", true);
|
|
|
+ writeConf(schedulerConf);
|
|
|
+
|
|
|
+ confStore.initialize(conf, conf, null);
|
|
|
+ Configuration storedConfig = confStore.retrieve();
|
|
|
+
|
|
|
+ compareConfig(schedulerConf, storedConfig);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testFormatConfiguration() throws Exception {
|
|
|
+ Configuration persistedSchedConf = new Configuration();
|
|
|
+ persistedSchedConf.set("a", "a");
|
|
|
+ writeConf(persistedSchedConf);
|
|
|
+ confStore.initialize(conf, conf, null);
|
|
|
+ Configuration storedConfig = confStore.retrieve();
|
|
|
+ assertEquals("Retrieved config should match the stored one", "a",
|
|
|
+ storedConfig.get("a"));
|
|
|
+ confStore.format();
|
|
|
+ try {
|
|
|
+ confStore.retrieve();
|
|
|
+ fail("Expected an IOException with message containing \"no capacity " +
|
|
|
+ "scheduler file in\" to be thrown");
|
|
|
+ } catch (IOException e) {
|
|
|
+ assertThat("Exception message should contain the predefined string.",
|
|
|
+ e.getMessage(),
|
|
|
+ CoreMatchers.containsString("no capacity scheduler file in"));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Test
|
|
|
public void testFileSystemClose() throws Exception {
|
|
|
MiniDFSCluster hdfsCluster = null;
|
|
@@ -146,18 +188,16 @@ public class TestFSSchedulerConfigurationStore {
|
|
|
fs.mkdirs(path);
|
|
|
}
|
|
|
|
|
|
- FSSchedulerConfigurationStore configStore =
|
|
|
- new FSSchedulerConfigurationStore();
|
|
|
hdfsConfig.set(YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH,
|
|
|
path.toString());
|
|
|
- configStore.initialize(hdfsConfig, hdfsConfig, null);
|
|
|
+ confStore.initialize(hdfsConfig, hdfsConfig, null);
|
|
|
|
|
|
// Close the FileSystem object and validate
|
|
|
fs.close();
|
|
|
|
|
|
try {
|
|
|
- prepareParameterizedLogMutation(configStore, true,
|
|
|
- "testkey", "testvalue");
|
|
|
+ prepareLogMutation("key", "val");
|
|
|
+ confStore.confirmMutation(true);
|
|
|
} catch (IOException e) {
|
|
|
if (e.getMessage().contains("Filesystem closed")) {
|
|
|
fail("FSSchedulerConfigurationStore failed to handle " +
|
|
@@ -176,48 +216,6 @@ public class TestFSSchedulerConfigurationStore {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Test
|
|
|
- public void testFormatConfiguration() throws Exception {
|
|
|
- Configuration schedulerConf = new Configuration();
|
|
|
- schedulerConf.set("a", "a");
|
|
|
- writeConf(schedulerConf);
|
|
|
- configurationStore.initialize(conf, conf, null);
|
|
|
- Configuration storedConfig = configurationStore.retrieve();
|
|
|
- assertEquals("a", storedConfig.get("a"));
|
|
|
- configurationStore.format();
|
|
|
- try {
|
|
|
- configurationStore.retrieve();
|
|
|
- fail("Expected an IOException with message containing \"no capacity " +
|
|
|
- "scheduler file in\" to be thrown");
|
|
|
- } catch (IOException e) {
|
|
|
- assertThat(e.getMessage(),
|
|
|
- CoreMatchers.containsString("no capacity scheduler file in"));
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Test
|
|
|
- public void retrieve() throws Exception {
|
|
|
- Configuration schedulerConf = new Configuration();
|
|
|
- schedulerConf.set("a", "a");
|
|
|
- schedulerConf.setLong("long", 1L);
|
|
|
- schedulerConf.setBoolean("boolean", true);
|
|
|
- writeConf(schedulerConf);
|
|
|
-
|
|
|
- configurationStore.initialize(conf, conf, null);
|
|
|
- Configuration storedConfig = configurationStore.retrieve();
|
|
|
-
|
|
|
- compareConfig(schedulerConf, storedConfig);
|
|
|
- }
|
|
|
-
|
|
|
- @Test
|
|
|
- public void checkVersion() {
|
|
|
- try {
|
|
|
- configurationStore.checkVersion();
|
|
|
- } catch (Exception e) {
|
|
|
- fail("checkVersion throw exception");
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
private void compareConfig(Configuration schedulerConf,
|
|
|
Configuration storedConfig) {
|
|
|
for (Map.Entry<String, String> entry : schedulerConf) {
|
|
@@ -231,26 +229,13 @@ public class TestFSSchedulerConfigurationStore {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void prepareParameterizedLogMutation(
|
|
|
- FSSchedulerConfigurationStore configStore,
|
|
|
- boolean validityFlag, String... values) throws Exception {
|
|
|
- Map<String, String> updates = new HashMap<>();
|
|
|
- String key;
|
|
|
- String value;
|
|
|
-
|
|
|
- if (values.length % 2 != 0) {
|
|
|
- throw new IllegalArgumentException("The number of parameters should be " +
|
|
|
- "even.");
|
|
|
- }
|
|
|
-
|
|
|
- for (int i = 1; i <= values.length; i += 2) {
|
|
|
- key = values[i - 1];
|
|
|
- value = values[i];
|
|
|
- updates.put(key, value);
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public YarnConfigurationStore createConfStore() {
|
|
|
+ return new FSSchedulerConfigurationStore();
|
|
|
+ }
|
|
|
|
|
|
- LogMutation logMutation = new LogMutation(updates, TEST_USER);
|
|
|
- configStore.logMutation(logMutation);
|
|
|
- configStore.confirmMutation(validityFlag);
|
|
|
+ @Override
|
|
|
+ Version getVersion() {
|
|
|
+ return null;
|
|
|
}
|
|
|
}
|