|
@@ -28,6 +28,7 @@ import java.net.BindException;
|
|
import java.net.URI;
|
|
import java.net.URI;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Collection;
|
|
import java.util.Collection;
|
|
|
|
+import java.util.Random;
|
|
import java.util.concurrent.Callable;
|
|
import java.util.concurrent.Callable;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.TimeoutException;
|
|
import java.util.concurrent.TimeoutException;
|
|
@@ -46,7 +47,6 @@ import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
|
-import org.apache.hadoop.net.ServerSocketUtil;
|
|
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
import org.apache.log4j.Level;
|
|
import org.apache.log4j.Level;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
@@ -168,21 +168,7 @@ public class TestEditLogTailer {
|
|
MiniDFSCluster cluster = null;
|
|
MiniDFSCluster cluster = null;
|
|
for (int i = 0; i < 5; i++) {
|
|
for (int i = 0; i < 5; i++) {
|
|
try {
|
|
try {
|
|
- // Have to specify IPC ports so the NNs can talk to each other.
|
|
|
|
- int[] ports = ServerSocketUtil.getPorts(3);
|
|
|
|
- MiniDFSNNTopology topology = new MiniDFSNNTopology()
|
|
|
|
- .addNameservice(new MiniDFSNNTopology.NSConf("ns1")
|
|
|
|
- .addNN(new MiniDFSNNTopology.NNConf("nn1")
|
|
|
|
- .setIpcPort(ports[0]))
|
|
|
|
- .addNN(new MiniDFSNNTopology.NNConf("nn2")
|
|
|
|
- .setIpcPort(ports[1]))
|
|
|
|
- .addNN(new MiniDFSNNTopology.NNConf("nn3")
|
|
|
|
- .setIpcPort(ports[2])));
|
|
|
|
-
|
|
|
|
- cluster = new MiniDFSCluster.Builder(conf)
|
|
|
|
- .nnTopology(topology)
|
|
|
|
- .numDataNodes(0)
|
|
|
|
- .build();
|
|
|
|
|
|
+ cluster = createMiniDFSCluster(conf, 3);
|
|
break;
|
|
break;
|
|
} catch (BindException e) {
|
|
} catch (BindException e) {
|
|
// retry if race on ports given by ServerSocketUtil#getPorts
|
|
// retry if race on ports given by ServerSocketUtil#getPorts
|
|
@@ -213,21 +199,9 @@ public class TestEditLogTailer {
|
|
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
|
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
|
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY, 100);
|
|
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY, 100);
|
|
|
|
|
|
- // Have to specify IPC ports so the NNs can talk to each other.
|
|
|
|
- MiniDFSNNTopology topology = new MiniDFSNNTopology()
|
|
|
|
- .addNameservice(new MiniDFSNNTopology.NSConf("ns1")
|
|
|
|
- .addNN(new MiniDFSNNTopology.NNConf("nn1")
|
|
|
|
- .setIpcPort(ServerSocketUtil.getPort(0, 100)))
|
|
|
|
- .addNN(new MiniDFSNNTopology.NNConf("nn2")
|
|
|
|
- .setIpcPort(ServerSocketUtil.getPort(0, 100)))
|
|
|
|
- .addNN(new MiniDFSNNTopology.NNConf("nn3")
|
|
|
|
- .setIpcPort(ServerSocketUtil.getPort(0, 100))));
|
|
|
|
-
|
|
|
|
- MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
|
|
|
- .nnTopology(topology)
|
|
|
|
- .numDataNodes(0)
|
|
|
|
- .build();
|
|
|
|
|
|
+ MiniDFSCluster cluster = null;
|
|
try {
|
|
try {
|
|
|
|
+ cluster = createMiniDFSCluster(conf, 3);
|
|
cluster.transitionToStandby(0);
|
|
cluster.transitionToStandby(0);
|
|
cluster.transitionToStandby(1);
|
|
cluster.transitionToStandby(1);
|
|
cluster.transitionToStandby(2);
|
|
cluster.transitionToStandby(2);
|
|
@@ -240,7 +214,9 @@ public class TestEditLogTailer {
|
|
cluster.transitionToActive(0);
|
|
cluster.transitionToActive(0);
|
|
waitForLogRollInSharedDir(cluster, 3);
|
|
waitForLogRollInSharedDir(cluster, 3);
|
|
} finally {
|
|
} finally {
|
|
- cluster.shutdown();
|
|
|
|
|
|
+ if (cluster != null) {
|
|
|
|
+ cluster.shutdown();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -306,4 +282,64 @@ public class TestEditLogTailer {
|
|
cluster.shutdown();
|
|
cluster.shutdown();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testRollEditLogIOExceptionForRemoteNN() throws IOException {
|
|
|
|
+ Configuration conf = getConf();
|
|
|
|
+
|
|
|
|
+ // Roll every 1s
|
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, 1);
|
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
|
|
|
+
|
|
|
|
+ MiniDFSCluster cluster = null;
|
|
|
|
+ try {
|
|
|
|
+ cluster = createMiniDFSCluster(conf, 3);
|
|
|
|
+ cluster.transitionToActive(0);
|
|
|
|
+ EditLogTailer tailer = Mockito.spy(
|
|
|
|
+ cluster.getNamesystem(1).getEditLogTailer());
|
|
|
|
+
|
|
|
|
+ final AtomicInteger invokedTimes = new AtomicInteger(0);
|
|
|
|
+
|
|
|
|
+ // It should go on to next name node when IOException happens.
|
|
|
|
+ when(tailer.getNameNodeProxy()).thenReturn(
|
|
|
|
+ tailer.new MultipleNameNodeProxy<Void>() {
|
|
|
|
+ @Override
|
|
|
|
+ protected Void doWork() throws IOException {
|
|
|
|
+ invokedTimes.getAndIncrement();
|
|
|
|
+ throw new IOException("It is an IO Exception.");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ );
|
|
|
|
+
|
|
|
|
+ tailer.triggerActiveLogRoll();
|
|
|
|
+
|
|
|
|
+ // MultipleNameNodeProxy uses Round-robin to look for active NN
|
|
|
|
+ // to do RollEditLog. If doWork() fails, then IOException throws,
|
|
|
|
+ // it continues to try next NN. triggerActiveLogRoll finishes
|
|
|
|
+ // either due to success, or using up retries.
|
|
|
|
+ // In this test case, there are 2 remote name nodes, default retry is 3.
|
|
|
|
+ // For test purpose, doWork() always returns IOException,
|
|
|
|
+ // so the total invoked times will be default retry 3 * remote NNs 2 = 6
|
|
|
|
+ assertEquals(6, invokedTimes.get());
|
|
|
|
+ } finally {
|
|
|
|
+ if (cluster != null) {
|
|
|
|
+ cluster.shutdown();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static MiniDFSCluster createMiniDFSCluster(Configuration conf,
|
|
|
|
+ int nnCount) throws IOException {
|
|
|
|
+ int basePort = 10060 + new Random().nextInt(100) * 2;
|
|
|
|
+
|
|
|
|
+ // By passing in basePort, name node will have IPC port set,
|
|
|
|
+ // which is needed for enabling roll log.
|
|
|
|
+ MiniDFSNNTopology topology =
|
|
|
|
+ MiniDFSNNTopology.simpleHATopology(nnCount, basePort);
|
|
|
|
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
|
|
|
+ .nnTopology(topology)
|
|
|
|
+ .numDataNodes(0)
|
|
|
|
+ .build();
|
|
|
|
+ return cluster;
|
|
|
|
+ }
|
|
}
|
|
}
|