Browse Source

HADOOP-1448. In HDFS, randomize lists of non-local block locations returned to client. Contributed by Hairong.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@555373 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 years ago
parent
commit
bc4a29e7fd

+ 4 - 0
CHANGES.txt

@@ -296,6 +296,10 @@ Trunk (unreleased changes)
  91. HADOOP-1580.  Improve contrib/streaming so that subprocess exit
      status is displayed for errors.  (John Heidemann via cutting)
 
+ 92. HADOOP-1448.  In HDFS, randomize lists of non-local block
+     locations returned to client, so that load is better balanced.
+     (Hairong Kuang via cutting)
+
 
 Release 0.13.0 - 2007-06-08
 

+ 1 - 1
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -450,7 +450,7 @@ class FSNamesystem implements FSConstants {
     for (Iterator<LocatedBlock> it = blocks.getLocatedBlocks().iterator();
          it.hasNext();) {
       LocatedBlock block = (LocatedBlock) it.next();
-      clusterMap.sortByDistance(client, 
+      clusterMap.pseudoSortByDistance(client, 
                                 (DatanodeDescriptor[])(block.getLocations()));
     }
     return blocks;

+ 52 - 23
src/java/org/apache/hadoop/net/NetworkTopology.java

@@ -19,10 +19,8 @@ package org.apache.hadoop.net;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Random;
-import java.util.Arrays;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -473,10 +471,6 @@ public class NetworkTopology {
       
     netlock.readLock().lock();
     try {
-      if (node1 == node2 || node1.equals(node2)) {
-        return true;
-      }
-        
       return node1.getParent()==node2.getParent();
     } finally {
       netlock.readLock().unlock();
@@ -592,25 +586,60 @@ public class NetworkTopology {
     return tree.toString();
   }
 
-  /* Set and used only inside sortByDistance. 
-   * This saves an allocation each time we sort.
+  /* swap two array items */
+  static private void swap(DatanodeDescriptor[] nodes, int i, int j) {
+    DatanodeDescriptor tempNode;
+    tempNode = nodes[j];
+    nodes[j] = nodes[i];
+    nodes[i] = tempNode;
+    
+  }
+  
+  /** Sort nodes array by their distances to <i>reader</i>
+   * It linearly scans the array, if a local node is found, swap it with
+   * the first element of the array.
+   * If a local rack node is found, swap it with the first element following
+   * the local node.
+   * If neither local node or local rack node is found, put a random replica
+   * location at postion 0.
+   * It leaves the rest nodes untouched.
    */
-  private static ThreadLocal<DatanodeDescriptor> distFrom = 
-    new ThreadLocal<DatanodeDescriptor>();
-  private final Comparator<DatanodeDescriptor> nodeDistanceComparator = 
-    new Comparator<DatanodeDescriptor>() {
-      public int compare(DatanodeDescriptor n1, DatanodeDescriptor n2) {
-        return getDistance(distFrom.get(), n1) - getDistance(distFrom.get(), n2);
+  public synchronized void pseudoSortByDistance(
+      DatanodeDescriptor reader, DatanodeDescriptor[] nodes ) {
+    int tempIndex = 0;
+    if (reader != null ) {
+      int localRackNode = -1;
+      //scan the array to find the local node & local rack node
+      for(int i=0; i<nodes.length; i++) {
+        if(tempIndex == 0 && reader == nodes[i]) { //local node
+          //swap the local node and the node at position 0
+          if( i != 0 ) {
+            swap(nodes, tempIndex, i);
+          }
+          tempIndex=1;
+          if(localRackNode != -1 ) {
+            if(localRackNode == 0) {
+              localRackNode = i;
+            }
+            break;
+          }
+        } else if(localRackNode == -1 && isOnSameRack(reader, nodes[i])) {
+          //local rack
+          localRackNode = i;
+          if(tempIndex != 0 ) break;
+        }
       }
-    };
-      
-  /** Sorts nodes array by their distances to <i>reader</i>. */
-  public void sortByDistance(final DatanodeDescriptor reader,
-                             DatanodeDescriptor[] nodes) { 
-    if (reader != null && contains(reader)) {
-      distFrom.set(reader);
-      Arrays.sort(nodes, nodeDistanceComparator);
-      distFrom.set(null);
+
+      // swap the local rack node and the node at position tempIndex
+      if(localRackNode != -1 && localRackNode != tempIndex ) {
+        swap(nodes, tempIndex, localRackNode);
+        tempIndex++;
+      }
+    }
+    
+    // put a random node at position 0 if it is not a local/local-rack node
+    if(tempIndex == 0) {
+      swap(nodes, 0, r.nextInt(nodes.length));
     }
   }
 }

+ 9 - 4
src/test/org/apache/hadoop/dfs/TestReplication.java

@@ -99,10 +99,15 @@ public class TestReplication extends TestCase {
       }
       isOnSameRack = false;
       isNotOnSameRack = false;
-      for (int idy = 0; idy < datanodes.length-1; idy++) {
-        LOG.info("datanode "+ idy + ": "+ datanodes[idy].getName());
-        boolean onRack = datanodes[idy].getNetworkLocation().equals(
-                                                                    datanodes[idy+1].getNetworkLocation());
+      for (int i = 0; i < datanodes.length-1; i++) {
+        LOG.info("datanode "+ i + ": "+ datanodes[i].getName());
+        boolean onRack = false;
+        for( int j=i+1; j<datanodes.length; j++) {
+           if( datanodes[i].getNetworkLocation().equals(
+            datanodes[j].getNetworkLocation()) ) {
+             onRack = true;
+           }
+        }
         if (onRack) {
           isOnSameRack = true;
         }

+ 32 - 1
src/test/org/apache/hadoop/net/TestNetworkTopology.java

@@ -25,7 +25,7 @@ public class TestNetworkTopology extends TestCase {
     }
   }
   
-  public void testContains() {
+  public void testContains() throws Exception {
     for(int i=0; i<dataNodes.length; i++) {
       assertTrue(cluster.contains(dataNodes[i]));
     }
@@ -53,6 +53,37 @@ public class TestNetworkTopology extends TestCase {
     assertEquals(cluster.getDistance(dataNodes[0], dataNodes[6]), 6);
   }
 
+  public void testPseudoSortByDistance() throws Exception {
+    DatanodeDescriptor[] testNodes = new DatanodeDescriptor[3];
+    
+    // array contains both local node & local rack node
+    testNodes[0] = dataNodes[1];
+    testNodes[1] = dataNodes[2];
+    testNodes[2] = dataNodes[0];
+    cluster.pseudoSortByDistance(dataNodes[0], testNodes );
+    assertTrue(testNodes[0] == dataNodes[0]);
+    assertTrue(testNodes[1] == dataNodes[1]);
+    assertTrue(testNodes[2] == dataNodes[2]);
+
+    // array contains local node
+    testNodes[0] = dataNodes[1];
+    testNodes[1] = dataNodes[3];
+    testNodes[2] = dataNodes[0];
+    cluster.pseudoSortByDistance(dataNodes[0], testNodes );
+    assertTrue(testNodes[0] == dataNodes[0]);
+    assertTrue(testNodes[1] == dataNodes[1]);
+    assertTrue(testNodes[2] == dataNodes[3]);
+
+    // array contains local rack node
+    testNodes[0] = dataNodes[5];
+    testNodes[1] = dataNodes[3];
+    testNodes[2] = dataNodes[1];
+    cluster.pseudoSortByDistance(dataNodes[0], testNodes );
+    assertTrue(testNodes[0] == dataNodes[1]);
+    assertTrue(testNodes[1] == dataNodes[3]);
+    assertTrue(testNodes[2] == dataNodes[5]);
+  }
+  
   public void testRemove() throws Exception {
     for(int i=0; i<dataNodes.length; i++) {
       cluster.remove(dataNodes[i]);