|
@@ -17,11 +17,6 @@
|
|
*/
|
|
*/
|
|
package org.apache.hadoop.hdfs.server.federation.resolver.order;
|
|
package org.apache.hadoop.hdfs.server.federation.resolver.order;
|
|
|
|
|
|
-import static org.apache.hadoop.util.Time.monotonicNow;
|
|
|
|
-
|
|
|
|
-import com.google.common.annotations.VisibleForTesting;
|
|
|
|
-import com.google.common.net.HostAndPort;
|
|
|
|
-
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.net.InetAddress;
|
|
import java.net.InetAddress;
|
|
import java.net.UnknownHostException;
|
|
import java.net.UnknownHostException;
|
|
@@ -30,17 +25,14 @@ import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.Map.Entry;
|
|
import java.util.Map.Entry;
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
|
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
|
|
import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
|
|
-import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
|
|
|
|
import org.apache.hadoop.hdfs.server.federation.router.Router;
|
|
import org.apache.hadoop.hdfs.server.federation.router.Router;
|
|
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
|
|
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
|
|
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
|
|
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
|
|
-import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
|
|
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
|
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest;
|
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
|
|
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse;
|
|
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
|
|
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
|
|
@@ -50,40 +42,46 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
|
+import com.google.common.net.HostAndPort;
|
|
|
|
+
|
|
|
|
|
|
/**
|
|
/**
|
|
* The local subcluster (where the writer is) should be tried first. The writer
|
|
* The local subcluster (where the writer is) should be tried first. The writer
|
|
* is defined from the RPC query received in the RPC server.
|
|
* is defined from the RPC query received in the RPC server.
|
|
*/
|
|
*/
|
|
-public class LocalResolver implements OrderedResolver {
|
|
|
|
|
|
+public class LocalResolver extends RouterResolver<String, String> {
|
|
|
|
|
|
private static final Logger LOG =
|
|
private static final Logger LOG =
|
|
LoggerFactory.getLogger(LocalResolver.class);
|
|
LoggerFactory.getLogger(LocalResolver.class);
|
|
|
|
|
|
- /** Configuration key to set the minimum time to update the local cache.*/
|
|
|
|
- public static final String MIN_UPDATE_PERIOD_KEY =
|
|
|
|
- RBFConfigKeys.FEDERATION_ROUTER_PREFIX + "local-resolver.update-period";
|
|
|
|
- /** 10 seconds by default. */
|
|
|
|
- private static final long MIN_UPDATE_PERIOD_DEFAULT =
|
|
|
|
- TimeUnit.SECONDS.toMillis(10);
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- /** Router service. */
|
|
|
|
- private final Router router;
|
|
|
|
- /** Minimum update time. */
|
|
|
|
- private final long minUpdateTime;
|
|
|
|
|
|
+ public LocalResolver(final Configuration conf, final Router routerService) {
|
|
|
|
+ super(conf, routerService);
|
|
|
|
+ }
|
|
|
|
|
|
- /** Node IP -> Subcluster. */
|
|
|
|
- private Map<String, String> nodeSubcluster = null;
|
|
|
|
- /** Last time the subcluster map was updated. */
|
|
|
|
- private long lastUpdated;
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Get the mapping from nodes to subcluster. It gets this mapping from the
|
|
|
|
+ * subclusters through expensive calls (e.g., RPC) and uses caching to avoid
|
|
|
|
+ * too many calls. The cache might be updated asynchronously to reduce
|
|
|
|
+ * latency.
|
|
|
|
+ *
|
|
|
|
+ * @return Node IP -> Subcluster.
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ protected Map<String, String> getSubclusterInfo(
|
|
|
|
+ MembershipStore membershipStore) {
|
|
|
|
+ Map<String, String> mapping = new HashMap<>();
|
|
|
|
|
|
|
|
+ Map<String, String> dnSubcluster = getDatanodesSubcluster();
|
|
|
|
+ if (dnSubcluster != null) {
|
|
|
|
+ mapping.putAll(dnSubcluster);
|
|
|
|
+ }
|
|
|
|
|
|
- public LocalResolver(final Configuration conf, final Router routerService) {
|
|
|
|
- this.minUpdateTime = conf.getTimeDuration(
|
|
|
|
- MIN_UPDATE_PERIOD_KEY, MIN_UPDATE_PERIOD_DEFAULT,
|
|
|
|
- TimeUnit.MILLISECONDS);
|
|
|
|
- this.router = routerService;
|
|
|
|
|
|
+ Map<String, String> nnSubcluster = getNamenodesSubcluster(membershipStore);
|
|
|
|
+ if (nnSubcluster != null) {
|
|
|
|
+ mapping.putAll(nnSubcluster);
|
|
|
|
+ }
|
|
|
|
+ return mapping;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -98,12 +96,12 @@ public class LocalResolver implements OrderedResolver {
|
|
* @return Local name space. Null if we don't know about this machine.
|
|
* @return Local name space. Null if we don't know about this machine.
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
- public String getFirstNamespace(final String path, final PathLocation loc) {
|
|
|
|
|
|
+ protected String chooseFirstNamespace(String path, PathLocation loc) {
|
|
String localSubcluster = null;
|
|
String localSubcluster = null;
|
|
String clientAddr = getClientAddr();
|
|
String clientAddr = getClientAddr();
|
|
- Map<String, String> nodeToSubcluster = getSubclusterMappings();
|
|
|
|
- if (nodeToSubcluster != null) {
|
|
|
|
- localSubcluster = nodeToSubcluster.get(clientAddr);
|
|
|
|
|
|
+ Map<String, String> subclusterInfo = getSubclusterMapping();
|
|
|
|
+ if (subclusterInfo != null) {
|
|
|
|
+ localSubcluster = subclusterInfo.get(clientAddr);
|
|
if (localSubcluster != null) {
|
|
if (localSubcluster != null) {
|
|
LOG.debug("Local namespace for {} is {}", clientAddr, localSubcluster);
|
|
LOG.debug("Local namespace for {} is {}", clientAddr, localSubcluster);
|
|
} else {
|
|
} else {
|
|
@@ -121,52 +119,6 @@ public class LocalResolver implements OrderedResolver {
|
|
return Server.getRemoteAddress();
|
|
return Server.getRemoteAddress();
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * Get the mapping from nodes to subcluster. It gets this mapping from the
|
|
|
|
- * subclusters through expensive calls (e.g., RPC) and uses caching to avoid
|
|
|
|
- * too many calls. The cache might be updated asynchronously to reduce
|
|
|
|
- * latency.
|
|
|
|
- *
|
|
|
|
- * @return Node IP -> Subcluster.
|
|
|
|
- */
|
|
|
|
- @VisibleForTesting
|
|
|
|
- synchronized Map<String, String> getSubclusterMappings() {
|
|
|
|
- if (nodeSubcluster == null ||
|
|
|
|
- (monotonicNow() - lastUpdated) > minUpdateTime) {
|
|
|
|
- // Fetch the mapping asynchronously
|
|
|
|
- Thread updater = new Thread(new Runnable() {
|
|
|
|
- @Override
|
|
|
|
- public void run() {
|
|
|
|
- Map<String, String> mapping = new HashMap<>();
|
|
|
|
-
|
|
|
|
- Map<String, String> dnSubcluster = getDatanodesSubcluster();
|
|
|
|
- if (dnSubcluster != null) {
|
|
|
|
- mapping.putAll(dnSubcluster);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- Map<String, String> nnSubcluster = getNamenodesSubcluster();
|
|
|
|
- if (nnSubcluster != null) {
|
|
|
|
- mapping.putAll(nnSubcluster);
|
|
|
|
- }
|
|
|
|
- nodeSubcluster = mapping;
|
|
|
|
- lastUpdated = monotonicNow();
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
- updater.start();
|
|
|
|
-
|
|
|
|
- // Wait until initialized
|
|
|
|
- if (nodeSubcluster == null) {
|
|
|
|
- try {
|
|
|
|
- LOG.debug("Wait to get the mapping for the first time");
|
|
|
|
- updater.join();
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
- LOG.error("Cannot wait for the updater to finish");
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- return nodeSubcluster;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Get the Datanode mapping from the subclusters from the Namenodes. This
|
|
* Get the Datanode mapping from the subclusters from the Namenodes. This
|
|
* needs to be done as a privileged action to use the user for the Router and
|
|
* needs to be done as a privileged action to use the user for the Router and
|
|
@@ -221,14 +173,8 @@ public class LocalResolver implements OrderedResolver {
|
|
*
|
|
*
|
|
* @return NN IP -> Subcluster.
|
|
* @return NN IP -> Subcluster.
|
|
*/
|
|
*/
|
|
- private Map<String, String> getNamenodesSubcluster() {
|
|
|
|
-
|
|
|
|
- final MembershipStore membershipStore = getMembershipStore();
|
|
|
|
- if (membershipStore == null) {
|
|
|
|
- LOG.error("Cannot access the Membership store");
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
|
|
+ private Map<String, String> getNamenodesSubcluster(
|
|
|
|
+ MembershipStore membershipStore) {
|
|
// Manage requests from this hostname (127.0.0.1)
|
|
// Manage requests from this hostname (127.0.0.1)
|
|
String localIp = "127.0.0.1";
|
|
String localIp = "127.0.0.1";
|
|
String localHostname = localIp;
|
|
String localHostname = localIp;
|
|
@@ -269,29 +215,4 @@ public class LocalResolver implements OrderedResolver {
|
|
}
|
|
}
|
|
return ret;
|
|
return ret;
|
|
}
|
|
}
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Get the Router RPC server.
|
|
|
|
- *
|
|
|
|
- * @return Router RPC server. Null if not possible.
|
|
|
|
- */
|
|
|
|
- private RouterRpcServer getRpcServer() {
|
|
|
|
- if (this.router == null) {
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
- return router.getRpcServer();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Get the Membership store.
|
|
|
|
- *
|
|
|
|
- * @return Membership store.
|
|
|
|
- */
|
|
|
|
- private MembershipStore getMembershipStore() {
|
|
|
|
- StateStoreService stateStore = router.getStateStore();
|
|
|
|
- if (stateStore == null) {
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
- return stateStore.getRegisteredRecordStore(MembershipStore.class);
|
|
|
|
- }
|
|
|
|
}
|
|
}
|