|
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
import static org.junit.Assert.fail;
|
|
|
|
|
|
+import java.io.FileNotFoundException;
|
|
|
import java.io.IOException;
|
|
|
import java.util.Collections;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
@@ -30,9 +31,12 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
|
+import org.apache.hadoop.fs.FileContext;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
|
|
+import org.apache.hadoop.ha.HAServiceStatus;
|
|
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
|
@@ -108,7 +112,8 @@ public class TestConsistentReadsObserver {
|
|
|
final int observerIdx = 2;
|
|
|
NameNode nn = dfsCluster.getNameNode(observerIdx);
|
|
|
int port = nn.getNameNodeAddress().getPort();
|
|
|
- Configuration configuration = dfsCluster.getConfiguration(observerIdx);
|
|
|
+ Configuration originalConf = dfsCluster.getConfiguration(observerIdx);
|
|
|
+ Configuration configuration = new Configuration(originalConf);
|
|
|
String prefix = CommonConfigurationKeys.IPC_NAMESPACE + "." + port + ".";
|
|
|
configuration.set(prefix + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY,
|
|
|
TestRpcScheduler.class.getName());
|
|
@@ -125,6 +130,8 @@ public class TestConsistentReadsObserver {
|
|
|
// be triggered and client should retry active NN.
|
|
|
dfs.getFileStatus(testPath);
|
|
|
assertSentTo(0);
|
|
|
+ // reset the original call queue
|
|
|
+ NameNodeAdapter.getRpcServer(nn).refreshCallQueue(originalConf);
|
|
|
}
|
|
|
|
|
|
@Test
|
|
@@ -194,7 +201,7 @@ public class TestConsistentReadsObserver {
|
|
|
// Therefore, the subsequent getFileStatus call should succeed.
|
|
|
if (!autoMsync) {
|
|
|
// If not testing auto-msync, perform an explicit one here
|
|
|
- dfs2.getClient().msync();
|
|
|
+ dfs2.msync();
|
|
|
} else if (autoMsyncPeriodMs > 0) {
|
|
|
Thread.sleep(autoMsyncPeriodMs);
|
|
|
}
|
|
@@ -383,6 +390,35 @@ public class TestConsistentReadsObserver {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Test(timeout=10000)
|
|
|
+ public void testMsyncFileContext() throws Exception {
|
|
|
+ NameNode nn0 = dfsCluster.getNameNode(0);
|
|
|
+ NameNode nn2 = dfsCluster.getNameNode(2);
|
|
|
+ HAServiceStatus st = nn0.getRpcServer().getServiceStatus();
|
|
|
+ assertEquals("nn0 is not active", HAServiceState.ACTIVE, st.getState());
|
|
|
+ st = nn2.getRpcServer().getServiceStatus();
|
|
|
+ assertEquals("nn2 is not observer", HAServiceState.OBSERVER, st.getState());
|
|
|
+
|
|
|
+ FileContext fc = FileContext.getFileContext(conf);
|
|
|
+ // initialize observer proxy for FileContext
|
|
|
+ fc.getFsStatus(testPath);
|
|
|
+
|
|
|
+ Path p = new Path(testPath, "testMsyncFileContext");
|
|
|
+ fc.mkdir(p, FsPermission.getDefault(), true);
|
|
|
+ fc.msync();
|
|
|
+ dfsCluster.rollEditLogAndTail(0);
|
|
|
+ LOG.info("State id active = {}, Stat id observer = {}",
|
|
|
+ nn0.getNamesystem().getFSImage().getLastAppliedOrWrittenTxId(),
|
|
|
+ nn2.getNamesystem().getFSImage().getLastAppliedOrWrittenTxId());
|
|
|
+ try {
|
|
|
+ // if getFileStatus is taking too long due to server requeueing
|
|
|
+ // the test will time out
|
|
|
+ fc.getFileStatus(p);
|
|
|
+ } catch (FileNotFoundException e) {
|
|
|
+ fail("File should exist on Observer after msync");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void assertSentTo(int nnIdx) throws IOException {
|
|
|
assertTrue("Request was not sent to the expected namenode " + nnIdx,
|
|
|
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
|