|
@@ -253,7 +253,6 @@ public class DatanodeManager {
|
|
|
final boolean dataNodePeerStatsEnabledVal =
|
|
|
conf.getBoolean(DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
|
|
|
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT);
|
|
|
- initSlowPeerTracker(conf, timer, dataNodePeerStatsEnabledVal);
|
|
|
this.maxSlowPeerReportNodes = conf.getInt(
|
|
|
DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY,
|
|
|
DFSConfigKeys.DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_DEFAULT);
|
|
@@ -261,9 +260,7 @@ public class DatanodeManager {
|
|
|
DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_KEY,
|
|
|
DFSConfigKeys.DFS_NAMENODE_SLOWPEER_COLLECT_INTERVAL_DEFAULT,
|
|
|
TimeUnit.MILLISECONDS);
|
|
|
- if (slowPeerTracker.isSlowPeerTrackerEnabled()) {
|
|
|
- startSlowPeerCollector();
|
|
|
- }
|
|
|
+ initSlowPeerTracker(conf, timer, dataNodePeerStatsEnabledVal);
|
|
|
this.slowDiskTracker = dataNodeDiskStatsEnabled ?
|
|
|
new SlowDiskTracker(conf, timer) : null;
|
|
|
this.defaultXferPort = NetUtils.createSocketAddr(
|
|
@@ -376,10 +373,16 @@ public class DatanodeManager {
|
|
|
this.slowPeerTracker = dataNodePeerStatsEnabled ?
|
|
|
new SlowPeerTracker(conf, timer) :
|
|
|
new SlowPeerDisabledTracker(conf, timer);
|
|
|
+ if (slowPeerTracker.isSlowPeerTrackerEnabled()) {
|
|
|
+ startSlowPeerCollector();
|
|
|
+ } else {
|
|
|
+ stopSlowPeerCollector();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void startSlowPeerCollector() {
|
|
|
if (slowPeerCollectorDaemon != null) {
|
|
|
+ LOG.warn("Slow peers collection thread has been started.");
|
|
|
return;
|
|
|
}
|
|
|
slowPeerCollectorDaemon = new Daemon(new Runnable() {
|
|
@@ -402,9 +405,11 @@ public class DatanodeManager {
|
|
|
}
|
|
|
});
|
|
|
slowPeerCollectorDaemon.start();
|
|
|
+ LOG.info("Slow peers collection thread start.");
|
|
|
}
|
|
|
|
|
|
public void stopSlowPeerCollector() {
|
|
|
+ LOG.info("Slow peers collection thread shutdown");
|
|
|
if (slowPeerCollectorDaemon == null) {
|
|
|
return;
|
|
|
}
|
|
@@ -413,6 +418,8 @@ public class DatanodeManager {
|
|
|
slowPeerCollectorDaemon.join();
|
|
|
} catch (InterruptedException e) {
|
|
|
LOG.error("Slow peers collection thread did not shutdown", e);
|
|
|
+ } finally {
|
|
|
+ slowPeerCollectorDaemon = null;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2270,4 +2277,9 @@ public class DatanodeManager {
|
|
|
Preconditions.checkNotNull(slowPeerTracker, "slowPeerTracker should not be un-assigned");
|
|
|
slowPeerTracker.setMaxSlowPeersToReport(maxSlowPeersToReport);
|
|
|
}
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ public boolean isSlowPeerCollectorInitialized() {
|
|
|
+ return slowPeerCollectorDaemon == null;
|
|
|
+ }
|
|
|
}
|