浏览代码

HDFS-8549. Abort the balancer if an upgrade is in progress.

(cherry picked from commit a7a7768341f1b7d3a8f2686e2f4d00c57f2e1d4f)
Andrew Wang 10 年之前
父节点
当前提交
9523e2c098

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -278,6 +278,8 @@ Release 2.8.0 - UNRELEASED
     HDFS-8568. TestClusterId#testFormatWithEmptyClusterIdOption is failing.
     (Rakesh R. via xyao)
 
+    HDFS-8549. Abort the balancer if an upgrade is in progress. (wang)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

+ 57 - 21
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java

@@ -181,12 +181,19 @@ public class Balancer {
       + "\n\t[-include [-f <hosts-file> | <comma-separated list of hosts>]]"
       + "\tIncludes only the specified datanodes."
       + "\n\t[-idleiterations <idleiterations>]"
-      + "\tNumber of consecutive idle iterations (-1 for Infinite) before exit.";
-  
+      + "\tNumber of consecutive idle iterations (-1 for Infinite) before "
+      + "exit."
+      + "\n\t[-runDuringUpgrade]"
+      + "\tWhether to run the balancer during an ongoing HDFS upgrade."
+      + "This is usually not desired since it will not affect used space "
+      + "on over-utilized machines.";
+
   private final Dispatcher dispatcher;
+  private final NameNodeConnector nnc;
   private final BalancingPolicy policy;
+  private final boolean runDuringUpgrade;
   private final double threshold;
-  
+
   // all data node lists
   private final Collection<Source> overUtilized = new LinkedList<Source>();
   private final Collection<Source> aboveAvgUtilized = new LinkedList<Source>();
@@ -228,11 +235,13 @@ public class Balancer {
         DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
         DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
 
+    this.nnc = theblockpool;
     this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded,
         p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads,
         maxConcurrentMovesPerNode, conf);
     this.threshold = p.threshold;
     this.policy = p.policy;
+    this.runDuringUpgrade = p.runDuringUpgrade;
   }
   
   private static long getCapacity(DatanodeStorageReport report, StorageType t) {
@@ -294,7 +303,7 @@ public class Balancer {
           if (thresholdDiff <= 0) { // within threshold
             aboveAvgUtilized.add(s);
           } else {
-            overLoadedBytes += precentage2bytes(thresholdDiff, capacity);
+            overLoadedBytes += percentage2bytes(thresholdDiff, capacity);
             overUtilized.add(s);
           }
           g = s;
@@ -303,7 +312,7 @@ public class Balancer {
           if (thresholdDiff <= 0) { // within threshold
             belowAvgUtilized.add(g);
           } else {
-            underLoadedBytes += precentage2bytes(thresholdDiff, capacity);
+            underLoadedBytes += percentage2bytes(thresholdDiff, capacity);
             underUtilized.add(g);
           }
         }
@@ -325,17 +334,17 @@ public class Balancer {
   private static long computeMaxSize2Move(final long capacity, final long remaining,
       final double utilizationDiff, final double threshold) {
     final double diff = Math.min(threshold, Math.abs(utilizationDiff));
-    long maxSizeToMove = precentage2bytes(diff, capacity);
+    long maxSizeToMove = percentage2bytes(diff, capacity);
     if (utilizationDiff < 0) {
       maxSizeToMove = Math.min(remaining, maxSizeToMove);
     }
     return Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
   }
 
-  private static long precentage2bytes(double precentage, long capacity) {
-    Preconditions.checkArgument(precentage >= 0,
-        "precentage = " + precentage + " < 0");
-    return (long)(precentage * capacity / 100.0);
+  private static long percentage2bytes(double percentage, long capacity) {
+    Preconditions.checkArgument(percentage >= 0, "percentage = %s < 0",
+        percentage);
+    return (long)(percentage * capacity / 100.0);
   }
 
   /* log the over utilized & under utilized nodes */
@@ -517,7 +526,13 @@ public class Balancer {
         LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove)
             + " to make the cluster balanced." );
       }
-      
+
+      // Should not run the balancer during an unfinalized upgrade, since moved
+      // blocks are not deleted on the source datanode.
+      if (!runDuringUpgrade && nnc.isUpgrading()) {
+        return newResult(ExitStatus.UNFINALIZED_UPGRADE, bytesLeftToMove, -1);
+      }
+
       /* Decide all the nodes that will participate in the block move and
        * the number of bytes that need to be moved from one node to another
        * in this iteration. Maximum bytes to be moved per node is
@@ -531,7 +546,7 @@ public class Balancer {
         LOG.info( "Will move " + StringUtils.byteDesc(bytesBeingMoved) +
             " in this iteration");
       }
-      
+
       /* For each pair of <source, target>, start a thread that repeatedly 
        * decide a block to be moved and its proxy source, 
        * then initiates the move until all bytes are moved or no more block
@@ -635,7 +650,8 @@ public class Balancer {
     static final Parameters DEFAULT = new Parameters(
         BalancingPolicy.Node.INSTANCE, 10.0,
         NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
-        Collections.<String> emptySet(), Collections.<String> emptySet());
+        Collections.<String> emptySet(), Collections.<String> emptySet(),
+        false);
 
     final BalancingPolicy policy;
     final double threshold;
@@ -644,23 +660,34 @@ public class Balancer {
     Set<String> nodesToBeExcluded;
     //include only these nodes in balancing operations
     Set<String> nodesToBeIncluded;
+    /**
+     * Whether to run the balancer during upgrade.
+     */
+    final boolean runDuringUpgrade;
 
     Parameters(BalancingPolicy policy, double threshold, int maxIdleIteration,
-        Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded) {
+        Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded,
+        boolean runDuringUpgrade) {
       this.policy = policy;
       this.threshold = threshold;
       this.maxIdleIteration = maxIdleIteration;
       this.nodesToBeExcluded = nodesToBeExcluded;
       this.nodesToBeIncluded = nodesToBeIncluded;
+      this.runDuringUpgrade = runDuringUpgrade;
     }
 
     @Override
     public String toString() {
-      return Balancer.class.getSimpleName() + "." + getClass().getSimpleName()
-          + "[" + policy + ", threshold=" + threshold +
-          ", max idle iteration = " + maxIdleIteration +
-          ", number of nodes to be excluded = "+ nodesToBeExcluded.size() +
-          ", number of nodes to be included = "+ nodesToBeIncluded.size() +"]";
+      return String.format("%s.%s [%s,"
+              + " threshold = %s,"
+              + " max idle iteration = %s, "
+              + "number of nodes to be excluded = %s,"
+              + " number of nodes to be included = %s,"
+              + " run during upgrade = %s]",
+          Balancer.class.getSimpleName(), getClass().getSimpleName(),
+          policy, threshold, maxIdleIteration,
+          nodesToBeExcluded.size(), nodesToBeIncluded.size(),
+          runDuringUpgrade);
     }
   }
 
@@ -702,6 +729,7 @@ public class Balancer {
       int maxIdleIteration = Parameters.DEFAULT.maxIdleIteration;
       Set<String> nodesTobeExcluded = Parameters.DEFAULT.nodesToBeExcluded;
       Set<String> nodesTobeIncluded = Parameters.DEFAULT.nodesToBeIncluded;
+      boolean runDuringUpgrade = Parameters.DEFAULT.runDuringUpgrade;
 
       if (args != null) {
         try {
@@ -757,9 +785,16 @@ public class Balancer {
               }
             } else if ("-idleiterations".equalsIgnoreCase(args[i])) {
               checkArgument(++i < args.length,
-                  "idleiterations value is missing: args = " + Arrays.toString(args));
+                  "idleiterations value is missing: args = " + Arrays
+                      .toString(args));
               maxIdleIteration = Integer.parseInt(args[i]);
               LOG.info("Using a idleiterations of " + maxIdleIteration);
+            } else if ("-runDuringUpgrade".equalsIgnoreCase(args[i])) {
+              runDuringUpgrade = true;
+              LOG.info("Will run the balancer even during an ongoing HDFS "
+                  + "upgrade. Most users will not want to run the balancer "
+                  + "during an upgrade since it will not affect used space "
+                  + "on over-utilized machines.");
             } else {
               throw new IllegalArgumentException("args = "
                   + Arrays.toString(args));
@@ -773,7 +808,8 @@ public class Balancer {
         }
       }
       
-      return new Parameters(policy, threshold, maxIdleIteration, nodesTobeExcluded, nodesTobeIncluded);
+      return new Parameters(policy, threshold, maxIdleIteration,
+          nodesTobeExcluded, nodesTobeIncluded, runDuringUpgrade);
     }
 
     private static void printUsage(PrintStream out) {

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java

@@ -29,7 +29,8 @@ public enum ExitStatus {
   NO_MOVE_PROGRESS(-3),
   IO_EXCEPTION(-4),
   ILLEGAL_ARGUMENTS(-5),
-  INTERRUPTED(-6);
+  INTERRUPTED(-6),
+  UNFINALIZED_UPGRADE(-7);
 
   private final int code;
 

+ 16 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java

@@ -44,7 +44,9 @@ import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@@ -164,6 +166,20 @@ public class NameNodeConnector implements Closeable {
     return namenode.getBlocks(datanode, size);
   }
 
+  /**
+   * @return true if an upgrade is in progress, false if not.
+   * @throws IOException
+   */
+  public boolean isUpgrading() throws IOException {
+    // fsimage upgrade
+    final boolean isUpgrade = !namenode.isUpgradeFinalized();
+    // rolling upgrade
+    RollingUpgradeInfo info = fs.rollingUpgrade(
+        HdfsConstants.RollingUpgradeAction.QUERY);
+    final boolean isRollingUpgrade = (info != null && !info.isFinalized());
+    return (isUpgrade || isRollingUpgrade);
+  }
+
   /** @return live datanode storage reports. */
   public DatanodeStorageReport[] getLiveDatanodeStorageReport()
       throws IOException {

+ 75 - 8
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java

@@ -64,6 +64,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
@@ -627,7 +628,8 @@ public class TestBalancer {
             Balancer.Parameters.DEFAULT.policy,
             Balancer.Parameters.DEFAULT.threshold,
             Balancer.Parameters.DEFAULT.maxIdleIteration,
-            nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded());
+            nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded(),
+            false);
       }
 
       int expectedExcludedNodes = 0;
@@ -866,7 +868,8 @@ public class TestBalancer {
           Balancer.Parameters.DEFAULT.policy,
           Balancer.Parameters.DEFAULT.threshold,
           Balancer.Parameters.DEFAULT.maxIdleIteration,
-          datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded);
+          datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded,
+          false);
       final int r = Balancer.run(namenodes, p, conf);
       assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
     } finally {
@@ -1298,12 +1301,7 @@ public class TestBalancer {
       Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
 
       // Run Balancer
-      Balancer.Parameters p = new Balancer.Parameters(
-        Parameters.DEFAULT.policy,
-        Parameters.DEFAULT.threshold,
-        Balancer.Parameters.DEFAULT.maxIdleIteration,
-        Parameters.DEFAULT.nodesToBeExcluded,
-        Parameters.DEFAULT.nodesToBeIncluded);
+      final Balancer.Parameters p = Parameters.DEFAULT;
       final int r = Balancer.run(namenodes, p, conf);
 
       // Validate no RAM_DISK block should be moved
@@ -1317,6 +1315,75 @@ public class TestBalancer {
     }
   }
 
+  /**
+   * Check that the balancer exits when there is an unfinalized upgrade.
+   */
+  @Test(timeout=300000)
+  public void testBalancerDuringUpgrade() throws Exception {
+    final int SEED = 0xFADED;
+    Configuration conf = new HdfsConfiguration();
+    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
+
+    final int BLOCK_SIZE = 1024*1024;
+    cluster = new MiniDFSCluster
+        .Builder(conf)
+        .numDataNodes(1)
+        .storageCapacities(new long[] { BLOCK_SIZE * 10 })
+        .storageTypes(new StorageType[] { DEFAULT })
+        .storagesPerDatanode(1)
+        .build();
+
+    try {
+      cluster.waitActive();
+      // Create a file on the single DN
+      final String METHOD_NAME = GenericTestUtils.getMethodName();
+      final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+
+      DistributedFileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, path1, BLOCK_SIZE, BLOCK_SIZE * 2, BLOCK_SIZE,
+          (short) 1, SEED);
+
+      // Add another DN with the same capacity, cluster is now unbalanced
+      cluster.startDataNodes(conf, 1, true, null, null);
+      cluster.triggerHeartbeats();
+      Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+
+      // Run balancer
+      final Balancer.Parameters p = Parameters.DEFAULT;
+
+      fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
+      fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.PREPARE);
+      fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+
+      // Rolling upgrade should abort the balancer
+      assertEquals(ExitStatus.UNFINALIZED_UPGRADE.getExitCode(),
+          Balancer.run(namenodes, p, conf));
+
+      // Should work with the -runDuringUpgrade flag.
+      final Balancer.Parameters runDuringUpgrade =
+          new Balancer.Parameters(Parameters.DEFAULT.policy,
+              Parameters.DEFAULT.threshold,
+              Parameters.DEFAULT.maxIdleIteration,
+              Parameters.DEFAULT.nodesToBeExcluded,
+              Parameters.DEFAULT.nodesToBeIncluded,
+              true);
+      assertEquals(ExitStatus.SUCCESS.getExitCode(),
+          Balancer.run(namenodes, runDuringUpgrade, conf));
+
+      // Finalize the rolling upgrade
+      fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.FINALIZE);
+
+      // Should also work after finalization.
+      assertEquals(ExitStatus.SUCCESS.getExitCode(),
+          Balancer.run(namenodes, p, conf));
+
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   /**
    * Test special case. Two replicas belong to same block should not in same node.
    * We have 2 nodes.