|
@@ -53,18 +53,29 @@ class FSNamesystem implements FSConstants {
|
|
|
//
|
|
|
Map blocksMap = new HashMap();
|
|
|
|
|
|
- //
|
|
|
- // Stores the datanode-->block map. Done by storing a
|
|
|
- // set of datanode info objects, sorted by name. Updated only in
|
|
|
- // response to client-sent information.
|
|
|
- // Mapping: StorageID -> DatanodeDescriptor
|
|
|
- //
|
|
|
+ /**
|
|
|
+ * Stores the datanode -> block map.
|
|
|
+ * <p>
|
|
|
+ * Done by storing a set of {@link DatanodeDescriptor} objects, sorted by
|
|
|
+ * storage id. In order to keep the storage map consistent it tracks
|
|
|
+ * all storages ever registered with the namenode.
|
|
|
+ * A descriptor corresponding to a specific storage id can be
|
|
|
+ * <ul>
|
|
|
+ * <li>added to the map if it is a new storage id;</li>
|
|
|
+ * <li>updated with a new datanode started as a replacement for the old one
|
|
|
+ * with the same storage id; and </li>
|
|
|
+ * <li>removed if and only if an existing datanode is restarted to serve a
|
|
|
+ * different storage id.</li>
|
|
|
+ * </ul> <br>
|
|
|
+ * The list of the {@link DatanodeDescriptor}s in the map is checkpointed
|
|
|
+ * in the namespace image file. Only the {@link DatanodeInfo} part is
|
|
|
+ * persistent, the list of blocks is restored from the datanode block
|
|
|
+ * reports.
|
|
|
+ * <p>
|
|
|
+ * Mapping: StorageID -> DatanodeDescriptor
|
|
|
+ */
|
|
|
TreeMap datanodeMap = new TreeMap();
|
|
|
|
|
|
- //
|
|
|
- // Stores the set of dead datanodes
|
|
|
- TreeMap deaddatanodeMap = new TreeMap();
|
|
|
-
|
|
|
//
|
|
|
// Keeps a Vector for every named machine. The Vector contains
|
|
|
// blocks that have recently been invalidated and are thought to live
|
|
@@ -110,9 +121,13 @@ class FSNamesystem implements FSConstants {
|
|
|
//
|
|
|
Random r = new Random();
|
|
|
|
|
|
- //
|
|
|
- // Stores a set of DatanodeDescriptor objects, sorted by heartbeat
|
|
|
- //
|
|
|
+ /**
|
|
|
+ * Stores a set of DatanodeDescriptor objects, sorted by heartbeat.
|
|
|
+ * This is a subset of {@link #datanodeMap}, containing nodes that are
|
|
|
+ * considered alive.
|
|
|
+ * The {@link HeartbeatMonitor} periodically checks for outdated entries,
|
|
|
+ * and removes them from the set.
|
|
|
+ */
|
|
|
TreeSet heartbeats = new TreeSet(new Comparator() {
|
|
|
public int compare(Object o1, Object o2) {
|
|
|
DatanodeDescriptor d1 = (DatanodeDescriptor) o1;
|
|
@@ -180,7 +195,8 @@ class FSNamesystem implements FSConstants {
|
|
|
InetSocketAddress addr = DataNode.createSocketAddr(conf.get("fs.default.name", "local"));
|
|
|
this.localMachine = addr.getHostName();
|
|
|
this.port = addr.getPort();
|
|
|
- this.dir = new FSDirectory(dir, conf);
|
|
|
+ this.dir = new FSDirectory(dir);
|
|
|
+ this.dir.loadFSImage( conf );
|
|
|
this.hbthread = new Daemon(new HeartbeatMonitor());
|
|
|
this.lmthread = new Daemon(new LeaseMonitor());
|
|
|
hbthread.start();
|
|
@@ -1068,37 +1084,39 @@ class FSNamesystem implements FSConstants {
|
|
|
// nodeN previously served a different data storage,
|
|
|
// which is not served by anybody anymore.
|
|
|
removeDatanode( nodeN );
|
|
|
+ // physically remove node from datanodeMap
|
|
|
+ wipeDatanode( nodeN );
|
|
|
+ // and log removal
|
|
|
+ getEditLog().logRemoveDatanode( nodeN );
|
|
|
nodeN = null;
|
|
|
}
|
|
|
|
|
|
// nodeN is not found
|
|
|
- if( nodeS == null ) {
|
|
|
- // this is a new datanode serving a new data storage
|
|
|
- if( nodeReg.getStorageID().equals("") ) {
|
|
|
- // this data storage has never registered
|
|
|
- // it is either empty or was created by pre-storageID version of DFS
|
|
|
- nodeReg.storageID = newStorageID();
|
|
|
- NameNode.stateChangeLog.debug(
|
|
|
- "BLOCK* NameSystem.registerDatanode: "
|
|
|
- + "new storageID " + nodeReg.getStorageID() + " assigned." );
|
|
|
- }
|
|
|
- // register new datanode
|
|
|
- datanodeMap.put(nodeReg.getStorageID(),
|
|
|
- new DatanodeDescriptor( nodeReg ));
|
|
|
+ if( nodeS != null ) {
|
|
|
+ // nodeS is found
|
|
|
+ // The registering datanode is a replacement node for the existing
|
|
|
+ // data storage, which from now on will be served by a new node.
|
|
|
NameNode.stateChangeLog.debug(
|
|
|
"BLOCK* NameSystem.registerDatanode: "
|
|
|
- + "node registered." );
|
|
|
+ + "node " + nodeS.name
|
|
|
+ + " is replaced by " + nodeReg.getName() + "." );
|
|
|
+ nodeS.name = nodeReg.getName();
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- // nodeS is found
|
|
|
- // The registering datanode is a replacement node for the existing
|
|
|
- // data storage, which from now on will be served by a new node.
|
|
|
- NameNode.stateChangeLog.debug(
|
|
|
- "BLOCK* NameSystem.registerDatanode: "
|
|
|
- + "node " + nodeS.name
|
|
|
- + " is replaced by " + nodeReg.getName() + "." );
|
|
|
- nodeS.name = nodeReg.getName();
|
|
|
+ // this is a new datanode serving a new data storage
|
|
|
+ if( nodeReg.getStorageID().equals("") ) {
|
|
|
+ // this data storage has never been registered
|
|
|
+ // it is either empty or was created by pre-storageID version of DFS
|
|
|
+ nodeReg.storageID = newStorageID();
|
|
|
+ NameNode.stateChangeLog.debug(
|
|
|
+ "BLOCK* NameSystem.registerDatanode: "
|
|
|
+ + "new storageID " + nodeReg.getStorageID() + " assigned." );
|
|
|
+ }
|
|
|
+ // register new datanode
|
|
|
+ DatanodeDescriptor nodeDescr = new DatanodeDescriptor( nodeReg );
|
|
|
+ unprotectedAddDatanode( nodeDescr );
|
|
|
+ getEditLog().logAddDatanode( nodeDescr );
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -1106,7 +1124,7 @@ class FSNamesystem implements FSConstants {
|
|
|
* Get registrationID for datanodes based on the namespaceID.
|
|
|
*
|
|
|
* @see #registerDatanode(DatanodeRegistration)
|
|
|
- * @see FSDirectory#newNamespaceID()
|
|
|
+ * @see FSImage#newNamespaceID()
|
|
|
* @return registration ID
|
|
|
*/
|
|
|
public String getRegistrationID() {
|
|
@@ -1142,27 +1160,14 @@ class FSNamesystem implements FSConstants {
|
|
|
int xceiverCount) throws IOException {
|
|
|
synchronized (heartbeats) {
|
|
|
synchronized (datanodeMap) {
|
|
|
- long capacityDiff = 0;
|
|
|
- long remainingDiff = 0;
|
|
|
DatanodeDescriptor nodeinfo = getDatanode( nodeID );
|
|
|
- deaddatanodeMap.remove(nodeID.getName());
|
|
|
-
|
|
|
- if (nodeinfo == null) {
|
|
|
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.gotHeartbeat: "
|
|
|
- +"brand-new heartbeat from "+nodeID.getName() );
|
|
|
- nodeinfo = new DatanodeDescriptor(nodeID, capacity, remaining, xceiverCount);
|
|
|
- datanodeMap.put(nodeinfo.getStorageID(), nodeinfo);
|
|
|
- capacityDiff = capacity;
|
|
|
- remainingDiff = remaining;
|
|
|
- } else {
|
|
|
- capacityDiff = capacity - nodeinfo.getCapacity();
|
|
|
- remainingDiff = remaining - nodeinfo.getRemaining();
|
|
|
- heartbeats.remove(nodeinfo);
|
|
|
- nodeinfo.updateHeartbeat(capacity, remaining, xceiverCount);
|
|
|
- }
|
|
|
- heartbeats.add(nodeinfo);
|
|
|
- totalCapacity += capacityDiff;
|
|
|
- totalRemaining += remainingDiff;
|
|
|
+
|
|
|
+ if (nodeinfo == null)
|
|
|
+ // We do not accept unregistered guests
|
|
|
+ throw new UnregisteredDatanodeException( nodeID );
|
|
|
+ removeHeartbeat(nodeinfo);
|
|
|
+ nodeinfo.updateHeartbeat(capacity, remaining, xceiverCount);
|
|
|
+ addHeartbeat(nodeinfo);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1206,18 +1211,58 @@ class FSNamesystem implements FSConstants {
|
|
|
* @author hairong
|
|
|
*/
|
|
|
private void removeDatanode( DatanodeDescriptor nodeInfo ) {
|
|
|
- heartbeats.remove(nodeInfo);
|
|
|
- datanodeMap.remove(nodeInfo.getStorageID());
|
|
|
- deaddatanodeMap.put(nodeInfo.getName(), nodeInfo);
|
|
|
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeDatanode: "
|
|
|
- + nodeInfo.getName() + " is removed from datanodeMap");
|
|
|
- totalCapacity -= nodeInfo.getCapacity();
|
|
|
- totalRemaining -= nodeInfo.getRemaining();
|
|
|
+ removeHeartbeat(nodeInfo);
|
|
|
|
|
|
Block deadblocks[] = nodeInfo.getBlocks();
|
|
|
if( deadblocks != null )
|
|
|
for( int i = 0; i < deadblocks.length; i++ )
|
|
|
removeStoredBlock(deadblocks[i], nodeInfo);
|
|
|
+ unprotectedRemoveDatanode(nodeInfo);
|
|
|
+ }
|
|
|
+
|
|
|
+ void unprotectedRemoveDatanode( DatanodeDescriptor nodeDescr ) {
|
|
|
+ // datanodeMap.remove(nodeDescr.getStorageID());
|
|
|
+ // deaddatanodeMap.put(nodeDescr.getName(), nodeDescr);
|
|
|
+ nodeDescr.resetBlocks();
|
|
|
+ NameNode.stateChangeLog.debug(
|
|
|
+ "BLOCK* NameSystem.unprotectedRemoveDatanode: "
|
|
|
+ + nodeDescr.getName() + " is out of service now.");
|
|
|
+ }
|
|
|
+
|
|
|
+ void unprotectedAddDatanode( DatanodeDescriptor nodeDescr ) {
|
|
|
+ datanodeMap.put( nodeDescr.getStorageID(), nodeDescr );
|
|
|
+ NameNode.stateChangeLog.debug(
|
|
|
+ "BLOCK* NameSystem.unprotectedAddDatanode: "
|
|
|
+ + "node " + nodeDescr.getName() + " is added to datanodeMap." );
|
|
|
+ }
|
|
|
+
|
|
|
+ private void addHeartbeat( DatanodeDescriptor nodeDescr ) {
|
|
|
+ heartbeats.add(nodeDescr);
|
|
|
+ totalCapacity += nodeDescr.capacity;
|
|
|
+ totalRemaining += nodeDescr.remaining;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void removeHeartbeat( DatanodeDescriptor nodeDescr ) {
|
|
|
+ totalCapacity -= nodeDescr.getCapacity();
|
|
|
+ totalRemaining -= nodeDescr.getRemaining();
|
|
|
+ heartbeats.remove(nodeDescr);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Physically remove node from datanodeMap.
|
|
|
+ *
|
|
|
+ * @param nodeID node
|
|
|
+ */
|
|
|
+ void wipeDatanode( DatanodeID nodeID ) {
|
|
|
+ datanodeMap.remove(nodeID.getStorageID());
|
|
|
+ NameNode.stateChangeLog.debug(
|
|
|
+ "BLOCK* NameSystem.wipeDatanode: "
|
|
|
+ + nodeID.getName() + " storage " + nodeID.getStorageID()
|
|
|
+ + " is removed from datanodeMap.");
|
|
|
+ }
|
|
|
+
|
|
|
+ private FSEditLog getEditLog() {
|
|
|
+ return dir.fsImage.getEditLog();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1541,18 +1586,22 @@ class FSNamesystem implements FSConstants {
|
|
|
/**
|
|
|
*/
|
|
|
public void DFSNodesStatus(Vector live, Vector dead) {
|
|
|
- synchronized (heartbeats) {
|
|
|
- synchronized (datanodeMap) {
|
|
|
- live.addAll(datanodeMap.values());
|
|
|
- dead.addAll(deaddatanodeMap.values());
|
|
|
- }
|
|
|
+ synchronized (heartbeats) {
|
|
|
+ synchronized (datanodeMap) {
|
|
|
+ for(Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) {
|
|
|
+ DatanodeDescriptor node = (DatanodeDescriptor)it.next();
|
|
|
+ if( node.isDead() )
|
|
|
+ dead.add( node );
|
|
|
+ else
|
|
|
+ live.add( node );
|
|
|
+ }
|
|
|
}
|
|
|
+ }
|
|
|
}
|
|
|
/**
|
|
|
*/
|
|
|
- public DatanodeDescriptor getDataNodeInfo(String name) {
|
|
|
- UTF8 src = new UTF8(name);
|
|
|
- return (DatanodeDescriptor)datanodeMap.get(src);
|
|
|
+ public DatanodeInfo getDataNodeInfo(String name) {
|
|
|
+ return (DatanodeDescriptor)datanodeMap.get(name);
|
|
|
}
|
|
|
/**
|
|
|
*/
|
|
@@ -1715,6 +1764,9 @@ class FSNamesystem implements FSConstants {
|
|
|
/**
|
|
|
* Get a certain number of targets, if possible.
|
|
|
* If not, return as many as we can.
|
|
|
+ * Only live nodes contained in {@link #heartbeats} are
|
|
|
+ * targeted for replication.
|
|
|
+ *
|
|
|
* @param desiredReplicates
|
|
|
* number of duplicates wanted.
|
|
|
* @param forbiddenNodes
|
|
@@ -1723,11 +1775,11 @@ class FSNamesystem implements FSConstants {
|
|
|
*/
|
|
|
DatanodeDescriptor[] chooseTargets(int desiredReplicates, TreeSet forbiddenNodes,
|
|
|
UTF8 clientMachine, long blockSize) {
|
|
|
- if (desiredReplicates > datanodeMap.size()) {
|
|
|
+ if (desiredReplicates > heartbeats.size()) {
|
|
|
LOG.warn("Replication requested of "+desiredReplicates
|
|
|
- +" is larger than cluster size ("+datanodeMap.size()
|
|
|
+ +" is larger than cluster size ("+heartbeats.size()
|
|
|
+"). Using cluster size.");
|
|
|
- desiredReplicates = datanodeMap.size();
|
|
|
+ desiredReplicates = heartbeats.size();
|
|
|
}
|
|
|
|
|
|
TreeSet alreadyChosen = new TreeSet();
|
|
@@ -1761,7 +1813,7 @@ class FSNamesystem implements FSConstants {
|
|
|
//
|
|
|
// Check if there are any available targets at all
|
|
|
//
|
|
|
- int totalMachines = datanodeMap.size();
|
|
|
+ int totalMachines = heartbeats.size();
|
|
|
if (totalMachines == 0) {
|
|
|
LOG.warn("While choosing target, totalMachines is " + totalMachines);
|
|
|
return null;
|
|
@@ -1789,7 +1841,7 @@ class FSNamesystem implements FSConstants {
|
|
|
// Build list of machines we can actually choose from
|
|
|
//
|
|
|
Vector targetList = new Vector();
|
|
|
- for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) {
|
|
|
+ for (Iterator it = heartbeats.iterator(); it.hasNext(); ) {
|
|
|
DatanodeDescriptor node = (DatanodeDescriptor) it.next();
|
|
|
if (! forbiddenMachines.contains(node.getHost())) {
|
|
|
targetList.add(node);
|
|
@@ -1941,7 +1993,7 @@ class FSNamesystem implements FSConstants {
|
|
|
}
|
|
|
return null;
|
|
|
}
|
|
|
- /** Stop at and return the detanode at index (used for content browsing)*/
|
|
|
+ /** Stop at and return the datanode at index (used for content browsing)*/
|
|
|
private DatanodeInfo getDatanodeByIndex( int index ) {
|
|
|
int i = 0;
|
|
|
for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) {
|