|
@@ -203,7 +203,7 @@ class FSNamesystem implements FSConstants {
|
|
|
// datanode networktoplogy
|
|
|
NetworkTopology clusterMap = new NetworkTopology();
|
|
|
// for block replicas placement
|
|
|
- Replicator replicator = new Replicator();
|
|
|
+ ReplicationTargetChooser replicator = new ReplicationTargetChooser();
|
|
|
|
|
|
private HostsFileReader hostsReader;
|
|
|
private Daemon dnthread = null;
|
|
@@ -2691,7 +2691,7 @@ class FSNamesystem implements FSConstants {
|
|
|
* @author hairong
|
|
|
*
|
|
|
*/
|
|
|
- class Replicator {
|
|
|
+ class ReplicationTargetChooser {
|
|
|
private class NotEnoughReplicasException extends Exception {
|
|
|
NotEnoughReplicasException( String msg ) {
|
|
|
super( msg );
|
|
@@ -2722,21 +2722,19 @@ class FSNamesystem implements FSConstants {
|
|
|
new ArrayList<DatanodeDescriptor>(), excludedNodes, blocksize);
|
|
|
}
|
|
|
|
|
|
- /*
|
|
|
- * re-replicate <i>numOfReplicas</i>
|
|
|
- /**
|
|
|
- * choose <i>numOfReplicas</i> data nodes for <i>writer</i>
|
|
|
- * to re-replicate a block with size <i>blocksize</i>
|
|
|
- * If not, return as many as we can.
|
|
|
- *
|
|
|
- * @param numOfReplicas: additional number of replicas wanted.
|
|
|
- * @param writer: the writer's machine, null if not in the cluster.
|
|
|
- * @param choosenNodes: datanodes that have been choosen as targets.
|
|
|
- * @param excludedNodes: datanodesthat should not be considered targets.
|
|
|
- * @param blocksize: size of the data to be written.
|
|
|
- * @return array of DatanodeDescriptor instances chosen as target
|
|
|
- * and sorted as a pipeline.
|
|
|
- */
|
|
|
+ /**
|
|
|
+ * choose <i>numOfReplicas</i> data nodes for <i>writer</i>
|
|
|
+ * to re-replicate a block with size <i>blocksize</i>
|
|
|
+ * If not, return as many as we can.
|
|
|
+ *
|
|
|
+ * @param numOfReplicas: additional number of replicas wanted.
|
|
|
+ * @param writer: the writer's machine, null if not in the cluster.
|
|
|
+ * @param choosenNodes: datanodes that have been choosen as targets.
|
|
|
+ * @param excludedNodes: datanodesthat should not be considered targets.
|
|
|
+ * @param blocksize: size of the data to be written.
|
|
|
+ * @return array of DatanodeDescriptor instances chosen as target
|
|
|
+ * and sorted as a pipeline.
|
|
|
+ */
|
|
|
DatanodeDescriptor[] chooseTarget(int numOfReplicas,
|
|
|
DatanodeDescriptor writer,
|
|
|
List<DatanodeDescriptor> choosenNodes,
|
|
@@ -2767,7 +2765,6 @@ class FSNamesystem implements FSConstants {
|
|
|
writer=null;
|
|
|
|
|
|
DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
|
|
|
- clusterMap.getLeaves(NodeBase.ROOT),
|
|
|
excludedNodes, blocksize, maxNodesPerRack, results );
|
|
|
|
|
|
results.removeAll(choosenNodes);
|
|
@@ -2776,16 +2773,17 @@ class FSNamesystem implements FSConstants {
|
|
|
return getPipeline((writer==null)?localNode:writer, results);
|
|
|
}
|
|
|
|
|
|
- /* choose <i>numOfReplicas</i> from <i>clusterNodes</i> */
|
|
|
+ /* choose <i>numOfReplicas</i> from all data nodes */
|
|
|
private DatanodeDescriptor chooseTarget(int numOfReplicas,
|
|
|
DatanodeDescriptor writer,
|
|
|
- DatanodeDescriptor[] clusterNodes,
|
|
|
List<DatanodeDescriptor> excludedNodes,
|
|
|
long blocksize,
|
|
|
int maxNodesPerRack,
|
|
|
List<DatanodeDescriptor> results) {
|
|
|
|
|
|
- if( numOfReplicas == 0 ) return writer;
|
|
|
+ if( numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0 ) {
|
|
|
+ return writer;
|
|
|
+ }
|
|
|
|
|
|
int numOfResults = results.size();
|
|
|
if(writer == null && (numOfResults==1 || numOfResults==2) ) {
|
|
@@ -2795,28 +2793,28 @@ class FSNamesystem implements FSConstants {
|
|
|
try {
|
|
|
switch( numOfResults ) {
|
|
|
case 0:
|
|
|
- writer = chooseLocalNode(writer, clusterNodes, excludedNodes,
|
|
|
+ writer = chooseLocalNode(writer, excludedNodes,
|
|
|
blocksize, maxNodesPerRack, results);
|
|
|
if(--numOfReplicas == 0) break;
|
|
|
case 1:
|
|
|
- chooseRemoteRack(1, writer, clusterNodes, excludedNodes,
|
|
|
+ chooseRemoteRack(1, writer, excludedNodes,
|
|
|
blocksize, maxNodesPerRack, results);
|
|
|
if(--numOfReplicas == 0) break;
|
|
|
case 2:
|
|
|
if(clusterMap.isOnSameRack(results.get(0), results.get(1))) {
|
|
|
- chooseRemoteRack(1, writer, clusterNodes, excludedNodes,
|
|
|
+ chooseRemoteRack(1, writer, excludedNodes,
|
|
|
blocksize, maxNodesPerRack, results);
|
|
|
} else {
|
|
|
- chooseLocalRack(writer, clusterNodes, excludedNodes,
|
|
|
+ chooseLocalRack(writer, excludedNodes,
|
|
|
blocksize, maxNodesPerRack, results);
|
|
|
}
|
|
|
if(--numOfReplicas == 0) break;
|
|
|
default:
|
|
|
- chooseRandom(numOfReplicas, clusterNodes, excludedNodes,
|
|
|
+ chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes,
|
|
|
blocksize, maxNodesPerRack, results);
|
|
|
}
|
|
|
} catch (NotEnoughReplicasException e) {
|
|
|
- LOG.warn("Not be able to place enough replicas, still in need of "
|
|
|
+ LOG.warn("Not able to place enough replicas, still in need of "
|
|
|
+ numOfReplicas );
|
|
|
}
|
|
|
return writer;
|
|
@@ -2829,7 +2827,6 @@ class FSNamesystem implements FSConstants {
|
|
|
*/
|
|
|
private DatanodeDescriptor chooseLocalNode(
|
|
|
DatanodeDescriptor localMachine,
|
|
|
- DatanodeDescriptor[] nodes,
|
|
|
List<DatanodeDescriptor> excludedNodes,
|
|
|
long blocksize,
|
|
|
int maxNodesPerRack,
|
|
@@ -2837,20 +2834,21 @@ class FSNamesystem implements FSConstants {
|
|
|
throws NotEnoughReplicasException {
|
|
|
// if no local machine, randomly choose one node
|
|
|
if(localMachine == null)
|
|
|
- return chooseRandom(nodes, excludedNodes,
|
|
|
+ return chooseRandom(NodeBase.ROOT, excludedNodes,
|
|
|
blocksize, maxNodesPerRack, results);
|
|
|
|
|
|
// otherwise try local machine first
|
|
|
if(!excludedNodes.contains(localMachine)) {
|
|
|
excludedNodes.add(localMachine);
|
|
|
- if( isGoodTarget(localMachine, blocksize, maxNodesPerRack, results)) {
|
|
|
+ if( isGoodTarget(localMachine, blocksize,
|
|
|
+ maxNodesPerRack, false, results)) {
|
|
|
results.add(localMachine);
|
|
|
return localMachine;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// try a node on local rack
|
|
|
- return chooseLocalRack(localMachine, nodes, excludedNodes,
|
|
|
+ return chooseLocalRack(localMachine, excludedNodes,
|
|
|
blocksize, maxNodesPerRack, results);
|
|
|
}
|
|
|
|
|
@@ -2858,12 +2856,11 @@ class FSNamesystem implements FSConstants {
|
|
|
* if no such node is availabe, choose one node from the rack where
|
|
|
* a second replica is on.
|
|
|
* if still no such node is available, choose a random node
|
|
|
- * in the cluster <i>nodes</i>.
|
|
|
+ * in the cluster.
|
|
|
* @return the choosen node
|
|
|
*/
|
|
|
private DatanodeDescriptor chooseLocalRack(
|
|
|
DatanodeDescriptor localMachine,
|
|
|
- DatanodeDescriptor[] nodes,
|
|
|
List<DatanodeDescriptor> excludedNodes,
|
|
|
long blocksize,
|
|
|
int maxNodesPerRack,
|
|
@@ -2871,14 +2868,14 @@ class FSNamesystem implements FSConstants {
|
|
|
throws NotEnoughReplicasException {
|
|
|
// no local machine, so choose a random machine
|
|
|
if( localMachine == null ) {
|
|
|
- return chooseRandom(nodes, excludedNodes,
|
|
|
+ return chooseRandom(NodeBase.ROOT, excludedNodes,
|
|
|
blocksize, maxNodesPerRack, results );
|
|
|
}
|
|
|
|
|
|
// choose one from the local rack
|
|
|
try {
|
|
|
return chooseRandom(
|
|
|
- clusterMap.getLeaves( localMachine.getNetworkLocation() ),
|
|
|
+ localMachine.getNetworkLocation(),
|
|
|
excludedNodes, blocksize, maxNodesPerRack, results);
|
|
|
} catch (NotEnoughReplicasException e1) {
|
|
|
// find the second replica
|
|
@@ -2894,16 +2891,16 @@ class FSNamesystem implements FSConstants {
|
|
|
if( newLocal != null ) {
|
|
|
try {
|
|
|
return chooseRandom(
|
|
|
- clusterMap.getLeaves( newLocal.getNetworkLocation() ),
|
|
|
+ newLocal.getNetworkLocation(),
|
|
|
excludedNodes, blocksize, maxNodesPerRack, results);
|
|
|
} catch( NotEnoughReplicasException e2 ) {
|
|
|
//otherwise randomly choose one from the network
|
|
|
- return chooseRandom(nodes, excludedNodes,
|
|
|
+ return chooseRandom(NodeBase.ROOT, excludedNodes,
|
|
|
blocksize, maxNodesPerRack, results);
|
|
|
}
|
|
|
} else {
|
|
|
//otherwise randomly choose one from the network
|
|
|
- return chooseRandom(nodes, excludedNodes,
|
|
|
+ return chooseRandom(NodeBase.ROOT, excludedNodes,
|
|
|
blocksize, maxNodesPerRack, results);
|
|
|
}
|
|
|
}
|
|
@@ -2917,38 +2914,19 @@ class FSNamesystem implements FSConstants {
|
|
|
|
|
|
private void chooseRemoteRack( int numOfReplicas,
|
|
|
DatanodeDescriptor localMachine,
|
|
|
- DatanodeDescriptor[] nodes,
|
|
|
List<DatanodeDescriptor> excludedNodes,
|
|
|
long blocksize,
|
|
|
int maxReplicasPerRack,
|
|
|
List<DatanodeDescriptor> results)
|
|
|
throws NotEnoughReplicasException {
|
|
|
- // get all the nodes on the local rack
|
|
|
- DatanodeDescriptor[] nodesOnRack = clusterMap.getLeaves(
|
|
|
- localMachine.getNetworkLocation() );
|
|
|
-
|
|
|
- // can we speed up this??? using hashing sets?
|
|
|
- DatanodeDescriptor[] nodesOnRemoteRack
|
|
|
- = new DatanodeDescriptor[nodes.length-nodesOnRack.length];
|
|
|
- HashSet<DatanodeDescriptor> set1 = new HashSet<DatanodeDescriptor>(nodes.length);
|
|
|
- HashSet<DatanodeDescriptor> set2 = new HashSet<DatanodeDescriptor>(nodesOnRack.length);
|
|
|
- for(int i=0; i<nodes.length; i++) {
|
|
|
- set1.add(nodes[i]);
|
|
|
- }
|
|
|
- for(int i=0; i<nodesOnRack.length; i++) {
|
|
|
- set2.add(nodesOnRack[i]);
|
|
|
- }
|
|
|
- set1.removeAll(set2);
|
|
|
- nodesOnRemoteRack = set1.toArray(nodesOnRemoteRack);
|
|
|
-
|
|
|
int oldNumOfReplicas = results.size();
|
|
|
// randomly choose one node from remote racks
|
|
|
try {
|
|
|
- chooseRandom( numOfReplicas, nodesOnRemoteRack, excludedNodes,
|
|
|
- blocksize, maxReplicasPerRack, results );
|
|
|
+ chooseRandom( numOfReplicas, "~"+localMachine.getNetworkLocation(),
|
|
|
+ excludedNodes, blocksize, maxReplicasPerRack, results );
|
|
|
} catch (NotEnoughReplicasException e) {
|
|
|
chooseRandom( numOfReplicas-(results.size()-oldNumOfReplicas),
|
|
|
- nodesOnRack, excludedNodes, blocksize,
|
|
|
+ localMachine.getNetworkLocation(), excludedNodes, blocksize,
|
|
|
maxReplicasPerRack, results);
|
|
|
}
|
|
|
}
|
|
@@ -2957,7 +2935,7 @@ class FSNamesystem implements FSConstants {
|
|
|
* @return the choosen node
|
|
|
*/
|
|
|
private DatanodeDescriptor chooseRandom(
|
|
|
- DatanodeDescriptor[] nodes,
|
|
|
+ String nodes,
|
|
|
List<DatanodeDescriptor> excludedNodes,
|
|
|
long blocksize,
|
|
|
int maxNodesPerRack,
|
|
@@ -2980,7 +2958,7 @@ class FSNamesystem implements FSConstants {
|
|
|
/* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
|
|
|
*/
|
|
|
private void chooseRandom(int numOfReplicas,
|
|
|
- DatanodeDescriptor[] nodes,
|
|
|
+ String nodes,
|
|
|
List<DatanodeDescriptor> excludedNodes,
|
|
|
long blocksize,
|
|
|
int maxNodesPerRack,
|
|
@@ -3008,24 +2986,20 @@ class FSNamesystem implements FSConstants {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /* Randomly choose one node from <i>nodes</i>.
|
|
|
- * @return the choosen node
|
|
|
+ /* Randomly choose <i>numOfNodes</i> nodes from <i>scope</i>.
|
|
|
+ * @return the choosen nodes
|
|
|
*/
|
|
|
private DatanodeDescriptor[] chooseRandom(int numOfReplicas,
|
|
|
- DatanodeDescriptor[] nodes,
|
|
|
+ String nodes,
|
|
|
List<DatanodeDescriptor> excludedNodes) {
|
|
|
List<DatanodeDescriptor> results =
|
|
|
new ArrayList<DatanodeDescriptor>();
|
|
|
- int numOfAvailableNodes = 0;
|
|
|
- for(int i=0; i<nodes.length; i++) {
|
|
|
- if( !excludedNodes.contains(nodes[i]) ) {
|
|
|
- numOfAvailableNodes++;
|
|
|
- }
|
|
|
- }
|
|
|
+ int numOfAvailableNodes =
|
|
|
+ clusterMap.countNumOfAvailableNodes(nodes, excludedNodes);
|
|
|
numOfReplicas = (numOfAvailableNodes<numOfReplicas)?
|
|
|
numOfAvailableNodes:numOfReplicas;
|
|
|
while( numOfReplicas > 0 ) {
|
|
|
- DatanodeDescriptor choosenNode = nodes[r.nextInt(nodes.length)];
|
|
|
+ DatanodeDescriptor choosenNode = clusterMap.chooseRandom(nodes);
|
|
|
if(!excludedNodes.contains(choosenNode)) {
|
|
|
results.add( choosenNode );
|
|
|
excludedNodes.add(choosenNode);
|
|
@@ -3040,28 +3014,43 @@ class FSNamesystem implements FSConstants {
|
|
|
* return true if <i>node</i> has enough space,
|
|
|
* does not have too much load, and the rack does not have too many nodes
|
|
|
*/
|
|
|
+ private boolean isGoodTarget( DatanodeDescriptor node,
|
|
|
+ long blockSize, int maxTargetPerLoc,
|
|
|
+ List<DatanodeDescriptor> results) {
|
|
|
+ return isGoodTarget(node, blockSize, maxTargetPerLoc, true, results);
|
|
|
+ }
|
|
|
+
|
|
|
private boolean isGoodTarget( DatanodeDescriptor node,
|
|
|
long blockSize, int maxTargetPerLoc,
|
|
|
+ boolean considerLoad,
|
|
|
List<DatanodeDescriptor> results) {
|
|
|
|
|
|
// check if the node is (being) decommissed
|
|
|
if(node.isDecommissionInProgress() || node.isDecommissioned()) {
|
|
|
+ LOG.debug("Node "+node.getPath()+
|
|
|
+ " is not chosen because the node is (being) decommissioned");
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
// check the remaining capacity of the target machine
|
|
|
if(blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>node.getRemaining() ) {
|
|
|
+ LOG.debug("Node "+node.getPath()+
|
|
|
+ " is not chosen because the node does not have enough space");
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
// check the communication traffic of the target machine
|
|
|
- double avgLoad = 0;
|
|
|
- int size = clusterMap.getNumOfLeaves();
|
|
|
- if( size != 0 ) {
|
|
|
- avgLoad = (double)totalLoad()/size;
|
|
|
- }
|
|
|
- if(node.getXceiverCount() > (2.0 * avgLoad)) {
|
|
|
- return false;
|
|
|
+ if(considerLoad) {
|
|
|
+ double avgLoad = 0;
|
|
|
+ int size = clusterMap.getNumOfLeaves();
|
|
|
+ if( size != 0 ) {
|
|
|
+ avgLoad = (double)totalLoad()/size;
|
|
|
+ }
|
|
|
+ if(node.getXceiverCount() > (2.0 * avgLoad)) {
|
|
|
+ LOG.debug("Node "+node.getPath()+
|
|
|
+ " is not chosen because the node is too busy");
|
|
|
+ return false;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// check if the target rack has chosen too many nodes
|
|
@@ -3075,6 +3064,8 @@ class FSNamesystem implements FSConstants {
|
|
|
}
|
|
|
}
|
|
|
if(counter>maxTargetPerLoc) {
|
|
|
+ LOG.debug("Node "+node.getPath()+
|
|
|
+ " is not chosen because the rack has too many chosen nodes");
|
|
|
return false;
|
|
|
}
|
|
|
return true;
|