|
@@ -193,10 +193,6 @@ public class DatanodeManager {
|
|
|
private final HashMap<String, Integer> datanodesSoftwareVersions =
|
|
|
new HashMap<>(4, 0.75f);
|
|
|
|
|
|
- /**
|
|
|
- * True if we should process latency metrics from downstream peers.
|
|
|
- */
|
|
|
- private final boolean dataNodePeerStatsEnabled;
|
|
|
/**
|
|
|
* True if we should process latency metrics from individual DN disks.
|
|
|
*/
|
|
@@ -210,7 +206,7 @@ public class DatanodeManager {
|
|
|
private static final String IP_PORT_SEPARATOR = ":";
|
|
|
|
|
|
@Nullable
|
|
|
- private final SlowPeerTracker slowPeerTracker;
|
|
|
+ private SlowPeerTracker slowPeerTracker;
|
|
|
private static Set<String> slowNodesUuidSet = Sets.newConcurrentHashSet();
|
|
|
private Daemon slowPeerCollectorDaemon;
|
|
|
private final long slowPeerCollectionInterval;
|
|
@@ -247,16 +243,15 @@ public class DatanodeManager {
|
|
|
this.datanodeAdminManager = new DatanodeAdminManager(namesystem,
|
|
|
blockManager, heartbeatManager);
|
|
|
this.fsClusterStats = newFSClusterStats();
|
|
|
- this.dataNodePeerStatsEnabled = conf.getBoolean(
|
|
|
- DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
|
|
|
- DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
|
|
|
this.dataNodeDiskStatsEnabled = Util.isDiskStatsEnabled(conf.getInt(
|
|
|
DFSConfigKeys.DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY,
|
|
|
DFSConfigKeys.
|
|
|
DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_DEFAULT));
|
|
|
final Timer timer = new Timer();
|
|
|
- this.slowPeerTracker = dataNodePeerStatsEnabled ?
|
|
|
- new SlowPeerTracker(conf, timer) : null;
|
|
|
+ final boolean dataNodePeerStatsEnabledVal =
|
|
|
+ conf.getBoolean(DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
|
|
|
+ DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
|
|
|
+ initSlowPeerTracker(conf, timer, dataNodePeerStatsEnabledVal);
|
|
|
this.maxSlowPeerReportNodes = conf.getInt(
|
|
|
DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY,
|
|
|
DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT);
|
|
@@ -264,7 +259,7 @@ public class DatanodeManager {
|
|
|
DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY,
|
|
|
DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_DEFAULT,
|
|
|
TimeUnit.MILLISECONDS);
|
|
|
- if (slowPeerTracker != null) {
|
|
|
+ if (slowPeerTracker.isSlowPeerTrackerEnabled()) {
|
|
|
startSlowPeerCollector();
|
|
|
}
|
|
|
this.slowDiskTracker = dataNodeDiskStatsEnabled ?
|
|
@@ -366,6 +361,21 @@ public class DatanodeManager {
|
|
|
DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY_DEFAULT);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Determines whether slow peer tracker should be enabled. If dataNodePeerStatsEnabledVal is
|
|
|
+ * true, slow peer tracker is initialized.
|
|
|
+ *
|
|
|
+ * @param conf The configuration to use while initializing slowPeerTracker.
|
|
|
+ * @param timer Timer object for slowPeerTracker.
|
|
|
+ * @param dataNodePeerStatsEnabled To determine whether slow peer tracking should be enabled.
|
|
|
+ */
|
|
|
+ public void initSlowPeerTracker(Configuration conf, Timer timer,
|
|
|
+ boolean dataNodePeerStatsEnabled) {
|
|
|
+ this.slowPeerTracker = dataNodePeerStatsEnabled ?
|
|
|
+ new SlowPeerTracker(conf, timer) :
|
|
|
+ new SlowPeerDisabledTracker(conf, timer);
|
|
|
+ }
|
|
|
+
|
|
|
private void startSlowPeerCollector() {
|
|
|
if (slowPeerCollectorDaemon != null) {
|
|
|
return;
|
|
@@ -1871,12 +1881,13 @@ public class DatanodeManager {
|
|
|
nodeinfo.setBalancerBandwidth(0);
|
|
|
}
|
|
|
|
|
|
- if (slowPeerTracker != null) {
|
|
|
+ Preconditions.checkNotNull(slowPeerTracker, "slowPeerTracker should not be un-assigned");
|
|
|
+
|
|
|
+ if (slowPeerTracker.isSlowPeerTrackerEnabled()) {
|
|
|
final Map<String, Double> slowPeersMap = slowPeers.getSlowPeers();
|
|
|
if (!slowPeersMap.isEmpty()) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("DataNode " + nodeReg + " reported slow peers: " +
|
|
|
- slowPeersMap);
|
|
|
+ LOG.debug("DataNode " + nodeReg + " reported slow peers: " + slowPeersMap);
|
|
|
}
|
|
|
for (String slowNodeId : slowPeersMap.keySet()) {
|
|
|
slowPeerTracker.addReport(slowNodeId, nodeReg.getIpcAddr(false));
|
|
@@ -2124,7 +2135,8 @@ public class DatanodeManager {
|
|
|
* @return
|
|
|
*/
|
|
|
public String getSlowPeersReport() {
|
|
|
- return slowPeerTracker != null ? slowPeerTracker.getJson() : null;
|
|
|
+ Preconditions.checkNotNull(slowPeerTracker, "slowPeerTracker should not be un-assigned");
|
|
|
+ return slowPeerTracker.getJson();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2133,11 +2145,9 @@ public class DatanodeManager {
|
|
|
*/
|
|
|
public Set<String> getSlowPeersUuidSet() {
|
|
|
Set<String> slowPeersUuidSet = Sets.newConcurrentHashSet();
|
|
|
- if (slowPeerTracker == null) {
|
|
|
- return slowPeersUuidSet;
|
|
|
- }
|
|
|
- ArrayList<String> slowNodes =
|
|
|
- slowPeerTracker.getSlowNodes(maxSlowPeerReportNodes);
|
|
|
+ List<String> slowNodes;
|
|
|
+ Preconditions.checkNotNull(slowPeerTracker, "slowPeerTracker should not be un-assigned");
|
|
|
+ slowNodes = slowPeerTracker.getSlowNodes(maxSlowPeerReportNodes);
|
|
|
for (String slowNode : slowNodes) {
|
|
|
if (StringUtils.isBlank(slowNode)
|
|
|
|| !slowNode.contains(IP_PORT_SEPARATOR)) {
|