Browse Source

HDFS-2202. Add a new DFSAdmin command to set balancer bandwidth of datanodes without restarting. Contributed by Eric Payne

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security@1152403 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 14 năm trước cách đây
mục cha
commit
7af7e4ceda

+ 3 - 0
CHANGES.txt

@@ -4,6 +4,9 @@ Release 0.20.205.0 - unreleased
 
   NEW FEATURES
 
+    HDFS-2202. Add a new DFSAdmin command to set balancer bandwidth of
+    datanodes without restarting.  (Eric Payne via szetszwo)
+
   BUG FIXES
 
     MAPREDUCE-2651. Fix race condition in Linux task controller for

+ 13 - 1
src/hdfs/org/apache/hadoop/hdfs/DFSClient.java

@@ -949,6 +949,18 @@ public class DFSClient implements FSConstants, java.io.Closeable {
   public void metaSave(String pathname) throws IOException {
     namenode.metaSave(pathname);
   }
+
+  /**
+   * Requests the namenode to tell all datanodes to use a new, non-persistent
+   * bandwidth value for dfs.balance.bandwidthPerSec.
+   * See {@link ClientProtocol#setBalancerBandwidth(long)} 
+   * for more details.
+   * 
+   * @see ClientProtocol#setBalancerBandwidth(long)
+   */
+  public void setBalancerBandwidth(long bandwidth) throws IOException {
+    namenode.setBalancerBandwidth(bandwidth);
+  }
     
   /**
    * @see ClientProtocol#finalizeUpgrade()
@@ -956,7 +968,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
   public void finalizeUpgrade() throws IOException {
     namenode.finalizeUpgrade();
   }
-
+    
   /**
    * @see ClientProtocol#distributedUpgradeProgress(FSConstants.UpgradeAction)
    */

+ 13 - 0
src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -612,4 +612,17 @@ public class DistributedFileSystem extends FileSystem {
       throws IOException {
     dfs.cancelDelegationToken(token);
   }
+
+  /**
+   * Requests the namenode to tell all datanodes to use a new, non-persistent
+   * bandwidth value for dfs.balance.bandwidthPerSec.
+   * The bandwidth parameter is the max number of bytes per second of network
+   * bandwidth to be used by a datanode during balancing.
+   *
+   * @param bandwidth Blanacer bandwidth in bytes per second for all datanodes.
+   * @throws IOException
+   */
+  public void setBalancerBandwidth(long bandwidth) throws IOException {
+    dfs.setBalancerBandwidth(bandwidth);
+  }
 }

+ 9 - 0
src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java

@@ -436,6 +436,15 @@ public interface ClientProtocol extends VersionedProtocol {
    */
   public void metaSave(String filename) throws IOException;
 
+  /**
+   * Tell all datanodes to use a new, non-persistent bandwidth value for
+   * dfs.balance.bandwidthPerSec.
+   *
+   * @param bandwidth Blanacer bandwidth in bytes per second for this datanode.
+   * @throws IOException
+   */
+  public void setBalancerBandwidth(long bandwidth) throws IOException;
+
   /**
    * Get the file info for a specific file or directory.
    * @param src The string representation of the path to the file

+ 25 - 0
src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -90,6 +90,7 @@ import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
@@ -1055,6 +1056,19 @@ public class DataNode extends Configured
         blockTokenSecretManager.setKeys(((KeyUpdateCommand) cmd).getExportedKeys());
       }
       break;
+    case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
+      LOG.info("DatanodeCommand action: DNA_BALANCERBANDWIDTHUPDATE");
+      int vsn = ((BalancerBandwidthCommand) cmd).getBalancerBandwidthVersion();
+      if (vsn >= 1) {
+        long bandwidth = 
+                   ((BalancerBandwidthCommand) cmd).getBalancerBandwidthValue();
+        if (bandwidth > 0) {
+          DataXceiverServer dxcs =
+                       (DataXceiverServer) this.dataXceiverServer.getRunnable();
+          dxcs.balanceThrottler.setBandwidth(bandwidth);
+        }
+      }
+      break;
     default:
       LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
     }
@@ -1892,4 +1906,15 @@ public class DataNode extends Configured
     }
     return JSON.toString(info);
   }
+
+  /**
+   * Get current value of the max balancer bandwidth in bytes per second.
+   *
+   * @return bandwidth Blanacer bandwidth in bytes per second for this datanode.
+   */
+  public Long getBalancerBandwidth() {
+    DataXceiverServer dxcs =
+                       (DataXceiverServer) this.dataXceiverServer.getRunnable();
+    return dxcs.balanceThrottler.getBandwidth();
+  }
 }

+ 21 - 0
src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java

@@ -92,6 +92,14 @@ public class DatanodeDescriptor extends DatanodeInfo {
   protected boolean isAlive = false;
   protected boolean needKeyUpdate = false;
 
+  // A system administrator can tune the balancer bandwidth parameter
+  // (dfs.balance.bandwidthPerSec) dynamically by calling
+  // "dfsadmin -setBalanacerBandwidth <newbandwidth>", at which point the
+  // following 'bandwidth' variable gets updated with the new value for each
+  // node. Once the heartbeat command is issued to update the value on the
+  // specified datanode, this value will be set back to 0.
+  private long bandwidth;
+
   /** A queue of blocks to be replicated by this datanode */
   private BlockQueue replicateBlocks = new BlockQueue();
   /** A queue of blocks to be recovered by this datanode */
@@ -517,4 +525,17 @@ public class DatanodeDescriptor extends DatanodeInfo {
     }
   } // End of class DecommissioningStatus
   
+  /**
+   * @return Blanacer bandwidth in bytes per second for this datanode.
+   */
+  public long getBalancerBandwidth() {
+    return this.bandwidth;
+  }
+  
+  /**
+   * @param bandwidth Blanacer bandwidth in bytes per second for this datanode.
+   */
+  public void setBalancerBandwidth(long bandwidth) {
+    this.bandwidth = bandwidth;
+  }
 }

+ 30 - 1
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -95,6 +95,7 @@ import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.Server;
@@ -681,6 +682,28 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
     out.close();
   }
 
+  /**
+   * Tell all datanodes to use a new, non-persistent bandwidth value for
+   * dfs.balance.bandwidthPerSec.
+   *
+   * A system administrator can tune the balancer bandwidth parameter
+   * (dfs.balance.bandwidthPerSec) dynamically by calling
+   * "dfsadmin -setBalanacerBandwidth newbandwidth", at which point the
+   * following 'bandwidth' variable gets updated with the new value for each
+   * node. Once the heartbeat command is issued to update the value on the
+   * specified datanode, this value will be set back to 0.
+   *
+   * @param bandwidth Blanacer bandwidth in bytes per second for all datanodes.
+   * @throws IOException
+   */
+  public void setBalancerBandwidth(long bandwidth) throws IOException {
+    synchronized(datanodeMap) {
+      for (DatanodeDescriptor nodeInfo : datanodeMap.values()) {
+        nodeInfo.setBalancerBandwidth(bandwidth);
+      }
+    }
+  }
+
   long getDefaultBlockSize() {
     return defaultBlockSize;
   }
@@ -2404,7 +2427,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
           return new DatanodeCommand[] {cmd};
         }
       
-        ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(3);
+        ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
         //check pending replication
         cmd = nodeinfo.getReplicationCommand(
               maxReplicationStreams - xmitsInProgress);
@@ -2421,6 +2444,12 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
           cmds.add(new KeyUpdateCommand(accessTokenHandler.exportKeys()));
           nodeinfo.needKeyUpdate = false;
         }
+        // check for balancer bandwidth update
+        if (nodeinfo.getBalancerBandwidth() > 0) {
+          cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth()));
+          // set back to 0 to indicate that datanode has been sent the new value
+          nodeinfo.setBalancerBandwidth(0);
+        }
         if (!cmds.isEmpty()) {
           return cmds.toArray(new DatanodeCommand[cmds.size()]);
         }

+ 10 - 0
src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -830,6 +830,16 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
     namesystem.metaSave(filename);
   }
 
+  /**
+   * Tell all datanodes to use a new, non-persistent bandwidth value for
+   * dfs.balance.bandwidthPerSec.
+   * @param bandwidth Blanacer bandwidth in bytes per second for all datanodes.
+   * @throws IOException
+   */
+  public void setBalancerBandwidth(long bandwidth) throws IOException {
+    namesystem.setBalancerBandwidth(bandwidth);
+  }
+
   /** {@inheritDoc} */
   public ContentSummary getContentSummary(String path) throws IOException {
     return namesystem.getContentSummary(path);

+ 118 - 0
src/hdfs/org/apache/hadoop/hdfs/server/protocol/BalancerBandwidthCommand.java

@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+/*
+ * A system administrator can tune the balancer bandwidth parameter
+ * (dfs.balance.bandwidthPerSec) dynamically by calling
+ * "dfsadmin -setBalanacerBandwidth newbandwidth".
+ * This class is to define the command which sends the new bandwidth value to
+ * each datanode.
+ */
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * Balancer bandwidth command instructs each datanode to change its value for
+ * the max amount of network bandwidth it may use during the block balancing
+ * operation.
+ * 
+ * The Balancer Bandwidth Command contains the new bandwidth value as its
+ * payload. The bandwidth value is in bytes per second.
+ */
+public class BalancerBandwidthCommand extends DatanodeCommand {
+  public final static int BBC_VERSION = 1;
+  private final static long BBC_DEFAULTBANDWIDTH = 0L;
+
+  private long bandwidth;
+  private int version = BBC_VERSION;
+
+  /**
+   * Balancer Bandwidth Command constructor. Sets bandwidth to 0.
+   */
+  BalancerBandwidthCommand() {
+    this(BBC_DEFAULTBANDWIDTH);
+  }
+
+  /**
+   * Balancer Bandwidth Command constructor.
+   *
+   * @param bandwidth Blanacer bandwidth in bytes per second.
+   */
+  public BalancerBandwidthCommand(long bandwidth) {
+    super(DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE);
+    this.bandwidth = bandwidth;
+  }
+
+  /**
+   * Get current value of the balancer bandwidth version.
+   *
+   * @return version blanacer bandwidth command version
+   */
+  public int getBalancerBandwidthVersion() {
+    return this.version;
+  }
+
+  /**
+   * Get current value of the max balancer bandwidth in bytes per second.
+   *
+   * @return bandwidth Blanacer bandwidth in bytes per second for this datanode.
+   */
+  public long getBalancerBandwidthValue() {
+    return this.bandwidth;
+  }
+
+  // ///////////////////////////////////////////////
+  // Writable
+  // ///////////////////////////////////////////////
+  static { // register a ctor
+    WritableFactories.setFactory(BalancerBandwidthCommand.class, new WritableFactory() {
+      public Writable newInstance() {
+        return new BalancerBandwidthCommand();
+      }
+    });
+  }
+
+  /**
+   * Writes the bandwidth payload to the Balancer Bandwidth Command packet.
+   * @param out DataOutput stream used for writing commands to the datanode.
+   * @throws IOException
+   */
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeInt(this.version);
+    out.writeLong(this.bandwidth);
+  }
+
+  /**
+   * Reads the bandwidth payload from the Balancer Bandwidth Command packet.
+   * @param in DataInput stream used for reading commands to the datanode.
+   * @throws IOException
+   */
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    this.version = in.readInt();
+    this.bandwidth = in.readLong();
+  }
+}

+ 1 - 0
src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java

@@ -66,6 +66,7 @@ public interface DatanodeProtocol extends VersionedProtocol {
   final static int DNA_FINALIZE = 5;   // finalize previous upgrade
   final static int DNA_RECOVERBLOCK = 6;  // request a block recovery
   final static int DNA_ACCESSKEYUPDATE = 7;  // update access key
+  final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth
 
   /** 
    * Register Datanode.

+ 57 - 1
src/hdfs/org/apache/hadoop/hdfs/tools/DFSAdmin.java

@@ -419,6 +419,39 @@ public class DFSAdmin extends FsShell {
     return exitCode;
   }
 
+  /**
+   * Command to ask the namenode to set the balancer bandwidth for all of the
+   * datanodes.
+   * Usage: java DFSAdmin -setBalancerBandwidth bandwidth
+   * @param argv List of of command line parameters.
+   * @param idx The index of the command that is being processed.
+   * @exception IOException 
+   */
+  public int setBalancerBandwidth(String[] argv, int idx) throws IOException {
+    long bandwidth;
+    int exitCode = -1;
+
+    try {
+      bandwidth = Long.parseLong(argv[idx]);
+    } catch (NumberFormatException nfe) {
+      System.err.println("NumberFormatException: " + nfe.getMessage());
+      System.err.println("Usage: java DFSAdmin"
+                  + " [-setBalancerBandwidth <bandwidth in bytes per second>]");
+      return exitCode;
+    }
+
+    if (!(fs instanceof DistributedFileSystem)) {
+      System.err.println("FileSystem is " + fs.getUri());
+      return exitCode;
+    }
+
+    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    dfs.setBalancerBandwidth(bandwidth);
+    exitCode = 0;
+   
+    return exitCode;
+  }
+
   private void printHelp(String cmd) {
     String summary = "hadoop dfsadmin is the command to execute DFS administrative commands.\n" +
       "The full syntax is: \n\n" +
@@ -432,6 +465,7 @@ public class DFSAdmin extends FsShell {
       "\t[-refreshServiceAcl]\n" +
       "\t[-refreshUserToGroupsMappings]\n" +
       "\t[refreshSuperUserGroupsConfiguration]\n" +
+      "\t[-setBalancerBandwidth <bandwidth>]\n" +
       "\t[-help [cmd]]\n";
 
     String report ="-report: \tReports basic filesystem information and statistics.\n";
@@ -486,7 +520,15 @@ public class DFSAdmin extends FsShell {
     
     String refreshSuperUserGroupsConfiguration = 
       "-refreshSuperUserGroupsConfiguration: Refresh superuser proxy groups mappings\n";
-    
+
+    String setBalancerBandwidth = "-setBalancerBandwidth <bandwidth>:\n" +
+      "\tChanges the network bandwidth used by each datanode during\n" +
+      "\tHDFS block balancing.\n\n" +
+      "\t\t<bandwidth> is the maximum number of bytes per second\n" +
+      "\t\tthat will be used by each datanode. This value overrides\n" +
+      "\t\tthe dfs.balance.bandwidthPerSec parameter.\n\n" +
+      "\t\t--- NOTE: The new value is not persistent on the DataNode.---\n";
+
     String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
       "\t\tis specified.\n";
 
@@ -518,6 +560,8 @@ public class DFSAdmin extends FsShell {
       System.out.println(refreshUserToGroupsMappings);
     } else if ("refreshSuperUserGroupsConfiguration".equals(cmd)) {
       System.out.println(refreshSuperUserGroupsConfiguration);
+    } else if ("setBalancerBandwidth".equals(cmd)) {
+      System.out.println(setBalancerBandwidth);
     } else if ("help".equals(cmd)) {
       System.out.println(help);
     } else {
@@ -536,6 +580,7 @@ public class DFSAdmin extends FsShell {
       System.out.println(refreshServiceAcl);
       System.out.println(refreshUserToGroupsMappings);
       System.out.println(refreshSuperUserGroupsConfiguration);
+      System.out.println(setBalancerBandwidth);
       System.out.println(help);
       System.out.println();
       ToolRunner.printGenericCommandUsage(System.out);
@@ -764,6 +809,9 @@ public class DFSAdmin extends FsShell {
     } else if ("-refreshSuperUserGroupsConfiguration".equals(cmd)) {
       System.err.println("Usage: java DFSAdmin"
                          + " [-refreshSuperUserGroupsConfiguration]");
+    } else if ("-setBalancerBandwidth".equals(cmd)) {
+      System.err.println("Usage: java DFSAdmin"
+                  + " [-setBalancerBandwidth <bandwidth in bytes per second>]");
     } else {
       System.err.println("Usage: java DFSAdmin");
       System.err.println("           [-report]");
@@ -780,6 +828,7 @@ public class DFSAdmin extends FsShell {
       System.err.println("           ["+ClearQuotaCommand.USAGE+"]");
       System.err.println("           ["+SetSpaceQuotaCommand.USAGE+"]");
       System.err.println("           ["+ClearSpaceQuotaCommand.USAGE+"]");
+      System.err.println("           [-setBalancerBandwidth <bandwidth in bytes per second>]");
       System.err.println("           [-help [cmd]]");
       System.err.println();
       ToolRunner.printGenericCommandUsage(System.err);
@@ -851,6 +900,11 @@ public class DFSAdmin extends FsShell {
         printUsage(cmd);
         return exitCode;
       }
+    } else if ("-setBalancerBandwidth".equals(cmd)) {
+      if (argv.length != 2) {
+        printUsage(cmd);
+        return exitCode;
+      }
     }
     
     // initialize DFSAdmin
@@ -895,6 +949,8 @@ public class DFSAdmin extends FsShell {
         exitCode = refreshUserToGroupsMappings();
       } else if ("-refreshSuperUserGroupsConfiguration".equals(cmd)) {
         exitCode = refreshSuperUserGroupsConfiguration();
+      } else if ("-setBalancerBandwidth".equals(cmd)) {
+        exitCode = setBalancerBandwidth(argv, i);
       } else if ("-help".equals(cmd)) {
         if (i < argv.length) {
           printHelp(argv[i]);

+ 93 - 0
src/test/org/apache/hadoop/hdfs/TestBalancerBandwidth.java

@@ -0,0 +1,93 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.hdfs;
+
+import java.util.ArrayList;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+
+/**
+ * This test ensures that the balancer bandwidth is dynamically adjusted
+ * correctly.
+ */
+public class TestBalancerBandwidth extends TestCase {
+  final static private Configuration conf = new Configuration();
+  final static private int NUM_OF_DATANODES = 2;
+  final static private int DEFAULT_BANDWIDTH = 1024*1024;
+  public static final Log LOG = LogFactory.getLog(TestBalancerBandwidth.class);
+
+  public void testBalancerBandwidth() throws Exception {
+    /* Set bandwidthPerSec to a low value of 1M bps. */
+    conf.setLong(
+        DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
+        DEFAULT_BANDWIDTH);
+
+    /* Create and start cluster */
+    MiniDFSCluster cluster = 
+      new MiniDFSCluster(conf, NUM_OF_DATANODES, true, null);
+    try {
+      cluster.waitActive();
+
+      DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
+
+      ArrayList<DataNode> datanodes = cluster.getDataNodes();
+      // Ensure value from the configuration is reflected in the datanodes.
+      assertEquals(DEFAULT_BANDWIDTH, (long) datanodes.get(0).getBalancerBandwidth());
+      assertEquals(DEFAULT_BANDWIDTH, (long) datanodes.get(1).getBalancerBandwidth());
+
+      // Dynamically change balancer bandwidth and ensure the updated value
+      // is reflected on the datanodes.
+      long newBandwidth = 12 * DEFAULT_BANDWIDTH; // 12M bps
+      fs.setBalancerBandwidth(newBandwidth);
+
+      // Give it a few seconds to propogate new the value to the datanodes.
+      try {
+        Thread.sleep(5000);
+      } catch (Exception e) {}
+
+      assertEquals(newBandwidth, (long) datanodes.get(0).getBalancerBandwidth());
+      assertEquals(newBandwidth, (long) datanodes.get(1).getBalancerBandwidth());
+
+      // Dynamically change balancer bandwidth to 0. Balancer bandwidth on the
+      // datanodes should remain as it was.
+      fs.setBalancerBandwidth(0);
+
+      // Give it a few seconds to propogate new the value to the datanodes.
+      try {
+        Thread.sleep(5000);
+      } catch (Exception e) {}
+
+      assertEquals(newBandwidth, (long) datanodes.get(0).getBalancerBandwidth());
+      assertEquals(newBandwidth, (long) datanodes.get(1).getBalancerBandwidth());
+    }finally {
+      cluster.shutdown();
+    }
+  }
+ 
+  public static void main(String[] args) throws Exception {
+    new TestBalancerBandwidth().testBalancerBandwidth();
+  }
+}

+ 2 - 0
src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java

@@ -215,6 +215,8 @@ public class TestDFSClientRetries extends TestCase {
 
     public void metaSave(String filename) throws IOException {}
 
+    public void setBalancerBandwidth(long bandwidth) throws IOException {}
+
     public HdfsFileStatus getFileInfo(String src) throws IOException { return null; }
 
     public ContentSummary getContentSummary(String path) throws IOException { return null; }