Bläddra i källkod

HDFS-17216. Distcp: When handle the small files, the bandwidth parameter will be invalid, fix this bug. (#6138)

xiaojunxiang 1 år sedan
förälder
incheckning
8528d5783d

+ 6 - 0
hadoop-tools/hadoop-distcp/pom.xml

@@ -114,6 +114,12 @@
       <artifactId>assertj-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-library</artifactId>
+      <version>1.3</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

+ 4 - 5
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java

@@ -120,12 +120,11 @@ public class ThrottledInputStream extends InputStream implements Seekable {
    * @return Read rate, in bytes/sec.
    */
   public long getBytesPerSec() {
-    long elapsed = (System.currentTimeMillis() - startTime) / 1000;
-    if (elapsed == 0) {
-      return bytesRead;
-    } else {
-      return bytesRead / elapsed;
+    if (bytesRead == 0){
+      return 0;
     }
+    float elapsed = (System.currentTimeMillis() - startTime) / 1000.0f;
+    return (long) (bytesRead / elapsed);
   }
 
   /**

+ 71 - 4
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestThrottledInputStream.java

@@ -22,6 +22,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.io.IOUtils;
 import org.junit.Assert;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.junit.Assert.assertThat;
 import org.junit.Test;
 
 import java.io.*;
@@ -43,7 +45,9 @@ public class TestThrottledInputStream {
       tmpFile.deleteOnExit();
       outFile.deleteOnExit();
 
-      long maxBandwidth = copyAndAssert(tmpFile, outFile, 0, 1, -1, CB.BUFFER);
+      // Correction: we should use CB.ONE_C mode to calculate the maxBandwidth,
+      // because CB.ONE_C's speed is the lowest.
+      long maxBandwidth = copyAndAssert(tmpFile, outFile, 0, 1, -1, CB.ONE_C);
 
       copyAndAssert(tmpFile, outFile, maxBandwidth, 20, 0, CB.BUFFER);
 /*
@@ -90,10 +94,16 @@ public class TestThrottledInputStream {
       }
 
       LOG.info("{}", in);
+      /*
+        in.getBytesPerSec() should not be called repeatedly,
+        because each call will return a different value,
+        and because the program execution also takes time,
+        which magnifies the error of getBytesPerSec()
+      */
       bandwidth = in.getBytesPerSec();
       Assert.assertEquals(in.getTotalBytesRead(), tmpFile.length());
-      Assert.assertTrue(in.getBytesPerSec() > maxBandwidth / (factor * 1.2));
-      Assert.assertTrue(in.getTotalSleepTime() >  sleepTime || in.getBytesPerSec() <= maxBPS);
+      Assert.assertTrue(bandwidth > maxBandwidth / (factor * 1.2));
+      Assert.assertTrue(in.getTotalSleepTime() >  sleepTime || bandwidth <= maxBPS);
     } finally {
       IOUtils.closeStream(in);
       IOUtils.closeStream(out);
@@ -154,4 +164,61 @@ public class TestThrottledInputStream {
       IOUtils.closeStream(out);
     }
   }
-}
+
+  /**
+   * Distcp: When handle the small files,
+   * the bandwidth parameter will be invalid, fix this bug
+   */
+  @Test
+  public void testFixThrottleInvalid() {
+    int testFileCnt = 100;
+    int fileSize = 19;
+    int bandwidth= 20;
+    File[] srcFiles = new File[testFileCnt];
+    File destFile;
+    try {
+      destFile = createFile(testFileCnt * 100 * 1024);
+      destFile.deleteOnExit();
+
+      // create srcFile
+      for (int i = 0; i < srcFiles.length; i++) {
+        srcFiles[i] = createFile(fileSize * 1024);
+        srcFiles[i].deleteOnExit();
+      }
+
+      long begin = System.currentTimeMillis();
+      LOG.info("begin: " + begin);
+
+      // copy srcFiles
+      for (File srcFile : srcFiles) {
+        LOG.info("fileLength: " + srcFiles.length);
+        copyAndAssert(srcFile, destFile, bandwidth * 1024 * 1024);
+      }
+
+      // Check whether the speed limit is successfully limited
+      long end = System.currentTimeMillis();
+      LOG.info("end: " + end);
+      assertThat((int) (end - begin) / 1000,
+          greaterThanOrEqualTo(testFileCnt * fileSize / bandwidth));
+    } catch (IOException e) {
+      LOG.error("Exception encountered ", e);
+    }
+  }
+
+  private void copyAndAssert(File tmpFile, File outFile, long maxBPS)
+      throws IOException {
+    ThrottledInputStream in = new ThrottledInputStream(new FileInputStream(tmpFile), maxBPS);
+    OutputStream out = new FileOutputStream(outFile);
+    try {
+      copyBytes(in, out, BUFF_SIZE);
+      LOG.info("{}", in);
+      Assert.assertEquals(in.getTotalBytesRead(), tmpFile.length());
+
+      long bytesPerSec = in.getBytesPerSec();
+      Assert.assertTrue(bytesPerSec < maxBPS);
+    } finally {
+      IOUtils.closeStream(in);
+      IOUtils.closeStream(out);
+    }
+  }
+}