|
@@ -17,13 +17,6 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.hdfs.server.namenode.ha;
|
|
|
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
|
|
-import static org.apache.hadoop.hdfs.DFSUtil.createUri;
|
|
|
-
|
|
|
import java.io.IOException;
|
|
|
import java.lang.reflect.Field;
|
|
|
import java.lang.reflect.Proxy;
|
|
@@ -36,11 +29,12 @@ import java.util.List;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.TimeoutException;
|
|
|
import java.util.concurrent.atomic.LongAccumulator;
|
|
|
+import java.util.function.Supplier;
|
|
|
|
|
|
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
|
|
|
-
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
+
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.hdfs.ClientGSIContext;
|
|
@@ -49,6 +43,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
|
|
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
|
|
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Failover;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
|
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
@@ -59,47 +54,52 @@ import org.apache.hadoop.io.retry.RetryInvocationHandler;
|
|
|
import org.apache.hadoop.test.GenericTestUtils;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
|
|
|
-import java.util.function.Supplier;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSUtil.createUri;
|
|
|
|
|
|
/**
|
|
|
* Static utility functions useful for testing HA.
|
|
|
*/
|
|
|
public abstract class HATestUtil {
|
|
|
private static final Logger LOG = LoggerFactory.getLogger(HATestUtil.class);
|
|
|
-
|
|
|
+
|
|
|
private static final String LOGICAL_HOSTNAME = "ha-nn-uri-%d";
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Trigger an edits log roll on the active and then wait for the standby to
|
|
|
* catch up to all the edits done by the active. This method will check
|
|
|
* repeatedly for up to NN_LAG_TIMEOUT milliseconds, and then fail throwing
|
|
|
* {@link CouldNotCatchUpException}
|
|
|
- *
|
|
|
+ *
|
|
|
* @param active active NN
|
|
|
* @param standby standby NN which should catch up to active
|
|
|
* @throws IOException if an error occurs rolling the edit log
|
|
|
* @throws CouldNotCatchUpException if the standby doesn't catch up to the
|
|
|
* active in NN_LAG_TIMEOUT milliseconds
|
|
|
*/
|
|
|
- public static void waitForStandbyToCatchUp(NameNode active,
|
|
|
- NameNode standby) throws InterruptedException, IOException, CouldNotCatchUpException {
|
|
|
- long activeTxId = active.getNamesystem().getFSImage().getEditLog()
|
|
|
- .getLastWrittenTxId();
|
|
|
+ public static void waitForStandbyToCatchUp(NameNode active, NameNode standby)
|
|
|
+ throws InterruptedException, IOException, CouldNotCatchUpException {
|
|
|
+ long activeTxId =
|
|
|
+ active.getNamesystem().getFSImage().getEditLog().getLastWrittenTxId();
|
|
|
|
|
|
active.getRpcServer().rollEditLog();
|
|
|
|
|
|
long start = Time.now();
|
|
|
while (Time.now() - start < TestEditLogTailer.NN_LAG_TIMEOUT) {
|
|
|
- long nn2HighestTxId = standby.getNamesystem().getFSImage()
|
|
|
- .getLastAppliedTxId();
|
|
|
+ long nn2HighestTxId =
|
|
|
+ standby.getNamesystem().getFSImage().getLastAppliedTxId();
|
|
|
if (nn2HighestTxId >= activeTxId) {
|
|
|
return;
|
|
|
}
|
|
|
Thread.sleep(TestEditLogTailer.SLEEP_TIME);
|
|
|
}
|
|
|
- throw new CouldNotCatchUpException("Standby did not catch up to txid " +
|
|
|
- activeTxId + " (currently at " +
|
|
|
- standby.getNamesystem().getFSImage().getLastAppliedTxId() + ")");
|
|
|
+ throw new CouldNotCatchUpException(
|
|
|
+ "Standby did not catch up to txid " + activeTxId + " (currently at "
|
|
|
+ + standby.getNamesystem().getFSImage().getLastAppliedTxId() + ")");
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -119,7 +119,7 @@ public abstract class HATestUtil {
|
|
|
return true;
|
|
|
}
|
|
|
}, 1000, 10000);
|
|
|
-
|
|
|
+
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -144,16 +144,18 @@ public abstract class HATestUtil {
|
|
|
super(message);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- /** Gets the filesystem instance by setting the failover configurations */
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Gets the filesystem instance by setting the failover configurations.
|
|
|
+ */
|
|
|
public static DistributedFileSystem configureFailoverFs(
|
|
|
MiniDFSCluster cluster, Configuration conf)
|
|
|
throws IOException, URISyntaxException {
|
|
|
return configureFailoverFs(cluster, conf, 0);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Gets the filesystem instance by setting the failover configurations
|
|
|
+ /**
|
|
|
+ * Gets the filesystem instance by setting the failover configurations.
|
|
|
* @param cluster the single process DFS cluster
|
|
|
* @param conf cluster configuration
|
|
|
* @param nsIndex namespace index starting with zero
|
|
@@ -164,13 +166,13 @@ public abstract class HATestUtil {
|
|
|
int nsIndex) throws IOException, URISyntaxException {
|
|
|
conf = new Configuration(conf);
|
|
|
String logicalName = getLogicalHostname(cluster);
|
|
|
- setFailoverConfigurations(cluster, conf, logicalName, nsIndex);
|
|
|
+ setFailoverConfigurations(cluster, conf, logicalName, null, nsIndex);
|
|
|
FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf);
|
|
|
return (DistributedFileSystem)fs;
|
|
|
}
|
|
|
|
|
|
public static <P extends ObserverReadProxyProvider<?>>
|
|
|
- DistributedFileSystem configureObserverReadFs(
|
|
|
+ DistributedFileSystem configureObserverReadFs(
|
|
|
MiniDFSCluster cluster, Configuration conf,
|
|
|
Class<P> classFPP, boolean isObserverReadEnabled)
|
|
|
throws IOException, URISyntaxException {
|
|
@@ -246,8 +248,8 @@ public abstract class HATestUtil {
|
|
|
return qjmhaCluster;
|
|
|
}
|
|
|
|
|
|
- public static <P extends FailoverProxyProvider<?>>
|
|
|
- void setupHAConfiguration(MiniDFSCluster cluster,
|
|
|
+ public static <P extends FailoverProxyProvider<?>> void
|
|
|
+ setupHAConfiguration(MiniDFSCluster cluster,
|
|
|
Configuration conf, int nsIndex, Class<P> classFPP) {
|
|
|
MiniDFSCluster.NameNodeInfo[] nns = cluster.getNameNodeInfos(nsIndex);
|
|
|
List<String> nnAddresses = new ArrayList<String>();
|
|
@@ -264,18 +266,23 @@ public abstract class HATestUtil {
|
|
|
Configuration conf) {
|
|
|
setFailoverConfigurations(cluster, conf, getLogicalHostname(cluster));
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/** Sets the required configurations for performing failover of default namespace. */
|
|
|
public static void setFailoverConfigurations(MiniDFSCluster cluster,
|
|
|
Configuration conf, String logicalName) {
|
|
|
- setFailoverConfigurations(cluster, conf, logicalName, 0);
|
|
|
+ setFailoverConfigurations(cluster, conf, logicalName, null, 0);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/** Sets the required configurations for performing failover. */
|
|
|
public static void setFailoverConfigurations(MiniDFSCluster cluster,
|
|
|
- Configuration conf, String logicalName, int nsIndex) {
|
|
|
- setFailoverConfigurations(cluster, conf, logicalName, nsIndex,
|
|
|
- ConfiguredFailoverProxyProvider.class);
|
|
|
+ Configuration conf, String logicalName, String proxyProvider,
|
|
|
+ int nsIndex) {
|
|
|
+ MiniDFSCluster.NameNodeInfo[] nns = cluster.getNameNodeInfos(nsIndex);
|
|
|
+ List<InetSocketAddress> nnAddresses = new ArrayList<InetSocketAddress>(3);
|
|
|
+ for (MiniDFSCluster.NameNodeInfo nn : nns) {
|
|
|
+ nnAddresses.add(nn.nameNode.getNameNodeAddress());
|
|
|
+ }
|
|
|
+ setFailoverConfigurations(conf, logicalName, proxyProvider, nnAddresses);
|
|
|
}
|
|
|
|
|
|
/** Sets the required configurations for performing failover. */
|
|
@@ -290,19 +297,56 @@ public abstract class HATestUtil {
|
|
|
setFailoverConfigurations(conf, logicalName, nnAddresses, classFPP);
|
|
|
}
|
|
|
|
|
|
- public static void setFailoverConfigurations(Configuration conf, String logicalName,
|
|
|
- InetSocketAddress ... nnAddresses){
|
|
|
- setFailoverConfigurations(conf, logicalName, Arrays.asList(nnAddresses),
|
|
|
- ConfiguredFailoverProxyProvider.class);
|
|
|
+ public static void setFailoverConfigurations(Configuration conf,
|
|
|
+ String logicalName, String proxyProvider,
|
|
|
+ InetSocketAddress... nnAddresses) {
|
|
|
+ setFailoverConfigurations(conf, logicalName, proxyProvider,
|
|
|
+ Arrays.asList(nnAddresses));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Sets the required configurations for performing failover.
|
|
|
+ */
|
|
|
+ public static void setFailoverConfigurations(
|
|
|
+ Configuration conf, String logicalName,
|
|
|
+ String proxyProvider, List<InetSocketAddress> nnAddresses) {
|
|
|
+ final List<String> addresses = new ArrayList<>();
|
|
|
+ nnAddresses.forEach(addr ->
|
|
|
+ addresses.add("hdfs://" + addr.getHostName() + ":" + addr.getPort()));
|
|
|
+ setFailoverConfigurations(conf, logicalName, proxyProvider, addresses);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static void setFailoverConfigurations(
|
|
|
+ Configuration conf, String logicalName,
|
|
|
+ String proxyProvider, Iterable<String> nnAddresses) {
|
|
|
+ List<String> nnids = new ArrayList<String>();
|
|
|
+ int i = 0;
|
|
|
+ for (String address : nnAddresses) {
|
|
|
+ String nnId = "nn" + (i + 1);
|
|
|
+ nnids.add(nnId);
|
|
|
+ conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, logicalName, nnId), address);
|
|
|
+ i++;
|
|
|
+ }
|
|
|
+ conf.set(DFSConfigKeys.DFS_NAMESERVICES, logicalName);
|
|
|
+ conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, logicalName),
|
|
|
+ Joiner.on(',').join(nnids));
|
|
|
+ if (proxyProvider == null) {
|
|
|
+ conf.set(Failover.PROXY_PROVIDER_KEY_PREFIX + "." + logicalName,
|
|
|
+ ConfiguredFailoverProxyProvider.class.getName());
|
|
|
+ } else {
|
|
|
+ conf.set(Failover.PROXY_PROVIDER_KEY_PREFIX + "." + logicalName,
|
|
|
+ proxyProvider);
|
|
|
+ }
|
|
|
+ conf.set("fs.defaultFS", "hdfs://" + logicalName);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Sets the required configurations for performing failover
|
|
|
+ * Sets the required configurations for performing failover.
|
|
|
*/
|
|
|
public static <P extends FailoverProxyProvider<?>> void
|
|
|
setFailoverConfigurations(Configuration conf, String logicalName,
|
|
|
List<InetSocketAddress> nnAddresses, Class<P> classFPP) {
|
|
|
- final List<String> addresses = new ArrayList();
|
|
|
+ final List<String> addresses = new ArrayList<>();
|
|
|
nnAddresses.forEach(
|
|
|
addr -> addresses.add(
|
|
|
"hdfs://" + addr.getHostName() + ":" + addr.getPort()));
|
|
@@ -310,7 +354,7 @@ public abstract class HATestUtil {
|
|
|
}
|
|
|
|
|
|
public static <P extends FailoverProxyProvider<?>>
|
|
|
- void setFailoverConfigurations(
|
|
|
+ void setFailoverConfigurations(
|
|
|
Configuration conf, String logicalName,
|
|
|
Iterable<String> nnAddresses, Class<P> classFPP) {
|
|
|
List<String> nnids = new ArrayList<String>();
|
|
@@ -332,13 +376,13 @@ public abstract class HATestUtil {
|
|
|
public static String getLogicalHostname(MiniDFSCluster cluster) {
|
|
|
return String.format(LOGICAL_HOSTNAME, cluster.getInstanceId());
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public static URI getLogicalUri(MiniDFSCluster cluster)
|
|
|
throws URISyntaxException {
|
|
|
return new URI(HdfsConstants.HDFS_URI_SCHEME + "://" +
|
|
|
getLogicalHostname(cluster));
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public static void waitForCheckpoint(MiniDFSCluster cluster, int nnIdx,
|
|
|
List<Integer> txids) throws InterruptedException {
|
|
|
long start = Time.now();
|