|
@@ -45,7 +45,6 @@ import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.Map.Entry;
|
|
import java.util.Map.Entry;
|
|
import java.util.NavigableMap;
|
|
import java.util.NavigableMap;
|
|
-import java.util.Random;
|
|
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
import java.util.TreeMap;
|
|
import java.util.TreeMap;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
@@ -140,7 +139,6 @@ import org.apache.hadoop.net.CachedDNSToSwitchMapping;
|
|
import org.apache.hadoop.net.DNSToSwitchMapping;
|
|
import org.apache.hadoop.net.DNSToSwitchMapping;
|
|
import org.apache.hadoop.net.NetworkTopology;
|
|
import org.apache.hadoop.net.NetworkTopology;
|
|
import org.apache.hadoop.net.Node;
|
|
import org.apache.hadoop.net.Node;
|
|
-import org.apache.hadoop.net.NodeBase;
|
|
|
|
import org.apache.hadoop.net.ScriptBasedMapping;
|
|
import org.apache.hadoop.net.ScriptBasedMapping;
|
|
import org.apache.hadoop.security.AccessControlException;
|
|
import org.apache.hadoop.security.AccessControlException;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
@@ -242,7 +240,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
// Stores the correct file name hierarchy
|
|
// Stores the correct file name hierarchy
|
|
//
|
|
//
|
|
public FSDirectory dir;
|
|
public FSDirectory dir;
|
|
- public BlockManager blockManager;
|
|
|
|
|
|
+ BlockManager blockManager;
|
|
|
|
|
|
// Block pool ID used by this namenode
|
|
// Block pool ID used by this namenode
|
|
String blockPoolId;
|
|
String blockPoolId;
|
|
@@ -271,8 +269,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
public final NavigableMap<String, DatanodeDescriptor> datanodeMap =
|
|
public final NavigableMap<String, DatanodeDescriptor> datanodeMap =
|
|
new TreeMap<String, DatanodeDescriptor>();
|
|
new TreeMap<String, DatanodeDescriptor>();
|
|
|
|
|
|
- Random r = new Random();
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Stores a set of DatanodeDescriptor objects.
|
|
* Stores a set of DatanodeDescriptor objects.
|
|
* This is a subset of {@link #datanodeMap}, containing nodes that are
|
|
* This is a subset of {@link #datanodeMap}, containing nodes that are
|
|
@@ -320,8 +316,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
|
|
|
|
private volatile SafeModeInfo safeMode; // safe mode information
|
|
private volatile SafeModeInfo safeMode; // safe mode information
|
|
|
|
|
|
- /** datanode network toplogy */
|
|
|
|
- public NetworkTopology clusterMap = new NetworkTopology();
|
|
|
|
private DNSToSwitchMapping dnsToSwitchMapping;
|
|
private DNSToSwitchMapping dnsToSwitchMapping;
|
|
|
|
|
|
private HostsFileReader hostsReader;
|
|
private HostsFileReader hostsReader;
|
|
@@ -742,7 +736,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
return new BlocksWithLocations(new BlockWithLocations[0]);
|
|
return new BlocksWithLocations(new BlockWithLocations[0]);
|
|
}
|
|
}
|
|
Iterator<BlockInfo> iter = node.getBlockIterator();
|
|
Iterator<BlockInfo> iter = node.getBlockIterator();
|
|
- int startBlock = r.nextInt(numBlocks); // starting from a random block
|
|
|
|
|
|
+ int startBlock = DFSUtil.getRandom().nextInt(numBlocks); // starting from a random block
|
|
// skip blocks
|
|
// skip blocks
|
|
for(int i=0; i<startBlock; i++) {
|
|
for(int i=0; i<startBlock; i++) {
|
|
iter.next();
|
|
iter.next();
|
|
@@ -878,15 +872,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
FileNotFoundException, UnresolvedLinkException, IOException {
|
|
FileNotFoundException, UnresolvedLinkException, IOException {
|
|
LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true);
|
|
LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true);
|
|
if (blocks != null) {
|
|
if (blocks != null) {
|
|
- //sort the blocks
|
|
|
|
- final DatanodeDescriptor client =
|
|
|
|
- blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
|
|
|
|
- for (LocatedBlock b : blocks.getLocatedBlocks()) {
|
|
|
|
- clusterMap.pseudoSortByDistance(client, b.getLocations());
|
|
|
|
-
|
|
|
|
- // Move decommissioned datanodes to the bottom
|
|
|
|
- Arrays.sort(b.getLocations(), DFSUtil.DECOM_COMPARATOR);
|
|
|
|
- }
|
|
|
|
|
|
+ blockManager.getDatanodeManager().sortLocatedBlocks(
|
|
|
|
+ clientMachine, blocks.getLocatedBlocks());
|
|
}
|
|
}
|
|
return blocks;
|
|
return blocks;
|
|
}
|
|
}
|
|
@@ -1776,16 +1763,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
}
|
|
}
|
|
|
|
|
|
// choose targets for the new block to be allocated.
|
|
// choose targets for the new block to be allocated.
|
|
- DatanodeDescriptor targets[] = blockManager.replicator.chooseTarget(
|
|
|
|
|
|
+ final DatanodeDescriptor targets[] = blockManager.chooseTarget(
|
|
src, replication, clientNode, excludedNodes, blockSize);
|
|
src, replication, clientNode, excludedNodes, blockSize);
|
|
- if (targets.length < blockManager.minReplication) {
|
|
|
|
- throw new IOException("File " + src + " could only be replicated to " +
|
|
|
|
- targets.length + " nodes, instead of " +
|
|
|
|
- blockManager.minReplication + ". There are "
|
|
|
|
- +clusterMap.getNumOfLeaves()+" datanode(s) running"
|
|
|
|
- +" but "+excludedNodes.size() +
|
|
|
|
- " node(s) are excluded in this operation.");
|
|
|
|
- }
|
|
|
|
|
|
|
|
// Allocate a new block and record it in the INode.
|
|
// Allocate a new block and record it in the INode.
|
|
writeLock();
|
|
writeLock();
|
|
@@ -1996,8 +1975,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
blockManager.checkReplication(pendingBlocks[i], numExpectedReplicas);
|
|
blockManager.checkReplication(pendingBlocks[i], numExpectedReplicas);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
- static Random randBlockId = new Random();
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
* Allocate a block at the given pending filename
|
|
* Allocate a block at the given pending filename
|
|
@@ -2011,9 +1988,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
private Block allocateBlock(String src, INode[] inodes,
|
|
private Block allocateBlock(String src, INode[] inodes,
|
|
DatanodeDescriptor targets[]) throws QuotaExceededException {
|
|
DatanodeDescriptor targets[]) throws QuotaExceededException {
|
|
assert hasWriteLock();
|
|
assert hasWriteLock();
|
|
- Block b = new Block(FSNamesystem.randBlockId.nextLong(), 0, 0);
|
|
|
|
|
|
+ Block b = new Block(DFSUtil.getRandom().nextLong(), 0, 0);
|
|
while(isValidBlock(b)) {
|
|
while(isValidBlock(b)) {
|
|
- b.setBlockId(FSNamesystem.randBlockId.nextLong());
|
|
|
|
|
|
+ b.setBlockId(DFSUtil.getRandom().nextLong());
|
|
}
|
|
}
|
|
b.setGenerationStamp(getGenerationStamp());
|
|
b.setGenerationStamp(getGenerationStamp());
|
|
b = dir.addBlock(src, inodes, b, targets);
|
|
b = dir.addBlock(src, inodes, b, targets);
|
|
@@ -2883,14 +2860,14 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
nodeReg.getStorageID());
|
|
nodeReg.getStorageID());
|
|
}
|
|
}
|
|
// update cluster map
|
|
// update cluster map
|
|
- clusterMap.remove(nodeS);
|
|
|
|
|
|
+ blockManager.getDatanodeManager().getNetworkTopology().remove(nodeS);
|
|
nodeS.updateRegInfo(nodeReg);
|
|
nodeS.updateRegInfo(nodeReg);
|
|
nodeS.setHostName(hostName);
|
|
nodeS.setHostName(hostName);
|
|
nodeS.setDisallowed(false); // Node is in the include list
|
|
nodeS.setDisallowed(false); // Node is in the include list
|
|
|
|
|
|
// resolve network location
|
|
// resolve network location
|
|
resolveNetworkLocation(nodeS);
|
|
resolveNetworkLocation(nodeS);
|
|
- clusterMap.add(nodeS);
|
|
|
|
|
|
+ blockManager.getDatanodeManager().getNetworkTopology().add(nodeS);
|
|
|
|
|
|
// also treat the registration message as a heartbeat
|
|
// also treat the registration message as a heartbeat
|
|
synchronized(heartbeats) {
|
|
synchronized(heartbeats) {
|
|
@@ -2921,7 +2898,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
= new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK, hostName);
|
|
= new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK, hostName);
|
|
resolveNetworkLocation(nodeDescr);
|
|
resolveNetworkLocation(nodeDescr);
|
|
blockManager.getDatanodeManager().addDatanode(nodeDescr);
|
|
blockManager.getDatanodeManager().addDatanode(nodeDescr);
|
|
- clusterMap.add(nodeDescr);
|
|
|
|
checkDecommissioning(nodeDescr, dnAddress);
|
|
checkDecommissioning(nodeDescr, dnAddress);
|
|
|
|
|
|
// also treat the registration message as a heartbeat
|
|
// also treat the registration message as a heartbeat
|
|
@@ -2984,7 +2960,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
private String newStorageID() {
|
|
private String newStorageID() {
|
|
String newID = null;
|
|
String newID = null;
|
|
while(newID == null) {
|
|
while(newID == null) {
|
|
- newID = "DS" + Integer.toString(r.nextInt());
|
|
|
|
|
|
+ newID = "DS" + Integer.toString(DFSUtil.getRandom().nextInt());
|
|
if (datanodeMap.get(newID) != null)
|
|
if (datanodeMap.get(newID) != null)
|
|
newID = null;
|
|
newID = null;
|
|
}
|
|
}
|
|
@@ -3338,27 +3314,11 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- Iterator<? extends Block> it = nodeInfo.getBlockIterator();
|
|
|
|
- while(it.hasNext()) {
|
|
|
|
- blockManager.removeStoredBlock(it.next(), nodeInfo);
|
|
|
|
- }
|
|
|
|
- unprotectedRemoveDatanode(nodeInfo);
|
|
|
|
- clusterMap.remove(nodeInfo);
|
|
|
|
|
|
+ blockManager.removeDatanode(nodeInfo);
|
|
|
|
|
|
checkSafeMode();
|
|
checkSafeMode();
|
|
}
|
|
}
|
|
|
|
|
|
- void unprotectedRemoveDatanode(DatanodeDescriptor nodeDescr) {
|
|
|
|
- assert hasWriteLock();
|
|
|
|
- nodeDescr.resetBlocks();
|
|
|
|
- blockManager.removeFromInvalidates(nodeDescr.getStorageID());
|
|
|
|
- if(NameNode.stateChangeLog.isDebugEnabled()) {
|
|
|
|
- NameNode.stateChangeLog.debug(
|
|
|
|
- "BLOCK* NameSystem.unprotectedRemoveDatanode: "
|
|
|
|
- + nodeDescr.getName() + " is out of service now.");
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
FSImage getFSImage() {
|
|
FSImage getFSImage() {
|
|
return dir.fsImage;
|
|
return dir.fsImage;
|
|
}
|
|
}
|
|
@@ -4106,14 +4066,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
return node;
|
|
return node;
|
|
}
|
|
}
|
|
|
|
|
|
- /** Choose a random datanode
|
|
|
|
- *
|
|
|
|
- * @return a randomly chosen datanode
|
|
|
|
- */
|
|
|
|
- DatanodeDescriptor getRandomDatanode() {
|
|
|
|
- return (DatanodeDescriptor)clusterMap.chooseRandom(NodeBase.ROOT);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* SafeModeInfo contains information related to the safe mode.
|
|
* SafeModeInfo contains information related to the safe mode.
|
|
* <p>
|
|
* <p>
|
|
@@ -4280,9 +4232,10 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
}
|
|
}
|
|
reached = -1;
|
|
reached = -1;
|
|
safeMode = null;
|
|
safeMode = null;
|
|
|
|
+ final NetworkTopology nt = blockManager.getDatanodeManager().getNetworkTopology();
|
|
NameNode.stateChangeLog.info("STATE* Network topology has "
|
|
NameNode.stateChangeLog.info("STATE* Network topology has "
|
|
- +clusterMap.getNumOfRacks()+" racks and "
|
|
|
|
- +clusterMap.getNumOfLeaves()+ " datanodes");
|
|
|
|
|
|
+ + nt.getNumOfRacks() + " racks and "
|
|
|
|
+ + nt.getNumOfLeaves() + " datanodes");
|
|
NameNode.stateChangeLog.info("STATE* UnderReplicatedBlocks has "
|
|
NameNode.stateChangeLog.info("STATE* UnderReplicatedBlocks has "
|
|
+blockManager.neededReplications.size()+" blocks");
|
|
+blockManager.neededReplications.size()+" blocks");
|
|
}
|
|
}
|
|
@@ -5851,6 +5804,11 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
|
return blockPoolId;
|
|
return blockPoolId;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /** @return the block manager. */
|
|
|
|
+ public BlockManager getBlockManager() {
|
|
|
|
+ return blockManager;
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Remove an already decommissioned data node who is neither in include nor
|
|
* Remove an already decommissioned data node who is neither in include nor
|
|
* exclude hosts lists from the the list of live or dead nodes. This is used
|
|
* exclude hosts lists from the the list of live or dead nodes. This is used
|