|
@@ -23,6 +23,8 @@ import java.util.Comparator;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Random;
|
|
import java.util.Random;
|
|
import java.util.Arrays;
|
|
import java.util.Arrays;
|
|
|
|
+import java.util.concurrent.locks.ReadWriteLock;
|
|
|
|
+import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -224,7 +226,7 @@ public class NetworkTopology {
|
|
} // end of remove
|
|
} // end of remove
|
|
|
|
|
|
/** Given a node's string representation, return a reference to the node */
|
|
/** Given a node's string representation, return a reference to the node */
|
|
- Node getLoc(String loc) {
|
|
|
|
|
|
+ private Node getLoc(String loc) {
|
|
if (loc == null || loc.length() == 0) return this;
|
|
if (loc == null || loc.length() == 0) return this;
|
|
|
|
|
|
String[] path = loc.split(PATH_SEPARATOR_STR, 2);
|
|
String[] path = loc.split(PATH_SEPARATOR_STR, 2);
|
|
@@ -300,8 +302,10 @@ public class NetworkTopology {
|
|
|
|
|
|
InnerNode clusterMap = new InnerNode(InnerNode.ROOT); // the root
|
|
InnerNode clusterMap = new InnerNode(InnerNode.ROOT); // the root
|
|
private int numOfRacks = 0; // rack counter
|
|
private int numOfRacks = 0; // rack counter
|
|
|
|
+ private ReadWriteLock netlock;
|
|
|
|
|
|
public NetworkTopology() {
|
|
public NetworkTopology() {
|
|
|
|
+ netlock = new ReentrantReadWriteLock();
|
|
}
|
|
}
|
|
|
|
|
|
/** Add a data node
|
|
/** Add a data node
|
|
@@ -310,21 +314,26 @@ public class NetworkTopology {
|
|
* data node to be added
|
|
* data node to be added
|
|
* @exception IllegalArgumentException if add a data node to a leave
|
|
* @exception IllegalArgumentException if add a data node to a leave
|
|
*/
|
|
*/
|
|
- public synchronized void add(DatanodeDescriptor node) {
|
|
|
|
|
|
+ public void add(DatanodeDescriptor node) {
|
|
if (node==null) return;
|
|
if (node==null) return;
|
|
|
|
+ netlock.writeLock().lock();
|
|
LOG.info("Adding a new node: "+node.getPath());
|
|
LOG.info("Adding a new node: "+node.getPath());
|
|
- Node rack = getNode(node.getNetworkLocation());
|
|
|
|
- if (rack != null && !(rack instanceof InnerNode)) {
|
|
|
|
- throw new IllegalArgumentException("Unexpected data node "
|
|
|
|
- + node.toString()
|
|
|
|
- + " at an illegal network location");
|
|
|
|
- }
|
|
|
|
- if (clusterMap.add(node)) {
|
|
|
|
- if (rack == null) {
|
|
|
|
- numOfRacks++;
|
|
|
|
|
|
+ try {
|
|
|
|
+ Node rack = getNode(node.getNetworkLocation());
|
|
|
|
+ if (rack != null && !(rack instanceof InnerNode)) {
|
|
|
|
+ throw new IllegalArgumentException("Unexpected data node "
|
|
|
|
+ + node.toString()
|
|
|
|
+ + " at an illegal network location");
|
|
|
|
+ }
|
|
|
|
+ if (clusterMap.add(node)) {
|
|
|
|
+ if (rack == null) {
|
|
|
|
+ numOfRacks++;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ LOG.debug("NetworkTopology became:\n" + this.toString());
|
|
|
|
+ } finally {
|
|
|
|
+ netlock.writeLock().unlock();
|
|
}
|
|
}
|
|
- LOG.debug("NetworkTopology became:\n" + this.toString());
|
|
|
|
}
|
|
}
|
|
|
|
|
|
/** Remove a data node
|
|
/** Remove a data node
|
|
@@ -332,16 +341,21 @@ public class NetworkTopology {
|
|
* @param node
|
|
* @param node
|
|
* data node to be removed
|
|
* data node to be removed
|
|
*/
|
|
*/
|
|
- public synchronized void remove(DatanodeDescriptor node) {
|
|
|
|
|
|
+ public void remove(DatanodeDescriptor node) {
|
|
if (node==null) return;
|
|
if (node==null) return;
|
|
|
|
+ netlock.writeLock().lock();
|
|
LOG.info("Removing a node: "+node.getPath());
|
|
LOG.info("Removing a node: "+node.getPath());
|
|
- if (clusterMap.remove(node)) {
|
|
|
|
- InnerNode rack = (InnerNode)getNode(node.getNetworkLocation());
|
|
|
|
- if (rack == null) {
|
|
|
|
- numOfRacks--;
|
|
|
|
|
|
+ try {
|
|
|
|
+ if (clusterMap.remove(node)) {
|
|
|
|
+ InnerNode rack = (InnerNode)getNode(node.getNetworkLocation());
|
|
|
|
+ if (rack == null) {
|
|
|
|
+ numOfRacks--;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ LOG.debug("NetworkTopology became:\n" + this.toString());
|
|
|
|
+ } finally {
|
|
|
|
+ netlock.writeLock().unlock();
|
|
}
|
|
}
|
|
- LOG.debug("NetworkTopology became:\n" + this.toString());
|
|
|
|
}
|
|
}
|
|
|
|
|
|
/** Check if the tree contains data node <i>node</i>
|
|
/** Check if the tree contains data node <i>node</i>
|
|
@@ -350,13 +364,18 @@ public class NetworkTopology {
|
|
* a data node
|
|
* a data node
|
|
* @return true if <i>node</i> is already in the tree; false otherwise
|
|
* @return true if <i>node</i> is already in the tree; false otherwise
|
|
*/
|
|
*/
|
|
- public synchronized boolean contains(DatanodeDescriptor node) {
|
|
|
|
|
|
+ public boolean contains(DatanodeDescriptor node) {
|
|
if (node == null) return false;
|
|
if (node == null) return false;
|
|
- Node parent = node.getParent();
|
|
|
|
- for(int level=node.getLevel(); parent!=null&&level>0;
|
|
|
|
- parent=parent.getParent(), level--) {
|
|
|
|
- if (parent == clusterMap)
|
|
|
|
- return true;
|
|
|
|
|
|
+ netlock.readLock().lock();
|
|
|
|
+ try {
|
|
|
|
+ Node parent = node.getParent();
|
|
|
|
+ for(int level=node.getLevel(); parent!=null&&level>0;
|
|
|
|
+ parent=parent.getParent(), level--) {
|
|
|
|
+ if (parent == clusterMap)
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
|
|
+ } finally {
|
|
|
|
+ netlock.readLock().unlock();
|
|
}
|
|
}
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
@@ -367,7 +386,7 @@ public class NetworkTopology {
|
|
* a path-like string representation of a node
|
|
* a path-like string representation of a node
|
|
* @return a reference to the node; null if the node is not in the tree
|
|
* @return a reference to the node; null if the node is not in the tree
|
|
*/
|
|
*/
|
|
- public synchronized Node getNode(String loc) {
|
|
|
|
|
|
+ private Node getNode(String loc) {
|
|
loc = NodeBase.normalize(loc);
|
|
loc = NodeBase.normalize(loc);
|
|
if (!NodeBase.ROOT.equals(loc))
|
|
if (!NodeBase.ROOT.equals(loc))
|
|
loc = loc.substring(1);
|
|
loc = loc.substring(1);
|
|
@@ -375,13 +394,23 @@ public class NetworkTopology {
|
|
}
|
|
}
|
|
|
|
|
|
/** Return the total number of racks */
|
|
/** Return the total number of racks */
|
|
- public synchronized int getNumOfRacks() {
|
|
|
|
- return numOfRacks;
|
|
|
|
|
|
+ public int getNumOfRacks() {
|
|
|
|
+ netlock.readLock().lock();
|
|
|
|
+ try {
|
|
|
|
+ return numOfRacks;
|
|
|
|
+ } finally {
|
|
|
|
+ netlock.readLock().unlock();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/** Return the total number of data nodes */
|
|
/** Return the total number of data nodes */
|
|
- public synchronized int getNumOfLeaves() {
|
|
|
|
- return clusterMap.getNumOfLeaves();
|
|
|
|
|
|
+ public int getNumOfLeaves() {
|
|
|
|
+ netlock.readLock().lock();
|
|
|
|
+ try {
|
|
|
|
+ return clusterMap.getNumOfLeaves();
|
|
|
|
+ } finally {
|
|
|
|
+ netlock.readLock().unlock();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/** Return the distance between two data nodes
|
|
/** Return the distance between two data nodes
|
|
@@ -397,24 +426,28 @@ public class NetworkTopology {
|
|
if (node1 == node2) {
|
|
if (node1 == node2) {
|
|
return 0;
|
|
return 0;
|
|
}
|
|
}
|
|
- int i;
|
|
|
|
Node n1=node1, n2=node2;
|
|
Node n1=node1, n2=node2;
|
|
- int level1=node1.getLevel(), level2=node2.getLevel();
|
|
|
|
int dis = 0;
|
|
int dis = 0;
|
|
- while(n1!=null && level1>level2) {
|
|
|
|
- n1 = n1.getParent();
|
|
|
|
- level1--;
|
|
|
|
- dis++;
|
|
|
|
- }
|
|
|
|
- while(n2!=null && level2>level1) {
|
|
|
|
- n2 = n2.getParent();
|
|
|
|
- level2--;
|
|
|
|
- dis++;
|
|
|
|
- }
|
|
|
|
- while(n1!=null && n2!=null && n1.getParent()!=n2.getParent()) {
|
|
|
|
- n1=n1.getParent();
|
|
|
|
- n2=n2.getParent();
|
|
|
|
- dis+=2;
|
|
|
|
|
|
+ netlock.readLock().lock();
|
|
|
|
+ try {
|
|
|
|
+ int level1=node1.getLevel(), level2=node2.getLevel();
|
|
|
|
+ while(n1!=null && level1>level2) {
|
|
|
|
+ n1 = n1.getParent();
|
|
|
|
+ level1--;
|
|
|
|
+ dis++;
|
|
|
|
+ }
|
|
|
|
+ while(n2!=null && level2>level1) {
|
|
|
|
+ n2 = n2.getParent();
|
|
|
|
+ level2--;
|
|
|
|
+ dis++;
|
|
|
|
+ }
|
|
|
|
+ while(n1!=null && n2!=null && n1.getParent()!=n2.getParent()) {
|
|
|
|
+ n1=n1.getParent();
|
|
|
|
+ n2=n2.getParent();
|
|
|
|
+ dis+=2;
|
|
|
|
+ }
|
|
|
|
+ } finally {
|
|
|
|
+ netlock.readLock().unlock();
|
|
}
|
|
}
|
|
if (n1==null) {
|
|
if (n1==null) {
|
|
LOG.warn("The cluster does not contain data node: "+node1.getPath());
|
|
LOG.warn("The cluster does not contain data node: "+node1.getPath());
|
|
@@ -440,11 +473,16 @@ public class NetworkTopology {
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
|
|
|
|
- if (node1 == node2 || node1.equals(node2)) {
|
|
|
|
- return true;
|
|
|
|
- }
|
|
|
|
|
|
+ netlock.readLock().lock();
|
|
|
|
+ try {
|
|
|
|
+ if (node1 == node2 || node1.equals(node2)) {
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
|
|
|
|
- return node1.getParent()==node2.getParent();
|
|
|
|
|
|
+ return node1.getParent()==node2.getParent();
|
|
|
|
+ } finally {
|
|
|
|
+ netlock.readLock().unlock();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
final private static Random r = new Random();
|
|
final private static Random r = new Random();
|
|
@@ -455,10 +493,15 @@ public class NetworkTopology {
|
|
* @return the choosen data node
|
|
* @return the choosen data node
|
|
*/
|
|
*/
|
|
public DatanodeDescriptor chooseRandom(String scope) {
|
|
public DatanodeDescriptor chooseRandom(String scope) {
|
|
- if (scope.startsWith("~")) {
|
|
|
|
- return chooseRandom(NodeBase.ROOT, scope.substring(1));
|
|
|
|
- } else {
|
|
|
|
- return chooseRandom(scope, null);
|
|
|
|
|
|
+ netlock.readLock().lock();
|
|
|
|
+ try {
|
|
|
|
+ if (scope.startsWith("~")) {
|
|
|
|
+ return chooseRandom(NodeBase.ROOT, scope.substring(1));
|
|
|
|
+ } else {
|
|
|
|
+ return chooseRandom(scope, null);
|
|
|
|
+ }
|
|
|
|
+ } finally {
|
|
|
|
+ netlock.readLock().unlock();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -507,22 +550,27 @@ public class NetworkTopology {
|
|
}
|
|
}
|
|
scope = NodeBase.normalize(scope);
|
|
scope = NodeBase.normalize(scope);
|
|
int count=0; // the number of nodes in both scope & excludedNodes
|
|
int count=0; // the number of nodes in both scope & excludedNodes
|
|
- for(DatanodeDescriptor node:excludedNodes) {
|
|
|
|
- if ((node.getPath()+NodeBase.PATH_SEPARATOR_STR).
|
|
|
|
- startsWith(scope+NodeBase.PATH_SEPARATOR_STR)) {
|
|
|
|
- count++;
|
|
|
|
|
|
+ netlock.readLock().lock();
|
|
|
|
+ try {
|
|
|
|
+ for(DatanodeDescriptor node:excludedNodes) {
|
|
|
|
+ if ((node.getPath()+NodeBase.PATH_SEPARATOR_STR).
|
|
|
|
+ startsWith(scope+NodeBase.PATH_SEPARATOR_STR)) {
|
|
|
|
+ count++;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- }
|
|
|
|
- Node n=getNode(scope);
|
|
|
|
- int scopeNodeCount=1;
|
|
|
|
- if (n instanceof InnerNode) {
|
|
|
|
- scopeNodeCount=((InnerNode)n).getNumOfLeaves();
|
|
|
|
- }
|
|
|
|
- if (isExcluded) {
|
|
|
|
- return clusterMap.getNumOfLeaves()-
|
|
|
|
- scopeNodeCount-excludedNodes.size()+count;
|
|
|
|
- } else {
|
|
|
|
- return scopeNodeCount-count;
|
|
|
|
|
|
+ Node n=getNode(scope);
|
|
|
|
+ int scopeNodeCount=1;
|
|
|
|
+ if (n instanceof InnerNode) {
|
|
|
|
+ scopeNodeCount=((InnerNode)n).getNumOfLeaves();
|
|
|
|
+ }
|
|
|
|
+ if (isExcluded) {
|
|
|
|
+ return clusterMap.getNumOfLeaves()-
|
|
|
|
+ scopeNodeCount-excludedNodes.size()+count;
|
|
|
|
+ } else {
|
|
|
|
+ return scopeNodeCount-count;
|
|
|
|
+ }
|
|
|
|
+ } finally {
|
|
|
|
+ netlock.readLock().unlock();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -549,21 +597,22 @@ public class NetworkTopology {
|
|
/* Set and used only inside sortByDistance.
|
|
/* Set and used only inside sortByDistance.
|
|
* This saves an allocation each time we sort.
|
|
* This saves an allocation each time we sort.
|
|
*/
|
|
*/
|
|
- private DatanodeDescriptor distFrom = null;
|
|
|
|
|
|
+ private static ThreadLocal<DatanodeDescriptor> distFrom =
|
|
|
|
+ new ThreadLocal<DatanodeDescriptor>();
|
|
private final Comparator<DatanodeDescriptor> nodeDistanceComparator =
|
|
private final Comparator<DatanodeDescriptor> nodeDistanceComparator =
|
|
new Comparator<DatanodeDescriptor>() {
|
|
new Comparator<DatanodeDescriptor>() {
|
|
public int compare(DatanodeDescriptor n1, DatanodeDescriptor n2) {
|
|
public int compare(DatanodeDescriptor n1, DatanodeDescriptor n2) {
|
|
- return getDistance(distFrom, n1) - getDistance(distFrom, n2);
|
|
|
|
|
|
+ return getDistance(distFrom.get(), n1) - getDistance(distFrom.get(), n2);
|
|
}
|
|
}
|
|
};
|
|
};
|
|
|
|
|
|
/** Sorts nodes array by their distances to <i>reader</i>. */
|
|
/** Sorts nodes array by their distances to <i>reader</i>. */
|
|
- public synchronized void sortByDistance(final DatanodeDescriptor reader,
|
|
|
|
- DatanodeDescriptor[] nodes) {
|
|
|
|
|
|
+ public void sortByDistance(final DatanodeDescriptor reader,
|
|
|
|
+ DatanodeDescriptor[] nodes) {
|
|
if (reader != null && contains(reader)) {
|
|
if (reader != null && contains(reader)) {
|
|
- distFrom = reader;
|
|
|
|
|
|
+ distFrom.set(reader);
|
|
Arrays.sort(nodes, nodeDistanceComparator);
|
|
Arrays.sort(nodes, nodeDistanceComparator);
|
|
- distFrom = null;
|
|
|
|
|
|
+ distFrom.set(null);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|