|
@@ -20,8 +20,10 @@ import org.apache.commons.logging.*;
|
|
|
import org.apache.hadoop.io.*;
|
|
|
import org.apache.hadoop.conf.*;
|
|
|
import org.apache.hadoop.util.*;
|
|
|
+import org.apache.hadoop.mapred.StatusHttpServer;
|
|
|
|
|
|
import java.io.*;
|
|
|
+import java.net.InetSocketAddress;
|
|
|
import java.util.*;
|
|
|
|
|
|
/***************************************************
|
|
@@ -57,6 +59,11 @@ class FSNamesystem implements FSConstants {
|
|
|
//
|
|
|
TreeMap datanodeMap = new TreeMap();
|
|
|
|
|
|
+
|
|
|
+ //
|
|
|
+ // Stores the set of dead datanodes
|
|
|
+ TreeMap deaddatanodeMap = new TreeMap();
|
|
|
+
|
|
|
//
|
|
|
// Keeps a Vector for every named machine. The Vector contains
|
|
|
// blocks that have recently been invalidated and are thought to live
|
|
@@ -89,6 +96,14 @@ class FSNamesystem implements FSConstants {
|
|
|
//
|
|
|
long totalCapacity = 0, totalRemaining = 0;
|
|
|
|
|
|
+
|
|
|
+ //
|
|
|
+ // For the HTTP browsing interface
|
|
|
+ //
|
|
|
+ StatusHttpServer infoServer;
|
|
|
+ int infoPort;
|
|
|
+ Date startTime;
|
|
|
+
|
|
|
//
|
|
|
Random r = new Random();
|
|
|
|
|
@@ -143,17 +158,29 @@ class FSNamesystem implements FSConstants {
|
|
|
// HEARTBEAT_RECHECK is how often a datanode sends its hearbeat
|
|
|
private int heartBeatRecheck;
|
|
|
|
|
|
+ public static FSNamesystem fsNamesystemObject;
|
|
|
+ private String localMachine;
|
|
|
+ private int port;
|
|
|
+
|
|
|
/**
|
|
|
* dir is where the filesystem directory state
|
|
|
* is stored
|
|
|
*/
|
|
|
public FSNamesystem(File dir, Configuration conf) throws IOException {
|
|
|
+ fsNamesystemObject = this;
|
|
|
+ this.infoPort = conf.getInt("dfs.info.port", 50070);
|
|
|
+ this.infoServer = new StatusHttpServer("dfs", infoPort, false);
|
|
|
+ this.infoServer.start();
|
|
|
+ InetSocketAddress addr = DataNode.createSocketAddr(conf.get("fs.default.name", "local"));
|
|
|
+ this.localMachine = addr.getHostName();
|
|
|
+ this.port = addr.getPort();
|
|
|
this.dir = new FSDirectory(dir, conf);
|
|
|
this.hbthread = new Daemon(new HeartbeatMonitor());
|
|
|
this.lmthread = new Daemon(new LeaseMonitor());
|
|
|
hbthread.start();
|
|
|
lmthread.start();
|
|
|
this.systemStart = System.currentTimeMillis();
|
|
|
+ this.startTime = new Date(systemStart);
|
|
|
|
|
|
this.maxReplication = conf.getInt("dfs.replication.max", 512);
|
|
|
this.minReplication = conf.getInt("dfs.replication.min", 1);
|
|
@@ -167,6 +194,12 @@ class FSNamesystem implements FSConstants {
|
|
|
this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);
|
|
|
this.heartBeatRecheck= 1000;
|
|
|
}
|
|
|
+ /** Return the FSNamesystem object
|
|
|
+ *
|
|
|
+ */
|
|
|
+ public static FSNamesystem getFSNamesystem() {
|
|
|
+ return fsNamesystemObject;
|
|
|
+ }
|
|
|
|
|
|
/** Close down this filesystem manager.
|
|
|
* Causes heartbeat and lease daemons to stop; waits briefly for
|
|
@@ -177,6 +210,7 @@ class FSNamesystem implements FSConstants {
|
|
|
fsRunning = false;
|
|
|
}
|
|
|
try {
|
|
|
+ infoServer.stop();
|
|
|
hbthread.join(3000);
|
|
|
} catch (InterruptedException ie) {
|
|
|
} finally {
|
|
@@ -1044,7 +1078,7 @@ class FSNamesystem implements FSConstants {
|
|
|
}
|
|
|
// register new datanode
|
|
|
datanodeMap.put(nodeReg.getStorageID(),
|
|
|
- new DatanodeInfo( nodeReg ) );
|
|
|
+ new DatanodeInfo( nodeReg ) ) ;
|
|
|
NameNode.stateChangeLog.debug(
|
|
|
"BLOCK* NameSystem.registerDatanode: "
|
|
|
+ "node registered." );
|
|
@@ -1104,6 +1138,7 @@ class FSNamesystem implements FSConstants {
|
|
|
long capacityDiff = 0;
|
|
|
long remainingDiff = 0;
|
|
|
DatanodeInfo nodeinfo = getDatanode( nodeID );
|
|
|
+ deaddatanodeMap.remove(nodeID.getName());
|
|
|
|
|
|
if (nodeinfo == null) {
|
|
|
NameNode.stateChangeLog.debug("BLOCK* NameSystem.gotHeartbeat: "
|
|
@@ -1166,6 +1201,7 @@ class FSNamesystem implements FSConstants {
|
|
|
private void removeDatanode( DatanodeInfo nodeInfo ) {
|
|
|
heartbeats.remove(nodeInfo);
|
|
|
datanodeMap.remove(nodeInfo.getStorageID());
|
|
|
+ deaddatanodeMap.put(nodeInfo.getName(), nodeInfo);
|
|
|
NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeDatanode: "
|
|
|
+ nodeInfo.getName() + " is removed from datanodeMap");
|
|
|
totalCapacity -= nodeInfo.getCapacity();
|
|
@@ -1488,6 +1524,38 @@ class FSNamesystem implements FSConstants {
|
|
|
return results;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ /**
|
|
|
+ */
|
|
|
+ public void DFSNodesStatus(Vector live, Vector dead) {
|
|
|
+ synchronized (heartbeats) {
|
|
|
+ synchronized (datanodeMap) {
|
|
|
+ live.addAll(datanodeMap.values());
|
|
|
+ dead.addAll(deaddatanodeMap.values());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ */
|
|
|
+ public DatanodeInfo getDataNodeInfo(String name) {
|
|
|
+ UTF8 src = new UTF8(name);
|
|
|
+ return (DatanodeInfo)datanodeMap.get(src);
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ */
|
|
|
+ public String getDFSNameNodeMachine() {
|
|
|
+ return localMachine;
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ */
|
|
|
+ public int getDFSNameNodePort() {
|
|
|
+ return port;
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ */
|
|
|
+ public Date getStartTime() {
|
|
|
+ return startTime;
|
|
|
+ }
|
|
|
/////////////////////////////////////////////////////////
|
|
|
//
|
|
|
// These methods are called by the Namenode system, to see
|