|
@@ -38,6 +38,8 @@ import java.util.Set;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
|
|
+import javax.management.ObjectName;
|
|
|
|
+
|
|
import org.apache.commons.lang3.builder.ToStringBuilder;
|
|
import org.apache.commons.lang3.builder.ToStringBuilder;
|
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
|
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
|
@@ -66,6 +68,7 @@ import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
|
import org.apache.hadoop.io.IOUtils;
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
|
+import org.apache.hadoop.metrics2.util.MBeans;
|
|
import org.apache.hadoop.net.NetUtils;
|
|
import org.apache.hadoop.net.NetUtils;
|
|
import org.apache.hadoop.security.SecurityUtil;
|
|
import org.apache.hadoop.security.SecurityUtil;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
@@ -76,6 +79,7 @@ import org.apache.hadoop.util.Tool;
|
|
import org.apache.hadoop.util.ToolRunner;
|
|
import org.apache.hadoop.util.ToolRunner;
|
|
|
|
|
|
import org.apache.hadoop.util.Preconditions;
|
|
import org.apache.hadoop.util.Preconditions;
|
|
|
|
+import org.apache.hadoop.util.VersionInfo;
|
|
|
|
|
|
/** <p>The balancer is a tool that balances disk space usage on an HDFS cluster
|
|
/** <p>The balancer is a tool that balances disk space usage on an HDFS cluster
|
|
* when some datanodes become full or when new empty nodes join the cluster.
|
|
* when some datanodes become full or when new empty nodes join the cluster.
|
|
@@ -180,7 +184,7 @@ import org.apache.hadoop.util.Preconditions;
|
|
*/
|
|
*/
|
|
|
|
|
|
@InterfaceAudience.Private
|
|
@InterfaceAudience.Private
|
|
-public class Balancer {
|
|
|
|
|
|
+public class Balancer implements BalancerMXBean {
|
|
static final Logger LOG = LoggerFactory.getLogger(Balancer.class);
|
|
static final Logger LOG = LoggerFactory.getLogger(Balancer.class);
|
|
|
|
|
|
static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
|
|
static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
|
|
@@ -241,6 +245,7 @@ public class Balancer {
|
|
private final boolean sortTopNodes;
|
|
private final boolean sortTopNodes;
|
|
private final int limitOverUtilizedNum;
|
|
private final int limitOverUtilizedNum;
|
|
private final BalancerMetrics metrics;
|
|
private final BalancerMetrics metrics;
|
|
|
|
+ private ObjectName balancerInfoBeanName;
|
|
|
|
|
|
// all data node lists
|
|
// all data node lists
|
|
private final Collection<Source> overUtilized = new LinkedList<Source>();
|
|
private final Collection<Source> overUtilized = new LinkedList<Source>();
|
|
@@ -377,6 +382,8 @@ public class Balancer {
|
|
DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
|
|
DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
|
|
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
|
|
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
|
|
this.metrics = BalancerMetrics.create(this);
|
|
this.metrics = BalancerMetrics.create(this);
|
|
|
|
+
|
|
|
|
+ registerBalancerMXBean();
|
|
}
|
|
}
|
|
|
|
|
|
private static long getCapacity(DatanodeStorageReport report, StorageType t) {
|
|
private static long getCapacity(DatanodeStorageReport report, StorageType t) {
|
|
@@ -680,6 +687,13 @@ public class Balancer {
|
|
left.getDatanodeInfo(), right.getDatanodeInfo());
|
|
left.getDatanodeInfo(), right.getDatanodeInfo());
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Register BalancerMXBean.
|
|
|
|
+ */
|
|
|
|
+ private void registerBalancerMXBean() {
|
|
|
|
+ balancerInfoBeanName = MBeans.register("Balancer", "BalancerInfo", this);
|
|
|
|
+ }
|
|
|
|
+
|
|
/* reset all fields in a balancer preparing for the next iteration */
|
|
/* reset all fields in a balancer preparing for the next iteration */
|
|
void resetData(Configuration conf) {
|
|
void resetData(Configuration conf) {
|
|
this.overUtilized.clear();
|
|
this.overUtilized.clear();
|
|
@@ -689,12 +703,32 @@ public class Balancer {
|
|
this.policy.reset();
|
|
this.policy.reset();
|
|
this.dispatcher.reset(conf);
|
|
this.dispatcher.reset(conf);
|
|
DefaultMetricsSystem.removeSourceName(metrics.getName());
|
|
DefaultMetricsSystem.removeSourceName(metrics.getName());
|
|
|
|
+ if (balancerInfoBeanName != null) {
|
|
|
|
+ MBeans.unregister(balancerInfoBeanName);
|
|
|
|
+ balancerInfoBeanName = null;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
NameNodeConnector getNnc() {
|
|
NameNodeConnector getNnc() {
|
|
return nnc;
|
|
return nnc;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Override
|
|
|
|
+ public String getVersion() {
|
|
|
|
+ return VersionInfo.getVersion() + ", r" + VersionInfo.getRevision();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public String getSoftwareVersion() {
|
|
|
|
+ return VersionInfo.getVersion();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public String getCompileInfo() {
|
|
|
|
+ return VersionInfo.getDate() + " by " + VersionInfo.getUser() + " from "
|
|
|
|
+ + VersionInfo.getBranch();
|
|
|
|
+ }
|
|
|
|
+
|
|
static class Result {
|
|
static class Result {
|
|
private final ExitStatus exitStatus;
|
|
private final ExitStatus exitStatus;
|
|
private final long bytesLeftToMove;
|
|
private final long bytesLeftToMove;
|
|
@@ -860,6 +894,7 @@ public class Balancer {
|
|
+ " NameNode");
|
|
+ " NameNode");
|
|
|
|
|
|
List<NameNodeConnector> connectors = Collections.emptyList();
|
|
List<NameNodeConnector> connectors = Collections.emptyList();
|
|
|
|
+ BalancerHttpServer balancerHttpServer = startBalancerHttpServer(conf);
|
|
try {
|
|
try {
|
|
connectors = NameNodeConnector.newNameNodeConnectors(namenodes, nsIds,
|
|
connectors = NameNodeConnector.newNameNodeConnectors(namenodes, nsIds,
|
|
Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf,
|
|
Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf,
|
|
@@ -872,6 +907,9 @@ public class Balancer {
|
|
if (p.getBlockPools().size() == 0
|
|
if (p.getBlockPools().size() == 0
|
|
|| p.getBlockPools().contains(nnc.getBlockpoolID())) {
|
|
|| p.getBlockPools().contains(nnc.getBlockpoolID())) {
|
|
final Balancer b = new Balancer(nnc, p, conf);
|
|
final Balancer b = new Balancer(nnc, p, conf);
|
|
|
|
+ if (balancerHttpServer != null) {
|
|
|
|
+ balancerHttpServer.setBalancerAttribute(b);
|
|
|
|
+ }
|
|
final Result r = b.runOneIteration();
|
|
final Result r = b.runOneIteration();
|
|
r.print(iteration, nnc, System.out);
|
|
r.print(iteration, nnc, System.out);
|
|
|
|
|
|
@@ -898,6 +936,9 @@ public class Balancer {
|
|
for(NameNodeConnector nnc : connectors) {
|
|
for(NameNodeConnector nnc : connectors) {
|
|
IOUtils.cleanupWithLogger(LOG, nnc);
|
|
IOUtils.cleanupWithLogger(LOG, nnc);
|
|
}
|
|
}
|
|
|
|
+ if (balancerHttpServer != null) {
|
|
|
|
+ balancerHttpServer.stop();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
return ExitStatus.SUCCESS.getExitCode();
|
|
return ExitStatus.SUCCESS.getExitCode();
|
|
}
|
|
}
|
|
@@ -969,6 +1010,18 @@ public class Balancer {
|
|
serviceRunning = false;
|
|
serviceRunning = false;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private static BalancerHttpServer startBalancerHttpServer(Configuration conf) throws IOException {
|
|
|
|
+ boolean httpServerEnabled = conf.getBoolean(DFSConfigKeys.DFS_BALANCER_HTTPSERVER_ENABLED_KEY,
|
|
|
|
+ DFSConfigKeys.DFS_BALANCER_HTTPSERVER_ENABLED_DEFAULT);
|
|
|
|
+ if (httpServerEnabled) {
|
|
|
|
+ BalancerHttpServer balancerHttpServer = new BalancerHttpServer(conf);
|
|
|
|
+ balancerHttpServer.start();
|
|
|
|
+ return balancerHttpServer;
|
|
|
|
+ } else {
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
private static void checkKeytabAndInit(Configuration conf)
|
|
private static void checkKeytabAndInit(Configuration conf)
|
|
throws IOException {
|
|
throws IOException {
|
|
if (conf.getBoolean(DFSConfigKeys.DFS_BALANCER_KEYTAB_ENABLED_KEY,
|
|
if (conf.getBoolean(DFSConfigKeys.DFS_BALANCER_KEYTAB_ENABLED_KEY,
|