浏览代码

HADOOP-11203. Allow ditscp to accept bandwitdh in fraction MegaBytes. Contributed by Raju Bairishetti

Amareshwari Sriramadasu 10 年之前
父节点
当前提交
8ef07f767f

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -53,6 +53,9 @@ Trunk (Unreleased)
 
   IMPROVEMENTS
 
+    HADOOP-11203. Allow ditscp to accept bandwitdh in fraction MegaBytes
+    (Raju Bairishetti via amareshwari)
+
     HADOOP-8017. Configure hadoop-main pom to get rid of M2E plugin execution
     not covered (Eric Charles via bobby)
 

+ 1 - 1
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java

@@ -30,7 +30,7 @@ public class DistCpConstants {
   public static final int DEFAULT_MAPS = 20;
 
   /* Default bandwidth if none specified */
-  public static final int DEFAULT_BANDWIDTH_MB = 100;
+  public static final float DEFAULT_BANDWIDTH_MB = 100;
 
   /* Default strategy for copying. Implementation looked up
      from distcp-default.xml

+ 3 - 2
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java

@@ -174,10 +174,11 @@ public enum DistCpOptionSwitch {
               "copied to <= n bytes")),
 
   /**
-   * Specify bandwidth per map in MB
+   * Specify bandwidth per map in MB, accepts bandwidth as a fraction
    */
   BANDWIDTH(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
-      new Option("bandwidth", true, "Specify bandwidth per map in MB")),
+      new Option("bandwidth", true, "Specify bandwidth per map in MB,"
+          + " accepts bandwidth as a fraction.")),
 
   /**
    * Path containing a list of strings, which when found in the path of

+ 3 - 3
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java

@@ -47,7 +47,7 @@ public class DistCpOptions {
   public static final int maxNumListstatusThreads = 40;
   private int numListstatusThreads = 0;  // Indicates that flag is not set.
   private int maxMaps = DistCpConstants.DEFAULT_MAPS;
-  private int mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB;
+  private float mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB;
 
   private String sslConfigurationFile;
 
@@ -366,7 +366,7 @@ public class DistCpOptions {
    *
    * @return Bandwidth in MB
    */
-  public int getMapBandwidth() {
+  public float getMapBandwidth() {
     return mapBandwidth;
   }
 
@@ -375,7 +375,7 @@ public class DistCpOptions {
    *
    * @param mapBandwidth - per map bandwidth
    */
-  public void setMapBandwidth(int mapBandwidth) {
+  public void setMapBandwidth(float mapBandwidth) {
     assert mapBandwidth > 0 : "Bandwidth " + mapBandwidth + " is invalid (should be > 0)";
     this.mapBandwidth = mapBandwidth;
   }

+ 1 - 1
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java

@@ -293,7 +293,7 @@ public class OptionsParser {
                                      DistCpOptions option) {
     if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) {
       try {
-        Integer mapBandwidth = Integer.parseInt(
+        Float mapBandwidth = Float.parseFloat(
             getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()).trim());
         if (mapBandwidth <= 0) {
           throw new IllegalArgumentException("Bandwidth specified is not " +

+ 16 - 1
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java

@@ -62,6 +62,8 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
     BYTESEXPECTED,// Number of bytes expected to be copied.
     BYTESFAILED,  // Number of bytes that failed to be copied.
     BYTESSKIPPED, // Number of bytes that were skipped from copy.
+    SLEEP_TIME_MS, // Time map slept while trying to honor bandwidth cap.
+    BANDWIDTH_IN_BYTES, // Effective transfer rate in B/s.
   }
 
   /**
@@ -85,7 +87,9 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
   private EnumSet<FileAttribute> preserve = EnumSet.noneOf(FileAttribute.class);
 
   private FileSystem targetFS = null;
-  private Path    targetWorkPath = null;
+  private Path targetWorkPath = null;
+  private long startEpoch;
+  private long totalBytesCopied = 0;
 
   /**
    * Implementation of the Mapper::setup() method. This extracts the DistCp-
@@ -118,6 +122,7 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
     if (conf.get(DistCpConstants.CONF_LABEL_SSL_CONF) != null) {
       initializeSSLConf(context);
     }
+    startEpoch = System.currentTimeMillis();
   }
 
   /**
@@ -288,6 +293,7 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
     incrementCounter(context, Counter.BYTESEXPECTED, sourceFileStatus.getLen());
     incrementCounter(context, Counter.BYTESCOPIED, bytesCopied);
     incrementCounter(context, Counter.COPY, 1);
+    totalBytesCopied += bytesCopied;
   }
 
   private void createTargetDirsWithRetry(String description,
@@ -373,4 +379,13 @@ public class CopyMapper extends Mapper<Text, CopyListingFileStatus, Text, Text>
       return false;
     }
   }
+
+  @Override
+  protected void cleanup(Context context)
+      throws IOException, InterruptedException {
+    super.cleanup(context);
+    long secs = (System.currentTimeMillis() - startEpoch) / 1000;
+    incrementCounter(context, Counter.BANDWIDTH_IN_BYTES,
+        totalBytesCopied / ((secs == 0 ? 1 : secs)));
+  }
 }

+ 1 - 1
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java

@@ -293,7 +293,7 @@ public class RetriableFileCopyCommand extends RetriableCommand {
       Configuration conf) throws IOException {
     try {
       FileSystem fs = path.getFileSystem(conf);
-      long bandwidthMB = conf.getInt(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
+      float bandwidthMB = conf.getFloat(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
               DistCpConstants.DEFAULT_BANDWIDTH_MB);
       FSDataInputStream in = fs.open(path);
       return new ThrottledInputStream(in, bandwidthMB * 1024 * 1024);

+ 3 - 3
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java

@@ -39,7 +39,7 @@ import com.google.common.base.Preconditions;
 public class ThrottledInputStream extends InputStream {
 
   private final InputStream rawStream;
-  private final long maxBytesPerSec;
+  private final float maxBytesPerSec;
   private final long startTime = System.currentTimeMillis();
 
   private long bytesRead = 0;
@@ -51,8 +51,8 @@ public class ThrottledInputStream extends InputStream {
     this(rawStream, Long.MAX_VALUE);
   }
 
-  public ThrottledInputStream(InputStream rawStream, long maxBytesPerSec) {
-    assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid"; 
+  public ThrottledInputStream(InputStream rawStream, float maxBytesPerSec) {
+    assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid";
     this.rawStream = rawStream;
     this.maxBytesPerSec = maxBytesPerSec;
   }

+ 9 - 7
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java

@@ -32,6 +32,8 @@ import java.util.NoSuchElementException;
 
 public class TestOptionsParser {
 
+  private static final float DELTA = 0.001f;
+
   @Test
   public void testParseIgnoreFailure() {
     DistCpOptions options = OptionsParser.parse(new String[] {
@@ -104,14 +106,14 @@ public class TestOptionsParser {
     DistCpOptions options = OptionsParser.parse(new String[] {
         "hdfs://localhost:8020/source/first",
         "hdfs://localhost:8020/target/"});
-    Assert.assertEquals(options.getMapBandwidth(), DistCpConstants.DEFAULT_BANDWIDTH_MB);
+    Assert.assertEquals(options.getMapBandwidth(), DistCpConstants.DEFAULT_BANDWIDTH_MB, DELTA);
 
     options = OptionsParser.parse(new String[] {
         "-bandwidth",
-        "11",
+        "11.2",
         "hdfs://localhost:8020/source/first",
         "hdfs://localhost:8020/target/"});
-    Assert.assertEquals(options.getMapBandwidth(), 11);
+    Assert.assertEquals(options.getMapBandwidth(), 11.2, DELTA);
   }
 
   @Test(expected=IllegalArgumentException.class)
@@ -585,8 +587,8 @@ public class TestOptionsParser {
     options.appendToConf(conf);
     Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false));
     Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.ATOMIC_COMMIT.getConfigLabel(), false));
-    Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1),
-        DistCpConstants.DEFAULT_BANDWIDTH_MB);
+    Assert.assertEquals(conf.getFloat(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1),
+        DistCpConstants.DEFAULT_BANDWIDTH_MB, DELTA);
 
     conf = new Configuration();
     Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false));
@@ -597,14 +599,14 @@ public class TestOptionsParser {
         "-delete",
         "-pu",
         "-bandwidth",
-        "11",
+        "11.2",
         "hdfs://localhost:8020/source/first",
         "hdfs://localhost:8020/target/"});
     options.appendToConf(conf);
     Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false));
     Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.DELETE_MISSING.getConfigLabel(), false));
     Assert.assertEquals(conf.get(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel()), "U");
-    Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), 11);
+    Assert.assertEquals(conf.getFloat(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), 11.2, DELTA);
   }
 
   @Test