|
@@ -21,10 +21,8 @@ import static org.junit.Assert.*;
|
|
|
|
|
|
import java.io.File;
|
|
import java.io.File;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
-import java.net.URI;
|
|
|
|
|
|
+import java.net.InetSocketAddress;
|
|
import java.net.URL;
|
|
import java.net.URL;
|
|
-import java.util.ArrayList;
|
|
|
|
-import java.util.List;
|
|
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -35,16 +33,21 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
|
+import org.apache.hadoop.hdfs.NameNodeProxies;
|
|
|
|
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
|
import org.apache.hadoop.hdfs.server.journalservice.JournalHttpServer;
|
|
import org.apache.hadoop.hdfs.server.journalservice.JournalHttpServer;
|
|
-import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
|
|
|
|
|
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
|
import org.apache.hadoop.net.NetUtils;
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.log4j.Level;
|
|
import org.apache.log4j.Level;
|
|
import org.junit.Before;
|
|
import org.junit.Before;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
|
|
+import org.mockito.Mockito;
|
|
|
|
|
|
public class TestJournalHttpServer {
|
|
public class TestJournalHttpServer {
|
|
- public static final Log LOG = LogFactory
|
|
|
|
- .getLog(TestJournalHttpServer.class);
|
|
|
|
|
|
+ public static final Log LOG = LogFactory.getLog(TestJournalHttpServer.class);
|
|
|
|
|
|
static {
|
|
static {
|
|
((Log4JLogger) JournalHttpServer.LOG).getLogger().setLevel(Level.ALL);
|
|
((Log4JLogger) JournalHttpServer.LOG).getLogger().setLevel(Level.ALL);
|
|
@@ -52,7 +55,7 @@ public class TestJournalHttpServer {
|
|
|
|
|
|
private Configuration conf;
|
|
private Configuration conf;
|
|
private File hdfsDir = null;
|
|
private File hdfsDir = null;
|
|
- private File path1;
|
|
|
|
|
|
+ private File path1, path2;
|
|
|
|
|
|
@Before
|
|
@Before
|
|
public void setUp() throws Exception {
|
|
public void setUp() throws Exception {
|
|
@@ -67,24 +70,20 @@ public class TestJournalHttpServer {
|
|
hdfsDir.mkdirs();
|
|
hdfsDir.mkdirs();
|
|
// TODO: remove the manual setting storage when JN is fully implemented
|
|
// TODO: remove the manual setting storage when JN is fully implemented
|
|
path1 = new File(hdfsDir, "jn1dir");
|
|
path1 = new File(hdfsDir, "jn1dir");
|
|
|
|
+ path2 = new File(hdfsDir, "jn2dir");
|
|
path1.mkdir();
|
|
path1.mkdir();
|
|
- if (!path1.exists()) {
|
|
|
|
|
|
+ path2.mkdir();
|
|
|
|
+ if (!path1.exists() || !path2.exists()) {
|
|
throw new IOException("Couldn't create path in "
|
|
throw new IOException("Couldn't create path in "
|
|
+ hdfsDir.getAbsolutePath());
|
|
+ hdfsDir.getAbsolutePath());
|
|
}
|
|
}
|
|
|
|
|
|
System.out.println("configuring hdfsdir is " + hdfsDir.getAbsolutePath()
|
|
System.out.println("configuring hdfsdir is " + hdfsDir.getAbsolutePath()
|
|
- + "; jn1Dir = " + path1.getPath());
|
|
|
|
-
|
|
|
|
- File path1current = new File(path1, "current");
|
|
|
|
- path1current.mkdir();
|
|
|
|
- if (!path1current.exists()) {
|
|
|
|
- throw new IOException("Couldn't create path " + path1current);
|
|
|
|
- }
|
|
|
|
|
|
+ + "; jn1Dir = " + path1.getPath() + "; jn2Dir = " + path2.getPath());
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Test JN Http Server
|
|
|
|
|
|
+ * Test Journal service Http Server
|
|
*
|
|
*
|
|
* @throws Exception
|
|
* @throws Exception
|
|
*/
|
|
*/
|
|
@@ -97,12 +96,7 @@ public class TestJournalHttpServer {
|
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
|
|
|
|
|
conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, path1.getPath());
|
|
conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, path1.getPath());
|
|
- // TODO: remove the manual setting storage when JN is fully implemented
|
|
|
|
- URI uri = new URI(new String("file:" + path1.getPath()));
|
|
|
|
- List<URI> editsDirs = new ArrayList<URI>();
|
|
|
|
- editsDirs.add(uri);
|
|
|
|
- NNStorage storage = new NNStorage(conf, new ArrayList<URI>(), editsDirs);
|
|
|
|
- jns1 = new JournalHttpServer(conf, storage,
|
|
|
|
|
|
+ jns1 = new JournalHttpServer(conf, new Journal(conf),
|
|
NetUtils.createSocketAddr("localhost:50200"));
|
|
NetUtils.createSocketAddr("localhost:50200"));
|
|
jns1.start();
|
|
jns1.start();
|
|
|
|
|
|
@@ -120,4 +114,82 @@ public class TestJournalHttpServer {
|
|
cluster.shutdown();
|
|
cluster.shutdown();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ //TODO: remove this method when the same rpc is supported by journal service
|
|
|
|
+ private RemoteEditLogManifest editsToDownload(InetSocketAddress srcRpcAddr,
|
|
|
|
+ long txid) throws IOException {
|
|
|
|
+
|
|
|
|
+ NamenodeProtocol namenode = NameNodeProxies.createNonHAProxy(conf,
|
|
|
|
+ srcRpcAddr, NamenodeProtocol.class,
|
|
|
|
+ UserGroupInformation.getCurrentUser(), true).getProxy();
|
|
|
|
+
|
|
|
|
+ // get all edit segments
|
|
|
|
+ RemoteEditLogManifest manifest = namenode.getEditLogManifest(txid);
|
|
|
|
+
|
|
|
|
+ return manifest;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Test lagging Journal service copies edit segments from another Journal
|
|
|
|
+ * service
|
|
|
|
+ *
|
|
|
|
+ * @throws Exception
|
|
|
|
+ */
|
|
|
|
+ @Test
|
|
|
|
+ public void testCopyEdits() throws Exception {
|
|
|
|
+ MiniDFSCluster cluster = null;
|
|
|
|
+ JournalHttpServer jns1 = null, jns2 = null;
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
|
|
|
+
|
|
|
|
+ // restart namenode, so it will have one finalized edit segment
|
|
|
|
+ cluster.restartNameNode();
|
|
|
|
+
|
|
|
|
+ // get namenode clusterID/layoutVersion/namespaceID
|
|
|
|
+ InetSocketAddress nnAddr = cluster.getNameNode(0).getNameNodeAddress();
|
|
|
|
+ InetSocketAddress serverAddr = new InetSocketAddress(0);
|
|
|
|
+ JournalListener listener = Mockito.mock(JournalListener.class);
|
|
|
|
+ JournalService service = new JournalService(conf, nnAddr, serverAddr,
|
|
|
|
+ listener);
|
|
|
|
+ service.start();
|
|
|
|
+ StorageInfo si = service.getJournal().getStorage();
|
|
|
|
+ service.stop();
|
|
|
|
+
|
|
|
|
+ // start jns1 with path1
|
|
|
|
+ conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, path1.getPath());
|
|
|
|
+ Journal journal1 = new Journal(conf);
|
|
|
|
+ journal1.format(si.namespaceID, si.clusterID);
|
|
|
|
+ jns1 = new JournalHttpServer(conf, journal1,
|
|
|
|
+ NetUtils.createSocketAddr("localhost:50200"));
|
|
|
|
+ jns1.start();
|
|
|
|
+
|
|
|
|
+ InetSocketAddress srcRpcAddr = NameNode.getServiceAddress(conf, true);
|
|
|
|
+ RemoteEditLogManifest manifest = editsToDownload(srcRpcAddr, 1);
|
|
|
|
+
|
|
|
|
+ String addr = conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
|
|
|
+ jns1.downloadEditFiles(addr, manifest);
|
|
|
|
+
|
|
|
|
+ // start jns2 with path2
|
|
|
|
+ conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, path2.getPath());
|
|
|
|
+ Journal journal2 = new Journal(conf);
|
|
|
|
+ journal2.format(si.namespaceID, si.clusterID);
|
|
|
|
+ jns2 = new JournalHttpServer(conf, journal2,
|
|
|
|
+ NetUtils.createSocketAddr("localhost:50300"));
|
|
|
|
+ jns2.start();
|
|
|
|
+
|
|
|
|
+ jns2.downloadEditFiles("localhost:50200", manifest);
|
|
|
|
+
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ LOG.error("Error in TestCopyEdits:", e);
|
|
|
|
+ assertTrue(e.getLocalizedMessage(), false);
|
|
|
|
+ } finally {
|
|
|
|
+ if (jns1 != null)
|
|
|
|
+ jns1.stop();
|
|
|
|
+ if (jns2 != null)
|
|
|
|
+ jns2.stop();
|
|
|
|
+ if (cluster != null)
|
|
|
|
+ cluster.shutdown();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|