Sfoglia il codice sorgente

HADOOP-725. Optimize DFS block placement algorithm. Contributed by Milind.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@477400 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 anni fa
parent
commit
26ea824410

+ 3 - 0
CHANGES.txt

@@ -91,6 +91,9 @@ Trunk (unreleased changes)
 27. HADOOP-652.  In DFS, when a file is deleted, the block count is
     now decremented.  (Vladimir Krokhmalyov via cutting)
 
+28. HADOOP-725.  In DFS, optimize block placement algorithm,
+    previously a performance bottleneck.  (Milind Bhandarkar via cutting)
+
 
 Release 0.8.0 - 2006-11-03
 

+ 3 - 11
src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java

@@ -30,6 +30,9 @@ import java.util.*;
 class DatanodeDescriptor extends DatanodeInfo {
 
   private volatile Collection<Block> blocks = new TreeSet<Block>();
+  // isAlive == heartbeats.contains(this)
+  // This is an optimization, because contains takes O(n) time on Arraylist
+  protected boolean isAlive = false;
 
   DatanodeDescriptor() {
     super();
@@ -81,17 +84,6 @@ class DatanodeDescriptor extends DatanodeInfo {
     this.xceiverCount = xceiverCount;
   }
   
-  /**
-   * Verify whether the node is dead.
-   * 
-   * A data node is considered dead if its last heartbeat was received
-   * EXPIRE_INTERVAL msecs ago.
-   */
-  boolean isDead() {
-    return getLastUpdate() < 
-              System.currentTimeMillis() - FSConstants.EXPIRE_INTERVAL;
-  }
-
   Block[] getBlocks() {
     return (Block[]) blocks.toArray(new Block[blocks.size()]);
   }

+ 2 - 1
src/java/org/apache/hadoop/dfs/DatanodeID.java

@@ -82,7 +82,8 @@ public class DatanodeID implements WritableComparable {
   }
   
   public boolean equals( Object to ) {
-    return (this.compareTo( to ) != 0);
+    return (name.equals(((DatanodeID)to).getName()) &&
+        storageID.equals(((DatanodeID)to).getStorageID()));
   }
   
   public int hashCode() {

+ 0 - 1
src/java/org/apache/hadoop/dfs/FSConstants.java

@@ -103,7 +103,6 @@ public interface FSConstants {
     // Timeouts, constants
     //
     public static long HEARTBEAT_INTERVAL = 3;
-    public static long EXPIRE_INTERVAL = 10 * 60 * 1000;
     public static long BLOCKREPORT_INTERVAL = 60 * 60 * 1000;
     public static final long LEASE_SOFTLIMIT_PERIOD = 60 * 1000;
     public static final long LEASE_HARDLIMIT_PERIOD = 60 * LEASE_SOFTLIMIT_PERIOD;

+ 140 - 178
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -122,6 +122,9 @@ class FSNamesystem implements FSConstants {
     // Stats on overall usage
     //
     long totalCapacity = 0, totalRemaining = 0;
+    
+    // total number of connections per live datanode
+    int totalLoad = 0;
 
     
     //
@@ -136,26 +139,13 @@ class FSNamesystem implements FSConstants {
     Random r = new Random();
 
     /**
-     * Stores a set of DatanodeDescriptor objects, sorted by heartbeat.
+     * Stores a set of DatanodeDescriptor objects.
      * This is a subset of {@link #datanodeMap}, containing nodes that are 
      * considered alive.
      * The {@link HeartbeatMonitor} periodically checks for outdated entries,
-     * and removes them from the set.
-     */
-    TreeSet<DatanodeDescriptor> heartbeats = 
-      new TreeSet<DatanodeDescriptor>(
-        new Comparator<DatanodeDescriptor>() {
-          public int compare(DatanodeDescriptor d1, DatanodeDescriptor d2) {
-            long lu1 = d1.getLastUpdate();
-            long lu2 = d2.getLastUpdate();
-            if (lu1 < lu2)
-              return -1;
-            if (lu1 > lu2)
-              return 1;
-            return d1.getStorageID().compareTo(d2.getStorageID());
-          }
-        }
-      );
+     * and removes them from the list.
+     */
+    ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>();
 
     //
     // Store set of Blocks that need to be replicated 1 or more times.
@@ -189,8 +179,11 @@ class FSNamesystem implements FSConstants {
     private int maxReplicationStreams;
     // MIN_REPLICATION is how many copies we need in place or else we disallow the write
     private int minReplication;
-    // HEARTBEAT_RECHECK is how often a datanode sends its hearbeat
-    private int heartBeatRecheck;
+    // heartbeatRecheckInterval is how often namenode checks for expired datanodes
+    private long heartbeatRecheckInterval;
+    // heartbeatExpireInterval is how long namenode waits for datanode to report
+    // heartbeat
+    private long heartbeatExpireInterval;
 
     public static FSNamesystem fsNamesystemObject;
     private String localMachine;
@@ -222,7 +215,10 @@ class FSNamesystem implements FSConstants {
               + " must be less than dfs.replication.max = " 
               + maxReplication );
         this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);
-        this.heartBeatRecheck= 1000;
+        long heartbeatInterval = conf.getLong("dfs.heartbeat.interval", 3) * 1000;
+        this.heartbeatRecheckInterval = 5 * 60 * 1000; // 5 minutes
+        this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval +
+            10 * heartbeatInterval;
 
         this.localMachine = addr.getHostName();
         this.port = addr.getPort();
@@ -1206,7 +1202,7 @@ class FSNamesystem implements FSConstants {
         // The same datanode has been just restarted to serve the same data 
         // storage. We do not need to remove old data blocks, the delta will  
         // be calculated on the next block report from the datanode
-        NameNode.stateChangeLog.debug(
+        NameNode.stateChangeLog.info(
             "BLOCK* NameSystem.registerDatanode: "
             + "node restarted." );
         return;
@@ -1249,6 +1245,8 @@ class FSNamesystem implements FSConstants {
       }
       // register new datanode
       DatanodeDescriptor nodeDescr = new DatanodeDescriptor( nodeReg );
+      // unless we get a heartbeat from this datanode, we will not mark it Alive
+      nodeDescr.isAlive = false;
       unprotectedAddDatanode( nodeDescr );
       getEditLog().logAddDatanode( nodeDescr );
       return;
@@ -1283,6 +1281,11 @@ class FSNamesystem implements FSConstants {
       return newID;
     }
     
+    private boolean isDatanodeDead(DatanodeDescriptor node) {
+      return (node.getLastUpdate() <
+          (System.currentTimeMillis() - heartbeatExpireInterval));
+    }
+    
     /**
      * The given node has reported in.  This method should:
      * 1) Record the heartbeat, so the datanode isn't timed out
@@ -1303,19 +1306,36 @@ class FSNamesystem implements FSConstants {
       synchronized (heartbeats) {
         synchronized (datanodeMap) {
           DatanodeDescriptor nodeinfo = getDatanode( nodeID );
-          needBlockReport = nodeinfo.isDead(); 
+          needBlockReport = isDatanodeDead(nodeinfo); 
           
           if (nodeinfo == null) 
             // We do not accept unregistered guests
             throw new UnregisteredDatanodeException( nodeID );
-          removeHeartbeat(nodeinfo);
+          if (nodeinfo.isAlive) {
+            updateStats(nodeinfo, false);
+          }
           nodeinfo.updateHeartbeat(capacity, remaining, xceiverCount);
-          addHeartbeat(nodeinfo);
+          updateStats(nodeinfo, true);
+          if (!nodeinfo.isAlive) {
+            heartbeats.add(nodeinfo);
+            nodeinfo.isAlive = true;
+          }
         }
       }
       return needBlockReport;
     }
 
+    private void updateStats(DatanodeDescriptor node, boolean isAdded) {
+      if (isAdded) {
+        totalCapacity += node.getCapacity();
+        totalRemaining += node.getRemaining();
+        totalLoad += node.getXceiverCount();
+      } else {
+        totalCapacity -= node.getCapacity();
+        totalRemaining -= node.getRemaining();
+        totalLoad -= node.getXceiverCount();
+      }
+    }
     /**
      * Periodically calls heartbeatCheck().
      */
@@ -1326,7 +1346,7 @@ class FSNamesystem implements FSConstants {
             while (fsRunning) {
                 heartbeatCheck();
                 try {
-                    Thread.sleep(heartBeatRecheck);
+                    Thread.sleep(heartbeatRecheckInterval);
                 } catch (InterruptedException ie) {
                 }
             }
@@ -1355,7 +1375,11 @@ class FSNamesystem implements FSConstants {
    * @author hairong
    */
     private void removeDatanode( DatanodeDescriptor nodeInfo ) {
-      removeHeartbeat(nodeInfo);
+      if (nodeInfo.isAlive) {
+        updateStats(nodeInfo, false);
+        heartbeats.remove(nodeInfo);
+        nodeInfo.isAlive = false;
+      }
 
       Block deadblocks[] = nodeInfo.getBlocks();
       if( deadblocks != null )
@@ -1380,17 +1404,6 @@ class FSNamesystem implements FSConstants {
           + "node " + nodeDescr.getName() + " is added to datanodeMap." );
     }
 
-    private void addHeartbeat( DatanodeDescriptor nodeDescr ) {
-      heartbeats.add(nodeDescr);
-      totalCapacity += nodeDescr.capacity;
-      totalRemaining += nodeDescr.remaining;
-    }
-    
-    private void removeHeartbeat( DatanodeDescriptor nodeDescr ) {
-      totalCapacity -= nodeDescr.getCapacity();
-      totalRemaining -= nodeDescr.getRemaining();
-      heartbeats.remove(nodeDescr);
-    }
     
     /**
      * Physically remove node from datanodeMap.
@@ -1412,18 +1425,30 @@ class FSNamesystem implements FSConstants {
     /**
      * Check if there are any expired heartbeats, and if so,
      * whether any blocks have to be re-replicated.
-     */
-    synchronized void heartbeatCheck() {
-      synchronized (heartbeats) {
-        DatanodeDescriptor nodeInfo = null;
-
-        while ((heartbeats.size() > 0) &&
-               ((nodeInfo = heartbeats.first()) != null) &&
-               (nodeInfo.isDead())) {
-          NameNode.stateChangeLog.info("BLOCK* NameSystem.heartbeatCheck: "
-              + "lost heartbeat from " + nodeInfo.getName());
-          removeDatanode( nodeInfo );
+     * While removing dead datanodes, make sure that only one datanode is marked
+     * dead at a time within the synchronized section. Otherwise, a cascading
+     * effect causes more datanodes to be declared dead.
+     */
+    void heartbeatCheck() {
+      boolean allAlive = false;
+      while (!allAlive) {
+        boolean foundDead = false;
+        synchronized(this) {
+          synchronized (heartbeats) {
+            for (Iterator<DatanodeDescriptor> it = heartbeats.iterator();
+            it.hasNext();) {
+              DatanodeDescriptor nodeInfo = it.next();
+              if (isDatanodeDead(nodeInfo)) {
+                NameNode.stateChangeLog.info("BLOCK* NameSystem.heartbeatCheck: "
+                    + "lost heartbeat from " + nodeInfo.getName());
+                removeDatanode( nodeInfo );
+                foundDead = true;
+                break;
+              }
+            }
+          }
         }
+        allAlive = ! foundDead;
       }
     }
     
@@ -1744,7 +1769,7 @@ class FSNamesystem implements FSConstants {
         synchronized (datanodeMap) {
           for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext(); ) {
             DatanodeDescriptor node = it.next();
-            if( node.isDead() )
+            if( isDatanodeDead(node))
               dead.add( node );
             else
               live.add( node );
@@ -1946,151 +1971,88 @@ class FSNamesystem implements FSConstants {
                             Collection<DatanodeDescriptor> forbiddenNodes,
                             UTF8 clientMachine, 
                             long blockSize) {
+        Collection<DatanodeDescriptor> targets = new ArrayList<DatanodeDescriptor>();
+        
         if (desiredReplicates > heartbeats.size()) {
           LOG.warn("Replication requested of "+desiredReplicates
                       +" is larger than cluster size ("+heartbeats.size()
                       +"). Using cluster size.");
           desiredReplicates  = heartbeats.size();
+          if (desiredReplicates == 0) {
+            LOG.warn("While choosing target, totalMachines is " + desiredReplicates);
+          }
         }
-
-        Collection<DatanodeDescriptor> alreadyChosen;
-        alreadyChosen = new TreeSet<DatanodeDescriptor>();
-        Collection<DatanodeDescriptor> targets = new ArrayList<DatanodeDescriptor>();
-
-        for (int i = 0; i < desiredReplicates; i++) {
-            DatanodeDescriptor target = chooseTarget(forbiddenNodes, alreadyChosen, 
-                                               clientMachine, blockSize);
-            if (target == null)
-              break; // calling chooseTarget again won't help
-            targets.add(target);
-            alreadyChosen.add(target);
-        }
-        return (DatanodeDescriptor[]) targets.toArray(new DatanodeDescriptor[targets.size()]);
-    }
-
-    /**
-     * Choose a target from available machines, excepting the
-     * given ones.
-     *
-     * Right now it chooses randomly from available boxes.  In future could 
-     * choose according to capacity and load-balancing needs (or even 
-     * network-topology, to avoid inter-switch traffic).
-     * @param forbidden1 DatanodeDescriptor targets not allowed, null allowed.
-     * @param forbidden2 DatanodeDescriptor targets not allowed, null allowed.
-     * @return DatanodeDescriptor instance to use or null if something went wrong
-     * (a log message is emitted if null is returned).
-     */
-    DatanodeDescriptor chooseTarget(Collection<DatanodeDescriptor> forbidden1, 
-                                    Collection<DatanodeDescriptor> forbidden2, 
-                                    UTF8 clientMachine, 
-                                    long blockSize) {
-        //
-        // Check if there are any available targets at all
-        //
-        int totalMachines = heartbeats.size();
-        if (totalMachines == 0) {
-            LOG.warn("While choosing target, totalMachines is " + totalMachines);
-            return null;
-        }
-
-        //
-        // Build a map of forbidden hostnames from the two forbidden sets.
-        //
-        Collection<DatanodeDescriptor> forbiddenMachines = new TreeSet();
-        if (forbidden1 != null) {
-            for (Iterator<DatanodeDescriptor> it = forbidden1.iterator(); it.hasNext(); ) {
-                DatanodeDescriptor cur = it.next();
-                forbiddenMachines.add(cur);
-            }
-        }
-        if (forbidden2 != null) {
-            for (Iterator<DatanodeDescriptor> it = forbidden2.iterator(); it.hasNext(); ) {
-                DatanodeDescriptor cur = it.next();
-                forbiddenMachines.add(cur);
-            }
-        }
-
-        double avgLoad = 0.0;
-        //
-        // Build list of machines we can actually choose from
-        //
-        List<DatanodeDescriptor> targetList = new ArrayList<DatanodeDescriptor>();
-        for (Iterator<DatanodeDescriptor> it = heartbeats.iterator(); it.hasNext(); ) {
-            DatanodeDescriptor node = it.next();
-            if (! forbiddenMachines.contains(node)) {
-                targetList.add(node);
-                avgLoad += node.getXceiverCount();
-            }
-        }
-        if (targetList.size() > 0) { avgLoad = avgLoad/targetList.size(); }
-        
-        Collections.shuffle(targetList);
         
-        //
-        // Now pick one
-        //
-        if (targetList.size() > 0) {
-            //
-            // If the requester's machine is in the targetList, 
-            // and it's got the capacity, pick it.
-            //
-            if (clientMachine != null && clientMachine.getLength() > 0) {
-                for (Iterator<DatanodeDescriptor> it = targetList.iterator(); it.hasNext(); ) {
-                    DatanodeDescriptor node = it.next();
-                    if (clientMachine.toString().equals(node.getHost())) {
-                        if ((node.getRemaining() >= blockSize * MIN_BLOCKS_FOR_WRITE) &&
-                            (node.getXceiverCount() <= (2.0 * avgLoad))) {
-                            return node;
-                        }
-                    }
-                }
-            }
-
-            //
-            // Otherwise, choose node according to target capacity
-            //
-            for (Iterator<DatanodeDescriptor> it = targetList.iterator(); it.hasNext(); ) {
-                DatanodeDescriptor node = it.next();
+        double avgLoad = 0.0;
+        if (heartbeats.size() != 0) {
+          avgLoad = (double) totalLoad / heartbeats.size();
+        }
+        // choose local replica first
+        if (desiredReplicates != 0) {
+          // make sure that the client machine is not forbidden
+          if (clientMachine != null && clientMachine.getLength() > 0) {
+            for (Iterator<DatanodeDescriptor> it = heartbeats.iterator();
+                 it.hasNext();) {
+              DatanodeDescriptor node = it.next();
+              if ((forbiddenNodes == null || !forbiddenNodes.contains(node)) &&
+                  clientMachine.toString().equals(node.getHost())) {
                 if ((node.getRemaining() >= blockSize * MIN_BLOCKS_FOR_WRITE) &&
                     (node.getXceiverCount() <= (2.0 * avgLoad))) {
-                    return node;
+                  targets.add(node);
+                  desiredReplicates--;
+                  break;
                 }
+              }
             }
+          }
+        }
 
-            //
-            // If we are still not able to find a good node, then see if
-            // we can pick the clientmachine itself.
-            //
-            if (clientMachine != null && clientMachine.getLength() > 0) {
-                for (Iterator<DatanodeDescriptor> it = targetList.iterator(); it.hasNext(); ) {
-                    DatanodeDescriptor node = it.next();
-                    if (clientMachine.toString().equals(node.getHost()) &&
-                        node.getRemaining() >= blockSize) {
-                        return node;
-                    }
-                }
+        for (int i = 0; i < desiredReplicates; i++) {
+          DatanodeDescriptor target = null;
+          //
+          // Otherwise, choose node according to target capacity
+          //
+          int nNodes = heartbeats.size();
+          int idx = r.nextInt(nNodes);
+          int rejected = 0;
+          while (target == null && rejected < nNodes ) {
+            DatanodeDescriptor node = heartbeats.get(idx);
+            if ((forbiddenNodes == null || !forbiddenNodes.contains(node)) &&
+                !targets.contains(node) &&
+                (node.getRemaining() >= blockSize * MIN_BLOCKS_FOR_WRITE) &&
+                (node.getXceiverCount() <= (2.0 * avgLoad))) {
+              target = node;
+              break;
+            } else {
+              idx = (idx+1) % nNodes;
+              rejected++;
             }
-
-            //
-            // That should do the trick.  But we might not be able
-            // to pick any node if the target was out of bytes.  As
-            // a last resort, pick the first valid one we can find.
-            //
-            for (Iterator<DatanodeDescriptor> it = targetList.iterator(); it.hasNext(); ) {
-                DatanodeDescriptor node = it.next();
-                if (node.getRemaining() >= blockSize) {
-                    return node;
-                }
+          }
+          if (target == null) {
+            idx = r.nextInt(nNodes);
+            rejected = 0;
+            while (target == null && rejected < nNodes ) {
+              DatanodeDescriptor node = heartbeats.get(idx);
+              if ((forbiddenNodes == null || !forbiddenNodes.contains(node)) &&
+                  !targets.contains(node) &&
+                  node.getRemaining() >= blockSize) {
+                target = node;
+                break;
+              } else {
+                idx = (idx + 1) % nNodes;
+                rejected++;
+              }
             }
+          }
+          
+          if (target == null) {
             LOG.warn("Could not find any nodes with sufficient capacity");
-            return null;
-        } else {
-            LOG.warn("Zero targets found, forbidden1.size=" +
-                ( forbidden1 != null ? forbidden1.size() : 0 ) +
-                " forbidden2.size()=" +
-                ( forbidden2 != null ? forbidden2.size() : 0 ));
-            return null;
+            break; // making one more pass over heartbeats would not help
+          }
+          targets.add(target);
         }
+        
+        return (DatanodeDescriptor[]) targets.toArray(new DatanodeDescriptor[targets.size()]);
     }
 
     /**

+ 105 - 0
src/test/org/apache/hadoop/dfs/TestReplication.java

@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.Random;
+import java.net.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FSOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * This class tests the replication of a DFS file.
+ * @author Milind Bhandarkar
+ */
+public class TestReplication extends TestCase {
+  static final long seed = 0xDEADBEEFL;
+  static final int blockSize = 8192;
+  static final int fileSize = 16384;
+  static final int numDatanodes = 4;
+
+  private void writeFile(FileSystem fileSys, Path name, int repl)
+  throws IOException {
+    // create and write a file that contains three blocks of data
+    FSOutputStream stm = fileSys.createRaw(name, true, (short)repl,
+        (long)blockSize);
+    byte[] buffer = new byte[fileSize];
+    Random rand = new Random(seed);
+    rand.nextBytes(buffer);
+    stm.write(buffer);
+    stm.close();
+  }
+  
+  
+  private void checkFile(FileSystem fileSys, Path name, int repl)
+  throws IOException {
+    String[][] locations = fileSys.getFileCacheHints(name, 0, fileSize);
+    for (int idx = 0; idx < locations.length; idx++) {
+      assertEquals("Number of replicas for block" + idx,
+          Math.min(numDatanodes, repl), locations[idx].length);  
+    }
+  }
+  
+  private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
+    assertTrue(fileSys.exists(name));
+    fileSys.delete(name);
+    assertTrue(!fileSys.exists(name));
+  }
+  
+  /**
+   * Tests replication in DFS.
+   */
+  public void testReplication() throws IOException {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, numDatanodes, false);
+    // Now wait for 15 seconds to give datanodes chance to register
+    // themselves and to report heartbeat
+    try {
+      Thread.sleep(15000L);
+    } catch (InterruptedException e) {
+      // nothing
+    }
+    InetSocketAddress addr = new InetSocketAddress("localhost", 65312);
+    DFSClient client = new DFSClient(addr, conf);
+    DatanodeInfo[] info = client.datanodeReport();
+    assertEquals("Number of Datanodes ", numDatanodes, info.length);
+    FileSystem fileSys = cluster.getFileSystem();
+    try {
+      Path file1 = new Path("smallblocktest.dat");
+      writeFile(fileSys, file1, 3);
+      checkFile(fileSys, file1, 3);
+      cleanupFile(fileSys, file1);
+      writeFile(fileSys, file1, 10);
+      checkFile(fileSys, file1, 10);
+      cleanupFile(fileSys, file1);
+      writeFile(fileSys, file1, 4);
+      checkFile(fileSys, file1, 4);
+      cleanupFile(fileSys, file1);
+      writeFile(fileSys, file1, 1);
+      checkFile(fileSys, file1, 1);
+      cleanupFile(fileSys, file1);
+    } finally {
+      fileSys.close();
+      cluster.shutdown();
+    }
+  }
+}