|
@@ -29,11 +29,16 @@ import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
+import java.util.concurrent.ExecutionException;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.function.Function;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
import javax.management.NotCompliantMBeanException;
|
|
|
import javax.management.ObjectName;
|
|
|
import javax.management.StandardMBean;
|
|
|
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
|
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
@@ -41,6 +46,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
|
|
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
|
|
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
|
|
|
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
|
|
|
import org.apache.hadoop.hdfs.server.federation.router.Router;
|
|
|
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
|
|
|
import org.apache.hadoop.hdfs.server.federation.store.MembershipStore;
|
|
@@ -58,6 +64,10 @@ import org.mortbay.util.ajax.JSON;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import com.google.common.cache.CacheBuilder;
|
|
|
+import com.google.common.cache.CacheLoader;
|
|
|
+import com.google.common.cache.LoadingCache;
|
|
|
+
|
|
|
/**
|
|
|
* Expose the Namenode metrics as the Router was one.
|
|
|
*/
|
|
@@ -67,6 +77,22 @@ public class NamenodeBeanMetrics
|
|
|
private static final Logger LOG =
|
|
|
LoggerFactory.getLogger(NamenodeBeanMetrics.class);
|
|
|
|
|
|
+ /** Prevent holding the page from loading too long. */
|
|
|
+ private static final String DN_REPORT_TIME_OUT =
|
|
|
+ RBFConfigKeys.FEDERATION_ROUTER_PREFIX + "dn-report.time-out";
|
|
|
+ /** We only wait for 1 second. */
|
|
|
+ private static final long DN_REPORT_TIME_OUT_DEFAULT =
|
|
|
+ TimeUnit.SECONDS.toMillis(1);
|
|
|
+
|
|
|
+ /** Time to cache the DN information. */
|
|
|
+ public static final String DN_REPORT_CACHE_EXPIRE =
|
|
|
+ RBFConfigKeys.FEDERATION_ROUTER_PREFIX + "dn-report.cache-expire";
|
|
|
+ /** We cache the DN information for 10 seconds by default. */
|
|
|
+ public static final long DN_REPORT_CACHE_EXPIRE_DEFAULT =
|
|
|
+ TimeUnit.SECONDS.toMillis(10);
|
|
|
+
|
|
|
+
|
|
|
+ /** Instance of the Router being monitored. */
|
|
|
private final Router router;
|
|
|
|
|
|
/** FSNamesystem bean. */
|
|
@@ -78,6 +104,11 @@ public class NamenodeBeanMetrics
|
|
|
/** NameNodeStatus bean. */
|
|
|
private ObjectName nnStatusBeanName;
|
|
|
|
|
|
+ /** Timeout to get the DN report. */
|
|
|
+ private final long dnReportTimeOut;
|
|
|
+ /** DN type -> full DN report in JSON. */
|
|
|
+ private final LoadingCache<DatanodeReportType, String> dnCache;
|
|
|
+
|
|
|
|
|
|
public NamenodeBeanMetrics(Router router) {
|
|
|
this.router = router;
|
|
@@ -116,6 +147,23 @@ public class NamenodeBeanMetrics
|
|
|
} catch (NotCompliantMBeanException e) {
|
|
|
throw new RuntimeException("Bad NameNodeStatus MBean setup", e);
|
|
|
}
|
|
|
+
|
|
|
+ // Initialize the cache for the DN reports
|
|
|
+ Configuration conf = router.getConfig();
|
|
|
+ this.dnReportTimeOut = conf.getTimeDuration(
|
|
|
+ DN_REPORT_TIME_OUT, DN_REPORT_TIME_OUT_DEFAULT, TimeUnit.MILLISECONDS);
|
|
|
+ long dnCacheExpire = conf.getTimeDuration(
|
|
|
+ DN_REPORT_CACHE_EXPIRE,
|
|
|
+ DN_REPORT_CACHE_EXPIRE_DEFAULT, TimeUnit.MILLISECONDS);
|
|
|
+ this.dnCache = CacheBuilder.newBuilder()
|
|
|
+ .expireAfterWrite(dnCacheExpire, TimeUnit.MILLISECONDS)
|
|
|
+ .build(
|
|
|
+ new CacheLoader<DatanodeReportType, String>() {
|
|
|
+ @Override
|
|
|
+ public String load(DatanodeReportType type) throws Exception {
|
|
|
+ return getNodesImpl(type);
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -283,17 +331,33 @@ public class NamenodeBeanMetrics
|
|
|
return this.getNodes(DatanodeReportType.DECOMMISSIONING);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Get all the nodes in the federation from a particular type. Getting this
|
|
|
+ * information is expensive and we use a cache.
|
|
|
+ * @param type Type of the datanodes to check.
|
|
|
+ * @return JSON with the nodes.
|
|
|
+ */
|
|
|
+ private String getNodes(final DatanodeReportType type) {
|
|
|
+ try {
|
|
|
+ return this.dnCache.get(type);
|
|
|
+ } catch (ExecutionException e) {
|
|
|
+ LOG.error("Cannot get the DN storage report for {}", type, e);
|
|
|
+ }
|
|
|
+ // If we cannot get the report, return empty JSON
|
|
|
+ return "{}";
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Get all the nodes in the federation from a particular type.
|
|
|
- * TODO this is expensive, we may want to cache it.
|
|
|
* @param type Type of the datanodes to check.
|
|
|
* @return JSON with the nodes.
|
|
|
*/
|
|
|
- private String getNodes(DatanodeReportType type) {
|
|
|
+ private String getNodesImpl(final DatanodeReportType type) {
|
|
|
final Map<String, Map<String, Object>> info = new HashMap<>();
|
|
|
try {
|
|
|
RouterRpcServer rpcServer = this.router.getRpcServer();
|
|
|
- DatanodeInfo[] datanodes = rpcServer.getDatanodeReport(type);
|
|
|
+ DatanodeInfo[] datanodes =
|
|
|
+ rpcServer.getDatanodeReport(type, false, dnReportTimeOut);
|
|
|
for (DatanodeInfo node : datanodes) {
|
|
|
Map<String, Object> innerinfo = new HashMap<>();
|
|
|
innerinfo.put("infoAddr", node.getInfoAddr());
|