|
@@ -25,6 +25,7 @@ import java.net.URI;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.Collection;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
@@ -32,7 +33,12 @@ import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
import com.google.common.base.Preconditions;
|
|
|
import com.google.common.util.concurrent.RateLimiter;
|
|
|
+import org.apache.hadoop.ha.HAServiceProtocol;
|
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
+import org.apache.hadoop.hdfs.HAUtil;
|
|
|
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
|
|
+import org.apache.hadoop.ipc.RPC;
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
@@ -100,6 +106,32 @@ public class NameNodeConnector implements Closeable {
|
|
|
return connectors;
|
|
|
}
|
|
|
|
|
|
+ public static List<NameNodeConnector> newNameNodeConnectors(
|
|
|
+ Collection<URI> namenodes, Collection<String> nsIds, String name,
|
|
|
+ Path idPath, Configuration conf, int maxIdleIterations)
|
|
|
+ throws IOException {
|
|
|
+ final List<NameNodeConnector> connectors = new ArrayList<NameNodeConnector>(
|
|
|
+ namenodes.size());
|
|
|
+ Map<URI, String> uriToNsId = new HashMap<>();
|
|
|
+ if (nsIds != null) {
|
|
|
+ for (URI uri : namenodes) {
|
|
|
+ for (String nsId : nsIds) {
|
|
|
+ if (uri.getAuthority().equals(nsId)) {
|
|
|
+ uriToNsId.put(uri, nsId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ for (URI uri : namenodes) {
|
|
|
+ String nsId = uriToNsId.get(uri);
|
|
|
+ NameNodeConnector nnc = new NameNodeConnector(name, uri, nsId, idPath,
|
|
|
+ null, conf, maxIdleIterations);
|
|
|
+ nnc.getKeyManager().startBlockKeyUpdater();
|
|
|
+ connectors.add(nnc);
|
|
|
+ }
|
|
|
+ return connectors;
|
|
|
+ }
|
|
|
+
|
|
|
@VisibleForTesting
|
|
|
public static void setWrite2IdFile(boolean write2IdFile) {
|
|
|
NameNodeConnector.write2IdFile = write2IdFile;
|
|
@@ -114,6 +146,13 @@ public class NameNodeConnector implements Closeable {
|
|
|
private final String blockpoolID;
|
|
|
|
|
|
private final BalancerProtocols namenode;
|
|
|
+ /**
|
|
|
+ * If set balancerShouldRequestStandby true, Balancer will getBlocks from
|
|
|
+ * Standby NameNode only and it can reduce the performance impact of Active
|
|
|
+ * NameNode, especially in a busy HA mode cluster.
|
|
|
+ */
|
|
|
+ private boolean balancerShouldRequestStandby;
|
|
|
+ private NamenodeProtocol standbyNameNode;
|
|
|
private final KeyManager keyManager;
|
|
|
final AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
|
|
|
|
|
@@ -149,6 +188,11 @@ public class NameNodeConnector implements Closeable {
|
|
|
|
|
|
this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
|
|
|
BalancerProtocols.class, fallbackToSimpleAuth).getProxy();
|
|
|
+ this.balancerShouldRequestStandby = conf.getBoolean(
|
|
|
+ DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_KEY,
|
|
|
+ DFSConfigKeys.DFS_HA_ALLOW_STALE_READ_DEFAULT);
|
|
|
+ this.standbyNameNode = null;
|
|
|
+
|
|
|
this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf);
|
|
|
|
|
|
final NamespaceInfo namespaceinfo = namenode.versionRequest();
|
|
@@ -167,6 +211,31 @@ public class NameNodeConnector implements Closeable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public NameNodeConnector(String name, URI nameNodeUri, String nsId,
|
|
|
+ Path idPath, List<Path> targetPaths,
|
|
|
+ Configuration conf, int maxNotChangedIterations)
|
|
|
+ throws IOException {
|
|
|
+ this(name, nameNodeUri, idPath, targetPaths, conf, maxNotChangedIterations);
|
|
|
+ if (nsId != null && HAUtil.isHAEnabled(conf, nsId)) {
|
|
|
+ List<ClientProtocol> namenodes =
|
|
|
+ HAUtil.getProxiesForAllNameNodesInNameservice(conf, nsId);
|
|
|
+ for (ClientProtocol proxy : namenodes) {
|
|
|
+ try {
|
|
|
+ if (proxy.getHAServiceState().equals(
|
|
|
+ HAServiceProtocol.HAServiceState.STANDBY)) {
|
|
|
+ this.standbyNameNode = NameNodeProxies.createNonHAProxy(
|
|
|
+ conf, RPC.getServerAddress(proxy), NamenodeProtocol.class,
|
|
|
+ UserGroupInformation.getCurrentUser(), false).getProxy();
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ //Ignore the exception while connecting to a namenode.
|
|
|
+ LOG.debug("Error while connecting to namenode", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public DistributedFileSystem getDistributedFileSystem() {
|
|
|
return fs;
|
|
|
}
|
|
@@ -186,6 +255,22 @@ public class NameNodeConnector implements Closeable {
|
|
|
if (getBlocksRateLimiter != null) {
|
|
|
getBlocksRateLimiter.acquire();
|
|
|
}
|
|
|
+ boolean isRequestStandby = true;
|
|
|
+ try {
|
|
|
+ if (balancerShouldRequestStandby && standbyNameNode != null) {
|
|
|
+ return standbyNameNode.getBlocks(datanode, size, minBlockSize);
|
|
|
+ } else {
|
|
|
+ isRequestStandby = false;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("Request #getBlocks to Standby NameNode but meet exception, " +
|
|
|
+ "will fallback to normal way", e);
|
|
|
+ isRequestStandby = false;
|
|
|
+ } finally {
|
|
|
+ if (isRequestStandby) {
|
|
|
+ LOG.info("Request #getBlocks to Standby NameNode success.");
|
|
|
+ }
|
|
|
+ }
|
|
|
return namenode.getBlocks(datanode, size, minBlockSize);
|
|
|
}
|
|
|
|