|
@@ -24,6 +24,7 @@ import static org.junit.Assert.fail;
|
|
|
|
|
|
import com.google.common.base.Supplier;
|
|
|
import java.io.IOException;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.TimeoutException;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
@@ -35,13 +36,16 @@ import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
|
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
|
|
import org.apache.hadoop.ipc.ProcessingDetails;
|
|
|
+import org.apache.hadoop.ipc.RemoteException;
|
|
|
import org.apache.hadoop.ipc.RpcScheduler;
|
|
|
import org.apache.hadoop.ipc.Schedulable;
|
|
|
+import org.apache.hadoop.ipc.StandbyException;
|
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
import org.junit.After;
|
|
@@ -376,6 +380,39 @@ public class TestConsistentReadsObserver {
|
|
|
reader.interrupt();
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testRequestFromNonObserverProxyProvider() throws Exception {
|
|
|
+ // Create another HDFS client using ConfiguredFailoverProvider
|
|
|
+ Configuration conf2 = new Configuration(conf);
|
|
|
+
|
|
|
+ // Populate the above configuration with only a single observer in the
|
|
|
+ // namenode list. Also reduce retries to make test finish faster.
|
|
|
+ HATestUtil.setFailoverConfigurations(
|
|
|
+ conf2,
|
|
|
+ HATestUtil.getLogicalHostname(dfsCluster),
|
|
|
+ Collections.singletonList(
|
|
|
+ dfsCluster.getNameNode(2).getNameNodeAddress()),
|
|
|
+ ConfiguredFailoverProxyProvider.class);
|
|
|
+ conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
|
|
|
+ conf2.setInt(HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY, 1);
|
|
|
+ conf2.setInt(HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY, 1);
|
|
|
+ FileSystem dfs2 = FileSystem.get(conf2);
|
|
|
+
|
|
|
+ dfs.mkdir(testPath, FsPermission.getDefault());
|
|
|
+ dfsCluster.rollEditLogAndTail(0);
|
|
|
+
|
|
|
+ try {
|
|
|
+ // Request should be rejected by observer and throw StandbyException
|
|
|
+ dfs2.listStatus(testPath);
|
|
|
+ fail("listStatus should have thrown exception");
|
|
|
+ } catch (RemoteException re) {
|
|
|
+ IOException e = re.unwrapRemoteException();
|
|
|
+ assertTrue("should have thrown StandbyException but got "
|
|
|
+ + e.getClass().getSimpleName(),
|
|
|
+ e instanceof StandbyException);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void assertSentTo(int nnIdx) throws IOException {
|
|
|
assertTrue("Request was not sent to the expected namenode " + nnIdx,
|
|
|
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
|