|
@@ -38,6 +38,7 @@ import java.util.TreeSet;
|
|
|
|
|
|
import com.google.common.base.Joiner;
|
|
|
import com.google.common.base.Optional;
|
|
|
+
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
@@ -70,6 +71,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
|
|
+import org.apache.hadoop.hdfs.protocol.ReconfigurationProtocol;
|
|
|
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.SnapshotException;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
@@ -415,7 +417,8 @@ public class DFSAdmin extends FsShell {
|
|
|
"\t[-refreshSuperUserGroupsConfiguration]\n" +
|
|
|
"\t[-refreshCallQueue]\n" +
|
|
|
"\t[-refresh <host:ipc_port> <key> [arg1..argn]\n" +
|
|
|
- "\t[-reconfig <datanode|...> <host:ipc_port> <start|status|properties>]\n" +
|
|
|
+ "\t[-reconfig <namenode|datanode> <host:ipc_port> " +
|
|
|
+ "<start|status|properties>]\n" +
|
|
|
"\t[-printTopology]\n" +
|
|
|
"\t[-refreshNamenodes datanode_host:ipc_port]\n"+
|
|
|
"\t[-deleteBlockPool datanode_host:ipc_port blockpoolId [force]]\n"+
|
|
@@ -1000,12 +1003,12 @@ public class DFSAdmin extends FsShell {
|
|
|
|
|
|
String refreshCallQueue = "-refreshCallQueue: Reload the call queue from config\n";
|
|
|
|
|
|
- String reconfig = "-reconfig <datanode|...> <host:ipc_port> <start|status|properties>:\n" +
|
|
|
+ String reconfig = "-reconfig <namenode|datanode> <host:ipc_port> " +
|
|
|
+ "<start|status|properties>:\n" +
|
|
|
"\tStarts or gets the status of a reconfiguration operation, \n" +
|
|
|
"\tor gets a list of reconfigurable properties.\n" +
|
|
|
- "\tThe second parameter specifies the node type.\n" +
|
|
|
- "\tCurrently, only reloading DataNode's configuration is supported.\n";
|
|
|
|
|
|
+ "\tThe second parameter specifies the node type\n";
|
|
|
String genericRefresh = "-refresh: Arguments are <hostname:port> <resource_identifier> [arg1..argn]\n" +
|
|
|
"\tTriggers a runtime-refresh of the resource specified by <resource_identifier>\n" +
|
|
|
"\ton <hostname:port>. All other args after are sent to the host.\n";
|
|
@@ -1466,104 +1469,186 @@ public class DFSAdmin extends FsShell {
|
|
|
String nodeType = argv[i];
|
|
|
String address = argv[i + 1];
|
|
|
String op = argv[i + 2];
|
|
|
+
|
|
|
if ("start".equals(op)) {
|
|
|
- return startReconfiguration(nodeType, address);
|
|
|
+ return startReconfiguration(nodeType, address, System.out, System.err);
|
|
|
} else if ("status".equals(op)) {
|
|
|
return getReconfigurationStatus(nodeType, address, System.out, System.err);
|
|
|
} else if ("properties".equals(op)) {
|
|
|
- return getReconfigurableProperties(
|
|
|
- nodeType, address, System.out, System.err);
|
|
|
+ return getReconfigurableProperties(nodeType, address, System.out,
|
|
|
+ System.err);
|
|
|
}
|
|
|
System.err.println("Unknown operation: " + op);
|
|
|
return -1;
|
|
|
}
|
|
|
|
|
|
- int startReconfiguration(String nodeType, String address) throws IOException {
|
|
|
- if ("datanode".equals(nodeType)) {
|
|
|
- ClientDatanodeProtocol dnProxy = getDataNodeProxy(address);
|
|
|
- dnProxy.startReconfiguration();
|
|
|
- System.out.println("Started reconfiguration task on DataNode " + address);
|
|
|
+ int startReconfiguration(final String nodeThpe, final String address)
|
|
|
+ throws IOException {
|
|
|
+ return startReconfiguration(nodeThpe, address, System.out, System.err);
|
|
|
+ }
|
|
|
+
|
|
|
+ int startReconfiguration(final String nodeType, final String address,
|
|
|
+ final PrintStream out, final PrintStream err) throws IOException {
|
|
|
+ String outMsg = null;
|
|
|
+ String errMsg = null;
|
|
|
+ int ret = 0;
|
|
|
+
|
|
|
+ try {
|
|
|
+ ret = startReconfigurationDispatch(nodeType, address, out, err);
|
|
|
+ outMsg = String.format("Started reconfiguration task on node [%s].",
|
|
|
+ address);
|
|
|
+ } catch (IOException e) {
|
|
|
+ errMsg = String.format("Node [%s] reconfiguring: %s.", address,
|
|
|
+ e.toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (errMsg != null) {
|
|
|
+ err.println(errMsg);
|
|
|
+ return 1;
|
|
|
+ } else {
|
|
|
+ out.println(outMsg);
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ int startReconfigurationDispatch(final String nodeType,
|
|
|
+ final String address, final PrintStream out, final PrintStream err)
|
|
|
+ throws IOException {
|
|
|
+ if ("namenode".equals(nodeType)) {
|
|
|
+ ReconfigurationProtocol reconfProxy = getNameNodeProxy(address);
|
|
|
+ reconfProxy.startReconfiguration();
|
|
|
+ return 0;
|
|
|
+ } else if ("datanode".equals(nodeType)) {
|
|
|
+ ClientDatanodeProtocol reconfProxy = getDataNodeProxy(address);
|
|
|
+ reconfProxy.startReconfiguration();
|
|
|
return 0;
|
|
|
} else {
|
|
|
- System.err.println("Node type " + nodeType +
|
|
|
- " does not support reconfiguration.");
|
|
|
+ System.err.println("Node type " + nodeType
|
|
|
+ + " does not support reconfiguration.");
|
|
|
return 1;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- int getReconfigurationStatus(String nodeType, String address,
|
|
|
- PrintStream out, PrintStream err) throws IOException {
|
|
|
- if ("datanode".equals(nodeType)) {
|
|
|
- ClientDatanodeProtocol dnProxy = getDataNodeProxy(address);
|
|
|
- try {
|
|
|
- ReconfigurationTaskStatus status = dnProxy.getReconfigurationStatus();
|
|
|
- out.print("Reconfiguring status for DataNode[" + address + "]: ");
|
|
|
- if (!status.hasTask()) {
|
|
|
- out.println("no task was found.");
|
|
|
- return 0;
|
|
|
- }
|
|
|
- out.print("started at " + new Date(status.getStartTime()));
|
|
|
- if (!status.stopped()) {
|
|
|
- out.println(" and is still running.");
|
|
|
- return 0;
|
|
|
- }
|
|
|
+ int getReconfigurationStatus(final String nodeType, final String address,
|
|
|
+ final PrintStream out, final PrintStream err) throws IOException {
|
|
|
+ String outMsg = null;
|
|
|
+ String errMsg = null;
|
|
|
+ ReconfigurationTaskStatus status = null;
|
|
|
|
|
|
- out.println(" and finished at " +
|
|
|
- new Date(status.getEndTime()).toString() + ".");
|
|
|
- if (status.getStatus() == null) {
|
|
|
- // Nothing to report.
|
|
|
- return 0;
|
|
|
- }
|
|
|
- for (Map.Entry<PropertyChange, Optional<String>> result :
|
|
|
- status.getStatus().entrySet()) {
|
|
|
- if (!result.getValue().isPresent()) {
|
|
|
- out.printf(
|
|
|
- "SUCCESS: Changed property %s%n\tFrom: \"%s\"%n\tTo: \"%s\"%n",
|
|
|
- result.getKey().prop, result.getKey().oldVal,
|
|
|
- result.getKey().newVal);
|
|
|
- } else {
|
|
|
- final String errorMsg = result.getValue().get();
|
|
|
- out.printf(
|
|
|
- "FAILED: Change property %s%n\tFrom: \"%s\"%n\tTo: \"%s\"%n",
|
|
|
- result.getKey().prop, result.getKey().oldVal,
|
|
|
- result.getKey().newVal);
|
|
|
- out.println("\tError: " + errorMsg + ".");
|
|
|
- }
|
|
|
+ try {
|
|
|
+ status = getReconfigurationStatusDispatch(nodeType, address, out, err);
|
|
|
+ outMsg = String.format("Reconfiguring status for node [%s]: ", address);
|
|
|
+ } catch (IOException e) {
|
|
|
+ errMsg = String.format("Node [%s] reloading configuration: %s.", address,
|
|
|
+ e.toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (errMsg != null) {
|
|
|
+ err.println(errMsg);
|
|
|
+ return 1;
|
|
|
+ } else {
|
|
|
+ out.print(outMsg);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (status != null) {
|
|
|
+ if (!status.hasTask()) {
|
|
|
+ out.println("no task was found.");
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ out.print("started at " + new Date(status.getStartTime()));
|
|
|
+ if (!status.stopped()) {
|
|
|
+ out.println(" and is still running.");
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ out.println(" and finished at "
|
|
|
+ + new Date(status.getEndTime()).toString() + ".");
|
|
|
+ if (status.getStatus() == null) {
|
|
|
+ // Nothing to report.
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ for (Map.Entry<PropertyChange, Optional<String>> result : status
|
|
|
+ .getStatus().entrySet()) {
|
|
|
+ if (!result.getValue().isPresent()) {
|
|
|
+ out.printf(
|
|
|
+ "SUCCESS: Changed property %s%n\tFrom: \"%s\"%n\tTo: \"%s\"%n",
|
|
|
+ result.getKey().prop, result.getKey().oldVal,
|
|
|
+ result.getKey().newVal);
|
|
|
+ } else {
|
|
|
+ final String errorMsg = result.getValue().get();
|
|
|
+ out.printf(
|
|
|
+ "FAILED: Change property %s%n\tFrom: \"%s\"%n\tTo: \"%s\"%n",
|
|
|
+ result.getKey().prop, result.getKey().oldVal,
|
|
|
+ result.getKey().newVal);
|
|
|
+ out.println("\tError: " + errorMsg + ".");
|
|
|
}
|
|
|
- } catch (IOException e) {
|
|
|
- err.println("DataNode reloading configuration: " + e + ".");
|
|
|
- return 1;
|
|
|
}
|
|
|
} else {
|
|
|
- err.println("Node type " + nodeType +
|
|
|
- " does not support reconfiguration.");
|
|
|
return 1;
|
|
|
}
|
|
|
+
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
- int getReconfigurableProperties(String nodeType, String address,
|
|
|
- PrintStream out, PrintStream err) throws IOException {
|
|
|
- if ("datanode".equals(nodeType)) {
|
|
|
- ClientDatanodeProtocol dnProxy = getDataNodeProxy(address);
|
|
|
- try {
|
|
|
- List<String> properties =
|
|
|
- dnProxy.listReconfigurableProperties();
|
|
|
- out.println(
|
|
|
- "Configuration properties that are allowed to be reconfigured:");
|
|
|
- for (String name : properties) {
|
|
|
- out.println(name);
|
|
|
- }
|
|
|
- } catch (IOException e) {
|
|
|
- err.println("DataNode reconfiguration: " + e + ".");
|
|
|
- return 1;
|
|
|
- }
|
|
|
+ ReconfigurationTaskStatus getReconfigurationStatusDispatch(
|
|
|
+ final String nodeType, final String address, final PrintStream out,
|
|
|
+ final PrintStream err) throws IOException {
|
|
|
+ if ("namenode".equals(nodeType)) {
|
|
|
+ ReconfigurationProtocol reconfProxy = getNameNodeProxy(address);
|
|
|
+ return reconfProxy.getReconfigurationStatus();
|
|
|
+ } else if ("datanode".equals(nodeType)) {
|
|
|
+ ClientDatanodeProtocol reconfProxy = getDataNodeProxy(address);
|
|
|
+ return reconfProxy.getReconfigurationStatus();
|
|
|
} else {
|
|
|
- err.println("Node type " + nodeType +
|
|
|
- " does not support reconfiguration.");
|
|
|
+ err.println("Node type " + nodeType
|
|
|
+ + " does not support reconfiguration.");
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ int getReconfigurableProperties(final String nodeType, final String address,
|
|
|
+ final PrintStream out, final PrintStream err) throws IOException {
|
|
|
+ String outMsg = null;
|
|
|
+ String errMsg = null;
|
|
|
+ List<String> properties = null;
|
|
|
+
|
|
|
+ try {
|
|
|
+ properties = getReconfigurablePropertiesDispatch(nodeType, address, out,
|
|
|
+ err);
|
|
|
+ outMsg = String.format("Node [%s] Reconfigurable properties:", address);
|
|
|
+ } catch (IOException e) {
|
|
|
+ errMsg = String.format("Node [%s] reconfiguration: %s.", address,
|
|
|
+ e.toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (errMsg != null) {
|
|
|
+ err.println(errMsg);
|
|
|
return 1;
|
|
|
+ } else if (properties == null) {
|
|
|
+ return 1;
|
|
|
+ } else {
|
|
|
+ out.println(outMsg);
|
|
|
+ for (String name : properties) {
|
|
|
+ out.println(name);
|
|
|
+ }
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ List<String> getReconfigurablePropertiesDispatch(final String nodeType,
|
|
|
+ final String address, final PrintStream out, final PrintStream err)
|
|
|
+ throws IOException {
|
|
|
+ if ("namenode".equals(nodeType)) {
|
|
|
+ ReconfigurationProtocol reconfProxy = getNameNodeProxy(address);
|
|
|
+ return reconfProxy.listReconfigurableProperties();
|
|
|
+ } else if ("datanode".equals(nodeType)) {
|
|
|
+ ClientDatanodeProtocol reconfProxy = getDataNodeProxy(address);
|
|
|
+ return reconfProxy.listReconfigurableProperties();
|
|
|
+ } else {
|
|
|
+ err.println("Node type " + nodeType
|
|
|
+ + " does not support reconfiguration.");
|
|
|
+ return null;
|
|
|
}
|
|
|
- return 0;
|
|
|
}
|
|
|
|
|
|
public int genericRefresh(String[] argv, int i) throws IOException {
|
|
@@ -1685,7 +1770,7 @@ public class DFSAdmin extends FsShell {
|
|
|
+ " [-refreshCallQueue]");
|
|
|
} else if ("-reconfig".equals(cmd)) {
|
|
|
System.err.println("Usage: hdfs dfsadmin"
|
|
|
- + " [-reconfig <datanode|...> <host:port> <start|status>]");
|
|
|
+ + " [-reconfig <namenode|datanode> <host:port> <start|status>]");
|
|
|
} else if ("-refresh".equals(cmd)) {
|
|
|
System.err.println("Usage: hdfs dfsadmin"
|
|
|
+ " [-refresh <hostname:port> <resource_identifier> [arg1..argn]");
|
|
@@ -2001,6 +2086,23 @@ public class DFSAdmin extends FsShell {
|
|
|
NetUtils.getSocketFactory(conf, ClientDatanodeProtocol.class));
|
|
|
return dnProtocol;
|
|
|
}
|
|
|
+
|
|
|
+ private ReconfigurationProtocol getNameNodeProxy(String node)
|
|
|
+ throws IOException {
|
|
|
+ InetSocketAddress nodeAddr = NetUtils.createSocketAddr(node);
|
|
|
+ // Get the current configuration
|
|
|
+ Configuration conf = getConf();
|
|
|
+
|
|
|
+ // For namenode proxy the server principal should be NN's one.
|
|
|
+ conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY,
|
|
|
+ conf.get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, ""));
|
|
|
+
|
|
|
+ // Create the client
|
|
|
+ ReconfigurationProtocol reconfigProtocol = DFSUtilClient
|
|
|
+ .createReconfigurationProtocolProxy(nodeAddr, getUGI(), conf,
|
|
|
+ NetUtils.getSocketFactory(conf, ReconfigurationProtocol.class));
|
|
|
+ return reconfigProtocol;
|
|
|
+ }
|
|
|
|
|
|
private int deleteBlockPool(String[] argv, int i) throws IOException {
|
|
|
ClientDatanodeProtocol dnProxy = getDataNodeProxy(argv[i]);
|