|
@@ -33,6 +33,8 @@ import javax.management.ObjectName;
|
|
|
import javax.management.ReflectionException;
|
|
|
import javax.management.openmbean.CompositeDataSupport;
|
|
|
|
|
|
+import org.junit.Rule;
|
|
|
+import org.junit.rules.TemporaryFolder;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -80,16 +82,38 @@ public class TestRollingUpgrade {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Rule
|
|
|
+ public TemporaryFolder folder = new TemporaryFolder();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Create a default HDFS configuration which has test-specific data directories. This is
|
|
|
+ * intended to protect against interactions between test runs that might corrupt results. Each
|
|
|
+ * test run's data is automatically cleaned-up by JUnit.
|
|
|
+ *
|
|
|
+ * @return a default configuration with test-specific data directories
|
|
|
+ */
|
|
|
+ public Configuration getHdfsConfiguration() throws IOException {
|
|
|
+ Configuration conf = new HdfsConfiguration();
|
|
|
+
|
|
|
+ // Override the file system locations with test-specific temporary folders
|
|
|
+ conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
|
|
|
+ folder.newFolder("dfs/name").toString());
|
|
|
+ conf.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,
|
|
|
+ folder.newFolder("dfs/namesecondary").toString());
|
|
|
+ conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
|
|
|
+ folder.newFolder("dfs/data").toString());
|
|
|
+
|
|
|
+ return conf;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Test DFSAdmin Upgrade Command.
|
|
|
*/
|
|
|
@Test
|
|
|
public void testDFSAdminRollingUpgradeCommands() throws Exception {
|
|
|
// start a cluster
|
|
|
- final Configuration conf = new HdfsConfiguration();
|
|
|
- MiniDFSCluster cluster = null;
|
|
|
- try {
|
|
|
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
|
|
+ final Configuration conf = getHdfsConfiguration();
|
|
|
+ try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build()) {
|
|
|
cluster.waitActive();
|
|
|
|
|
|
final Path foo = new Path("/foo");
|
|
@@ -149,8 +173,6 @@ public class TestRollingUpgrade {
|
|
|
Assert.assertTrue(dfs.exists(bar));
|
|
|
Assert.assertTrue(dfs.exists(baz));
|
|
|
}
|
|
|
- } finally {
|
|
|
- if(cluster != null) cluster.shutdown();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -172,115 +194,116 @@ public class TestRollingUpgrade {
|
|
|
LOG.info("nn1Dir=" + nn1Dir);
|
|
|
LOG.info("nn2Dir=" + nn2Dir);
|
|
|
|
|
|
- final Configuration conf = new HdfsConfiguration();
|
|
|
- final MiniJournalCluster mjc = new MiniJournalCluster.Builder(conf).build();
|
|
|
- mjc.waitActive();
|
|
|
- setConf(conf, nn1Dir, mjc);
|
|
|
-
|
|
|
- {
|
|
|
- // Start the cluster once to generate the dfs dirs
|
|
|
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
|
|
- .numDataNodes(0)
|
|
|
- .manageNameDfsDirs(false)
|
|
|
- .checkExitOnShutdown(false)
|
|
|
- .build();
|
|
|
- // Shutdown the cluster before making a copy of the namenode dir to release
|
|
|
- // all file locks, otherwise, the copy will fail on some platforms.
|
|
|
- cluster.shutdown();
|
|
|
- }
|
|
|
-
|
|
|
- MiniDFSCluster cluster2 = null;
|
|
|
- try {
|
|
|
- // Start a second NN pointed to the same quorum.
|
|
|
- // We need to copy the image dir from the first NN -- or else
|
|
|
- // the new NN will just be rejected because of Namespace mismatch.
|
|
|
- FileUtil.fullyDelete(nn2Dir);
|
|
|
- FileUtil.copy(nn1Dir, FileSystem.getLocal(conf).getRaw(),
|
|
|
- new Path(nn2Dir.getAbsolutePath()), false, conf);
|
|
|
-
|
|
|
- // Start the cluster again
|
|
|
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
|
|
- .numDataNodes(0)
|
|
|
- .format(false)
|
|
|
- .manageNameDfsDirs(false)
|
|
|
- .checkExitOnShutdown(false)
|
|
|
- .build();
|
|
|
+ final Configuration conf = getHdfsConfiguration();
|
|
|
+ try (MiniJournalCluster mjc = new MiniJournalCluster.Builder(conf).build()) {
|
|
|
+ mjc.waitActive();
|
|
|
+ setConf(conf, nn1Dir, mjc);
|
|
|
|
|
|
- final Path foo = new Path("/foo");
|
|
|
- final Path bar = new Path("/bar");
|
|
|
- final Path baz = new Path("/baz");
|
|
|
-
|
|
|
- final RollingUpgradeInfo info1;
|
|
|
{
|
|
|
- final DistributedFileSystem dfs = cluster.getFileSystem();
|
|
|
- dfs.mkdirs(foo);
|
|
|
-
|
|
|
- //start rolling upgrade
|
|
|
- dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
|
|
|
- info1 = dfs.rollingUpgrade(RollingUpgradeAction.PREPARE);
|
|
|
- dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
|
|
|
- LOG.info("START\n" + info1);
|
|
|
-
|
|
|
- //query rolling upgrade
|
|
|
- assertEquals(info1, dfs.rollingUpgrade(RollingUpgradeAction.QUERY));
|
|
|
-
|
|
|
- dfs.mkdirs(bar);
|
|
|
+ // Start the cluster once to generate the dfs dirs
|
|
|
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
|
|
+ .numDataNodes(0)
|
|
|
+ .manageNameDfsDirs(false)
|
|
|
+ .checkExitOnShutdown(false)
|
|
|
+ .build();
|
|
|
+ // Shutdown the cluster before making a copy of the namenode dir to release
|
|
|
+ // all file locks, otherwise, the copy will fail on some platforms.
|
|
|
cluster.shutdown();
|
|
|
}
|
|
|
|
|
|
- // cluster2 takes over QJM
|
|
|
- final Configuration conf2 = setConf(new Configuration(), nn2Dir, mjc);
|
|
|
- cluster2 = new MiniDFSCluster.Builder(conf2)
|
|
|
- .numDataNodes(0)
|
|
|
- .format(false)
|
|
|
- .manageNameDfsDirs(false)
|
|
|
- .build();
|
|
|
- final DistributedFileSystem dfs2 = cluster2.getFileSystem();
|
|
|
-
|
|
|
- // Check that cluster2 sees the edits made on cluster1
|
|
|
- Assert.assertTrue(dfs2.exists(foo));
|
|
|
- Assert.assertTrue(dfs2.exists(bar));
|
|
|
- Assert.assertFalse(dfs2.exists(baz));
|
|
|
-
|
|
|
- //query rolling upgrade in cluster2
|
|
|
- assertEquals(info1, dfs2.rollingUpgrade(RollingUpgradeAction.QUERY));
|
|
|
-
|
|
|
- dfs2.mkdirs(baz);
|
|
|
-
|
|
|
- LOG.info("RESTART cluster 2");
|
|
|
- cluster2.restartNameNode();
|
|
|
- assertEquals(info1, dfs2.rollingUpgrade(RollingUpgradeAction.QUERY));
|
|
|
- Assert.assertTrue(dfs2.exists(foo));
|
|
|
- Assert.assertTrue(dfs2.exists(bar));
|
|
|
- Assert.assertTrue(dfs2.exists(baz));
|
|
|
-
|
|
|
- //restart cluster with -upgrade should fail.
|
|
|
+ MiniDFSCluster cluster2 = null;
|
|
|
try {
|
|
|
- cluster2.restartNameNode("-upgrade");
|
|
|
- } catch(IOException e) {
|
|
|
- LOG.info("The exception is expected.", e);
|
|
|
- }
|
|
|
+ // Start a second NN pointed to the same quorum.
|
|
|
+ // We need to copy the image dir from the first NN -- or else
|
|
|
+ // the new NN will just be rejected because of Namespace mismatch.
|
|
|
+ FileUtil.fullyDelete(nn2Dir);
|
|
|
+ FileUtil.copy(nn1Dir, FileSystem.getLocal(conf).getRaw(),
|
|
|
+ new Path(nn2Dir.getAbsolutePath()), false, conf);
|
|
|
+
|
|
|
+ // Start the cluster again
|
|
|
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
|
|
+ .numDataNodes(0)
|
|
|
+ .format(false)
|
|
|
+ .manageNameDfsDirs(false)
|
|
|
+ .checkExitOnShutdown(false)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ final Path foo = new Path("/foo");
|
|
|
+ final Path bar = new Path("/bar");
|
|
|
+ final Path baz = new Path("/baz");
|
|
|
+
|
|
|
+ final RollingUpgradeInfo info1;
|
|
|
+ {
|
|
|
+ final DistributedFileSystem dfs = cluster.getFileSystem();
|
|
|
+ dfs.mkdirs(foo);
|
|
|
+
|
|
|
+ //start rolling upgrade
|
|
|
+ dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
|
|
|
+ info1 = dfs.rollingUpgrade(RollingUpgradeAction.PREPARE);
|
|
|
+ dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
|
|
|
+ LOG.info("START\n" + info1);
|
|
|
+
|
|
|
+ //query rolling upgrade
|
|
|
+ assertEquals(info1, dfs.rollingUpgrade(RollingUpgradeAction.QUERY));
|
|
|
+
|
|
|
+ dfs.mkdirs(bar);
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
|
|
|
- LOG.info("RESTART cluster 2 again");
|
|
|
- cluster2.restartNameNode();
|
|
|
- assertEquals(info1, dfs2.rollingUpgrade(RollingUpgradeAction.QUERY));
|
|
|
- Assert.assertTrue(dfs2.exists(foo));
|
|
|
- Assert.assertTrue(dfs2.exists(bar));
|
|
|
- Assert.assertTrue(dfs2.exists(baz));
|
|
|
-
|
|
|
- //finalize rolling upgrade
|
|
|
- final RollingUpgradeInfo finalize = dfs2.rollingUpgrade(
|
|
|
- RollingUpgradeAction.FINALIZE);
|
|
|
- Assert.assertTrue(finalize.isFinalized());
|
|
|
-
|
|
|
- LOG.info("RESTART cluster 2 with regular startup option");
|
|
|
- cluster2.getNameNodeInfos()[0].setStartOpt(StartupOption.REGULAR);
|
|
|
- cluster2.restartNameNode();
|
|
|
- Assert.assertTrue(dfs2.exists(foo));
|
|
|
- Assert.assertTrue(dfs2.exists(bar));
|
|
|
- Assert.assertTrue(dfs2.exists(baz));
|
|
|
- } finally {
|
|
|
- if (cluster2 != null) cluster2.shutdown();
|
|
|
+ // cluster2 takes over QJM
|
|
|
+ final Configuration conf2 = setConf(new Configuration(), nn2Dir, mjc);
|
|
|
+ cluster2 = new MiniDFSCluster.Builder(conf2)
|
|
|
+ .numDataNodes(0)
|
|
|
+ .format(false)
|
|
|
+ .manageNameDfsDirs(false)
|
|
|
+ .build();
|
|
|
+ final DistributedFileSystem dfs2 = cluster2.getFileSystem();
|
|
|
+
|
|
|
+ // Check that cluster2 sees the edits made on cluster1
|
|
|
+ Assert.assertTrue(dfs2.exists(foo));
|
|
|
+ Assert.assertTrue(dfs2.exists(bar));
|
|
|
+ Assert.assertFalse(dfs2.exists(baz));
|
|
|
+
|
|
|
+ //query rolling upgrade in cluster2
|
|
|
+ assertEquals(info1, dfs2.rollingUpgrade(RollingUpgradeAction.QUERY));
|
|
|
+
|
|
|
+ dfs2.mkdirs(baz);
|
|
|
+
|
|
|
+ LOG.info("RESTART cluster 2");
|
|
|
+ cluster2.restartNameNode();
|
|
|
+ assertEquals(info1, dfs2.rollingUpgrade(RollingUpgradeAction.QUERY));
|
|
|
+ Assert.assertTrue(dfs2.exists(foo));
|
|
|
+ Assert.assertTrue(dfs2.exists(bar));
|
|
|
+ Assert.assertTrue(dfs2.exists(baz));
|
|
|
+
|
|
|
+ //restart cluster with -upgrade should fail.
|
|
|
+ try {
|
|
|
+ cluster2.restartNameNode("-upgrade");
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.info("The exception is expected.", e);
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG.info("RESTART cluster 2 again");
|
|
|
+ cluster2.restartNameNode();
|
|
|
+ assertEquals(info1, dfs2.rollingUpgrade(RollingUpgradeAction.QUERY));
|
|
|
+ Assert.assertTrue(dfs2.exists(foo));
|
|
|
+ Assert.assertTrue(dfs2.exists(bar));
|
|
|
+ Assert.assertTrue(dfs2.exists(baz));
|
|
|
+
|
|
|
+ //finalize rolling upgrade
|
|
|
+ final RollingUpgradeInfo finalize = dfs2.rollingUpgrade(
|
|
|
+ RollingUpgradeAction.FINALIZE);
|
|
|
+ Assert.assertTrue(finalize.isFinalized());
|
|
|
+
|
|
|
+ LOG.info("RESTART cluster 2 with regular startup option");
|
|
|
+ cluster2.getNameNodeInfos()[0].setStartOpt(StartupOption.REGULAR);
|
|
|
+ cluster2.restartNameNode();
|
|
|
+ Assert.assertTrue(dfs2.exists(foo));
|
|
|
+ Assert.assertTrue(dfs2.exists(bar));
|
|
|
+ Assert.assertTrue(dfs2.exists(baz));
|
|
|
+ } finally {
|
|
|
+ if (cluster2 != null) cluster2.shutdown();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -309,10 +332,8 @@ public class TestRollingUpgrade {
|
|
|
@Test
|
|
|
public void testRollback() throws Exception {
|
|
|
// start a cluster
|
|
|
- final Configuration conf = new HdfsConfiguration();
|
|
|
- MiniDFSCluster cluster = null;
|
|
|
- try {
|
|
|
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
|
|
+ final Configuration conf = getHdfsConfiguration();
|
|
|
+ try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build()) {
|
|
|
cluster.waitActive();
|
|
|
|
|
|
final Path foo = new Path("/foo");
|
|
@@ -352,8 +373,6 @@ public class TestRollingUpgrade {
|
|
|
|
|
|
startRollingUpgrade(foo, bar, file, data, cluster);
|
|
|
rollbackRollingUpgrade(foo, bar, file, data, cluster);
|
|
|
- } finally {
|
|
|
- if(cluster != null) cluster.shutdown();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -407,10 +426,8 @@ public class TestRollingUpgrade {
|
|
|
@Test
|
|
|
public void testDFSAdminDatanodeUpgradeControlCommands() throws Exception {
|
|
|
// start a cluster
|
|
|
- final Configuration conf = new HdfsConfiguration();
|
|
|
- MiniDFSCluster cluster = null;
|
|
|
- try {
|
|
|
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
|
|
+ final Configuration conf = getHdfsConfiguration();
|
|
|
+ try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build()) {
|
|
|
cluster.waitActive();
|
|
|
final DFSAdmin dfsadmin = new DFSAdmin(conf);
|
|
|
DataNode dn = cluster.getDataNodes().get(0);
|
|
@@ -431,8 +448,6 @@ public class TestRollingUpgrade {
|
|
|
|
|
|
// ping should fail.
|
|
|
assertEquals(-1, dfsadmin.run(args1));
|
|
|
- } finally {
|
|
|
- if (cluster != null) cluster.shutdown();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -462,7 +477,7 @@ public class TestRollingUpgrade {
|
|
|
|
|
|
private void testFinalize(int nnCount, boolean skipImageDeltaCheck)
|
|
|
throws Exception {
|
|
|
- final Configuration conf = new HdfsConfiguration();
|
|
|
+ final Configuration conf = getHdfsConfiguration();
|
|
|
MiniQJMHACluster cluster = null;
|
|
|
final Path foo = new Path("/foo");
|
|
|
final Path bar = new Path("/bar");
|
|
@@ -528,10 +543,8 @@ public class TestRollingUpgrade {
|
|
|
}
|
|
|
|
|
|
private void testQuery(int nnCount) throws Exception{
|
|
|
- final Configuration conf = new Configuration();
|
|
|
- MiniQJMHACluster cluster = null;
|
|
|
- try {
|
|
|
- cluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(nnCount).build();
|
|
|
+ final Configuration conf = getHdfsConfiguration();
|
|
|
+ try (MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(nnCount).build()) {
|
|
|
MiniDFSCluster dfsCluster = cluster.getDfsCluster();
|
|
|
dfsCluster.waitActive();
|
|
|
|
|
@@ -561,19 +574,13 @@ public class TestRollingUpgrade {
|
|
|
// The NN should have a copy of the fsimage in case of rollbacks.
|
|
|
Assert.assertTrue(dfsCluster.getNamesystem(0).getFSImage()
|
|
|
.hasRollbackFSImage());
|
|
|
- } finally {
|
|
|
- if (cluster != null) {
|
|
|
- cluster.shutdown();
|
|
|
- }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Test (timeout = 300000)
|
|
|
public void testQueryAfterRestart() throws IOException, InterruptedException {
|
|
|
- final Configuration conf = new Configuration();
|
|
|
- MiniDFSCluster cluster = null;
|
|
|
- try {
|
|
|
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
|
|
+ final Configuration conf = getHdfsConfiguration();
|
|
|
+ try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build()) {
|
|
|
cluster.waitActive();
|
|
|
DistributedFileSystem dfs = cluster.getFileSystem();
|
|
|
|
|
@@ -587,10 +594,6 @@ public class TestRollingUpgrade {
|
|
|
|
|
|
cluster.restartNameNodes();
|
|
|
dfs.rollingUpgrade(RollingUpgradeAction.QUERY);
|
|
|
- } finally {
|
|
|
- if (cluster != null) {
|
|
|
- cluster.shutdown();
|
|
|
- }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -606,7 +609,7 @@ public class TestRollingUpgrade {
|
|
|
|
|
|
@Test(timeout = 60000)
|
|
|
public void testRollBackImage() throws Exception {
|
|
|
- final Configuration conf = new Configuration();
|
|
|
+ final Configuration conf = getHdfsConfiguration();
|
|
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 10);
|
|
|
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
|
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 2);
|
|
@@ -651,15 +654,14 @@ public class TestRollingUpgrade {
|
|
|
}
|
|
|
|
|
|
public void testCheckpoint(int nnCount) throws IOException, InterruptedException {
|
|
|
- final Configuration conf = new Configuration();
|
|
|
+ final Configuration conf = getHdfsConfiguration();
|
|
|
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
|
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 1);
|
|
|
|
|
|
- MiniQJMHACluster cluster = null;
|
|
|
final Path foo = new Path("/foo");
|
|
|
|
|
|
- try {
|
|
|
- cluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(nnCount).build();
|
|
|
+ try (MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(nnCount)
|
|
|
+ .build()) {
|
|
|
MiniDFSCluster dfsCluster = cluster.getDfsCluster();
|
|
|
dfsCluster.waitActive();
|
|
|
|
|
@@ -681,17 +683,14 @@ public class TestRollingUpgrade {
|
|
|
verifyNNCheckpoint(dfsCluster, txid, i);
|
|
|
}
|
|
|
|
|
|
- } finally {
|
|
|
- if (cluster != null) {
|
|
|
- cluster.shutdown();
|
|
|
- }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Verify that the namenode at the given index has an FSImage with a TxId up to txid-1
|
|
|
*/
|
|
|
- private void verifyNNCheckpoint(MiniDFSCluster dfsCluster, long txid, int nnIndex) throws InterruptedException {
|
|
|
+ private void verifyNNCheckpoint(MiniDFSCluster dfsCluster, long txid, int nnIndex)
|
|
|
+ throws InterruptedException {
|
|
|
int retries = 0;
|
|
|
while (++retries < 5) {
|
|
|
NNStorage storage = dfsCluster.getNamesystem(nnIndex).getFSImage()
|
|
@@ -732,7 +731,7 @@ public class TestRollingUpgrade {
|
|
|
SecondaryNameNode snn = null;
|
|
|
|
|
|
try {
|
|
|
- Configuration conf = new HdfsConfiguration();
|
|
|
+ Configuration conf = getHdfsConfiguration();
|
|
|
cluster = new MiniDFSCluster.Builder(conf).build();
|
|
|
cluster.waitActive();
|
|
|
|