|
@@ -462,7 +462,7 @@ public class DataNode extends ReconfigurableBase
|
|
|
private final Tracer tracer;
|
|
|
private static final int NUM_CORES = Runtime.getRuntime()
|
|
|
.availableProcessors();
|
|
|
- private static final double CONGESTION_RATIO = 1.5;
|
|
|
+ private final double congestionRatio;
|
|
|
private DiskBalancer diskBalancer;
|
|
|
private DataSetLockManager dataSetLockManager;
|
|
|
|
|
@@ -515,6 +515,10 @@ public class DataNode extends ReconfigurableBase
|
|
|
volumeChecker = new DatasetVolumeChecker(conf, new Timer());
|
|
|
this.xferService =
|
|
|
HadoopExecutors.newCachedThreadPool(new Daemon.DaemonFactory());
|
|
|
+ double congestionRationTmp = conf.getDouble(DFSConfigKeys.DFS_PIPELINE_CONGESTION_RATIO,
|
|
|
+ DFSConfigKeys.DFS_PIPELINE_CONGESTION_RATIO_DEFAULT);
|
|
|
+ this.congestionRatio = congestionRationTmp > 0 ?
|
|
|
+ congestionRationTmp : DFSConfigKeys.DFS_PIPELINE_CONGESTION_RATIO_DEFAULT;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -614,6 +618,10 @@ public class DataNode extends ReconfigurableBase
|
|
|
new DataTransferThrottler(100, ecReconstuctReadBandwidth) : null;
|
|
|
this.ecReconstuctWriteThrottler = ecReconstuctWriteBandwidth > 0 ?
|
|
|
new DataTransferThrottler(100, ecReconstuctWriteBandwidth) : null;
|
|
|
+ double congestionRationTmp = conf.getDouble(DFSConfigKeys.DFS_PIPELINE_CONGESTION_RATIO,
|
|
|
+ DFSConfigKeys.DFS_PIPELINE_CONGESTION_RATIO_DEFAULT);
|
|
|
+ this.congestionRatio = congestionRationTmp > 0 ?
|
|
|
+ congestionRationTmp : DFSConfigKeys.DFS_PIPELINE_CONGESTION_RATIO_DEFAULT;
|
|
|
}
|
|
|
|
|
|
@Override // ReconfigurableBase
|
|
@@ -1070,7 +1078,7 @@ public class DataNode extends ReconfigurableBase
|
|
|
}
|
|
|
double load = ManagementFactory.getOperatingSystemMXBean()
|
|
|
.getSystemLoadAverage();
|
|
|
- return load > NUM_CORES * CONGESTION_RATIO ? PipelineAck.ECN.CONGESTED :
|
|
|
+ return load > NUM_CORES * congestionRatio ? PipelineAck.ECN.CONGESTED :
|
|
|
PipelineAck.ECN.SUPPORTED;
|
|
|
}
|
|
|
|