|
@@ -48,6 +48,7 @@ public class MiniDFSCluster {
|
|
*/
|
|
*/
|
|
class NameNodeRunner implements Runnable {
|
|
class NameNodeRunner implements Runnable {
|
|
private NameNode node;
|
|
private NameNode node;
|
|
|
|
+ private boolean isRunning = true;
|
|
|
|
|
|
public boolean isUp() {
|
|
public boolean isUp() {
|
|
if (node == null) {
|
|
if (node == null) {
|
|
@@ -66,7 +67,11 @@ public class MiniDFSCluster {
|
|
*/
|
|
*/
|
|
public void run() {
|
|
public void run() {
|
|
try {
|
|
try {
|
|
- node = new NameNode(conf);
|
|
|
|
|
|
+ synchronized( this ) {
|
|
|
|
+ if( isRunning ) {
|
|
|
|
+ node = new NameNode(conf);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
} catch (Throwable e) {
|
|
} catch (Throwable e) {
|
|
node = null;
|
|
node = null;
|
|
System.err.println("Name node crashed:");
|
|
System.err.println("Name node crashed:");
|
|
@@ -77,7 +82,8 @@ public class MiniDFSCluster {
|
|
/**
|
|
/**
|
|
* Shutdown the name node and wait for it to finish.
|
|
* Shutdown the name node and wait for it to finish.
|
|
*/
|
|
*/
|
|
- public void shutdown() {
|
|
|
|
|
|
+ public synchronized void shutdown() {
|
|
|
|
+ isRunning = false;
|
|
if (node != null) {
|
|
if (node != null) {
|
|
node.stop();
|
|
node.stop();
|
|
node.join();
|
|
node.join();
|
|
@@ -91,6 +97,7 @@ public class MiniDFSCluster {
|
|
class DataNodeRunner implements Runnable {
|
|
class DataNodeRunner implements Runnable {
|
|
private DataNode node;
|
|
private DataNode node;
|
|
Configuration conf = null;
|
|
Configuration conf = null;
|
|
|
|
+ private boolean isRunning = true;
|
|
|
|
|
|
public DataNodeRunner(Configuration conf, File dataDir, int index) {
|
|
public DataNodeRunner(Configuration conf, File dataDir, int index) {
|
|
this.conf = new Configuration(conf);
|
|
this.conf = new Configuration(conf);
|
|
@@ -122,10 +129,15 @@ public class MiniDFSCluster {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- node = new DataNode(conf, conf.get("dfs.datanode.rack",
|
|
|
|
- NetworkTopology.DEFAULT_RACK), dirs);
|
|
|
|
|
|
+ synchronized (this){
|
|
|
|
+ if (isRunning) {
|
|
|
|
+ node = new DataNode(conf, conf.get("dfs.datanode.rack",
|
|
|
|
+ NetworkTopology.DEFAULT_RACK), dirs);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
node.run();
|
|
node.run();
|
|
} catch (Throwable e) {
|
|
} catch (Throwable e) {
|
|
|
|
+ node.shutdown();
|
|
node = null;
|
|
node = null;
|
|
System.err.println("Data node crashed:");
|
|
System.err.println("Data node crashed:");
|
|
e.printStackTrace();
|
|
e.printStackTrace();
|
|
@@ -135,7 +147,8 @@ public class MiniDFSCluster {
|
|
/**
|
|
/**
|
|
* Shut down the server and wait for it to finish.
|
|
* Shut down the server and wait for it to finish.
|
|
*/
|
|
*/
|
|
- public void shutdown() {
|
|
|
|
|
|
+ public synchronized void shutdown() {
|
|
|
|
+ isRunning = false;
|
|
if (node != null) {
|
|
if (node != null) {
|
|
node.shutdown();
|
|
node.shutdown();
|
|
}
|
|
}
|
|
@@ -296,10 +309,7 @@ public class MiniDFSCluster {
|
|
System.out.println("\tNameNode RPC port: " + nameNodePort);
|
|
System.out.println("\tNameNode RPC port: " + nameNodePort);
|
|
System.out.println("\tNameNode info port: " + nameNodeInfoPort);
|
|
System.out.println("\tNameNode info port: " + nameNodeInfoPort);
|
|
|
|
|
|
- nameNode.shutdown();
|
|
|
|
- for (int idx = 0; idx < nDatanodes; idx++) {
|
|
|
|
- dataNodes[idx].shutdown();
|
|
|
|
- }
|
|
|
|
|
|
+ shutdown();
|
|
|
|
|
|
} else {
|
|
} else {
|
|
foundPorts = true;
|
|
foundPorts = true;
|
|
@@ -324,10 +334,21 @@ public class MiniDFSCluster {
|
|
* Shut down the servers.
|
|
* Shut down the servers.
|
|
*/
|
|
*/
|
|
public void shutdown() {
|
|
public void shutdown() {
|
|
|
|
+ System.out.println("Shutting down the cluster");
|
|
for (int idx = 0; idx < nDatanodes; idx++) {
|
|
for (int idx = 0; idx < nDatanodes; idx++) {
|
|
dataNodes[idx].shutdown();
|
|
dataNodes[idx].shutdown();
|
|
}
|
|
}
|
|
nameNode.shutdown();
|
|
nameNode.shutdown();
|
|
|
|
+ for (int idx = 0; idx < nDatanodes; idx++) {
|
|
|
|
+ try {
|
|
|
|
+ dataNodeThreads[idx].join();
|
|
|
|
+ } catch(InterruptedException e) {
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ try {
|
|
|
|
+ nameNodeThread.join();
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|