|
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
|
|
+import org.apache.hadoop.hdfs.server.journalservice.GetJournalEditServlet;
|
|
|
import org.apache.hadoop.hdfs.server.journalservice.JournalHttpServer;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
|
|
@@ -50,6 +51,7 @@ import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.log4j.Level;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
+import org.mockito.Mockito;
|
|
|
|
|
|
public class TestJournalHttpServer {
|
|
|
public static final Log LOG = LogFactory.getLog(TestJournalHttpServer.class);
|
|
@@ -146,8 +148,8 @@ public class TestJournalHttpServer {
|
|
|
private void copyNNFiles(MiniDFSCluster cluster, File dstDir)
|
|
|
throws IOException {
|
|
|
Collection<URI> editURIs = cluster.getNameEditsDirs(0);
|
|
|
- String firstURI = editURIs.iterator().next().getPath().toString();
|
|
|
- File nnDir = new File(new String(firstURI + "/current"));
|
|
|
+ String firstURI = editURIs.iterator().next().getPath();
|
|
|
+ File nnDir = new File(firstURI + "/current");
|
|
|
|
|
|
File allFiles[] = FileUtil.listFiles(nnDir);
|
|
|
for (File f : allFiles) {
|
|
@@ -194,7 +196,7 @@ public class TestJournalHttpServer {
|
|
|
cluster.restartNameNode();
|
|
|
|
|
|
// TODO: remove file copy when NN can work with journal auto-machine
|
|
|
- copyNNFiles(cluster, new File(new String(path1.toString() + "/current")));
|
|
|
+ copyNNFiles(cluster, new File(path1 + "/current"));
|
|
|
|
|
|
// start another journal service that will do the sync
|
|
|
conf.set(DFSConfigKeys.DFS_JOURNAL_ADDRESS_KEY, "localhost:50900");
|
|
@@ -228,4 +230,82 @@ public class TestJournalHttpServer {
|
|
|
service2.stop();
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test lagging Journal service fails to copy edit segments from another
|
|
|
+ * Journal service with injected error:
|
|
|
+ * 1. start one journal service
|
|
|
+ * 2. reboot namenode so more segments are created
|
|
|
+ * 3. add another journal service and this new journal service should sync
|
|
|
+ * with the first journal service. Use injected error to fail the sync.
|
|
|
+ *
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testCopyEditsFailure() throws Exception {
|
|
|
+ MiniDFSCluster cluster = null;
|
|
|
+ JournalService service1 = null, service2 = null;
|
|
|
+ GetJournalEditServlet.FaultInjector faultInjector;
|
|
|
+ try {
|
|
|
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
|
|
+
|
|
|
+ // start journal service
|
|
|
+ conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, path1.getPath());
|
|
|
+ InetSocketAddress nnAddr = cluster.getNameNode(0).getNameNodeAddress();
|
|
|
+ InetSocketAddress serverAddr = NetUtils
|
|
|
+ .createSocketAddr("localhost:50900");
|
|
|
+ Journal j1 = new Journal(conf);
|
|
|
+ JournalListener listener1 = new JournalDiskWriter(j1);
|
|
|
+ service1 = new JournalService(conf, nnAddr, serverAddr,
|
|
|
+ new InetSocketAddress(50901), listener1, j1);
|
|
|
+ service1.start();
|
|
|
+
|
|
|
+ // get namenode clusterID/layoutVersion/namespaceID
|
|
|
+ StorageInfo si = service1.getJournal().getStorage();
|
|
|
+ JournalInfo journalInfo = new JournalInfo(si.layoutVersion, si.clusterID,
|
|
|
+ si.namespaceID);
|
|
|
+
|
|
|
+ // restart namenode, so there will be one more journal segments
|
|
|
+ cluster.restartNameNode();
|
|
|
+
|
|
|
+ // TODO: remove file copy when NN can work with journal auto-machine
|
|
|
+ copyNNFiles(cluster, new File(path1 + "/current"));
|
|
|
+
|
|
|
+ faultInjector = Mockito.mock(GetJournalEditServlet.FaultInjector.class);
|
|
|
+ GetJournalEditServlet.FaultInjector.instance = faultInjector;
|
|
|
+ Mockito
|
|
|
+ .doThrow(new IOException("Injecting failure before sending edits"))
|
|
|
+ .when(faultInjector).beforeSendEdits();
|
|
|
+ // start another journal service that will do the sync
|
|
|
+ conf.set(DFSConfigKeys.DFS_JOURNAL_ADDRESS_KEY, "localhost:50900");
|
|
|
+ conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, path2.getPath());
|
|
|
+ conf.set(DFSConfigKeys.DFS_JOURNAL_HTTP_ADDRESS_KEY,
|
|
|
+ "localhost:50902, localhost:50901");
|
|
|
+ Journal j2 = new Journal(conf);
|
|
|
+ JournalListener listener2 = new JournalDiskWriter(j2);
|
|
|
+ service2 = new JournalService(conf, nnAddr, new InetSocketAddress(50800),
|
|
|
+ NetUtils.createSocketAddr("localhost:50902"), listener2, j2);
|
|
|
+ service2.start();
|
|
|
+
|
|
|
+ // give service2 sometime to sync
|
|
|
+ Thread.sleep(5000);
|
|
|
+
|
|
|
+ // TODO: change to sinceTxid to 1 after NN is modified to use journal
|
|
|
+ // service to start
|
|
|
+ RemoteEditLogManifest manifest2 = service2.getEditLogManifest(
|
|
|
+ journalInfo, 3);
|
|
|
+ assertTrue(manifest2.getLogs().size() == 0);
|
|
|
+
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Error in TestCopyEdits:", e);
|
|
|
+ assertTrue(e.getLocalizedMessage(), false);
|
|
|
+ } finally {
|
|
|
+ if (cluster != null)
|
|
|
+ cluster.shutdown();
|
|
|
+ if (service1 != null)
|
|
|
+ service1.stop();
|
|
|
+ if (service2 != null)
|
|
|
+ service2.stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|