瀏覽代碼

HDFS-6300. Prevent multiple balancers from running simultaneously (Contributed by Rakesh R)

(cherry picked from commit 065d8f2a34296b566e7ca541a284f7991212f14c)
Vinayakumar B 10 年之前
父節點
當前提交
2adb1257b1

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -496,6 +496,9 @@ Release 2.7.1 - UNRELEASED
     HDFS-7916. 'reportBadBlocks' from datanodes to standby Node BPServiceActor
     goes for infinite loop (Rushabh S Shah  via kihwal)
 
+    HDFS-6300. Prevent multiple balancers from running simultaneously
+    (Rakesh R via vinayakumarb)
+
 Release 2.7.0 - 2015-04-20
 
   INCOMPATIBLE CHANGES

+ 12 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java

@@ -220,12 +220,20 @@ public class NameNodeConnector implements Closeable {
    */
   private OutputStream checkAndMarkRunning() throws IOException {
     try {
-      final FSDataOutputStream out = fs.create(idPath);
+      if (fs.exists(idPath)) {
+        // try appending to it so that it will fail fast if another balancer is
+        // running.
+        IOUtils.closeStream(fs.append(idPath));
+        fs.delete(idPath, true);
+      }
+      final FSDataOutputStream fsout = fs.create(idPath, false);
+      // mark balancer idPath to be deleted during filesystem closure
+      fs.deleteOnExit(idPath);
       if (write2IdFile) {
-        out.writeBytes(InetAddress.getLocalHost().getHostName());
-        out.hflush();
+        fsout.writeBytes(InetAddress.getLocalHost().getHostName());
+        fsout.hflush();
       }
-      return out;
+      return fsout;
     } catch(RemoteException e) {
       if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){
         return null;

+ 77 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java

@@ -28,6 +28,7 @@ import static org.junit.Assume.assumeTrue;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.net.InetAddress;
 import java.net.URI;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -45,6 +46,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
@@ -1372,6 +1374,81 @@ public class TestBalancer {
       cluster.shutdown();
     }
   }
+
+  /**
+   * Test running many balancer simultaneously.
+   *
+   * Case-1: First balancer is running. Now, running second one should get
+   * "Another balancer is running. Exiting.." IOException and fail immediately
+   *
+   * Case-2: When running second balancer 'balancer.id' file exists but the
+   * lease doesn't exists. Now, the second balancer should run successfully.
+   */
+  @Test(timeout = 100000)
+  public void testManyBalancerSimultaneously() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    // add an empty node with half of the capacities(4 * CAPACITY) & the same
+    // rack
+    long[] capacities = new long[] { 4 * CAPACITY };
+    String[] racks = new String[] { RACK0 };
+    long newCapacity = 2 * CAPACITY;
+    String newRack = RACK0;
+    LOG.info("capacities = " + long2String(capacities));
+    LOG.info("racks      = " + Arrays.asList(racks));
+    LOG.info("newCapacity= " + newCapacity);
+    LOG.info("newRack    = " + newRack);
+    LOG.info("useTool    = " + false);
+    assertEquals(capacities.length, racks.length);
+    int numOfDatanodes = capacities.length;
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
+        .racks(racks).simulatedCapacities(capacities).build();
+    try {
+      cluster.waitActive();
+      client = NameNodeProxies.createProxy(conf,
+          cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
+
+      long totalCapacity = sum(capacities);
+
+      // fill up the cluster to be 30% full
+      final long totalUsedSpace = totalCapacity * 3 / 10;
+      createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
+          (short) numOfDatanodes, 0);
+      // start up an empty node with the same capacity and on the same rack
+      cluster.startDataNodes(conf, 1, true, null, new String[] { newRack },
+          new long[] { newCapacity });
+
+      // Case1: Simulate first balancer by creating 'balancer.id' file. It
+      // will keep this file until the balancing operation is completed.
+      FileSystem fs = cluster.getFileSystem(0);
+      final FSDataOutputStream out = fs
+          .create(Balancer.BALANCER_ID_PATH, false);
+      out.writeBytes(InetAddress.getLocalHost().getHostName());
+      out.hflush();
+      assertTrue("'balancer.id' file doesn't exist!",
+          fs.exists(Balancer.BALANCER_ID_PATH));
+
+      // start second balancer
+      final String[] args = { "-policy", "datanode" };
+      final Tool tool = new Cli();
+      tool.setConf(conf);
+      int exitCode = tool.run(args); // start balancing
+      assertEquals("Exit status code mismatches",
+          ExitStatus.IO_EXCEPTION.getExitCode(), exitCode);
+
+      // Case2: Release lease so that another balancer would be able to
+      // perform balancing.
+      out.close();
+      assertTrue("'balancer.id' file doesn't exist!",
+          fs.exists(Balancer.BALANCER_ID_PATH));
+      exitCode = tool.run(args); // start balancing
+      assertEquals("Exit status code mismatches",
+          ExitStatus.SUCCESS.getExitCode(), exitCode);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   /**
    * @param args
    */