|
@@ -20,8 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode;
|
|
|
|
|
|
import com.google.common.base.Preconditions;
|
|
|
import org.apache.commons.codec.digest.DigestUtils;
|
|
|
-import org.slf4j.Logger;
|
|
|
-import org.slf4j.LoggerFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
@@ -35,6 +33,8 @@ import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
|
|
|
import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
import org.codehaus.jackson.map.ObjectMapper;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.nio.charset.Charset;
|
|
@@ -45,6 +45,7 @@ import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.Future;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.locks.ReentrantLock;
|
|
|
|
|
|
/**
|
|
@@ -99,6 +100,9 @@ public class DiskBalancer {
|
|
|
this.isDiskBalancerEnabled = conf.getBoolean(
|
|
|
DFSConfigKeys.DFS_DISK_BALANCER_ENABLED,
|
|
|
DFSConfigKeys.DFS_DISK_BALANCER_ENABLED_DEFAULT);
|
|
|
+ this.bandwidth = conf.getInt(
|
|
|
+ DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THRUPUT,
|
|
|
+ DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THRUPUT_DEFAULT);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -144,13 +148,11 @@ public class DiskBalancer {
|
|
|
* @param planID - A SHA512 of the plan string
|
|
|
* @param planVersion - version of the plan string - for future use.
|
|
|
* @param plan - Actual Plan
|
|
|
- * @param bandwidth - BytesPerSec to copy
|
|
|
* @param force - Skip some validations and execute the plan file.
|
|
|
* @throws DiskBalancerException
|
|
|
*/
|
|
|
public void submitPlan(String planID, long planVersion, String plan,
|
|
|
- long bandwidth, boolean force)
|
|
|
- throws DiskBalancerException {
|
|
|
+ boolean force) throws DiskBalancerException {
|
|
|
|
|
|
lock.lock();
|
|
|
try {
|
|
@@ -160,12 +162,10 @@ public class DiskBalancer {
|
|
|
throw new DiskBalancerException("Executing another plan",
|
|
|
DiskBalancerException.Result.PLAN_ALREADY_IN_PROGRESS);
|
|
|
}
|
|
|
- NodePlan nodePlan =
|
|
|
- verifyPlan(planID, planVersion, plan, bandwidth, force);
|
|
|
+ NodePlan nodePlan = verifyPlan(planID, planVersion, plan, force);
|
|
|
createWorkPlan(nodePlan);
|
|
|
this.planID = planID;
|
|
|
this.currentResult = Result.PLAN_UNDER_PROGRESS;
|
|
|
- this.bandwidth = bandwidth;
|
|
|
executePlan();
|
|
|
} finally {
|
|
|
lock.unlock();
|
|
@@ -292,14 +292,12 @@ public class DiskBalancer {
|
|
|
* @param planID - SHA 512 of the plan.
|
|
|
* @param planVersion - Version of the plan, for future use.
|
|
|
* @param plan - Plan String in Json.
|
|
|
- * @param bandwidth - Max disk bandwidth to use per second.
|
|
|
* @param force - Skip verifying when the plan was generated.
|
|
|
* @return a NodePlan Object.
|
|
|
* @throws DiskBalancerException
|
|
|
*/
|
|
|
private NodePlan verifyPlan(String planID, long planVersion, String plan,
|
|
|
- long bandwidth, boolean force)
|
|
|
- throws DiskBalancerException {
|
|
|
+ boolean force) throws DiskBalancerException {
|
|
|
|
|
|
Preconditions.checkState(lock.isHeldByCurrentThread());
|
|
|
verifyPlanVersion(planVersion);
|
|
@@ -428,7 +426,7 @@ public class DiskBalancer {
|
|
|
throw new DiskBalancerException("Unable to find destination volume.",
|
|
|
DiskBalancerException.Result.INVALID_VOLUME);
|
|
|
}
|
|
|
- createWorkPlan(sourceVol, destVol, step.getBytesToMove());
|
|
|
+ createWorkPlan(sourceVol, destVol, step);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -488,17 +486,18 @@ public class DiskBalancer {
|
|
|
*
|
|
|
* @param source - Source vol
|
|
|
* @param dest - destination volume
|
|
|
- * @param bytesToMove - number of bytes to move
|
|
|
+ * @param step - Move Step
|
|
|
*/
|
|
|
private void createWorkPlan(FsVolumeSpi source, FsVolumeSpi dest,
|
|
|
- long bytesToMove) throws DiskBalancerException {
|
|
|
+ Step step) throws DiskBalancerException {
|
|
|
|
|
|
if(source.getStorageID().equals(dest.getStorageID())) {
|
|
|
- throw new DiskBalancerException("Same source and destination",
|
|
|
- DiskBalancerException.Result.INVALID_MOVE);
|
|
|
+ LOG.info("Disk Balancer - source & destination volumes are same.");
|
|
|
+ throw new DiskBalancerException("source and destination volumes are " +
|
|
|
+ "same.", DiskBalancerException.Result.INVALID_MOVE);
|
|
|
}
|
|
|
VolumePair pair = new VolumePair(source, dest);
|
|
|
-
|
|
|
+ long bytesToMove = step.getBytesToMove();
|
|
|
// In case we have a plan with more than
|
|
|
// one line of same <source, dest>
|
|
|
// we compress that into one work order.
|
|
@@ -507,6 +506,12 @@ public class DiskBalancer {
|
|
|
}
|
|
|
|
|
|
DiskBalancerWorkItem work = new DiskBalancerWorkItem(bytesToMove, 0);
|
|
|
+
|
|
|
+ // all these values can be zero, if so we will use
|
|
|
+ // values from configuration.
|
|
|
+ work.setBandwidth(step.getBandwidth());
|
|
|
+ work.setTolerancePercent(step.getTolerancePercent());
|
|
|
+ work.setMaxDiskErrors(step.getMaxDiskErrors());
|
|
|
workMap.put(pair, work);
|
|
|
}
|
|
|
|
|
@@ -600,11 +605,12 @@ public class DiskBalancer {
|
|
|
/**
|
|
|
* Actual DataMover class for DiskBalancer.
|
|
|
* <p/>
|
|
|
- * TODO : Add implementation for this class. This is here as a place holder so
|
|
|
- * that Datanode can make calls into this class.
|
|
|
*/
|
|
|
public static class DiskBalancerMover implements BlockMover {
|
|
|
private final FsDatasetSpi dataset;
|
|
|
+ private long diskBandwidth;
|
|
|
+ private long blockTolerance;
|
|
|
+ private long maxDiskErrors;
|
|
|
|
|
|
/**
|
|
|
* Constructs diskBalancerMover.
|
|
@@ -614,7 +620,42 @@ public class DiskBalancer {
|
|
|
*/
|
|
|
public DiskBalancerMover(FsDatasetSpi dataset, Configuration conf) {
|
|
|
this.dataset = dataset;
|
|
|
- // TODO : Read Config values.
|
|
|
+
|
|
|
+ this.diskBandwidth = conf.getLong(
|
|
|
+ DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THRUPUT,
|
|
|
+ DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THRUPUT_DEFAULT);
|
|
|
+
|
|
|
+ this.blockTolerance = conf.getLong(
|
|
|
+ DFSConfigKeys.DFS_DISK_BALANCER_BLOCK_TOLERANCE,
|
|
|
+ DFSConfigKeys.DFS_DISK_BALANCER_BLOCK_TOLERANCE_DEFAULT);
|
|
|
+
|
|
|
+ this.maxDiskErrors = conf.getLong(
|
|
|
+ DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_ERRORS,
|
|
|
+ DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_ERRORS_DEFAULT);
|
|
|
+
|
|
|
+ // Since these are user provided values make sure it is sane
|
|
|
+ // or ignore faulty values.
|
|
|
+ if (this.diskBandwidth <= 0) {
|
|
|
+ LOG.debug("Found 0 or less as max disk throughput, ignoring config " +
|
|
|
+ "value. value : " + diskBandwidth);
|
|
|
+ diskBandwidth =
|
|
|
+ DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_THRUPUT_DEFAULT;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (this.blockTolerance <= 0) {
|
|
|
+ LOG.debug("Found 0 or less for block tolerance value, ignoring config" +
|
|
|
+ "value. value : " + blockTolerance);
|
|
|
+ blockTolerance =
|
|
|
+ DFSConfigKeys.DFS_DISK_BALANCER_BLOCK_TOLERANCE_DEFAULT;
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ if (this.maxDiskErrors < 0) {
|
|
|
+ LOG.debug("Found less than 0 for maxDiskErrors value, ignoring " +
|
|
|
+ "config value. value : " + maxDiskErrors);
|
|
|
+ maxDiskErrors =
|
|
|
+ DFSConfigKeys.DFS_DISK_BALANCER_MAX_DISK_ERRORS_DEFAULT;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|