|
@@ -19,389 +19,275 @@ package org.apache.hadoop.dfs;
|
|
|
|
|
|
import java.io.*;
|
|
|
import java.net.*;
|
|
|
+import java.util.ArrayList;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.dfs.FSConstants.StartupOption;
|
|
|
import org.apache.hadoop.fs.*;
|
|
|
-import org.apache.hadoop.net.NetworkTopology;
|
|
|
|
|
|
/**
|
|
|
* This class creates a single-process DFS cluster for junit testing.
|
|
|
- * One thread is created for each server.
|
|
|
* The data directories for DFS are undering the testing directory.
|
|
|
- * @author Owen O'Malley
|
|
|
*/
|
|
|
public class MiniDFSCluster {
|
|
|
|
|
|
private Configuration conf;
|
|
|
- int nDatanodes;
|
|
|
- private Thread nameNodeThread;
|
|
|
- private Thread dataNodeThreads[];
|
|
|
- private NameNodeRunner nameNode;
|
|
|
- private DataNodeRunner dataNodes[];
|
|
|
-
|
|
|
- private int nameNodePort = 0;
|
|
|
- private int nameNodeInfoPort = 0;
|
|
|
+ private NameNode nameNode;
|
|
|
+ private ArrayList<DataNode> dataNodes = new ArrayList<DataNode>();
|
|
|
+ private File base_dir;
|
|
|
+ private File data_dir;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Modify the config and start up the servers with the given operation.
|
|
|
+ * Servers will be started on free ports.
|
|
|
+ * <p>
|
|
|
+ * The caller must manage the creation of NameNode and DataNode directories
|
|
|
+ * and have already set dfs.name.dir and dfs.data.dir in the given conf.
|
|
|
+ *
|
|
|
+ * @param conf the base configuration to use in starting the servers. This
|
|
|
+ * will be modified as necessary.
|
|
|
+ * @param numDataNodes Number of DataNodes to start; may be zero
|
|
|
+ * @param operation the operation with which to start the servers. If null
|
|
|
+ * or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
|
|
|
+ */
|
|
|
+ public MiniDFSCluster(Configuration conf,
|
|
|
+ int numDataNodes,
|
|
|
+ StartupOption nameNodeOperation) throws IOException {
|
|
|
+ this(0, conf, numDataNodes, false, false, nameNodeOperation, null);
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
- * An inner class that runs a name node.
|
|
|
+ * Modify the config and start up the servers. The rpc and info ports for
|
|
|
+ * servers are guaranteed to use free ports.
|
|
|
+ * <p>
|
|
|
+ * NameNode and DataNode directory creation and configuration will be
|
|
|
+ * managed by this class.
|
|
|
+ *
|
|
|
+ * @param conf the base configuration to use in starting the servers. This
|
|
|
+ * will be modified as necessary.
|
|
|
+ * @param numDataNodes Number of DataNodes to start; may be zero
|
|
|
+ * @param format if true, format the NameNode and DataNodes before starting up
|
|
|
+ * @param racks array of strings indicating the rack that each DataNode is on
|
|
|
*/
|
|
|
- class NameNodeRunner implements Runnable {
|
|
|
- private NameNode node;
|
|
|
- private volatile boolean isInitialized = false;
|
|
|
- private boolean isCrashed = false;
|
|
|
- private boolean isRunning = true;
|
|
|
+ public MiniDFSCluster(Configuration conf,
|
|
|
+ int numDataNodes,
|
|
|
+ boolean format,
|
|
|
+ String[] racks) throws IOException {
|
|
|
+ this(0, conf, numDataNodes, format, true, null, racks);
|
|
|
+ }
|
|
|
|
|
|
- public InetSocketAddress getAddress() {
|
|
|
- return node.getNameNodeAddress();
|
|
|
- }
|
|
|
+ /**
|
|
|
+ * NOTE: if possible, the other constructors should be used as they will
|
|
|
+ * ensure that the servers use free ports.
|
|
|
+ * <p>
|
|
|
+ * Modify the config and start up the servers.
|
|
|
+ *
|
|
|
+ * @param nameNodePort suggestion for which rpc port to use. caller should
|
|
|
+ * use getNameNodePort() to get the actual port used.
|
|
|
+ * @param conf the base configuration to use in starting the servers. This
|
|
|
+ * will be modified as necessary.
|
|
|
+ * @param numDataNodes Number of DataNodes to start; may be zero
|
|
|
+ * @param format if true, format the NameNode and DataNodes before starting up
|
|
|
+ * @param manageDfsDirs if true, the data directories for servers will be
|
|
|
+ * created and dfs.name.dir and dfs.data.dir will be set in the conf
|
|
|
+ * @param operation the operation with which to start the servers. If null
|
|
|
+ * or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
|
|
|
+ * @param racks array of strings indicating the rack that each DataNode is on
|
|
|
+ */
|
|
|
+ public MiniDFSCluster(int nameNodePort,
|
|
|
+ Configuration conf,
|
|
|
+ int numDataNodes,
|
|
|
+ boolean format,
|
|
|
+ boolean manageDfsDirs,
|
|
|
+ StartupOption operation,
|
|
|
+ String[] racks) throws IOException {
|
|
|
+ this.conf = conf;
|
|
|
+ base_dir = new File(System.getProperty("test.build.data"), "dfs/");
|
|
|
+ data_dir = new File(base_dir, "data");
|
|
|
|
|
|
- synchronized public boolean isInitialized() {
|
|
|
- return isInitialized;
|
|
|
+ // Setup the NameNode configuration
|
|
|
+ conf.set("fs.default.name", "localhost:"+ Integer.toString(nameNodePort));
|
|
|
+ conf.setInt("dfs.info.port", 0);
|
|
|
+ if (manageDfsDirs) {
|
|
|
+ conf.set("dfs.name.dir", new File(base_dir, "name1").getPath()+","+
|
|
|
+ new File(base_dir, "name2").getPath());
|
|
|
}
|
|
|
+ conf.setInt("dfs.replication", Math.min(3, numDataNodes));
|
|
|
+ conf.setInt("dfs.safemode.extension", 0);
|
|
|
|
|
|
- synchronized public boolean isCrashed() {
|
|
|
- return isCrashed;
|
|
|
- }
|
|
|
-
|
|
|
- public boolean isUp() {
|
|
|
- if (node == null) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- try {
|
|
|
- long[] sizes = node.getStats();
|
|
|
- boolean isUp = false;
|
|
|
- synchronized (this) {
|
|
|
- isUp = (isInitialized && !node.isInSafeMode() && sizes[0] != 0);
|
|
|
- }
|
|
|
- return isUp;
|
|
|
- } catch (IOException ie) {
|
|
|
- return false;
|
|
|
+ // Format and clean out DataNode directories
|
|
|
+ if (format) {
|
|
|
+ if (data_dir.exists() && !FileUtil.fullyDelete(data_dir)) {
|
|
|
+ throw new IOException("Cannot remove data directory: " + data_dir);
|
|
|
}
|
|
|
+ NameNode.format(conf);
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Create the name node and run it.
|
|
|
- */
|
|
|
- public void run() {
|
|
|
- try {
|
|
|
- synchronized( this ) {
|
|
|
- if( isRunning ) {
|
|
|
- node = new NameNode(conf);
|
|
|
- }
|
|
|
- isInitialized = true;
|
|
|
- }
|
|
|
- } catch (Throwable e) {
|
|
|
- shutdown();
|
|
|
- System.err.println("Name node crashed:");
|
|
|
- e.printStackTrace();
|
|
|
- synchronized (this) {
|
|
|
- isCrashed = true;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ // Start the NameNode
|
|
|
+ String[] args = (operation == null ||
|
|
|
+ operation == StartupOption.FORMAT ||
|
|
|
+ operation == StartupOption.REGULAR) ?
|
|
|
+ new String[] {} : new String[] {"-"+operation.toString()};
|
|
|
+ nameNode = NameNode.createNameNode(args, conf);
|
|
|
|
|
|
- /**
|
|
|
- * Shutdown the name node and wait for it to finish.
|
|
|
- */
|
|
|
- public synchronized void shutdown() {
|
|
|
- isRunning = false;
|
|
|
- if (node != null) {
|
|
|
- node.stop();
|
|
|
- node.join();
|
|
|
- node = null;
|
|
|
+ // Start the DataNodes
|
|
|
+ startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks);
|
|
|
+
|
|
|
+ if (numDataNodes > 0) {
|
|
|
+ while (!isClusterUp()) {
|
|
|
+ try {
|
|
|
+ System.err.println("Waiting for the Mini HDFS Cluster to start...");
|
|
|
+ Thread.sleep(1000);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * An inner class to run the data node.
|
|
|
+ * Modify the config and start up the DataNodes. The info port for
|
|
|
+ * DataNodes is guaranteed to use a free port.
|
|
|
+ *
|
|
|
+ * @param conf the base configuration to use in starting the DataNodes. This
|
|
|
+ * will be modified as necessary.
|
|
|
+ * @param numDataNodes Number of DataNodes to start; may be zero
|
|
|
+ * @param manageDfsDirs if true, the data directories for DataNodes will be
|
|
|
+ * created and dfs.data.dir will be set in the conf
|
|
|
+ * @param operation the operation with which to start the DataNodes. If null
|
|
|
+ * or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
|
|
|
+ * @param racks array of strings indicating the rack that each DataNode is on
|
|
|
+ *
|
|
|
+ * @throws IllegalStateException if NameNode has been shutdown
|
|
|
*/
|
|
|
- class DataNodeRunner implements Runnable {
|
|
|
- private DataNode node;
|
|
|
- Configuration conf = null;
|
|
|
- private boolean isRunning = true;
|
|
|
-
|
|
|
- public DataNodeRunner(Configuration conf, File dataDir, int index) {
|
|
|
- this.conf = new Configuration(conf);
|
|
|
- this.conf.set("dfs.data.dir",
|
|
|
- new File(dataDir, "data"+(2*index+1)).getPath()+","+
|
|
|
- new File(dataDir, "data"+(2*index+2)).getPath());
|
|
|
+ public void startDataNodes(Configuration conf, int numDataNodes,
|
|
|
+ boolean manageDfsDirs, StartupOption operation,
|
|
|
+ String[] racks) throws IOException {
|
|
|
+ if (nameNode == null) {
|
|
|
+ throw new IllegalStateException("NameNode is not running");
|
|
|
}
|
|
|
+
|
|
|
+ // Set up the right ports for the datanodes
|
|
|
+ conf.setInt("dfs.datanode.info.port", 0);
|
|
|
+ InetSocketAddress nnAddr = nameNode.getNameNodeAddress();
|
|
|
+ int nameNodePort = nnAddr.getPort();
|
|
|
+ conf.set("fs.default.name",
|
|
|
+ nnAddr.getHostName()+ ":" + Integer.toString(nameNodePort));
|
|
|
|
|
|
- public DataNodeRunner(Configuration conf, File dataDir,
|
|
|
- String networkLoc, int index) {
|
|
|
- this(conf, dataDir, index);
|
|
|
- this.conf.set("dfs.datanode.rack", networkLoc);
|
|
|
- }
|
|
|
+ String[] args = (operation == null ||
|
|
|
+ operation == StartupOption.FORMAT ||
|
|
|
+ operation == StartupOption.REGULAR) ?
|
|
|
+ new String[] {} : new String[] {"-"+operation.toString()};
|
|
|
|
|
|
- /**
|
|
|
- * Create and run the data node.
|
|
|
- */
|
|
|
- public void run() {
|
|
|
- try {
|
|
|
- String[] dirs = conf.getStrings("dfs.data.dir");
|
|
|
- for (int idx = 0; idx < dirs.length; idx++) {
|
|
|
- File dataDir = new File(dirs[idx]);
|
|
|
- synchronized (DataNodeRunner.class) {
|
|
|
- if (!dataDir.mkdirs()) {
|
|
|
- if (!dataDir.isDirectory()) {
|
|
|
- throw new RuntimeException("Mkdirs failed to create directory " +
|
|
|
- dataDir.toString());
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- synchronized (this){
|
|
|
- if (isRunning) {
|
|
|
- node = new DataNode(conf, conf.get("dfs.datanode.rack",
|
|
|
- NetworkTopology.DEFAULT_RACK), dirs);
|
|
|
- }
|
|
|
+ for (int i = 0; i < numDataNodes; i++) {
|
|
|
+ Configuration dnConf = new Configuration(conf);
|
|
|
+ if (manageDfsDirs) {
|
|
|
+ File dir1 = new File(data_dir, "data"+(2*i+1));
|
|
|
+ File dir2 = new File(data_dir, "data"+(2*i+2));
|
|
|
+ dir1.mkdirs();
|
|
|
+ dir2.mkdirs();
|
|
|
+ if (!dir1.isDirectory() || !dir2.isDirectory()) {
|
|
|
+ throw new IOException("Mkdirs failed to create directory for DataNode "
|
|
|
+ + i + ": " + dir1 + " or " + dir2);
|
|
|
}
|
|
|
- node.run();
|
|
|
- } catch (Throwable e) {
|
|
|
- shutdown();
|
|
|
- System.err.println("Data node crashed:");
|
|
|
- e.printStackTrace();
|
|
|
+ dnConf.set("dfs.data.dir", dir1.getPath() + "," + dir2.getPath());
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Shut down the server and wait for it to finish.
|
|
|
- */
|
|
|
- public synchronized void shutdown() {
|
|
|
- isRunning = false;
|
|
|
- if (node != null) {
|
|
|
- node.shutdown();
|
|
|
- node = null;
|
|
|
+ if (racks != null && i < racks.length) {
|
|
|
+ dnConf.set("dfs.datanode.rack", racks[i]);
|
|
|
}
|
|
|
+ System.out.println("Starting DataNode " + i + " with dfs.data.dir: "
|
|
|
+ + dnConf.get("dfs.data.dir"));
|
|
|
+ dataNodes.add(DataNode.createDataNode(args, dnConf));
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- public MiniDFSCluster(Configuration conf,
|
|
|
- int nDatanodes,
|
|
|
- boolean formatNamenode,
|
|
|
- String[] racks) throws IOException {
|
|
|
- this(0, conf, nDatanodes, formatNamenode, racks);
|
|
|
- }
|
|
|
|
|
|
/**
|
|
|
- * Create the config and start up the servers. If either the rpc or info port is already
|
|
|
- * in use, we will try new ports.
|
|
|
- * @param namenodePort suggestion for which rpc port to use. caller should use
|
|
|
- * getNameNodePort() to get the actual port used.
|
|
|
- * @param dataNodeFirst should the datanode be brought up before the namenode?
|
|
|
- * @deprecated use {@link #MiniDFSCluster(Configuration, int, boolean, String[])}
|
|
|
+ * If the NameNode is running, attempt to finalize a previous upgrade.
|
|
|
+ * When this method return, the NameNode should be finalized, but
|
|
|
+ * DataNodes may not be since that occurs asynchronously.
|
|
|
+ *
|
|
|
+ * @throw IllegalStateException if the Namenode is not running.
|
|
|
*/
|
|
|
- public MiniDFSCluster(int namenodePort,
|
|
|
- Configuration conf,
|
|
|
- boolean dataNodeFirst) throws IOException {
|
|
|
- this(namenodePort, conf, 1, dataNodeFirst, true, null);
|
|
|
+ public void finalizeCluster(Configuration conf) throws Exception {
|
|
|
+ if (nameNode == null) {
|
|
|
+ throw new IllegalStateException("Attempting to finalize "
|
|
|
+ + "Namenode but it is not running");
|
|
|
+ }
|
|
|
+ new DFSAdmin().doMain(conf, new String[] {"-finalizeUpgrade"});
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * Create the config and start up the only the namenode. If either the rpc or info port is already
|
|
|
- * in use, we will try new ports.
|
|
|
- * @param namenodePort suggestion for which rpc port to use. caller should use
|
|
|
- * getNameNodePort() to get the actual port used.
|
|
|
- * @deprecated use {@link #MiniDFSCluster(Configuration, int, boolean, String[])}
|
|
|
+ * Gets the started NameNode. May be null.
|
|
|
*/
|
|
|
- public MiniDFSCluster(int namenodePort,
|
|
|
- Configuration conf,
|
|
|
- int numRetries,
|
|
|
- int numRetriesPerPort) throws IOException {
|
|
|
- this(namenodePort, conf, 0, false, false, null);
|
|
|
+ public NameNode getNameNode() {
|
|
|
+ return nameNode;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * Create the config and start up the servers. If either the rpc or info port is already
|
|
|
- * in use, we will try new ports.
|
|
|
- * @param namenodePort suggestion for which rpc port to use. caller should use
|
|
|
- * getNameNodePort() to get the actual port used.
|
|
|
- * @param nDatanodes Number of datanodes
|
|
|
- * @param dataNodeFirst should the datanode be brought up before the namenode?
|
|
|
- * @deprecated use {@link #MiniDFSCluster(Configuration, int, boolean, String[])}
|
|
|
+ * Gets a list of the started DataNodes. May be empty.
|
|
|
*/
|
|
|
- public MiniDFSCluster(int namenodePort,
|
|
|
- Configuration conf,
|
|
|
- int nDatanodes,
|
|
|
- boolean dataNodeFirst) throws IOException {
|
|
|
- this(namenodePort, conf, nDatanodes, dataNodeFirst, true, null);
|
|
|
+ public ArrayList<DataNode> getDataNodes() {
|
|
|
+ return dataNodes;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Create the config and start up the servers. If either the rpc or info port is already
|
|
|
- * in use, we will try new ports.
|
|
|
- * @param namenodePort suggestion for which rpc port to use. caller should use
|
|
|
- * getNameNodePort() to get the actual port used.
|
|
|
- * @param nDatanodes Number of datanodes
|
|
|
- * @param dataNodeFirst should the datanode be brought up before the namenode?
|
|
|
- * @param formatNamenode should the namenode be formatted before starting up ?
|
|
|
- * @deprecated use {@link #MiniDFSCluster(Configuration, int, boolean, String[])}
|
|
|
- */
|
|
|
- public MiniDFSCluster(int namenodePort,
|
|
|
- Configuration conf,
|
|
|
- int nDatanodes,
|
|
|
- boolean dataNodeFirst,
|
|
|
- boolean formatNamenode ) throws IOException {
|
|
|
- this(namenodePort, conf, nDatanodes, dataNodeFirst, formatNamenode, null);
|
|
|
+ * Gets the rpc port used by the NameNode, because the caller
|
|
|
+ * supplied port is not necessarily the actual port used.
|
|
|
+ */
|
|
|
+ public int getNameNodePort() {
|
|
|
+ return nameNode.getNameNodeAddress().getPort();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * Create the config and start up the servers. If either the rpc or info port is already
|
|
|
- * in use, we will try new ports.
|
|
|
- * @param namenodePort suggestion for which rpc port to use. caller should use
|
|
|
- * getNameNodePort() to get the actual port used.
|
|
|
- * @param nDatanodes Number of datanodes
|
|
|
- * @param dataNodeFirst should the datanode be brought up before the namenode?
|
|
|
- * @param formatNamenode should the namenode be formatted before starting up ?
|
|
|
- * @param racks array of strings indicating racks that each datanode is on
|
|
|
- * @deprecated use {@link #MiniDFSCluster(Configuration, int, String[])}
|
|
|
+ * Shut down the servers that are up.
|
|
|
*/
|
|
|
- public MiniDFSCluster(int namenodePort,
|
|
|
- Configuration conf,
|
|
|
- int nDatanodes,
|
|
|
- boolean dataNodeFirst,
|
|
|
- boolean formatNamenode,
|
|
|
- String[] racks) throws IOException {
|
|
|
- this(namenodePort, conf, nDatanodes,
|
|
|
- ! cannotStartDataNodeFirst(dataNodeFirst) &&
|
|
|
- formatNamenode, racks);
|
|
|
+ public void shutdown() {
|
|
|
+ System.out.println("Shutting down the Mini HDFS Cluster");
|
|
|
+ shutdownDataNodes();
|
|
|
+ if (nameNode != null) {
|
|
|
+ nameNode.stop();
|
|
|
+ nameNode.join();
|
|
|
+ nameNode = null;
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * NameNode should be always started first.
|
|
|
- * Data-nodes need to handshake with the name-node before they can start.
|
|
|
- *
|
|
|
- * @param dataNodeFirst should the datanode be brought up before the namenode?
|
|
|
- * @return false if dataNodeFirst is false
|
|
|
- * @throws IOException if dataNodeFirst is true
|
|
|
- *
|
|
|
- * @deprecated should be removed when dataNodeFirst is gone.
|
|
|
+ * Shutdown all DataNodes started by this class. The NameNode
|
|
|
+ * is left running so that new DataNodes may be started.
|
|
|
*/
|
|
|
- private static boolean cannotStartDataNodeFirst( boolean dataNodeFirst
|
|
|
- ) throws IOException {
|
|
|
- if( dataNodeFirst )
|
|
|
- throw new IOException( "NameNode should be always started first." );
|
|
|
- return false;
|
|
|
+ public void shutdownDataNodes() {
|
|
|
+ for (int i = dataNodes.size()-1; i >= 0; i--) {
|
|
|
+ System.out.println("Shutting down DataNode " + i);
|
|
|
+ DataNode dn = dataNodes.remove(i);
|
|
|
+ dn.shutdown();
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * Create the config and start up the servers. If either the rpc or info port is already
|
|
|
- * in use, we will try new ports.
|
|
|
- * @param namenodePort suggestion for which rpc port to use. caller should use
|
|
|
- * getNameNodePort() to get the actual port used.
|
|
|
- * @param nDatanodes Number of datanodes
|
|
|
- * @param formatNamenode should the namenode be formatted before starting up ?
|
|
|
- * @param racks array of strings indicating racks that each datanode is on
|
|
|
+ * Returns true if the NameNode is running and is out of Safe Mode.
|
|
|
*/
|
|
|
- public MiniDFSCluster(int namenodePort,
|
|
|
- Configuration conf,
|
|
|
- int nDatanodes,
|
|
|
- boolean formatNamenode,
|
|
|
- String[] racks) throws IOException {
|
|
|
- this.conf = conf;
|
|
|
-
|
|
|
- this.nDatanodes = nDatanodes;
|
|
|
- this.nameNodePort = namenodePort;
|
|
|
-
|
|
|
- this.conf.set("fs.default.name", "localhost:"+ Integer.toString(nameNodePort));
|
|
|
- this.conf.setInt("dfs.info.port", nameNodeInfoPort);
|
|
|
- this.conf.setInt("dfs.datanode.info.port", 0);
|
|
|
-
|
|
|
- File base_dir = new File(System.getProperty("test.build.data"), "dfs/");
|
|
|
- File data_dir = new File(base_dir, "data");
|
|
|
- this.conf.set("dfs.name.dir", new File(base_dir, "name1").getPath()+","+
|
|
|
- new File(base_dir, "name2").getPath());
|
|
|
- this.conf.setInt("dfs.replication", Math.min(3, nDatanodes));
|
|
|
- this.conf.setInt("dfs.safemode.extension", 0);
|
|
|
-
|
|
|
- // Create the NameNode
|
|
|
- StartupOption startOpt =
|
|
|
- formatNamenode ? StartupOption.FORMAT : StartupOption.REGULAR;
|
|
|
- conf.setObject( "dfs.namenode.startup", startOpt );
|
|
|
- conf.setObject( "dfs.datanode.startup", startOpt );
|
|
|
- nameNode = new NameNodeRunner();
|
|
|
- nameNodeThread = new Thread(nameNode);
|
|
|
-
|
|
|
- //
|
|
|
- // Start the MiniDFSCluster
|
|
|
- //
|
|
|
- // Start the namenode and wait for it to be initialized
|
|
|
- nameNodeThread.start();
|
|
|
- while (!nameNode.isCrashed() && !nameNode.isInitialized()) {
|
|
|
- try { // let daemons get started
|
|
|
- System.err.println("Waiting for the NameNode to initialize...");
|
|
|
- Thread.sleep(1000);
|
|
|
- } catch(InterruptedException e) {
|
|
|
- }
|
|
|
- if (nameNode.isCrashed()) {
|
|
|
- throw new RuntimeException("Namenode crashed");
|
|
|
- }
|
|
|
+ public boolean isClusterUp() {
|
|
|
+ if (nameNode == null) {
|
|
|
+ return false;
|
|
|
}
|
|
|
-
|
|
|
- // Set up the right ports for the datanodes
|
|
|
- InetSocketAddress nnAddr = nameNode.getAddress();
|
|
|
- nameNodePort = nnAddr.getPort();
|
|
|
- this.conf.set("fs.default.name", nnAddr.getHostName()+ ":" + Integer.toString(nameNodePort));
|
|
|
-
|
|
|
- // Start the datanodes
|
|
|
- startDataNodes(conf, racks, data_dir);
|
|
|
-
|
|
|
- while (!nameNode.isCrashed() && !nameNode.isUp()) {
|
|
|
- try { // let daemons get started
|
|
|
- System.err.println("Waiting for the Mini HDFS Cluster to start...");
|
|
|
- Thread.sleep(1000);
|
|
|
- } catch(InterruptedException e) {
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (nameNode.isCrashed()) {
|
|
|
- throw new RuntimeException("Namenode crashed");
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void startDataNodes(Configuration conf, String[] racks, File data_dir) {
|
|
|
- // Create the DataNodes & start them
|
|
|
- dataNodes = new DataNodeRunner[nDatanodes];
|
|
|
- dataNodeThreads = new Thread[nDatanodes];
|
|
|
- for (int idx = 0; idx < nDatanodes; idx++) {
|
|
|
- if( racks == null || idx >= racks.length) {
|
|
|
- dataNodes[idx] = new DataNodeRunner(conf, data_dir, idx);
|
|
|
- } else {
|
|
|
- dataNodes[idx] = new DataNodeRunner(conf, data_dir, racks[idx], idx);
|
|
|
+ try {
|
|
|
+ long[] sizes = nameNode.getStats();
|
|
|
+ boolean isUp = false;
|
|
|
+ synchronized (this) {
|
|
|
+ isUp = (!nameNode.isInSafeMode() && sizes[0] != 0);
|
|
|
}
|
|
|
- dataNodeThreads[idx] = new Thread(dataNodes[idx]);
|
|
|
- dataNodeThreads[idx].start();
|
|
|
+ return isUp;
|
|
|
+ } catch (IOException ie) {
|
|
|
+ return false;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Returns the rpc port used by the mini cluster, because the caller supplied port is
|
|
|
- * not necessarily the actual port used.
|
|
|
- */
|
|
|
- public int getNameNodePort() {
|
|
|
- return nameNode.getAddress().getPort();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Shut down the servers.
|
|
|
+ * Returns true if there is at least one DataNode running.
|
|
|
*/
|
|
|
- public void shutdown() {
|
|
|
- System.out.println("Shutting down the cluster");
|
|
|
- for (int idx = 0; idx < nDatanodes; idx++) {
|
|
|
- dataNodes[idx].shutdown();
|
|
|
- }
|
|
|
- nameNode.shutdown();
|
|
|
- for (int idx = 0; idx < nDatanodes; idx++) {
|
|
|
- try {
|
|
|
- dataNodeThreads[idx].join();
|
|
|
- } catch(InterruptedException e) {
|
|
|
- }
|
|
|
- }
|
|
|
- try {
|
|
|
- nameNodeThread.join();
|
|
|
- } catch (InterruptedException e) {
|
|
|
+ public boolean isDataNodeUp() {
|
|
|
+ if (dataNodes == null || dataNodes.size() == 0) {
|
|
|
+ return false;
|
|
|
}
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -412,14 +298,14 @@ public class MiniDFSCluster {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Get the directories where the namenode stores image
|
|
|
+ * Get the directories where the namenode stores its state.
|
|
|
*/
|
|
|
public File[] getNameDirs() {
|
|
|
return NameNode.getDirs(conf);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Wait till the cluster is active and running.
|
|
|
+ * Wait until the cluster is active and running.
|
|
|
*/
|
|
|
public void waitActive() throws IOException {
|
|
|
InetSocketAddress addr = new InetSocketAddress("localhost",
|
|
@@ -430,7 +316,7 @@ public class MiniDFSCluster {
|
|
|
// get initial state of datanodes
|
|
|
//
|
|
|
DatanodeInfo[] oldinfo = client.datanodeReport();
|
|
|
- while (oldinfo.length != nDatanodes) {
|
|
|
+ while (oldinfo.length != dataNodes.size()) {
|
|
|
try {
|
|
|
Thread.sleep(500);
|
|
|
} catch (Exception e) {
|
|
@@ -448,7 +334,7 @@ public class MiniDFSCluster {
|
|
|
} catch (Exception e) {
|
|
|
}
|
|
|
DatanodeInfo[] info = client.datanodeReport();
|
|
|
- if (info.length != nDatanodes) {
|
|
|
+ if (info.length != dataNodes.size()) {
|
|
|
continue;
|
|
|
}
|
|
|
numdead = 0;
|