|
@@ -37,6 +37,8 @@ import org.apache.hadoop.hdfs.NameNodeProxies;
|
|
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
|
|
import org.apache.hadoop.hdfs.server.journalservice.JournalHttpServer;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.JournalSyncProtocol;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
@@ -68,9 +70,9 @@ public class TestJournalHttpServer {
|
|
|
}
|
|
|
|
|
|
hdfsDir.mkdirs();
|
|
|
- // TODO: remove the manual setting storage when JN is fully implemented
|
|
|
- path1 = new File(hdfsDir, "jn1dir");
|
|
|
- path2 = new File(hdfsDir, "jn2dir");
|
|
|
+
|
|
|
+ path1 = new File(hdfsDir, "j1dir");
|
|
|
+ path2 = new File(hdfsDir, "j2dir");
|
|
|
path1.mkdir();
|
|
|
path2.mkdir();
|
|
|
if (!path1.exists() || !path2.exists()) {
|
|
@@ -79,7 +81,7 @@ public class TestJournalHttpServer {
|
|
|
}
|
|
|
|
|
|
System.out.println("configuring hdfsdir is " + hdfsDir.getAbsolutePath()
|
|
|
- + "; jn1Dir = " + path1.getPath() + "; jn2Dir = " + path2.getPath());
|
|
|
+ + "; j1Dir = " + path1.getPath() + "; j2Dir = " + path2.getPath());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -90,15 +92,15 @@ public class TestJournalHttpServer {
|
|
|
@Test
|
|
|
public void testHttpServer() throws Exception {
|
|
|
MiniDFSCluster cluster = null;
|
|
|
- JournalHttpServer jns1 = null;
|
|
|
+ JournalHttpServer jh1 = null;
|
|
|
|
|
|
try {
|
|
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
|
|
|
|
|
conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, path1.getPath());
|
|
|
- jns1 = new JournalHttpServer(conf, new Journal(conf),
|
|
|
+ jh1 = new JournalHttpServer(conf, new Journal(conf),
|
|
|
NetUtils.createSocketAddr("localhost:50200"));
|
|
|
- jns1.start();
|
|
|
+ jh1.start();
|
|
|
|
|
|
String pageContents = DFSTestUtil.urlGet(new URL(
|
|
|
"http://localhost:50200/journalstatus.jsp"));
|
|
@@ -108,26 +110,12 @@ public class TestJournalHttpServer {
|
|
|
LOG.error("Error in TestHttpServer:", e);
|
|
|
assertTrue(e.getLocalizedMessage(), false);
|
|
|
} finally {
|
|
|
- if (jns1 != null)
|
|
|
- jns1.stop();
|
|
|
+ if (jh1 != null)
|
|
|
+ jh1.stop();
|
|
|
if (cluster != null)
|
|
|
cluster.shutdown();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- //TODO: remove this method when the same rpc is supported by journal service
|
|
|
- private RemoteEditLogManifest editsToDownload(InetSocketAddress srcRpcAddr,
|
|
|
- long txid) throws IOException {
|
|
|
-
|
|
|
- NamenodeProtocol namenode = NameNodeProxies.createNonHAProxy(conf,
|
|
|
- srcRpcAddr, NamenodeProtocol.class,
|
|
|
- UserGroupInformation.getCurrentUser(), true).getProxy();
|
|
|
-
|
|
|
- // get all edit segments
|
|
|
- RemoteEditLogManifest manifest = namenode.getEditLogManifest(txid);
|
|
|
-
|
|
|
- return manifest;
|
|
|
- }
|
|
|
|
|
|
/**
|
|
|
* Test lagging Journal service copies edit segments from another Journal
|
|
@@ -138,58 +126,69 @@ public class TestJournalHttpServer {
|
|
|
@Test
|
|
|
public void testCopyEdits() throws Exception {
|
|
|
MiniDFSCluster cluster = null;
|
|
|
- JournalHttpServer jns1 = null, jns2 = null;
|
|
|
+ JournalService service = null;
|
|
|
+ JournalHttpServer jhs1 = null, jhs2 = null;
|
|
|
|
|
|
try {
|
|
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
|
|
|
|
|
- // restart namenode, so it will have one finalized edit segment
|
|
|
+ // restart namenode, so it will have finalized edit segments
|
|
|
cluster.restartNameNode();
|
|
|
|
|
|
- // get namenode clusterID/layoutVersion/namespaceID
|
|
|
+ conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, path1.getPath());
|
|
|
InetSocketAddress nnAddr = cluster.getNameNode(0).getNameNodeAddress();
|
|
|
- InetSocketAddress serverAddr = new InetSocketAddress(0);
|
|
|
+ InetSocketAddress serverAddr = new InetSocketAddress(50900);
|
|
|
JournalListener listener = Mockito.mock(JournalListener.class);
|
|
|
- JournalService service = new JournalService(conf, nnAddr, serverAddr,
|
|
|
- listener);
|
|
|
+ service = new JournalService(conf, nnAddr, serverAddr, listener);
|
|
|
service.start();
|
|
|
+
|
|
|
+ // get namenode clusterID/layoutVersion/namespaceID
|
|
|
StorageInfo si = service.getJournal().getStorage();
|
|
|
- service.stop();
|
|
|
-
|
|
|
+ JournalInfo journalInfo = new JournalInfo(si.layoutVersion, si.clusterID,
|
|
|
+ si.namespaceID);
|
|
|
+
|
|
|
// start jns1 with path1
|
|
|
- conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, path1.getPath());
|
|
|
- Journal journal1 = new Journal(conf);
|
|
|
- journal1.format(si.namespaceID, si.clusterID);
|
|
|
- jns1 = new JournalHttpServer(conf, journal1,
|
|
|
+ jhs1 = new JournalHttpServer(conf, service.getJournal(),
|
|
|
NetUtils.createSocketAddr("localhost:50200"));
|
|
|
- jns1.start();
|
|
|
+ jhs1.start();
|
|
|
|
|
|
+ // get all edit segments
|
|
|
InetSocketAddress srcRpcAddr = NameNode.getServiceAddress(conf, true);
|
|
|
- RemoteEditLogManifest manifest = editsToDownload(srcRpcAddr, 1);
|
|
|
+ NamenodeProtocol namenode = NameNodeProxies.createNonHAProxy(conf,
|
|
|
+ srcRpcAddr, NamenodeProtocol.class,
|
|
|
+ UserGroupInformation.getCurrentUser(), true).getProxy();
|
|
|
|
|
|
- String addr = conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
|
|
- jns1.downloadEditFiles(addr, manifest);
|
|
|
+ RemoteEditLogManifest manifest = namenode.getEditLogManifest(1);
|
|
|
+ jhs1.downloadEditFiles(
|
|
|
+ conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY), manifest);
|
|
|
|
|
|
// start jns2 with path2
|
|
|
conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, path2.getPath());
|
|
|
Journal journal2 = new Journal(conf);
|
|
|
journal2.format(si.namespaceID, si.clusterID);
|
|
|
- jns2 = new JournalHttpServer(conf, journal2,
|
|
|
+ jhs2 = new JournalHttpServer(conf, journal2,
|
|
|
NetUtils.createSocketAddr("localhost:50300"));
|
|
|
- jns2.start();
|
|
|
+ jhs2.start();
|
|
|
|
|
|
- jns2.downloadEditFiles("localhost:50200", manifest);
|
|
|
+ // transfer edit logs from j1 to j2
|
|
|
+ JournalSyncProtocol journalp = JournalService.createProxyWithJournalSyncProtocol(
|
|
|
+ NetUtils.createSocketAddr("localhost:50900"), conf,
|
|
|
+ UserGroupInformation.getCurrentUser());
|
|
|
+ RemoteEditLogManifest manifest1 = journalp.getEditLogManifest(journalInfo, 1);
|
|
|
+ jhs2.downloadEditFiles("localhost:50200", manifest1);
|
|
|
|
|
|
} catch (IOException e) {
|
|
|
LOG.error("Error in TestCopyEdits:", e);
|
|
|
assertTrue(e.getLocalizedMessage(), false);
|
|
|
} finally {
|
|
|
- if (jns1 != null)
|
|
|
- jns1.stop();
|
|
|
- if (jns2 != null)
|
|
|
- jns2.stop();
|
|
|
+ if (jhs1 != null)
|
|
|
+ jhs1.stop();
|
|
|
+ if (jhs2 != null)
|
|
|
+ jhs2.stop();
|
|
|
if (cluster != null)
|
|
|
cluster.shutdown();
|
|
|
+ if (service != null)
|
|
|
+ service.stop();
|
|
|
}
|
|
|
}
|
|
|
}
|