|
@@ -37,8 +37,6 @@ public class MiniDFSCluster {
|
|
private Thread dataNodeThreads[];
|
|
private Thread dataNodeThreads[];
|
|
private NameNodeRunner nameNode;
|
|
private NameNodeRunner nameNode;
|
|
private DataNodeRunner dataNodes[];
|
|
private DataNodeRunner dataNodes[];
|
|
- static public int MAX_RETRIES = 10;
|
|
|
|
- static public int MAX_RETRIES_PER_PORT = 10;
|
|
|
|
|
|
|
|
private int nameNodePort = 0;
|
|
private int nameNodePort = 0;
|
|
private int nameNodeInfoPort = 0;
|
|
private int nameNodeInfoPort = 0;
|
|
@@ -48,15 +46,33 @@ public class MiniDFSCluster {
|
|
*/
|
|
*/
|
|
class NameNodeRunner implements Runnable {
|
|
class NameNodeRunner implements Runnable {
|
|
private NameNode node;
|
|
private NameNode node;
|
|
|
|
+ private volatile boolean isInitialized = false;
|
|
|
|
+ private boolean isCrashed = false;
|
|
private boolean isRunning = true;
|
|
private boolean isRunning = true;
|
|
|
|
+
|
|
|
|
+ public InetSocketAddress getAddress() {
|
|
|
|
+ return node.getNameNodeAddress();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ synchronized public boolean isInitialized() {
|
|
|
|
+ return isInitialized;
|
|
|
|
+ }
|
|
|
|
|
|
|
|
+ synchronized public boolean isCrashed() {
|
|
|
|
+ return isCrashed;
|
|
|
|
+ }
|
|
|
|
+
|
|
public boolean isUp() {
|
|
public boolean isUp() {
|
|
if (node == null) {
|
|
if (node == null) {
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
try {
|
|
try {
|
|
long[] sizes = node.getStats();
|
|
long[] sizes = node.getStats();
|
|
- return !node.isInSafeMode() && sizes[0] != 0;
|
|
|
|
|
|
+ boolean isUp = false;
|
|
|
|
+ synchronized (this) {
|
|
|
|
+ isUp = (isInitialized && !node.isInSafeMode() && sizes[0] != 0);
|
|
|
|
+ }
|
|
|
|
+ return isUp;
|
|
} catch (IOException ie) {
|
|
} catch (IOException ie) {
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
@@ -71,11 +87,15 @@ public class MiniDFSCluster {
|
|
if( isRunning ) {
|
|
if( isRunning ) {
|
|
node = new NameNode(conf);
|
|
node = new NameNode(conf);
|
|
}
|
|
}
|
|
|
|
+ isInitialized = true;
|
|
}
|
|
}
|
|
} catch (Throwable e) {
|
|
} catch (Throwable e) {
|
|
node = null;
|
|
node = null;
|
|
System.err.println("Name node crashed:");
|
|
System.err.println("Name node crashed:");
|
|
e.printStackTrace();
|
|
e.printStackTrace();
|
|
|
|
+ synchronized (this) {
|
|
|
|
+ isCrashed = true;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -155,18 +175,25 @@ public class MiniDFSCluster {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public MiniDFSCluster(Configuration conf,
|
|
|
|
+ int nDatanodes,
|
|
|
|
+ boolean formatNamenode,
|
|
|
|
+ String[] racks) throws IOException {
|
|
|
|
+ this(0, conf, nDatanodes, false, formatNamenode, racks);
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Create the config and start up the servers. If either the rpc or info port is already
|
|
* Create the config and start up the servers. If either the rpc or info port is already
|
|
* in use, we will try new ports.
|
|
* in use, we will try new ports.
|
|
* @param namenodePort suggestion for which rpc port to use. caller should use
|
|
* @param namenodePort suggestion for which rpc port to use. caller should use
|
|
* getNameNodePort() to get the actual port used.
|
|
* getNameNodePort() to get the actual port used.
|
|
* @param dataNodeFirst should the datanode be brought up before the namenode?
|
|
* @param dataNodeFirst should the datanode be brought up before the namenode?
|
|
|
|
+ * @deprecated use {@link #MiniDFSCluster(Configuration, int, boolean, String[])}
|
|
*/
|
|
*/
|
|
public MiniDFSCluster(int namenodePort,
|
|
public MiniDFSCluster(int namenodePort,
|
|
Configuration conf,
|
|
Configuration conf,
|
|
boolean dataNodeFirst) throws IOException {
|
|
boolean dataNodeFirst) throws IOException {
|
|
- this(namenodePort, conf, 1, dataNodeFirst, true,
|
|
|
|
- MAX_RETRIES, MAX_RETRIES_PER_PORT, null);
|
|
|
|
|
|
+ this(namenodePort, conf, 1, dataNodeFirst, true, null);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -174,13 +201,13 @@ public class MiniDFSCluster {
|
|
* in use, we will try new ports.
|
|
* in use, we will try new ports.
|
|
* @param namenodePort suggestion for which rpc port to use. caller should use
|
|
* @param namenodePort suggestion for which rpc port to use. caller should use
|
|
* getNameNodePort() to get the actual port used.
|
|
* getNameNodePort() to get the actual port used.
|
|
|
|
+ * @deprecated use {@link #MiniDFSCluster(Configuration, int, boolean, String[])}
|
|
*/
|
|
*/
|
|
public MiniDFSCluster(int namenodePort,
|
|
public MiniDFSCluster(int namenodePort,
|
|
Configuration conf,
|
|
Configuration conf,
|
|
int numRetries,
|
|
int numRetries,
|
|
int numRetriesPerPort) throws IOException {
|
|
int numRetriesPerPort) throws IOException {
|
|
- this(namenodePort, conf, 0, false, false,
|
|
|
|
- numRetries, numRetriesPerPort, null);
|
|
|
|
|
|
+ this(namenodePort, conf, 0, false, false, null);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -190,13 +217,13 @@ public class MiniDFSCluster {
|
|
* getNameNodePort() to get the actual port used.
|
|
* getNameNodePort() to get the actual port used.
|
|
* @param nDatanodes Number of datanodes
|
|
* @param nDatanodes Number of datanodes
|
|
* @param dataNodeFirst should the datanode be brought up before the namenode?
|
|
* @param dataNodeFirst should the datanode be brought up before the namenode?
|
|
|
|
+ * @deprecated use {@link #MiniDFSCluster(Configuration, int, boolean, String[])}
|
|
*/
|
|
*/
|
|
public MiniDFSCluster(int namenodePort,
|
|
public MiniDFSCluster(int namenodePort,
|
|
Configuration conf,
|
|
Configuration conf,
|
|
int nDatanodes,
|
|
int nDatanodes,
|
|
boolean dataNodeFirst) throws IOException {
|
|
boolean dataNodeFirst) throws IOException {
|
|
- this(namenodePort, conf, nDatanodes, dataNodeFirst, true,
|
|
|
|
- MAX_RETRIES, MAX_RETRIES_PER_PORT, null);
|
|
|
|
|
|
+ this(namenodePort, conf, nDatanodes, dataNodeFirst, true, null);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -207,26 +234,16 @@ public class MiniDFSCluster {
|
|
* @param nDatanodes Number of datanodes
|
|
* @param nDatanodes Number of datanodes
|
|
* @param dataNodeFirst should the datanode be brought up before the namenode?
|
|
* @param dataNodeFirst should the datanode be brought up before the namenode?
|
|
* @param formatNamenode should the namenode be formatted before starting up ?
|
|
* @param formatNamenode should the namenode be formatted before starting up ?
|
|
|
|
+ * @deprecated use {@link #MiniDFSCluster(Configuration, int, boolean, String[])}
|
|
*/
|
|
*/
|
|
public MiniDFSCluster(int namenodePort,
|
|
public MiniDFSCluster(int namenodePort,
|
|
Configuration conf,
|
|
Configuration conf,
|
|
int nDatanodes,
|
|
int nDatanodes,
|
|
boolean dataNodeFirst,
|
|
boolean dataNodeFirst,
|
|
boolean formatNamenode ) throws IOException {
|
|
boolean formatNamenode ) throws IOException {
|
|
- this(namenodePort, conf, nDatanodes, dataNodeFirst, formatNamenode,
|
|
|
|
- MAX_RETRIES, MAX_RETRIES_PER_PORT, null);
|
|
|
|
|
|
+ this(namenodePort, conf, nDatanodes, dataNodeFirst, formatNamenode, null);
|
|
}
|
|
}
|
|
|
|
|
|
- public MiniDFSCluster(int namenodePort,
|
|
|
|
- Configuration conf,
|
|
|
|
- int nDatanodes,
|
|
|
|
- boolean dataNodeFirst,
|
|
|
|
- boolean formatNamenode,
|
|
|
|
- String[] racks) throws IOException {
|
|
|
|
- this(namenodePort, conf, nDatanodes, dataNodeFirst, formatNamenode,
|
|
|
|
- MAX_RETRIES, MAX_RETRIES_PER_PORT, racks);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Create the config and start up the servers. If either the rpc or info port is already
|
|
* Create the config and start up the servers. If either the rpc or info port is already
|
|
* in use, we will try new ports.
|
|
* in use, we will try new ports.
|
|
@@ -242,92 +259,94 @@ public class MiniDFSCluster {
|
|
int nDatanodes,
|
|
int nDatanodes,
|
|
boolean dataNodeFirst,
|
|
boolean dataNodeFirst,
|
|
boolean formatNamenode,
|
|
boolean formatNamenode,
|
|
- int numRetries,
|
|
|
|
- int numRetriesPerPort,
|
|
|
|
String[] racks) throws IOException {
|
|
String[] racks) throws IOException {
|
|
|
|
|
|
this.conf = conf;
|
|
this.conf = conf;
|
|
-
|
|
|
|
|
|
+
|
|
this.nDatanodes = nDatanodes;
|
|
this.nDatanodes = nDatanodes;
|
|
this.nameNodePort = namenodePort;
|
|
this.nameNodePort = namenodePort;
|
|
- this.nameNodeInfoPort = 50080; // We just want this port to be different from the default.
|
|
|
|
- File base_dir = new File(System.getProperty("test.build.data"),
|
|
|
|
- "dfs/");
|
|
|
|
|
|
+
|
|
|
|
+ this.conf.set("fs.default.name", "localhost:"+ Integer.toString(nameNodePort));
|
|
|
|
+ this.conf.setInt("dfs.info.port", nameNodeInfoPort);
|
|
|
|
+ this.conf.setInt("dfs.datanode.info.port", 0);
|
|
|
|
+
|
|
|
|
+ File base_dir = new File(System.getProperty("test.build.data"), "dfs/");
|
|
File data_dir = new File(base_dir, "data");
|
|
File data_dir = new File(base_dir, "data");
|
|
- conf.set("dfs.name.dir", new File(base_dir, "name1").getPath()+","+
|
|
|
|
|
|
+ this.conf.set("dfs.name.dir", new File(base_dir, "name1").getPath()+","+
|
|
new File(base_dir, "name2").getPath());
|
|
new File(base_dir, "name2").getPath());
|
|
- conf.setInt("dfs.replication", Math.min(3, nDatanodes));
|
|
|
|
- conf.setInt("dfs.safemode.extension", 0);
|
|
|
|
|
|
+ this.conf.setInt("dfs.replication", Math.min(3, nDatanodes));
|
|
|
|
+ this.conf.setInt("dfs.safemode.extension", 0);
|
|
|
|
|
|
- // Loops until we find ports that work or we give up because
|
|
|
|
- // too many tries have failed.
|
|
|
|
- boolean foundPorts = false;
|
|
|
|
- int portsTried = 0;
|
|
|
|
- while ((!foundPorts) && (portsTried < numRetries)) {
|
|
|
|
- conf.set("fs.default.name",
|
|
|
|
- "localhost:"+ Integer.toString(nameNodePort));
|
|
|
|
- conf.set("dfs.info.port", nameNodeInfoPort);
|
|
|
|
-
|
|
|
|
- if (formatNamenode) { NameNode.format(conf); }
|
|
|
|
- nameNode = new NameNodeRunner();
|
|
|
|
- nameNodeThread = new Thread(nameNode);
|
|
|
|
- dataNodes = new DataNodeRunner[nDatanodes];
|
|
|
|
- dataNodeThreads = new Thread[nDatanodes];
|
|
|
|
- for (int idx = 0; idx < nDatanodes; idx++) {
|
|
|
|
- if( racks == null || idx >= racks.length) {
|
|
|
|
- dataNodes[idx] = new DataNodeRunner(conf, data_dir, idx);
|
|
|
|
- } else {
|
|
|
|
- dataNodes[idx] = new DataNodeRunner(conf, data_dir, racks[idx], idx);
|
|
|
|
- }
|
|
|
|
- dataNodeThreads[idx] = new Thread(dataNodes[idx]);
|
|
|
|
|
|
+ // Create the NameNode
|
|
|
|
+ if (formatNamenode) { NameNode.format(conf); }
|
|
|
|
+ nameNode = new NameNodeRunner();
|
|
|
|
+ nameNodeThread = new Thread(nameNode);
|
|
|
|
+
|
|
|
|
+ //
|
|
|
|
+ // Start the MiniDFSCluster
|
|
|
|
+ //
|
|
|
|
+
|
|
|
|
+ if (dataNodeFirst) {
|
|
|
|
+ startDataNodes(conf, racks, data_dir);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Start the namenode and wait for it to be initialized
|
|
|
|
+ nameNodeThread.start();
|
|
|
|
+ while (!nameNode.isCrashed() && !nameNode.isInitialized()) {
|
|
|
|
+ try { // let daemons get started
|
|
|
|
+ System.err.println("Waiting for the NameNode to initialize...");
|
|
|
|
+ Thread.sleep(1000);
|
|
|
|
+ } catch(InterruptedException e) {
|
|
}
|
|
}
|
|
- if (dataNodeFirst) {
|
|
|
|
- for (int idx = 0; idx < nDatanodes; idx++) {
|
|
|
|
- dataNodeThreads[idx].start();
|
|
|
|
- }
|
|
|
|
- nameNodeThread.start();
|
|
|
|
- } else {
|
|
|
|
- nameNodeThread.start();
|
|
|
|
- for (int idx = 0; idx < nDatanodes; idx++) {
|
|
|
|
- dataNodeThreads[idx].start();
|
|
|
|
- }
|
|
|
|
|
|
+ if (nameNode.isCrashed()) {
|
|
|
|
+ throw new RuntimeException("Namenode crashed");
|
|
}
|
|
}
|
|
-
|
|
|
|
- int retry = 0;
|
|
|
|
- while (!nameNode.isUp() && (retry < numRetriesPerPort)) {
|
|
|
|
- try { // let daemons get started
|
|
|
|
- System.out.println("waiting for dfs minicluster to start");
|
|
|
|
- Thread.sleep(1000);
|
|
|
|
- } catch(InterruptedException e) {
|
|
|
|
- }
|
|
|
|
- retry++;
|
|
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Set up the right ports for the datanodes
|
|
|
|
+ InetSocketAddress nnAddr = nameNode.getAddress();
|
|
|
|
+ nameNodePort = nnAddr.getPort();
|
|
|
|
+ this.conf.set("fs.default.name", nnAddr.getHostName()+ ":" + Integer.toString(nameNodePort));
|
|
|
|
+
|
|
|
|
+ // Start the datanodes
|
|
|
|
+ if (!dataNodeFirst) {
|
|
|
|
+ startDataNodes(conf, racks, data_dir);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ while (!nameNode.isCrashed() && !nameNode.isUp()) {
|
|
|
|
+ try { // let daemons get started
|
|
|
|
+ System.err.println("Waiting for the Mini HDFS Cluster to start...");
|
|
|
|
+ Thread.sleep(1000);
|
|
|
|
+ } catch(InterruptedException e) {
|
|
}
|
|
}
|
|
- if (retry >= numRetriesPerPort) {
|
|
|
|
- this.nameNodePort += 3;
|
|
|
|
- this.nameNodeInfoPort += 7;
|
|
|
|
- System.out.println("Failed to start DFS minicluster in " + retry + " attempts. Trying new ports:");
|
|
|
|
- System.out.println("\tNameNode RPC port: " + nameNodePort);
|
|
|
|
- System.out.println("\tNameNode info port: " + nameNodeInfoPort);
|
|
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (nameNode.isCrashed()) {
|
|
|
|
+ throw new RuntimeException("Namenode crashed");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
- shutdown();
|
|
|
|
-
|
|
|
|
|
|
+ private void startDataNodes(Configuration conf, String[] racks, File data_dir) {
|
|
|
|
+ // Create the DataNodes & start them
|
|
|
|
+ dataNodes = new DataNodeRunner[nDatanodes];
|
|
|
|
+ dataNodeThreads = new Thread[nDatanodes];
|
|
|
|
+ for (int idx = 0; idx < nDatanodes; idx++) {
|
|
|
|
+ if( racks == null || idx >= racks.length) {
|
|
|
|
+ dataNodes[idx] = new DataNodeRunner(conf, data_dir, idx);
|
|
} else {
|
|
} else {
|
|
- foundPorts = true;
|
|
|
|
|
|
+ dataNodes[idx] = new DataNodeRunner(conf, data_dir, racks[idx], idx);
|
|
}
|
|
}
|
|
- portsTried++;
|
|
|
|
- }
|
|
|
|
- System.out.println("\tNameNode portsTried " + portsTried);
|
|
|
|
- if (portsTried >= numRetries) {
|
|
|
|
- throw new IOException("Failed to start a DFS minicluster after trying " + portsTried + " ports.");
|
|
|
|
|
|
+ dataNodeThreads[idx] = new Thread(dataNodes[idx]);
|
|
|
|
+ dataNodeThreads[idx].start();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Returns the rpc port used by the mini cluster, because the caller supplied port is
|
|
* Returns the rpc port used by the mini cluster, because the caller supplied port is
|
|
* not necessarily the actual port used.
|
|
* not necessarily the actual port used.
|
|
*/
|
|
*/
|
|
public int getNameNodePort() {
|
|
public int getNameNodePort() {
|
|
- return nameNodePort;
|
|
|
|
|
|
+ return nameNode.getAddress().getPort();
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|