|
@@ -18,14 +18,15 @@
|
|
|
|
|
|
package org.apache.hadoop.hdfs;
|
|
|
|
|
|
-import static org.hamcrest.CoreMatchers.is;
|
|
|
-import static org.junit.Assert.assertThat;
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
|
+import static org.junit.Assert.assertTrue;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
-import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
|
|
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
|
+import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
|
|
@@ -66,6 +67,7 @@ public class TestStateAlignmentContextWithHA {
|
|
|
private static final Configuration CONF = new HdfsConfiguration();
|
|
|
private static final List<ClientGSIContext> AC_LIST = new ArrayList<>();
|
|
|
|
|
|
+ private static MiniQJMHACluster qjmhaCluster;
|
|
|
private static MiniDFSCluster cluster;
|
|
|
private static List<Worker> clients;
|
|
|
|
|
@@ -87,33 +89,26 @@ public class TestStateAlignmentContextWithHA {
|
|
|
|
|
|
@BeforeClass
|
|
|
public static void startUpCluster() throws IOException {
|
|
|
- // disable block scanner
|
|
|
- CONF.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
|
|
|
// Set short retry timeouts so this test runs faster
|
|
|
CONF.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
|
|
|
- CONF.setBoolean("fs.hdfs.impl.disable.cache", true);
|
|
|
+ CONF.setBoolean(String.format(
|
|
|
+ "fs.%s.impl.disable.cache", HdfsConstants.HDFS_URI_SCHEME), true);
|
|
|
+ CONF.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, NUMDATANODES);
|
|
|
|
|
|
- cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(NUMDATANODES)
|
|
|
- .nnTopology(MiniDFSNNTopology.simpleHATopology(3))
|
|
|
- .build();
|
|
|
- cluster.waitActive();
|
|
|
- cluster.transitionToActive(0);
|
|
|
- cluster.transitionToObserver(2);
|
|
|
-
|
|
|
- HATestUtil.setupHAConfiguration(
|
|
|
- cluster, CONF, 0, ORPPwithAlignmentContexts.class);
|
|
|
+ qjmhaCluster = HATestUtil.setUpObserverCluster(CONF, 1, NUMDATANODES, true);
|
|
|
+ cluster = qjmhaCluster.getDfsCluster();
|
|
|
}
|
|
|
|
|
|
@Before
|
|
|
public void before() throws IOException, URISyntaxException {
|
|
|
- dfs = (DistributedFileSystem) FileSystem.get(CONF);
|
|
|
+ dfs = HATestUtil.configureObserverReadFs(
|
|
|
+ cluster, CONF, ORPPwithAlignmentContexts.class, true);
|
|
|
}
|
|
|
|
|
|
@AfterClass
|
|
|
public static void shutDownCluster() throws IOException {
|
|
|
- if (cluster != null) {
|
|
|
- cluster.shutdown();
|
|
|
- cluster = null;
|
|
|
+ if (qjmhaCluster != null) {
|
|
|
+ qjmhaCluster.shutdown();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -144,9 +139,9 @@ public class TestStateAlignmentContextWithHA {
|
|
|
long postWriteState =
|
|
|
cluster.getNamesystem(active).getLastWrittenTransactionId();
|
|
|
// Write(s) should have increased state. Check for greater than.
|
|
|
- assertThat(clientState > preWriteState, is(true));
|
|
|
+ assertTrue(clientState > preWriteState);
|
|
|
// Client and server state should be equal.
|
|
|
- assertThat(clientState, is(postWriteState));
|
|
|
+ assertEquals(clientState, postWriteState);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -161,7 +156,7 @@ public class TestStateAlignmentContextWithHA {
|
|
|
DFSTestUtil.readFile(dfs, new Path("/testFile2"));
|
|
|
// Read should catch client up to last written state.
|
|
|
long clientState = getContext(0).getLastSeenStateId();
|
|
|
- assertThat(clientState, is(lastWrittenId));
|
|
|
+ assertEquals(clientState, lastWrittenId);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -173,12 +168,12 @@ public class TestStateAlignmentContextWithHA {
|
|
|
DFSTestUtil.writeFile(dfs, new Path("/testFile3"), "ezpz");
|
|
|
long lastWrittenId =
|
|
|
cluster.getNamesystem(active).getLastWrittenTransactionId();
|
|
|
- try (DistributedFileSystem clearDfs =
|
|
|
- (DistributedFileSystem) FileSystem.get(CONF)) {
|
|
|
+ try (DistributedFileSystem clearDfs = HATestUtil.configureObserverReadFs(
|
|
|
+ cluster, CONF, ORPPwithAlignmentContexts.class, true);) {
|
|
|
ClientGSIContext clientState = getContext(1);
|
|
|
- assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE));
|
|
|
+ assertEquals(clientState.getLastSeenStateId(), Long.MIN_VALUE);
|
|
|
DFSTestUtil.readFile(clearDfs, new Path("/testFile3"));
|
|
|
- assertThat(clientState.getLastSeenStateId(), is(lastWrittenId));
|
|
|
+ assertEquals(clientState.getLastSeenStateId(), lastWrittenId);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -196,9 +191,9 @@ public class TestStateAlignmentContextWithHA {
|
|
|
long postWriteState =
|
|
|
cluster.getNamesystem(active).getLastWrittenTransactionId();
|
|
|
// Write(s) should have increased state. Check for greater than.
|
|
|
- assertThat(clientState > preWriteState, is(true));
|
|
|
+ assertTrue(clientState > preWriteState);
|
|
|
// Client and server state should be equal.
|
|
|
- assertThat(clientState, is(postWriteState));
|
|
|
+ assertEquals(clientState, postWriteState);
|
|
|
|
|
|
// Failover NameNode.
|
|
|
failOver();
|
|
@@ -210,9 +205,9 @@ public class TestStateAlignmentContextWithHA {
|
|
|
cluster.getNamesystem(active).getLastWrittenTransactionId();
|
|
|
|
|
|
// Write(s) should have increased state. Check for greater than.
|
|
|
- assertThat(clientStateFO > postWriteState, is(true));
|
|
|
+ assertTrue(clientStateFO > postWriteState);
|
|
|
// Client and server state should be equal.
|
|
|
- assertThat(clientStateFO, is(writeStateFO));
|
|
|
+ assertEquals(clientStateFO, writeStateFO);
|
|
|
}
|
|
|
|
|
|
@Test(timeout=300000)
|
|
@@ -230,8 +225,8 @@ public class TestStateAlignmentContextWithHA {
|
|
|
ExecutorService execService = Executors.newFixedThreadPool(2);
|
|
|
clients = new ArrayList<>(numClients);
|
|
|
for (int i = clientStartId; i <= numClients; i++) {
|
|
|
- DistributedFileSystem haClient =
|
|
|
- (DistributedFileSystem) FileSystem.get(CONF);
|
|
|
+ DistributedFileSystem haClient = HATestUtil.configureObserverReadFs(
|
|
|
+ cluster, CONF, ORPPwithAlignmentContexts.class, true);
|
|
|
clients.add(new Worker(haClient, numFiles, "/testFile3FO_", i));
|
|
|
}
|
|
|
|
|
@@ -248,7 +243,7 @@ public class TestStateAlignmentContextWithHA {
|
|
|
|
|
|
// Validation.
|
|
|
for (Future<STATE> future : futures) {
|
|
|
- assertThat(future.get(), is(STATE.SUCCESS));
|
|
|
+ assertEquals(future.get(), STATE.SUCCESS);
|
|
|
}
|
|
|
|
|
|
clients.clear();
|