|
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
|
|
+import org.apache.hadoop.http.HttpServer;
|
|
|
import org.apache.hadoop.ipc.*;
|
|
|
import org.apache.hadoop.conf.*;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
@@ -67,8 +68,8 @@ import java.util.Iterator;
|
|
|
* 'NameNode' refers to both this class as well as the 'NameNode server'.
|
|
|
* The 'FSNamesystem' class actually performs most of the filesystem
|
|
|
* management. The majority of the 'NameNode' class itself is concerned
|
|
|
- * with exposing the IPC interface to the outside world, plus some
|
|
|
- * configuration management.
|
|
|
+ * with exposing the IPC interface and the http server to the outside world,
|
|
|
+ * plus some configuration management.
|
|
|
*
|
|
|
* NameNode implements the ClientProtocol interface, which allows
|
|
|
* clients to ask for DFS services. ClientProtocol is not
|
|
@@ -103,14 +104,16 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
|
|
|
|
|
|
public static final Log LOG = LogFactory.getLog(NameNode.class.getName());
|
|
|
public static final Log stateChangeLog = LogFactory.getLog("org.apache.hadoop.hdfs.StateChange");
|
|
|
- public FSNamesystem namesystem;
|
|
|
+ public FSNamesystem namesystem; // TODO: This should private. Use getNamesystem() instead.
|
|
|
+ /** RPC server */
|
|
|
private Server server;
|
|
|
+ /** RPC server address */
|
|
|
+ private InetSocketAddress serverAddress = null;
|
|
|
+ /** httpServer */
|
|
|
+ private HttpServer httpServer;
|
|
|
+ /** HTTP server address */
|
|
|
+ private InetSocketAddress httpAddress = null;
|
|
|
private Thread emptier;
|
|
|
- private int handlerCount = 2;
|
|
|
- private boolean supportAppends = true; // allow appending to hdfs files
|
|
|
-
|
|
|
- private InetSocketAddress nameNodeAddress = null;
|
|
|
-
|
|
|
/** only used for testing purposes */
|
|
|
private boolean stopRequested = false;
|
|
|
|
|
@@ -122,6 +125,10 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
|
|
|
|
|
|
static NameNodeMetrics myMetrics;
|
|
|
|
|
|
+ public FSNamesystem getNamesystem() {
|
|
|
+ return namesystem;
|
|
|
+ }
|
|
|
+
|
|
|
public static NameNodeMetrics getNameNodeMetrics() {
|
|
|
return myMetrics;
|
|
|
}
|
|
@@ -139,45 +146,90 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
|
|
|
String portString = port == DEFAULT_PORT ? "" : (":"+port);
|
|
|
return URI.create("hdfs://"+ namenode.getHostName()+portString);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * Initialize the server
|
|
|
+ * Initialize name-node.
|
|
|
*
|
|
|
- * @param address hostname:port to bind to
|
|
|
* @param conf the configuration
|
|
|
*/
|
|
|
- private void initialize(String address, Configuration conf) throws IOException {
|
|
|
- InetSocketAddress socAddr = NameNode.getAddress(address);
|
|
|
- this.supportAppends = conf.getBoolean("dfs.support.append", true);
|
|
|
- this.handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
|
|
|
+ private void initialize(Configuration conf) throws IOException {
|
|
|
+ InetSocketAddress socAddr = NameNode.getAddress(conf);
|
|
|
+ int handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
|
|
|
+ // create rpc server
|
|
|
this.server = RPC.getServer(this, socAddr.getHostName(), socAddr.getPort(),
|
|
|
handlerCount, false, conf);
|
|
|
|
|
|
// The rpc-server port can be ephemeral... ensure we have the correct info
|
|
|
- this.nameNodeAddress = this.server.getListenerAddress();
|
|
|
- FileSystem.setDefaultUri(conf, getUri(nameNodeAddress));
|
|
|
- LOG.info("Namenode up at: " + this.nameNodeAddress);
|
|
|
+ this.serverAddress = this.server.getListenerAddress();
|
|
|
+ FileSystem.setDefaultUri(conf, getUri(serverAddress));
|
|
|
+ LOG.info("Namenode up at: " + this.serverAddress);
|
|
|
|
|
|
myMetrics = new NameNodeMetrics(conf, this);
|
|
|
|
|
|
this.namesystem = new FSNamesystem(this, conf);
|
|
|
+ startHttpServer(conf);
|
|
|
this.server.start(); //start RPC server
|
|
|
+ startTrashEmptier(conf);
|
|
|
+ }
|
|
|
|
|
|
+ private void startTrashEmptier(Configuration conf) throws IOException {
|
|
|
this.emptier = new Thread(new Trash(conf).getEmptier(), "Trash Emptier");
|
|
|
this.emptier.setDaemon(true);
|
|
|
this.emptier.start();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ private void startHttpServer(Configuration conf) throws IOException {
|
|
|
+ String infoAddr =
|
|
|
+ NetUtils.getServerAddress(conf, "dfs.info.bindAddress",
|
|
|
+ "dfs.info.port", "dfs.http.address");
|
|
|
+ InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
|
|
|
+ String infoHost = infoSocAddr.getHostName();
|
|
|
+ int infoPort = infoSocAddr.getPort();
|
|
|
+ this.httpServer = new HttpServer("hdfs", infoHost, infoPort,
|
|
|
+ infoPort == 0, conf);
|
|
|
+ if (conf.getBoolean("dfs.https.enable", false)) {
|
|
|
+ boolean needClientAuth = conf.getBoolean("dfs.https.need.client.auth", false);
|
|
|
+ InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(
|
|
|
+ "dfs.https.address", infoHost + ":" + 0));
|
|
|
+ Configuration sslConf = new Configuration(false);
|
|
|
+ sslConf.addResource(conf.get("dfs.https.server.keystore.resource",
|
|
|
+ "ssl-server.xml"));
|
|
|
+ this.httpServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth);
|
|
|
+ // assume same ssl port for all datanodes
|
|
|
+ InetSocketAddress datanodeSslPort = NetUtils.createSocketAddr(conf.get(
|
|
|
+ "dfs.datanode.https.address", infoHost + ":" + 50475));
|
|
|
+ this.httpServer.setAttribute("datanode.https.port", datanodeSslPort
|
|
|
+ .getPort());
|
|
|
+ }
|
|
|
+ this.httpServer.setAttribute("name.node", this);
|
|
|
+ this.httpServer.setAttribute("name.node.address", getNameNodeAddress());
|
|
|
+ this.httpServer.setAttribute("name.system.image", getFSImage());
|
|
|
+ this.httpServer.setAttribute("name.conf", conf);
|
|
|
+ this.httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class);
|
|
|
+ this.httpServer.addInternalServlet("getimage", "/getimage", GetImageServlet.class);
|
|
|
+ this.httpServer.addInternalServlet("listPaths", "/listPaths/*", ListPathsServlet.class);
|
|
|
+ this.httpServer.addInternalServlet("data", "/data/*", FileDataServlet.class);
|
|
|
+ this.httpServer.addInternalServlet("checksum", "/fileChecksum/*",
|
|
|
+ FileChecksumServlets.RedirectServlet.class);
|
|
|
+ this.httpServer.start();
|
|
|
+
|
|
|
+ // The web-server port can be ephemeral... ensure we have the correct info
|
|
|
+ infoPort = this.httpServer.getPort();
|
|
|
+ this.httpAddress = new InetSocketAddress(infoHost, infoPort);
|
|
|
+ conf.set("dfs.http.address", infoHost + ":" + infoPort);
|
|
|
+ LOG.info("Web-server up at: " + infoHost + ":" + infoPort);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Start NameNode.
|
|
|
* <p>
|
|
|
* The name-node can be started with one of the following startup options:
|
|
|
* <ul>
|
|
|
- * <li>{@link org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption#REGULAR REGULAR} - normal startup</li>
|
|
|
- * <li>{@link org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption#FORMAT FORMAT} - format name node</li>
|
|
|
- * <li>{@link org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption#UPGRADE UPGRADE} - start the cluster
|
|
|
+ * <li>{@link StartupOption#REGULAR REGULAR} - normal name node startup</li>
|
|
|
+ * <li>{@link StartupOption#FORMAT FORMAT} - format name node</li>
|
|
|
+ * <li>{@link StartupOption#UPGRADE UPGRADE} - start the cluster
|
|
|
* upgrade and create a snapshot of the current file system state</li>
|
|
|
- * <li>{@link org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption#ROLLBACK ROLLBACK} - roll the
|
|
|
+ * <li>{@link StartupOption#ROLLBACK ROLLBACK} - roll the
|
|
|
* cluster back to the previous state</li>
|
|
|
* </ul>
|
|
|
* The option is passed via configuration field:
|
|
@@ -191,21 +243,8 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public NameNode(Configuration conf) throws IOException {
|
|
|
- this(FileSystem.getDefaultUri(conf).getAuthority(), conf);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Create a NameNode at the specified location and start it.
|
|
|
- *
|
|
|
- * The conf will be modified to reflect the actual ports on which
|
|
|
- * the NameNode is up and running if the user passes the port as
|
|
|
- * <code>zero</code>.
|
|
|
- */
|
|
|
- public NameNode(String bindAddress,
|
|
|
- Configuration conf
|
|
|
- ) throws IOException {
|
|
|
try {
|
|
|
- initialize(bindAddress, conf);
|
|
|
+ initialize(conf);
|
|
|
} catch (IOException e) {
|
|
|
this.stop();
|
|
|
throw e;
|
|
@@ -230,6 +269,11 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
|
|
|
if (stopRequested)
|
|
|
return;
|
|
|
stopRequested = true;
|
|
|
+ try {
|
|
|
+ if (httpServer != null) httpServer.stop();
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ LOG.error(StringUtils.stringifyException(ie));
|
|
|
+ }
|
|
|
if(namesystem != null) namesystem.close();
|
|
|
if(emptier != null) emptier.interrupt();
|
|
|
if(server != null) server.stop();
|
|
@@ -313,11 +357,6 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
|
|
|
stateChangeLog.debug("*DIR* NameNode.append: file "
|
|
|
+src+" for "+clientName+" at "+clientMachine);
|
|
|
}
|
|
|
- if (supportAppends == false) {
|
|
|
- throw new IOException("Append to hdfs not supported." +
|
|
|
- " Please refer to dfs.support.append configuration parameter.");
|
|
|
- }
|
|
|
-
|
|
|
LocatedBlock info = namesystem.appendFile(src, clientName, clientMachine);
|
|
|
myMetrics.numFilesAppended.inc();
|
|
|
return info;
|
|
@@ -725,7 +764,17 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
|
|
|
* @return the address on which the NameNodes is listening to.
|
|
|
*/
|
|
|
public InetSocketAddress getNameNodeAddress() {
|
|
|
- return nameNodeAddress;
|
|
|
+ return serverAddress;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns the address of the NameNodes http server,
|
|
|
+ * which is used to access the name-node web UI.
|
|
|
+ *
|
|
|
+ * @return the http address.
|
|
|
+ */
|
|
|
+ public InetSocketAddress getHttpAddress() {
|
|
|
+ return httpAddress;
|
|
|
}
|
|
|
|
|
|
NetworkTopology getNetworkTopology() {
|
|
@@ -802,8 +851,7 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
|
|
|
StartupOption.IMPORT.getName() + "]");
|
|
|
}
|
|
|
|
|
|
- private static StartupOption parseArguments(String args[],
|
|
|
- Configuration conf) {
|
|
|
+ private static StartupOption parseArguments(String args[]) {
|
|
|
int argsLen = (args == null) ? 0 : args.length;
|
|
|
StartupOption startOpt = StartupOption.REGULAR;
|
|
|
for(int i=0; i < argsLen; i++) {
|
|
@@ -823,7 +871,6 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
|
|
|
} else
|
|
|
return null;
|
|
|
}
|
|
|
- setStartupOption(conf, startOpt);
|
|
|
return startOpt;
|
|
|
}
|
|
|
|
|
@@ -840,11 +887,12 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
|
|
|
Configuration conf) throws IOException {
|
|
|
if (conf == null)
|
|
|
conf = new Configuration();
|
|
|
- StartupOption startOpt = parseArguments(argv, conf);
|
|
|
+ StartupOption startOpt = parseArguments(argv);
|
|
|
if (startOpt == null) {
|
|
|
printUsage();
|
|
|
return null;
|
|
|
}
|
|
|
+ setStartupOption(conf, startOpt);
|
|
|
|
|
|
switch (startOpt) {
|
|
|
case FORMAT:
|