Browse Source

HDFS-8818. Changes the global moveExecutor to per datanode executors and changes MAX_SIZE_TO_MOVE to be configurable.

Tsz-Wo Nicholas Sze 9 years ago
parent
commit
ac8d153046

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -429,6 +429,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8772. Fix TestStandbyIsHot#testDatanodeRestarts which occasionally fails.
     (Walter Su via wang).
 
+    HDFS-8818. Changes the global moveExecutor to per datanode executors and
+    changes MAX_SIZE_TO_MOVE to be configurable.  (szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -347,6 +347,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_BALANCER_MOVERTHREADS_DEFAULT = 1000;
   public static final String  DFS_BALANCER_DISPATCHERTHREADS_KEY = "dfs.balancer.dispatcherThreads";
   public static final int     DFS_BALANCER_DISPATCHERTHREADS_DEFAULT = 200;
+  public static final String  DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY = "dfs.balancer.max-size-to-move";
+  public static final long    DFS_BALANCER_MAX_SIZE_TO_MOVE_DEFAULT = 10L*1024*1024*1024;
+
 
   public static final String  DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth";
   public static final long    DFS_MOVER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;

+ 34 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java

@@ -34,6 +34,7 @@ import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
@@ -168,9 +169,6 @@ public class Balancer {
 
   static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
 
-  private static final long GB = 1L << 30; //1GB
-  private static final long MAX_SIZE_TO_MOVE = 10*GB;
-
   private static final String USAGE = "Usage: hdfs balancer"
       + "\n\t[-policy <policy>]\tthe balancing policy: "
       + BalancingPolicy.Node.INSTANCE.getName() + " or "
@@ -193,6 +191,7 @@ public class Balancer {
   private final BalancingPolicy policy;
   private final boolean runDuringUpgrade;
   private final double threshold;
+  private final long maxSizeToMove;
 
   // all data node lists
   private final Collection<Source> overUtilized = new LinkedList<Source>();
@@ -214,6 +213,24 @@ public class Balancer {
     }
   }
 
+  static long getLong(Configuration conf, String key, long defaultValue) {
+    final long v = conf.getLong(key, defaultValue);
+    LOG.info(key + " = " + v + " (default=" + defaultValue + ")");
+    if (v <= 0) {
+      throw new HadoopIllegalArgumentException(key + " = " + v  + " <= " + 0);
+    }
+    return v;
+  }
+
+  static int getInt(Configuration conf, String key, int defaultValue) {
+    final int v = conf.getInt(key, defaultValue);
+    LOG.info(key + " = " + v + " (default=" + defaultValue + ")");
+    if (v <= 0) {
+      throw new HadoopIllegalArgumentException(key + " = " + v  + " <= " + 0);
+    }
+    return v;
+  }
+
   /**
    * Construct a balancer.
    * Initialize balancer. It sets the value of the threshold, and 
@@ -222,16 +239,16 @@ public class Balancer {
    * when connection fails.
    */
   Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) {
-    final long movedWinWidth = conf.getLong(
+    final long movedWinWidth = getLong(conf,
         DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
         DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
-    final int moverThreads = conf.getInt(
+    final int moverThreads = getInt(conf,
         DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY,
         DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT);
-    final int dispatcherThreads = conf.getInt(
+    final int dispatcherThreads = getInt(conf,
         DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
         DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT);
-    final int maxConcurrentMovesPerNode = conf.getInt(
+    final int maxConcurrentMovesPerNode = getInt(conf,
         DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
         DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
 
@@ -242,6 +259,10 @@ public class Balancer {
     this.threshold = p.threshold;
     this.policy = p.policy;
     this.runDuringUpgrade = p.runDuringUpgrade;
+
+    this.maxSizeToMove = getLong(conf,
+        DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY,
+        DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_DEFAULT);
   }
   
   private static long getCapacity(DatanodeStorageReport report, StorageType t) {
@@ -295,7 +316,7 @@ public class Balancer {
         final double utilizationDiff = utilization - policy.getAvgUtilization(t);
         final double thresholdDiff = Math.abs(utilizationDiff) - threshold;
         final long maxSize2Move = computeMaxSize2Move(capacity,
-            getRemaining(r, t), utilizationDiff, threshold);
+            getRemaining(r, t), utilizationDiff, threshold, maxSizeToMove);
 
         final StorageGroup g;
         if (utilizationDiff > 0) {
@@ -332,13 +353,13 @@ public class Balancer {
   }
 
   private static long computeMaxSize2Move(final long capacity, final long remaining,
-      final double utilizationDiff, final double threshold) {
+      final double utilizationDiff, final double threshold, final long max) {
     final double diff = Math.min(threshold, Math.abs(utilizationDiff));
     long maxSizeToMove = percentage2bytes(diff, capacity);
     if (utilizationDiff < 0) {
       maxSizeToMove = Math.min(remaining, maxSizeToMove);
     }
-    return Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
+    return Math.min(max, maxSizeToMove);
   }
 
   private static long percentage2bytes(double percentage, long capacity) {
@@ -388,6 +409,7 @@ public class Balancer {
     /* first step: match each overUtilized datanode (source) to
      * one or more underUtilized datanodes (targets).
      */
+    LOG.info("chooseStorageGroups for " + matcher + ": overUtilized => underUtilized");
     chooseStorageGroups(overUtilized, underUtilized, matcher);
     
     /* match each remaining overutilized datanode (source) to 
@@ -395,6 +417,7 @@ public class Balancer {
      * Note only overutilized datanodes that haven't had that max bytes to move
      * satisfied in step 1 are selected
      */
+    LOG.info("chooseStorageGroups for " + matcher + ": overUtilized => belowAvgUtilized");
     chooseStorageGroups(overUtilized, belowAvgUtilized, matcher);
 
     /* match each remaining underutilized datanode (target) to 
@@ -402,6 +425,7 @@ public class Balancer {
      * Note only underutilized datanodes that have not had that max bytes to
      * move satisfied in step 1 are selected.
      */
+    LOG.info("chooseStorageGroups for " + matcher + ": underUtilized => aboveAvgUtilized");
     chooseStorageGroups(underUtilized, aboveAvgUtilized, matcher);
   }
 

+ 83 - 18
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java

@@ -114,14 +114,39 @@ public class Dispatcher {
 
   private NetworkTopology cluster;
 
-  private final ExecutorService moveExecutor;
   private final ExecutorService dispatchExecutor;
 
+  private final Allocator moverThreadAllocator;
+
   /** The maximum number of concurrent blocks moves at a datanode */
   private final int maxConcurrentMovesPerNode;
 
   private final int ioFileBufferSize;
 
+  static class Allocator {
+    private final int max;
+    private int count = 0;
+
+    Allocator(int max) {
+      this.max = max;
+    }
+
+    synchronized int allocate(int n) {
+      final int remaining = max - count;
+      if (remaining <= 0) {
+        return 0;
+      } else {
+        final int allocated = remaining < n? remaining: n;
+        count += allocated;
+        return allocated;
+      }
+    }
+
+    synchronized void reset() {
+      count = 0;
+    }
+  }
+
   private static class GlobalBlockMap {
     private final Map<Block, DBlock> map = new HashMap<Block, DBlock>();
 
@@ -287,9 +312,7 @@ public class Dispatcher {
 
     /** Dispatch the move to the proxy source & wait for the response. */
     private void dispatch() {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Start moving " + this);
-      }
+      LOG.info("Start moving " + this);
 
       Socket sock = new Socket();
       DataOutputStream out = null;
@@ -504,7 +527,7 @@ public class Dispatcher {
     private final List<PendingMove> pendings;
     private volatile boolean hasFailure = false;
     private volatile boolean hasSuccess = false;
-    private final int maxConcurrentMoves;
+    private ExecutorService moveExecutor;
 
     @Override
     public String toString() {
@@ -513,7 +536,6 @@ public class Dispatcher {
 
     private DDatanode(DatanodeInfo datanode, int maxConcurrentMoves) {
       this.datanode = datanode;
-      this.maxConcurrentMoves = maxConcurrentMoves;
       this.pendings = new ArrayList<PendingMove>(maxConcurrentMoves);
     }
 
@@ -521,6 +543,21 @@ public class Dispatcher {
       return datanode;
     }
 
+    synchronized ExecutorService initMoveExecutor(int poolSize) {
+      return moveExecutor = Executors.newFixedThreadPool(poolSize);
+    }
+
+    synchronized ExecutorService getMoveExecutor() {
+      return moveExecutor;
+    }
+
+    synchronized void shutdownMoveExecutor() {
+      if (moveExecutor != null) {
+        moveExecutor.shutdown();
+        moveExecutor = null;
+      }
+    }
+
     private static <G extends StorageGroup> void put(StorageType storageType,
         G g, EnumMap<StorageType, G> map) {
       final StorageGroup existing = map.put(storageType, g);
@@ -541,6 +578,7 @@ public class Dispatcher {
 
     synchronized private void activateDelay(long delta) {
       delayUntil = Time.monotonicNow() + delta;
+      LOG.info(this + " activateDelay " + delta/1000.0 + " seconds");
     }
 
     synchronized private boolean isDelayActive() {
@@ -551,11 +589,6 @@ public class Dispatcher {
       return true;
     }
 
-    /** Check if the node can schedule more blocks to move */
-    synchronized boolean isPendingQNotFull() {
-      return pendings.size() < maxConcurrentMoves;
-    }
-
     /** Check if all the dispatched moves are done */
     synchronized boolean isPendingQEmpty() {
       return pendings.isEmpty();
@@ -563,7 +596,7 @@ public class Dispatcher {
 
     /** Add a scheduled block move to the node */
     synchronized boolean addPendingBlock(PendingMove pendingBlock) {
-      if (!isDelayActive() && isPendingQNotFull()) {
+      if (!isDelayActive()) {
         return pendings.add(pendingBlock);
       }
       return false;
@@ -621,6 +654,11 @@ public class Dispatcher {
     private long getBlockList() throws IOException {
       final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
       final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("getBlocks(" + getDatanodeInfo() + ", "
+            + StringUtils.TraditionalBinaryPrefix.long2String(size, "B", 2)
+            + ") returns " + newBlocks.getBlocks().length + " blocks.");
+      }
 
       long bytesReceived = 0;
       for (BlockWithLocations blk : newBlocks.getBlocks()) {
@@ -642,7 +680,9 @@ public class Dispatcher {
             }
           }
           if (!srcBlocks.contains(block) && isGoodBlockCandidate(block)) {
-            // filter bad candidates
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Add " + block + " to " + this);
+            }
             srcBlocks.add(block);
           }
         }
@@ -710,11 +750,9 @@ public class Dispatcher {
       }
     }
 
-    private static final int SOURCE_BLOCKS_MIN_SIZE = 5;
-
     /** @return if should fetch more blocks from namenode */
     private boolean shouldFetchMoreBlocks() {
-      return srcBlocks.size() < SOURCE_BLOCKS_MIN_SIZE && blocksToReceive > 0;
+      return blocksToReceive > 0;
     }
 
     private static final long MAX_ITERATION_TIME = 20 * 60 * 1000L; // 20 mins
@@ -734,6 +772,11 @@ public class Dispatcher {
       int noPendingMoveIteration = 0;
       while (!isTimeUp && getScheduledSize() > 0
           && (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(this + " blocksToReceive=" + blocksToReceive
+              + ", scheduledSize=" + getScheduledSize()
+              + ", srcBlocks#=" + srcBlocks.size());
+        }
         final PendingMove p = chooseNextMove();
         if (p != null) {
           // Reset no pending move counter
@@ -761,12 +804,16 @@ public class Dispatcher {
           // in case no blocks can be moved for source node's task,
           // jump out of while-loop after 5 iterations.
           if (noPendingMoveIteration >= MAX_NO_PENDING_MOVE_ITERATIONS) {
+            LOG.info("Failed to find a pending move "  + noPendingMoveIteration
+                + " times.  Skipping " + this);
             resetScheduledSize();
           }
         }
 
         // check if time is up or not
         if (Time.monotonicNow() - startTime > MAX_ITERATION_TIME) {
+          LOG.info("Time up (max time=" + MAX_ITERATION_TIME/1000
+              + " seconds).  Skipping " + this);
           isTimeUp = true;
           continue;
         }
@@ -803,9 +850,9 @@ public class Dispatcher {
 
     this.cluster = NetworkTopology.getInstance(conf);
 
-    this.moveExecutor = Executors.newFixedThreadPool(moverThreads);
     this.dispatchExecutor = dispatcherThreads == 0? null
         : Executors.newFixedThreadPool(dispatcherThreads);
+    this.moverThreadAllocator = new Allocator(moverThreads);
     this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
 
     this.saslClient = new SaslDataTransferClient(conf,
@@ -890,8 +937,22 @@ public class Dispatcher {
     return new DDatanode(datanode, maxConcurrentMovesPerNode);
   }
 
+
   public void executePendingMove(final PendingMove p) {
     // move the block
+    final DDatanode targetDn = p.target.getDDatanode();
+    ExecutorService moveExecutor = targetDn.getMoveExecutor();
+    if (moveExecutor == null) {
+      final int nThreads = moverThreadAllocator.allocate(maxConcurrentMovesPerNode);
+      if (nThreads > 0) {
+        moveExecutor = targetDn.initMoveExecutor(nThreads);
+      }
+    }
+    if (moveExecutor == null) {
+      LOG.warn("No mover threads available: skip moving " + p);
+      return;
+    }
+
     moveExecutor.execute(new Runnable() {
       @Override
       public void run() {
@@ -1083,6 +1144,11 @@ public class Dispatcher {
     cluster = NetworkTopology.getInstance(conf);
     storageGroupMap.clear();
     sources.clear();
+
+    moverThreadAllocator.reset();
+    for(StorageGroup t : targets) {
+      t.getDDatanode().shutdownMoveExecutor();
+    }
     targets.clear();
     globalBlocks.removeAllButRetain(movedBlocks);
     movedBlocks.cleanup();
@@ -1104,7 +1170,6 @@ public class Dispatcher {
     if (dispatchExecutor != null) {
       dispatchExecutor.shutdownNow();
     }
-    moveExecutor.shutdownNow();
   }
 
   static class Util {

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java

@@ -77,6 +77,11 @@ public class MovedBlocks<L> {
     public long getNumBytes() {
       return block.getNumBytes();
     }
+
+    @Override
+    public String toString() {
+      return block + " size=" + getNumBytes();
+    }
   }
 
   private static final int CUR_WIN = 0;

+ 16 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java

@@ -19,7 +19,13 @@ package org.apache.hadoop.hdfs.server.balancer;
 
 import static org.apache.hadoop.fs.StorageType.DEFAULT;
 import static org.apache.hadoop.fs.StorageType.RAM_DISK;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -29,8 +35,8 @@ import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.InetAddress;
-import java.net.URI;
 import java.net.InetSocketAddress;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -45,6 +51,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -59,12 +66,16 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.NameNodeProxies;
-import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
@@ -927,7 +938,7 @@ public class TestBalancer {
         new String[] {RACK0, RACK1});
   }
   
-  @Test(timeout=100000)
+  @Test(expected=HadoopIllegalArgumentException.class)
   public void testBalancerWithZeroThreadsForMove() throws Exception {
     Configuration conf = new HdfsConfiguration();
     conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 0);