|
@@ -32,12 +32,6 @@ import java.io.*;
|
|
|
import java.util.*;
|
|
|
import java.lang.UnsupportedOperationException;
|
|
|
|
|
|
-import javax.servlet.ServletContext;
|
|
|
-import javax.servlet.ServletException;
|
|
|
-import javax.servlet.http.HttpServlet;
|
|
|
-import javax.servlet.http.HttpServletRequest;
|
|
|
-import javax.servlet.http.HttpServletResponse;
|
|
|
-
|
|
|
/***************************************************
|
|
|
* FSNamesystem does the actual bookkeeping work for the
|
|
|
* DataNode.
|
|
@@ -199,6 +193,7 @@ class FSNamesystem implements FSConstants {
|
|
|
private String localMachine;
|
|
|
private int port;
|
|
|
private SafeModeInfo safeMode; // safe mode information
|
|
|
+ private Host2NodesMap host2DataNodeMap = new Host2NodesMap();
|
|
|
|
|
|
// datanode networktoplogy
|
|
|
NetworkTopology clusterMap = new NetworkTopology();
|
|
@@ -217,7 +212,10 @@ class FSNamesystem implements FSConstants {
|
|
|
NameNode nn, Configuration conf) throws IOException {
|
|
|
fsNamesystemObject = this;
|
|
|
this.replicator = new ReplicationTargetChooser(
|
|
|
- conf.getBoolean("dfs.replication.considerLoad", true));
|
|
|
+ conf.getBoolean("dfs.replication.considerLoad", true),
|
|
|
+ this,
|
|
|
+ clusterMap,
|
|
|
+ LOG);
|
|
|
this.defaultReplication = conf.getInt("dfs.replication", 3);
|
|
|
this.maxReplication = conf.getInt("dfs.replication.max", 512);
|
|
|
this.minReplication = conf.getInt("dfs.replication.min", 1);
|
|
@@ -406,193 +404,15 @@ class FSNamesystem implements FSConstants {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /* Class for keeping track of under replication blocks
|
|
|
- * Blocks have replication priority, with priority 0 indicating the highest
|
|
|
- * Blocks have only one replicas has the highest
|
|
|
- */
|
|
|
- private class UnderReplicatedBlocks {
|
|
|
- private static final int LEVEL = 3;
|
|
|
- List<TreeSet<Block>> priorityQueues = new ArrayList<TreeSet<Block>>();
|
|
|
-
|
|
|
- /* constructor */
|
|
|
- UnderReplicatedBlocks() {
|
|
|
- for(int i=0; i<LEVEL; i++) {
|
|
|
- priorityQueues.add(new TreeSet<Block>());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /* Return the total number of under replication blocks */
|
|
|
- synchronized int size() {
|
|
|
- int size = 0;
|
|
|
- for(int i=0; i<LEVEL; i++) {
|
|
|
- size += priorityQueues.get(i).size();
|
|
|
- }
|
|
|
- return size;
|
|
|
- }
|
|
|
-
|
|
|
- /* Check if a block is in the neededReplication queue */
|
|
|
- synchronized boolean contains(Block block) {
|
|
|
- for(TreeSet<Block> set:priorityQueues) {
|
|
|
- if (set.contains(block)) return true;
|
|
|
- }
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- /* Return the priority of a block
|
|
|
- * @param block a under replication block
|
|
|
- * @param curReplicas current number of replicas of the block
|
|
|
- * @param expectedReplicas expected number of replicas of the block
|
|
|
- */
|
|
|
- private int getPriority(Block block,
|
|
|
- int curReplicas, int expectedReplicas) {
|
|
|
- if (curReplicas<=0 || curReplicas>=expectedReplicas) {
|
|
|
- return LEVEL; // no need to replicate
|
|
|
- } else if (curReplicas==1) {
|
|
|
- return 0; // highest priority
|
|
|
- } else if (curReplicas*3<expectedReplicas) {
|
|
|
- return 1;
|
|
|
- } else {
|
|
|
- return 2;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /* add a block to a under replication queue according to its priority
|
|
|
- * @param block a under replication block
|
|
|
- * @param curReplicas current number of replicas of the block
|
|
|
- * @param expectedReplicas expected number of replicas of the block
|
|
|
- */
|
|
|
- synchronized boolean add(
|
|
|
- Block block, int curReplicas, int expectedReplicas) {
|
|
|
- if (curReplicas<=0 || expectedReplicas <= curReplicas) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- int priLevel = getPriority(block, curReplicas, expectedReplicas);
|
|
|
- if (priorityQueues.get(priLevel).add(block)) {
|
|
|
- NameNode.stateChangeLog.debug(
|
|
|
- "BLOCK* NameSystem.UnderReplicationBlock.add:"
|
|
|
- + block.getBlockName()
|
|
|
- + " has only "+curReplicas
|
|
|
- + " replicas and need " + expectedReplicas
|
|
|
- + " replicas so is added to neededReplications"
|
|
|
- + " at priority level " + priLevel);
|
|
|
- return true;
|
|
|
- }
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- /* add a block to a under replication queue */
|
|
|
- synchronized boolean add(Block block) {
|
|
|
- int expectedReplicas = getReplication(block);
|
|
|
- return add(block,
|
|
|
- countContainingNodes(block),
|
|
|
- expectedReplicas);
|
|
|
- }
|
|
|
-
|
|
|
- /* remove a block from a under replication queue */
|
|
|
- synchronized boolean remove(Block block,
|
|
|
- int oldReplicas, int oldExpectedReplicas) {
|
|
|
- int priLevel = getPriority(block, oldReplicas, oldExpectedReplicas);
|
|
|
- return remove(block, priLevel);
|
|
|
- }
|
|
|
-
|
|
|
- /* remove a block from a under replication queue given a priority*/
|
|
|
- private boolean remove(Block block, int priLevel) {
|
|
|
- if (priLevel >= 0 && priLevel < LEVEL
|
|
|
- && priorityQueues.get(priLevel).remove(block)) {
|
|
|
- NameNode.stateChangeLog.debug(
|
|
|
- "BLOCK* NameSystem.UnderReplicationBlock.remove: "
|
|
|
- + "Removing block " + block.getBlockName()
|
|
|
- + " from priority queue "+ priLevel);
|
|
|
- return true;
|
|
|
- } else {
|
|
|
- for(int i=0; i<LEVEL; i++) {
|
|
|
- if (i!=priLevel && priorityQueues.get(i).remove(block)) {
|
|
|
- NameNode.stateChangeLog.debug(
|
|
|
- "BLOCK* NameSystem.UnderReplicationBlock.remove: "
|
|
|
- + "Removing block " + block.getBlockName()
|
|
|
- + " from priority queue "+ i);
|
|
|
- return true;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- /* remove a block from a under replication queue */
|
|
|
- synchronized boolean remove(Block block) {
|
|
|
- int curReplicas = countContainingNodes(block);
|
|
|
- int expectedReplicas = getReplication(block);
|
|
|
- return remove(block, curReplicas, expectedReplicas);
|
|
|
- }
|
|
|
-
|
|
|
- /* update the priority level of a block */
|
|
|
- synchronized void update(Block block,
|
|
|
- int curReplicasDelta, int expectedReplicasDelta) {
|
|
|
- int curReplicas = countContainingNodes(block);
|
|
|
- int curExpectedReplicas = getReplication(block);
|
|
|
- int oldReplicas = curReplicas-curReplicasDelta;
|
|
|
- int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
|
|
|
- int curPri = getPriority(block, curReplicas, curExpectedReplicas);
|
|
|
- int oldPri = getPriority(block, oldReplicas, oldExpectedReplicas);
|
|
|
- NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " +
|
|
|
- block +
|
|
|
- " curReplicas " + curReplicas +
|
|
|
- " curExpectedReplicas " + curExpectedReplicas +
|
|
|
- " oldReplicas " + oldReplicas +
|
|
|
- " oldExpectedReplicas " + oldExpectedReplicas +
|
|
|
- " curPri " + curPri +
|
|
|
- " oldPri " + oldPri);
|
|
|
- if (oldPri != LEVEL && oldPri != curPri) {
|
|
|
- remove(block, oldPri);
|
|
|
- }
|
|
|
- if (curPri != LEVEL && oldPri != curPri
|
|
|
- && priorityQueues.get(curPri).add(block)) {
|
|
|
- NameNode.stateChangeLog.debug(
|
|
|
- "BLOCK* NameSystem.UnderReplicationBlock.update:"
|
|
|
- + block.getBlockName()
|
|
|
- + " has only "+curReplicas
|
|
|
- + " replicas and need " + curExpectedReplicas
|
|
|
- + " replicas so is added to neededReplications"
|
|
|
- + " at priority level " + curPri);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /* return a iterator of all the under replication blocks */
|
|
|
- synchronized Iterator<Block> iterator() {
|
|
|
- return new Iterator<Block>() {
|
|
|
- int level;
|
|
|
- List<Iterator<Block>> iterators = new ArrayList<Iterator<Block>>();
|
|
|
-
|
|
|
- {
|
|
|
- level=0;
|
|
|
- for(int i=0; i<LEVEL; i++) {
|
|
|
- iterators.add(priorityQueues.get(i).iterator());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void update() {
|
|
|
- while(level< LEVEL-1 && !iterators.get(level).hasNext() ) {
|
|
|
- level++;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public Block next() {
|
|
|
- update();
|
|
|
- return iterators.get(level).next();
|
|
|
- }
|
|
|
-
|
|
|
- public boolean hasNext() {
|
|
|
- update();
|
|
|
- return iterators.get(level).hasNext();
|
|
|
- }
|
|
|
-
|
|
|
- public void remove() {
|
|
|
- iterators.get(level).remove();
|
|
|
- }
|
|
|
- };
|
|
|
- }
|
|
|
+ /* updates a block in under replication queue */
|
|
|
+ synchronized void updateNeededReplications(Block block,
|
|
|
+ int curReplicasDelta, int expectedReplicasDelta) {
|
|
|
+ int curReplicas = countContainingNodes( block );
|
|
|
+ int curExpectedReplicas = getReplication(block);
|
|
|
+ neededReplications.update(block, curReplicas, curExpectedReplicas,
|
|
|
+ curReplicasDelta, expectedReplicasDelta);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/////////////////////////////////////////////////////////
|
|
|
//
|
|
|
// These methods are called by HadoopFS clients
|
|
@@ -670,7 +490,7 @@ class FSNamesystem implements FSConstants {
|
|
|
LOG.info("Increasing replication for file " + src
|
|
|
+ ". New replication is " + replication);
|
|
|
for(int idx = 0; idx < fileBlocks.length; idx++)
|
|
|
- neededReplications.update(fileBlocks[idx], 0, replication-oldRepl);
|
|
|
+ updateNeededReplications(fileBlocks[idx], 0, replication-oldRepl);
|
|
|
|
|
|
if (oldRepl > replication) {
|
|
|
// old replication > the new one; need to remove copies
|
|
@@ -1929,7 +1749,9 @@ class FSNamesystem implements FSConstants {
|
|
|
if (timedOutItems != null) {
|
|
|
synchronized (this) {
|
|
|
for (int i = 0; i < timedOutItems.length; i++) {
|
|
|
- neededReplications.add(timedOutItems[i]);
|
|
|
+ neededReplications.add(timedOutItems[i],
|
|
|
+ countContainingNodes(timedOutItems[i]),
|
|
|
+ getReplication(timedOutItems[i]));
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -2254,9 +2076,9 @@ class FSNamesystem implements FSConstants {
|
|
|
// handle underReplication/overReplication
|
|
|
short fileReplication = fileINode.getReplication();
|
|
|
if (numCurrentReplica >= fileReplication) {
|
|
|
- neededReplications.remove(block);
|
|
|
+ neededReplications.remove(block, numCurrentReplica, fileReplication);
|
|
|
} else {
|
|
|
- neededReplications.update(block, curReplicaDelta, 0);
|
|
|
+ updateNeededReplications(block, curReplicaDelta, 0);
|
|
|
}
|
|
|
if (numCurrentReplica > fileReplication) {
|
|
|
proccessOverReplicatedBlock(block, fileReplication);
|
|
@@ -2363,7 +2185,7 @@ class FSNamesystem implements FSConstants {
|
|
|
//
|
|
|
FSDirectory.INode fileINode = blocksMap.getINode(block);
|
|
|
if (fileINode != null) {
|
|
|
- neededReplications.update(block, -1, 0);
|
|
|
+ updateNeededReplications(block, -1, 0);
|
|
|
}
|
|
|
|
|
|
//
|
|
@@ -2496,7 +2318,7 @@ class FSNamesystem implements FSConstants {
|
|
|
// replicated.
|
|
|
Block decommissionBlocks[] = node.getBlocks();
|
|
|
for (int j = 0; j < decommissionBlocks.length; j++) {
|
|
|
- neededReplications.update(decommissionBlocks[j], -1, 0);
|
|
|
+ updateNeededReplications(decommissionBlocks[j], -1, 0);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -2852,447 +2674,7 @@ class FSNamesystem implements FSConstants {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /** The class is responsible for choosing the desired number of targets
|
|
|
- * for placing block replicas.
|
|
|
- * The replica placement strategy is that if the writer is on a datanode,
|
|
|
- * the 1st replica is placed on the local machine,
|
|
|
- * otherwise a random datanode. The 2nd replica is placed on a datanode
|
|
|
- * that is on a different rack. The 3rd replica is placed on a datanode
|
|
|
- * which is on the same rack as the first replca.
|
|
|
- * @author hairong
|
|
|
- *
|
|
|
- */
|
|
|
- class ReplicationTargetChooser {
|
|
|
- final boolean considerLoad;
|
|
|
-
|
|
|
- ReplicationTargetChooser(boolean considerLoad) {
|
|
|
- this.considerLoad = considerLoad;
|
|
|
- }
|
|
|
-
|
|
|
- private class NotEnoughReplicasException extends Exception {
|
|
|
- NotEnoughReplicasException(String msg) {
|
|
|
- super(msg);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * choose <i>numOfReplicas</i> data nodes for <i>writer</i> to replicate
|
|
|
- * a block with size <i>blocksize</i>
|
|
|
- * If not, return as many as we can.
|
|
|
- *
|
|
|
- * @param numOfReplicas: number of replicas wanted.
|
|
|
- * @param writer: the writer's machine, null if not in the cluster.
|
|
|
- * @param excludedNodes: datanodesthat should not be considered targets.
|
|
|
- * @param blocksize: size of the data to be written.
|
|
|
- * @return array of DatanodeDescriptor instances chosen as targets
|
|
|
- * and sorted as a pipeline.
|
|
|
- */
|
|
|
- DatanodeDescriptor[] chooseTarget(int numOfReplicas,
|
|
|
- DatanodeDescriptor writer,
|
|
|
- List<DatanodeDescriptor> excludedNodes,
|
|
|
- long blocksize) {
|
|
|
- if (excludedNodes == null) {
|
|
|
- excludedNodes = new ArrayList<DatanodeDescriptor>();
|
|
|
- }
|
|
|
-
|
|
|
- return chooseTarget(numOfReplicas, writer,
|
|
|
- new ArrayList<DatanodeDescriptor>(), excludedNodes, blocksize);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * choose <i>numOfReplicas</i> data nodes for <i>writer</i>
|
|
|
- * to re-replicate a block with size <i>blocksize</i>
|
|
|
- * If not, return as many as we can.
|
|
|
- *
|
|
|
- * @param numOfReplicas: additional number of replicas wanted.
|
|
|
- * @param writer: the writer's machine, null if not in the cluster.
|
|
|
- * @param choosenNodes: datanodes that have been choosen as targets.
|
|
|
- * @param excludedNodes: datanodesthat should not be considered targets.
|
|
|
- * @param blocksize: size of the data to be written.
|
|
|
- * @return array of DatanodeDescriptor instances chosen as target
|
|
|
- * and sorted as a pipeline.
|
|
|
- */
|
|
|
- DatanodeDescriptor[] chooseTarget(int numOfReplicas,
|
|
|
- DatanodeDescriptor writer,
|
|
|
- List<DatanodeDescriptor> choosenNodes,
|
|
|
- List<DatanodeDescriptor> excludedNodes,
|
|
|
- long blocksize) {
|
|
|
- if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
|
|
|
- return new DatanodeDescriptor[0];
|
|
|
- }
|
|
|
-
|
|
|
- if (excludedNodes == null) {
|
|
|
- excludedNodes = new ArrayList<DatanodeDescriptor>();
|
|
|
- }
|
|
|
-
|
|
|
- int clusterSize = clusterMap.getNumOfLeaves();
|
|
|
- int totalNumOfReplicas = choosenNodes.size()+numOfReplicas;
|
|
|
- if (totalNumOfReplicas > clusterSize) {
|
|
|
- numOfReplicas -= (totalNumOfReplicas-clusterSize);
|
|
|
- totalNumOfReplicas = clusterSize;
|
|
|
- }
|
|
|
-
|
|
|
- int maxNodesPerRack =
|
|
|
- (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
|
|
|
-
|
|
|
- List<DatanodeDescriptor> results =
|
|
|
- new ArrayList<DatanodeDescriptor>(choosenNodes);
|
|
|
- excludedNodes.addAll(choosenNodes);
|
|
|
-
|
|
|
- if (!clusterMap.contains(writer))
|
|
|
- writer=null;
|
|
|
-
|
|
|
- DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
|
|
|
- excludedNodes, blocksize, maxNodesPerRack, results);
|
|
|
-
|
|
|
- results.removeAll(choosenNodes);
|
|
|
-
|
|
|
- // sorting nodes to form a pipeline
|
|
|
- return getPipeline((writer==null)?localNode:writer,
|
|
|
- results.toArray(new DatanodeDescriptor[results.size()]));
|
|
|
- }
|
|
|
-
|
|
|
- /* choose <i>numOfReplicas</i> from all data nodes */
|
|
|
- private DatanodeDescriptor chooseTarget(int numOfReplicas,
|
|
|
- DatanodeDescriptor writer,
|
|
|
- List<DatanodeDescriptor> excludedNodes,
|
|
|
- long blocksize,
|
|
|
- int maxNodesPerRack,
|
|
|
- List<DatanodeDescriptor> results) {
|
|
|
-
|
|
|
- if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
|
|
|
- return writer;
|
|
|
- }
|
|
|
-
|
|
|
- int numOfResults = results.size();
|
|
|
- if (writer == null && (numOfResults==1 || numOfResults==2)) {
|
|
|
- writer = results.get(0);
|
|
|
- }
|
|
|
-
|
|
|
- try {
|
|
|
- switch(numOfResults) {
|
|
|
- case 0:
|
|
|
- writer = chooseLocalNode(writer, excludedNodes,
|
|
|
- blocksize, maxNodesPerRack, results);
|
|
|
- if (--numOfReplicas == 0) break;
|
|
|
- case 1:
|
|
|
- chooseRemoteRack(1, writer, excludedNodes,
|
|
|
- blocksize, maxNodesPerRack, results);
|
|
|
- if (--numOfReplicas == 0) break;
|
|
|
- case 2:
|
|
|
- if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
|
|
|
- chooseRemoteRack(1, writer, excludedNodes,
|
|
|
- blocksize, maxNodesPerRack, results);
|
|
|
- } else {
|
|
|
- chooseLocalRack(writer, excludedNodes,
|
|
|
- blocksize, maxNodesPerRack, results);
|
|
|
- }
|
|
|
- if (--numOfReplicas == 0) break;
|
|
|
- default:
|
|
|
- chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes,
|
|
|
- blocksize, maxNodesPerRack, results);
|
|
|
- }
|
|
|
- } catch (NotEnoughReplicasException e) {
|
|
|
- LOG.warn("Not able to place enough replicas, still in need of "
|
|
|
- + numOfReplicas);
|
|
|
- }
|
|
|
- return writer;
|
|
|
- }
|
|
|
-
|
|
|
- /* choose <i>localMachine</i> as the target.
|
|
|
- * if <i>localMachine</i> is not availabe,
|
|
|
- * choose a node on the same rack
|
|
|
- * @return the choosen node
|
|
|
- */
|
|
|
- private DatanodeDescriptor chooseLocalNode(
|
|
|
- DatanodeDescriptor localMachine,
|
|
|
- List<DatanodeDescriptor> excludedNodes,
|
|
|
- long blocksize,
|
|
|
- int maxNodesPerRack,
|
|
|
- List<DatanodeDescriptor> results)
|
|
|
- throws NotEnoughReplicasException {
|
|
|
- // if no local machine, randomly choose one node
|
|
|
- if (localMachine == null)
|
|
|
- return chooseRandom(NodeBase.ROOT, excludedNodes,
|
|
|
- blocksize, maxNodesPerRack, results);
|
|
|
-
|
|
|
- // otherwise try local machine first
|
|
|
- if (!excludedNodes.contains(localMachine)) {
|
|
|
- excludedNodes.add(localMachine);
|
|
|
- if (isGoodTarget(localMachine, blocksize,
|
|
|
- maxNodesPerRack, false, results)) {
|
|
|
- results.add(localMachine);
|
|
|
- return localMachine;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // try a node on local rack
|
|
|
- return chooseLocalRack(localMachine, excludedNodes,
|
|
|
- blocksize, maxNodesPerRack, results);
|
|
|
- }
|
|
|
-
|
|
|
- /* choose one node from the rack that <i>localMachine</i> is on.
|
|
|
- * if no such node is availabe, choose one node from the rack where
|
|
|
- * a second replica is on.
|
|
|
- * if still no such node is available, choose a random node
|
|
|
- * in the cluster.
|
|
|
- * @return the choosen node
|
|
|
- */
|
|
|
- private DatanodeDescriptor chooseLocalRack(
|
|
|
- DatanodeDescriptor localMachine,
|
|
|
- List<DatanodeDescriptor> excludedNodes,
|
|
|
- long blocksize,
|
|
|
- int maxNodesPerRack,
|
|
|
- List<DatanodeDescriptor> results)
|
|
|
- throws NotEnoughReplicasException {
|
|
|
- // no local machine, so choose a random machine
|
|
|
- if (localMachine == null) {
|
|
|
- return chooseRandom(NodeBase.ROOT, excludedNodes,
|
|
|
- blocksize, maxNodesPerRack, results);
|
|
|
- }
|
|
|
-
|
|
|
- // choose one from the local rack
|
|
|
- try {
|
|
|
- return chooseRandom(
|
|
|
- localMachine.getNetworkLocation(),
|
|
|
- excludedNodes, blocksize, maxNodesPerRack, results);
|
|
|
- } catch (NotEnoughReplicasException e1) {
|
|
|
- // find the second replica
|
|
|
- DatanodeDescriptor newLocal=null;
|
|
|
- for(Iterator<DatanodeDescriptor> iter=results.iterator();
|
|
|
- iter.hasNext();) {
|
|
|
- DatanodeDescriptor nextNode = iter.next();
|
|
|
- if (nextNode != localMachine) {
|
|
|
- newLocal = nextNode;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- if (newLocal != null) {
|
|
|
- try {
|
|
|
- return chooseRandom(
|
|
|
- newLocal.getNetworkLocation(),
|
|
|
- excludedNodes, blocksize, maxNodesPerRack, results);
|
|
|
- } catch(NotEnoughReplicasException e2) {
|
|
|
- //otherwise randomly choose one from the network
|
|
|
- return chooseRandom(NodeBase.ROOT, excludedNodes,
|
|
|
- blocksize, maxNodesPerRack, results);
|
|
|
- }
|
|
|
- } else {
|
|
|
- //otherwise randomly choose one from the network
|
|
|
- return chooseRandom(NodeBase.ROOT, excludedNodes,
|
|
|
- blocksize, maxNodesPerRack, results);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /* choose <i>numOfReplicas</i> nodes from the racks
|
|
|
- * that <i>localMachine</i> is NOT on.
|
|
|
- * if not enough nodes are availabe, choose the remaining ones
|
|
|
- * from the local rack
|
|
|
- */
|
|
|
-
|
|
|
- private void chooseRemoteRack(int numOfReplicas,
|
|
|
- DatanodeDescriptor localMachine,
|
|
|
- List<DatanodeDescriptor> excludedNodes,
|
|
|
- long blocksize,
|
|
|
- int maxReplicasPerRack,
|
|
|
- List<DatanodeDescriptor> results)
|
|
|
- throws NotEnoughReplicasException {
|
|
|
- int oldNumOfReplicas = results.size();
|
|
|
- // randomly choose one node from remote racks
|
|
|
- try {
|
|
|
- chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(),
|
|
|
- excludedNodes, blocksize, maxReplicasPerRack, results);
|
|
|
- } catch (NotEnoughReplicasException e) {
|
|
|
- chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
|
|
|
- localMachine.getNetworkLocation(), excludedNodes, blocksize,
|
|
|
- maxReplicasPerRack, results);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /* Randomly choose one target from <i>nodes</i>.
|
|
|
- * @return the choosen node
|
|
|
- */
|
|
|
- private DatanodeDescriptor chooseRandom(
|
|
|
- String nodes,
|
|
|
- List<DatanodeDescriptor> excludedNodes,
|
|
|
- long blocksize,
|
|
|
- int maxNodesPerRack,
|
|
|
- List<DatanodeDescriptor> results)
|
|
|
- throws NotEnoughReplicasException {
|
|
|
- DatanodeDescriptor result;
|
|
|
- do {
|
|
|
- DatanodeDescriptor[] selectedNodes =
|
|
|
- chooseRandom(1, nodes, excludedNodes);
|
|
|
- if (selectedNodes.length == 0) {
|
|
|
- throw new NotEnoughReplicasException(
|
|
|
- "Not able to place enough replicas");
|
|
|
- }
|
|
|
- result = (DatanodeDescriptor)(selectedNodes[0]);
|
|
|
- } while(!isGoodTarget(result, blocksize, maxNodesPerRack, results));
|
|
|
- results.add(result);
|
|
|
- return result;
|
|
|
- }
|
|
|
-
|
|
|
- /* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
|
|
|
- */
|
|
|
- private void chooseRandom(int numOfReplicas,
|
|
|
- String nodes,
|
|
|
- List<DatanodeDescriptor> excludedNodes,
|
|
|
- long blocksize,
|
|
|
- int maxNodesPerRack,
|
|
|
- List<DatanodeDescriptor> results)
|
|
|
- throws NotEnoughReplicasException {
|
|
|
- boolean toContinue = true;
|
|
|
- do {
|
|
|
- DatanodeDescriptor[] selectedNodes =
|
|
|
- chooseRandom(numOfReplicas, nodes, excludedNodes);
|
|
|
- if (selectedNodes.length < numOfReplicas) {
|
|
|
- toContinue = false;
|
|
|
- }
|
|
|
- for(int i=0; i<selectedNodes.length; i++) {
|
|
|
- DatanodeDescriptor result = (DatanodeDescriptor)(selectedNodes[i]);
|
|
|
- if (isGoodTarget(result, blocksize, maxNodesPerRack, results)) {
|
|
|
- numOfReplicas--;
|
|
|
- results.add(result);
|
|
|
- }
|
|
|
- } // end of for
|
|
|
- } while (numOfReplicas>0 && toContinue);
|
|
|
-
|
|
|
- if (numOfReplicas>0) {
|
|
|
- throw new NotEnoughReplicasException(
|
|
|
- "Not able to place enough replicas");
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /* Randomly choose <i>numOfNodes</i> nodes from <i>scope</i>.
|
|
|
- * @return the choosen nodes
|
|
|
- */
|
|
|
- private DatanodeDescriptor[] chooseRandom(int numOfReplicas,
|
|
|
- String nodes,
|
|
|
- List<DatanodeDescriptor> excludedNodes) {
|
|
|
- List<DatanodeDescriptor> results =
|
|
|
- new ArrayList<DatanodeDescriptor>();
|
|
|
- int numOfAvailableNodes =
|
|
|
- clusterMap.countNumOfAvailableNodes(nodes, excludedNodes);
|
|
|
- numOfReplicas = (numOfAvailableNodes<numOfReplicas)?
|
|
|
- numOfAvailableNodes:numOfReplicas;
|
|
|
- while(numOfReplicas > 0) {
|
|
|
- DatanodeDescriptor choosenNode = clusterMap.chooseRandom(nodes);
|
|
|
- if (!excludedNodes.contains(choosenNode)) {
|
|
|
- results.add(choosenNode);
|
|
|
- excludedNodes.add(choosenNode);
|
|
|
- numOfReplicas--;
|
|
|
- }
|
|
|
- }
|
|
|
- return (DatanodeDescriptor[])results.toArray(
|
|
|
- new DatanodeDescriptor[results.size()]);
|
|
|
- }
|
|
|
-
|
|
|
- /* judge if a node is a good target.
|
|
|
- * return true if <i>node</i> has enough space,
|
|
|
- * does not have too much load, and the rack does not have too many nodes
|
|
|
- */
|
|
|
- private boolean isGoodTarget(DatanodeDescriptor node,
|
|
|
- long blockSize, int maxTargetPerLoc,
|
|
|
- List<DatanodeDescriptor> results) {
|
|
|
- return isGoodTarget(node, blockSize, maxTargetPerLoc,
|
|
|
- this.considerLoad, results);
|
|
|
- }
|
|
|
-
|
|
|
- private boolean isGoodTarget(DatanodeDescriptor node,
|
|
|
- long blockSize, int maxTargetPerLoc,
|
|
|
- boolean considerLoad,
|
|
|
- List<DatanodeDescriptor> results) {
|
|
|
-
|
|
|
- // check if the node is (being) decommissed
|
|
|
- if (node.isDecommissionInProgress() || node.isDecommissioned()) {
|
|
|
- LOG.debug("Node "+node.getPath()+
|
|
|
- " is not chosen because the node is (being) decommissioned");
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- // check the remaining capacity of the target machine
|
|
|
- if (blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>node.getRemaining()) {
|
|
|
- LOG.debug("Node "+node.getPath()+
|
|
|
- " is not chosen because the node does not have enough space");
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- // check the communication traffic of the target machine
|
|
|
- if (considerLoad) {
|
|
|
- double avgLoad = 0;
|
|
|
- int size = clusterMap.getNumOfLeaves();
|
|
|
- if (size != 0) {
|
|
|
- avgLoad = (double)totalLoad()/size;
|
|
|
- }
|
|
|
- if (node.getXceiverCount() > (2.0 * avgLoad)) {
|
|
|
- LOG.debug("Node "+node.getPath()+
|
|
|
- " is not chosen because the node is too busy");
|
|
|
- return false;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // check if the target rack has chosen too many nodes
|
|
|
- String rackname = node.getNetworkLocation();
|
|
|
- int counter=1;
|
|
|
- for(Iterator<DatanodeDescriptor> iter = results.iterator();
|
|
|
- iter.hasNext();) {
|
|
|
- DatanodeDescriptor result = iter.next();
|
|
|
- if (rackname.equals(result.getNetworkLocation())) {
|
|
|
- counter++;
|
|
|
- }
|
|
|
- }
|
|
|
- if (counter>maxTargetPerLoc) {
|
|
|
- LOG.debug("Node "+node.getPath()+
|
|
|
- " is not chosen because the rack has too many chosen nodes");
|
|
|
- return false;
|
|
|
- }
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- /* Return a pipeline of nodes.
|
|
|
- * The pipeline is formed finding a shortest path that
|
|
|
- * starts from the writer and tranverses all <i>nodes</i>
|
|
|
- * This is basically a traveling salesman problem.
|
|
|
- */
|
|
|
- private DatanodeDescriptor[] getPipeline(
|
|
|
- DatanodeDescriptor writer,
|
|
|
- DatanodeDescriptor[] nodes) {
|
|
|
- if (nodes.length==0) return nodes;
|
|
|
-
|
|
|
- synchronized(clusterMap) {
|
|
|
- int index=0;
|
|
|
- if (writer == null || !clusterMap.contains(writer)) {
|
|
|
- writer = nodes[0];
|
|
|
- }
|
|
|
- for(;index<nodes.length; index++) {
|
|
|
- DatanodeDescriptor shortestNode = null;
|
|
|
- int shortestDistance = Integer.MAX_VALUE;
|
|
|
- int shortestIndex = index;
|
|
|
- for(int i=index; i<nodes.length; i++) {
|
|
|
- DatanodeDescriptor currentNode = nodes[i];
|
|
|
- int currentDistance = clusterMap.getDistance(writer, currentNode);
|
|
|
- if (shortestDistance>currentDistance) {
|
|
|
- shortestDistance = currentDistance;
|
|
|
- shortestNode = currentNode;
|
|
|
- shortestIndex = i;
|
|
|
- }
|
|
|
- }
|
|
|
- //switch position index & shortestIndex
|
|
|
- if (index != shortestIndex) {
|
|
|
- nodes[shortestIndex] = nodes[index];
|
|
|
- nodes[index] = shortestNode;
|
|
|
- }
|
|
|
- writer = shortestNode;
|
|
|
- }
|
|
|
- }
|
|
|
- return nodes;
|
|
|
- }
|
|
|
- } //end of Replicator
|
|
|
-
|
|
|
- // Keeps track of which datanodes are allowed to connect to the namenode.
|
|
|
-
|
|
|
+ // Keeps track of which datanodes are allowed to connect to the namenode.
|
|
|
private boolean inHostsList(DatanodeID node) {
|
|
|
Set<String> hostsList = hostsReader.getHosts();
|
|
|
return (hostsList.isEmpty() ||
|
|
@@ -3412,72 +2794,6 @@ class FSNamesystem implements FSConstants {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
- /**
|
|
|
- * Information about the file while it is being written to.
|
|
|
- * Note that at that time the file is not visible to the outside.
|
|
|
- *
|
|
|
- * This class contains a <code>Collection</code> of {@link Block}s that has
|
|
|
- * been written into the file so far, and file replication.
|
|
|
- *
|
|
|
- * @author shv
|
|
|
- */
|
|
|
- private class FileUnderConstruction {
|
|
|
- private short blockReplication; // file replication
|
|
|
- private long blockSize;
|
|
|
- private Collection<Block> blocks;
|
|
|
- private UTF8 clientName; // lease holder
|
|
|
- private UTF8 clientMachine;
|
|
|
- private DatanodeDescriptor clientNode; // if client is a cluster node too.
|
|
|
-
|
|
|
- FileUnderConstruction(short replication,
|
|
|
- long blockSize,
|
|
|
- UTF8 clientName,
|
|
|
- UTF8 clientMachine,
|
|
|
- DatanodeDescriptor clientNode) throws IOException {
|
|
|
- this.blockReplication = replication;
|
|
|
- this.blockSize = blockSize;
|
|
|
- this.blocks = new ArrayList<Block>();
|
|
|
- this.clientName = clientName;
|
|
|
- this.clientMachine = clientMachine;
|
|
|
- this.clientNode = clientNode;
|
|
|
- }
|
|
|
-
|
|
|
- public short getReplication() {
|
|
|
- return this.blockReplication;
|
|
|
- }
|
|
|
-
|
|
|
- public long getBlockSize() {
|
|
|
- return blockSize;
|
|
|
- }
|
|
|
-
|
|
|
- public Collection<Block> getBlocks() {
|
|
|
- return blocks;
|
|
|
- }
|
|
|
-
|
|
|
- public UTF8 getClientName() {
|
|
|
- return clientName;
|
|
|
- }
|
|
|
-
|
|
|
- public UTF8 getClientMachine() {
|
|
|
- return clientMachine;
|
|
|
- }
|
|
|
-
|
|
|
- public DatanodeDescriptor getClientNode() {
|
|
|
- return clientNode;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Return the penultimate allocated block for this file
|
|
|
- */
|
|
|
- public Block getPenultimateBlock() {
|
|
|
- if (blocks.size() <= 1) {
|
|
|
- return null;
|
|
|
- }
|
|
|
- return ((ArrayList<Block>)blocks).get(blocks.size() - 2);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Get data node by storage ID.
|
|
|
*
|
|
@@ -3499,136 +2815,6 @@ class FSNamesystem implements FSConstants {
|
|
|
return node;
|
|
|
}
|
|
|
|
|
|
- static class Host2NodesMap {
|
|
|
- private HashMap<String, DatanodeDescriptor[]> map
|
|
|
- = new HashMap<String, DatanodeDescriptor[]>();
|
|
|
- private Random r = new Random();
|
|
|
-
|
|
|
- /** Check if node is already in the map */
|
|
|
- synchronized boolean contains(DatanodeDescriptor node) {
|
|
|
- if (node==null) return false;
|
|
|
-
|
|
|
- String host = node.getHost();
|
|
|
- DatanodeDescriptor[] nodes = map.get(host);
|
|
|
- if (nodes != null) {
|
|
|
- for(DatanodeDescriptor containedNode:nodes) {
|
|
|
- if (node==containedNode)
|
|
|
- return true;
|
|
|
- }
|
|
|
- }
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- /** add <node.getHost(), node> to the map
|
|
|
- * return true if the node is added; false otherwise
|
|
|
- */
|
|
|
- synchronized boolean add(DatanodeDescriptor node) {
|
|
|
- if (node==null || contains(node)) return false;
|
|
|
-
|
|
|
- String host = node.getHost();
|
|
|
- DatanodeDescriptor[] nodes = map.get(host);
|
|
|
- DatanodeDescriptor[] newNodes;
|
|
|
- if (nodes==null) {
|
|
|
- newNodes = new DatanodeDescriptor[1];
|
|
|
- newNodes[0]=node;
|
|
|
- } else { // rare case: more than one datanode on the host
|
|
|
- newNodes = new DatanodeDescriptor[nodes.length+1];
|
|
|
- System.arraycopy(nodes, 0, newNodes, 0, nodes.length);
|
|
|
- newNodes[nodes.length] = node;
|
|
|
- }
|
|
|
- map.put(host, newNodes);
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- /** remove node from the map
|
|
|
- * return true if the node is removed; false otherwise
|
|
|
- */
|
|
|
- synchronized boolean remove(DatanodeDescriptor node) {
|
|
|
- if (node==null) return false;
|
|
|
-
|
|
|
- String host = node.getHost();
|
|
|
- DatanodeDescriptor[] nodes = map.get(host);
|
|
|
- if (nodes==null) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- if (nodes.length==1) {
|
|
|
- if (nodes[0]==node) {
|
|
|
- map.remove(host);
|
|
|
- return true;
|
|
|
- } else {
|
|
|
- return false;
|
|
|
- }
|
|
|
- }
|
|
|
- //rare case
|
|
|
- int i=0;
|
|
|
- for(; i<nodes.length; i++) {
|
|
|
- if (nodes[i]==node) {
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- if (i==nodes.length) {
|
|
|
- return false;
|
|
|
- } else {
|
|
|
- DatanodeDescriptor[] newNodes;
|
|
|
- newNodes = new DatanodeDescriptor[nodes.length-1];
|
|
|
- System.arraycopy(nodes, 0, newNodes, 0, i);
|
|
|
- System.arraycopy(nodes, i+1, newNodes, i, nodes.length-i-1);
|
|
|
- map.put(host, newNodes);
|
|
|
- return true;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /** get a data node by its host
|
|
|
- * @return DatanodeDescriptor if found; otherwise null
|
|
|
- */
|
|
|
- synchronized DatanodeDescriptor getDatanodeByHost(String host) {
|
|
|
- if (host==null) return null;
|
|
|
-
|
|
|
- DatanodeDescriptor[] nodes = map.get(host);
|
|
|
- // no entry
|
|
|
- if (nodes== null) {
|
|
|
- return null;
|
|
|
- }
|
|
|
- // one node
|
|
|
- if (nodes.length == 1) {
|
|
|
- return nodes[0];
|
|
|
- }
|
|
|
- // more than one node
|
|
|
- return nodes[r.nextInt(nodes.length)];
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Find data node by its name.
|
|
|
- *
|
|
|
- * @return DatanodeDescriptor if found or null otherwise
|
|
|
- */
|
|
|
- public DatanodeDescriptor getDatanodeByName(String name) {
|
|
|
- if (name==null) return null;
|
|
|
-
|
|
|
- int colon = name.indexOf(":");
|
|
|
- String host;
|
|
|
- if (colon < 0) {
|
|
|
- host = name;
|
|
|
- } else {
|
|
|
- host = name.substring(0, colon);
|
|
|
- }
|
|
|
-
|
|
|
- DatanodeDescriptor[] nodes = map.get(host);
|
|
|
- // no entry
|
|
|
- if (nodes== null) {
|
|
|
- return null;
|
|
|
- }
|
|
|
- for(DatanodeDescriptor containedNode:nodes) {
|
|
|
- if (name.equals(containedNode.getName())) {
|
|
|
- return containedNode;
|
|
|
- }
|
|
|
- }
|
|
|
- return null;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private Host2NodesMap host2DataNodeMap = new Host2NodesMap();
|
|
|
-
|
|
|
/** Stop at and return the datanode at index (used for content browsing)*/
|
|
|
private DatanodeDescriptor getDatanodeByIndex(int index) {
|
|
|
int i = 0;
|
|
@@ -3899,7 +3085,7 @@ class FSNamesystem implements FSConstants {
|
|
|
if (blockTotal == -1 && blockSafe == -1) {
|
|
|
return true; // manual safe mode
|
|
|
}
|
|
|
- int activeBlocks = blocksMap.map.size();
|
|
|
+ int activeBlocks = blocksMap.size();
|
|
|
for(Iterator<Collection<Block>> it =
|
|
|
recentInvalidateSets.values().iterator(); it.hasNext();) {
|
|
|
activeBlocks -= it.next().size();
|
|
@@ -3978,7 +3164,7 @@ class FSNamesystem implements FSConstants {
|
|
|
void setBlockTotal() {
|
|
|
if (safeMode == null)
|
|
|
return;
|
|
|
- safeMode.setBlockTotal(blocksMap.map.size());
|
|
|
+ safeMode.setBlockTotal(blocksMap.size());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -4039,239 +3225,10 @@ class FSNamesystem implements FSConstants {
|
|
|
return getEditLog().getFsEditName();
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * This class is used in Namesystem's jetty to do fsck on namenode
|
|
|
- * @author Milind Bhandarkar
|
|
|
- */
|
|
|
- public static class FsckServlet extends HttpServlet {
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- public void doGet(HttpServletRequest request,
|
|
|
- HttpServletResponse response
|
|
|
- ) throws ServletException, IOException {
|
|
|
- Map<String,String[]> pmap = request.getParameterMap();
|
|
|
- try {
|
|
|
- ServletContext context = getServletContext();
|
|
|
- NameNode nn = (NameNode) context.getAttribute("name.node");
|
|
|
- Configuration conf = (Configuration) context.getAttribute("name.conf");
|
|
|
- NamenodeFsck fscker = new NamenodeFsck(conf, nn, pmap, response);
|
|
|
- fscker.fsck();
|
|
|
- } catch (IOException ie) {
|
|
|
- StringUtils.stringifyException(ie);
|
|
|
- LOG.warn(ie);
|
|
|
- String errMsg = "Fsck on path " + pmap.get("path") + " failed.";
|
|
|
- response.sendError(HttpServletResponse.SC_GONE, errMsg);
|
|
|
- throw ie;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * This class is used in Namesystem's jetty to retrieve a file.
|
|
|
- * Typically used by the Secondary NameNode to retrieve image and
|
|
|
- * edit file for periodic checkpointing.
|
|
|
- * @author Dhruba Borthakur
|
|
|
- */
|
|
|
- public static class GetImageServlet extends HttpServlet {
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- public void doGet(HttpServletRequest request,
|
|
|
- HttpServletResponse response
|
|
|
- ) throws ServletException, IOException {
|
|
|
- Map<String,String[]> pmap = request.getParameterMap();
|
|
|
- try {
|
|
|
- ServletContext context = getServletContext();
|
|
|
- NameNode nn = (NameNode) context.getAttribute("name.node");
|
|
|
- TransferFsImage ff = new TransferFsImage(pmap, request, response);
|
|
|
- if (ff.getImage()) {
|
|
|
- // send fsImage to Secondary
|
|
|
- TransferFsImage.getFileServer(response.getOutputStream(),
|
|
|
- nn.getFsImageName());
|
|
|
- } else if (ff.getEdit()) {
|
|
|
- // send old edits to Secondary
|
|
|
- TransferFsImage.getFileServer(response.getOutputStream(),
|
|
|
- nn.getFsEditName());
|
|
|
- } else if (ff.putImage()) {
|
|
|
- // issue a HTTP get request to download the new fsimage
|
|
|
- TransferFsImage.getFileClient(ff.getInfoServer(), "getimage=1",
|
|
|
- nn.getFsImageNameCheckpoint());
|
|
|
- }
|
|
|
- } catch (IOException ie) {
|
|
|
- StringUtils.stringifyException(ie);
|
|
|
- LOG.warn(ie);
|
|
|
- String errMsg = "GetImage failed.";
|
|
|
- response.sendError(HttpServletResponse.SC_GONE, errMsg);
|
|
|
- throw ie;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Returns whether the given block is one pointed-to by a file.
|
|
|
*/
|
|
|
public boolean isValidBlock(Block b) {
|
|
|
return blocksMap.getINode(b) != null;
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * This class maintains the map from a block to its metadata.
|
|
|
- * block's metadata currently includes INode it belongs to and
|
|
|
- * the datanodes that store the block.
|
|
|
- */
|
|
|
- class BlocksMap {
|
|
|
-
|
|
|
- /**
|
|
|
- * Internal class for block metadata
|
|
|
- */
|
|
|
- private class BlockInfo {
|
|
|
- FSDirectory.INode inode;
|
|
|
-
|
|
|
- /** nodes could contain some null entries at the end, so
|
|
|
- * nodes.legth >= number of datanodes.
|
|
|
- * if nodes != null then nodes[0] != null.
|
|
|
- */
|
|
|
- DatanodeDescriptor nodes[];
|
|
|
- Block block; //block that was inserted.
|
|
|
- }
|
|
|
-
|
|
|
- private class NodeIterator implements Iterator<DatanodeDescriptor> {
|
|
|
- NodeIterator(DatanodeDescriptor[] nodes) {
|
|
|
- arr = nodes;
|
|
|
- }
|
|
|
- DatanodeDescriptor[] arr;
|
|
|
- int nextIdx = 0;
|
|
|
-
|
|
|
- public boolean hasNext() {
|
|
|
- return arr != null && nextIdx < arr.length && arr[nextIdx] != null;
|
|
|
- }
|
|
|
-
|
|
|
- public DatanodeDescriptor next() {
|
|
|
- return arr[nextIdx++];
|
|
|
- }
|
|
|
-
|
|
|
- public void remove() {
|
|
|
- throw new UnsupportedOperationException("Sorry. can't remove.");
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- Map<Block, BlockInfo> map = new HashMap<Block, BlockInfo>();
|
|
|
-
|
|
|
- /** add BlockInfo if mapping does not exist */
|
|
|
- private BlockInfo checkBlockInfo(Block b) {
|
|
|
- BlockInfo info = map.get(b);
|
|
|
- if (info == null) {
|
|
|
- info = new BlockInfo();
|
|
|
- info.block = b;
|
|
|
- map.put(b, info);
|
|
|
- }
|
|
|
- return info;
|
|
|
- }
|
|
|
-
|
|
|
- public FSDirectory.INode getINode(Block b) {
|
|
|
- BlockInfo info = map.get(b);
|
|
|
- return (info != null) ? info.inode : null;
|
|
|
- }
|
|
|
-
|
|
|
- public void addINode(Block b, FSDirectory.INode iNode) {
|
|
|
- BlockInfo info = checkBlockInfo(b);
|
|
|
- info.inode = iNode;
|
|
|
- }
|
|
|
-
|
|
|
- public void removeINode(Block b) {
|
|
|
- BlockInfo info = map.get(b);
|
|
|
- if (info != null) {
|
|
|
- info.inode = null;
|
|
|
- if (info.nodes == null) {
|
|
|
- map.remove(b);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /** Returns the block object it it exists in the map */
|
|
|
- public Block getStoredBlock(Block b) {
|
|
|
- BlockInfo info = map.get(b);
|
|
|
- return (info != null) ? info.block : null;
|
|
|
- }
|
|
|
-
|
|
|
- /** Returned Iterator does not support */
|
|
|
- public Iterator<DatanodeDescriptor> nodeIterator(Block b) {
|
|
|
- BlockInfo info = map.get(b);
|
|
|
- return new NodeIterator((info != null) ? info.nodes : null);
|
|
|
- }
|
|
|
-
|
|
|
- /** counts number of containing nodes. Better than using iterator. */
|
|
|
- public int numNodes(Block b) {
|
|
|
- int count = 0;
|
|
|
- BlockInfo info = map.get(b);
|
|
|
- if (info != null && info.nodes != null) {
|
|
|
- count = info.nodes.length;
|
|
|
- while (info.nodes[ count-1 ] == null) // mostly false
|
|
|
- count--;
|
|
|
- }
|
|
|
- return count;
|
|
|
- }
|
|
|
-
|
|
|
- /** returns true if the node does not already exists and is added.
|
|
|
- * false if the node already exists.*/
|
|
|
- public boolean addNode(Block b,
|
|
|
- DatanodeDescriptor node,
|
|
|
- int replicationHint) {
|
|
|
- BlockInfo info = checkBlockInfo(b);
|
|
|
- if (info.nodes == null) {
|
|
|
- info.nodes = new DatanodeDescriptor[ replicationHint ];
|
|
|
- }
|
|
|
-
|
|
|
- DatanodeDescriptor[] arr = info.nodes;
|
|
|
- for(int i=0; i < arr.length; i++) {
|
|
|
- if (arr[i] == null) {
|
|
|
- arr[i] = node;
|
|
|
- return true;
|
|
|
- }
|
|
|
- if (arr[i] == node) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /* Not enough space left. Create a new array. Should normally
|
|
|
- * happen only when replication is manually increased by the user. */
|
|
|
- info.nodes = new DatanodeDescriptor[ arr.length + 1 ];
|
|
|
- for(int i=0; i < arr.length; i++) {
|
|
|
- info.nodes[i] = arr[i];
|
|
|
- }
|
|
|
- info.nodes[ arr.length ] = node;
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- public boolean removeNode(Block b, DatanodeDescriptor node) {
|
|
|
- BlockInfo info = map.get(b);
|
|
|
- if (info == null || info.nodes == null)
|
|
|
- return false;
|
|
|
-
|
|
|
- boolean removed = false;
|
|
|
- // swap lastNode and node's location. set lastNode to null.
|
|
|
- DatanodeDescriptor[] arr = info.nodes;
|
|
|
- int lastNode = -1;
|
|
|
- for(int i=arr.length-1; i >= 0; i--) {
|
|
|
- if (lastNode < 0 && arr[i] != null)
|
|
|
- lastNode = i;
|
|
|
- if (arr[i] == node) {
|
|
|
- arr[i] = arr[ lastNode ];
|
|
|
- arr[ lastNode ] = null;
|
|
|
- removed = true;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /*
|
|
|
- * if ((lastNode + 1) < arr.length/4) {
|
|
|
- * we could trim the array.
|
|
|
- * }
|
|
|
- */
|
|
|
- if (arr[0] == null) { // no datanodes left.
|
|
|
- info.nodes = null;
|
|
|
- if (info.inode == null) {
|
|
|
- map.remove(b);
|
|
|
- }
|
|
|
- }
|
|
|
- return removed;
|
|
|
- }
|
|
|
- }
|
|
|
}
|