|
@@ -17,26 +17,34 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.hdfs.qjournal;
|
|
|
|
|
|
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO;
|
|
|
+import static org.junit.Assert.fail;
|
|
|
+
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.net.URI;
|
|
|
import java.net.URISyntaxException;
|
|
|
import java.util.List;
|
|
|
+import java.util.concurrent.TimeoutException;
|
|
|
|
|
|
+import com.google.common.base.Supplier;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
+import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
|
|
|
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
|
|
|
import com.google.common.base.Joiner;
|
|
|
import com.google.common.collect.Lists;
|
|
|
+import org.apache.hadoop.test.GenericTestUtils;
|
|
|
|
|
|
public class MiniJournalCluster {
|
|
|
+ public static final String CLUSTER_WAITACTIVE_URI = "waitactive";
|
|
|
public static class Builder {
|
|
|
private String baseDir;
|
|
|
private int numJournalNodes = 3;
|
|
@@ -217,4 +225,34 @@ public class MiniJournalCluster {
|
|
|
return nodes.length;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Wait until all the journalnodes start.
|
|
|
+ */
|
|
|
+ public void waitActive() throws IOException {
|
|
|
+ for (int i = 0; i < nodes.length; i++) {
|
|
|
+ final int index = i;
|
|
|
+ try {
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ // wait until all JN's IPC server is running
|
|
|
+ @Override public Boolean get() {
|
|
|
+ try {
|
|
|
+ QuorumJournalManager qjm =
|
|
|
+ new QuorumJournalManager(nodes[index].node.getConf(),
|
|
|
+ getQuorumJournalURI(CLUSTER_WAITACTIVE_URI), FAKE_NSINFO);
|
|
|
+ qjm.hasSomeData();
|
|
|
+ qjm.close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ // Exception from IPC call, likely due to server not ready yet.
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }, 50, 3000);
|
|
|
+ } catch (TimeoutException e) {
|
|
|
+ fail("Time out while waiting for journal node " + index + " to start.");
|
|
|
+ } catch (InterruptedException ite) {
|
|
|
+ LOG.warn("Thread interrupted when waiting for node start", ite);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|