瀏覽代碼

HADOOP-15292. Distcp's use of pread is slowing it down.
Contributed by Virajith Jalaparti.

Steve Loughran 7 年之前
父節點
當前提交
3bd6b1fd85

+ 16 - 8
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java

@@ -260,7 +260,8 @@ public class RetriableFileCopyCommand extends RetriableCommand {
     boolean finished = false;
     try {
       inStream = getInputStream(source, context.getConfiguration());
-      int bytesRead = readBytes(inStream, buf, sourceOffset);
+      seekIfRequired(inStream, sourceOffset);
+      int bytesRead = readBytes(inStream, buf);
       while (bytesRead >= 0) {
         if (chunkLength > 0 &&
             (totalBytesRead + bytesRead) >= chunkLength) {
@@ -276,7 +277,7 @@ public class RetriableFileCopyCommand extends RetriableCommand {
         if (finished) {
           break;
         }
-        bytesRead = readBytes(inStream, buf, sourceOffset);
+        bytesRead = readBytes(inStream, buf);
       }
       outStream.close();
       outStream = null;
@@ -299,13 +300,20 @@ public class RetriableFileCopyCommand extends RetriableCommand {
     context.setStatus(message.toString());
   }
 
-  private static int readBytes(ThrottledInputStream inStream, byte buf[],
-      long position) throws IOException {
+  private static int readBytes(ThrottledInputStream inStream, byte buf[])
+      throws IOException {
+    try {
+      return inStream.read(buf);
+    } catch (IOException e) {
+      throw new CopyReadException(e);
+    }
+  }
+
+  private static void seekIfRequired(ThrottledInputStream inStream,
+      long sourceOffset) throws IOException {
     try {
-      if (position == 0) {
-        return inStream.read(buf);
-      } else {
-        return inStream.read(position, buf, 0, buf.length);
+      if (sourceOffset != inStream.getPos()) {
+        inStream.seek(sourceOffset);
       }
     } catch (IOException e) {
       throw new CopyReadException(e);

+ 27 - 21
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java

@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.tools.util;
 
-import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -33,7 +33,7 @@ import java.io.InputStream;
  * (Thus, while the read-rate might exceed the maximum for a given short interval,
  * the average tends towards the specified maximum, overall.)
  */
-public class ThrottledInputStream extends InputStream {
+public class ThrottledInputStream extends InputStream implements Seekable {
 
   private final InputStream rawStream;
   private final float maxBytesPerSec;
@@ -95,25 +95,6 @@ public class ThrottledInputStream extends InputStream {
     return readLen;
   }
 
-  /**
-   * Read bytes starting from the specified position. This requires rawStream is
-   * an instance of {@link PositionedReadable}.
-   */
-  public int read(long position, byte[] buffer, int offset, int length)
-      throws IOException {
-    if (!(rawStream instanceof PositionedReadable)) {
-      throw new UnsupportedOperationException(
-          "positioned read is not supported by the internal stream");
-    }
-    throttle();
-    int readLen = ((PositionedReadable) rawStream).read(position, buffer,
-        offset, length);
-    if (readLen != -1) {
-      bytesRead += readLen;
-    }
-    return readLen;
-  }
-
   private void throttle() throws IOException {
     while (getBytesPerSec() > maxBytesPerSec) {
       try {
@@ -165,4 +146,29 @@ public class ThrottledInputStream extends InputStream {
         ", totalSleepTime=" + totalSleepTime +
         '}';
   }
+
+  private void checkSeekable() throws IOException {
+    if (!(rawStream instanceof Seekable)) {
+      throw new UnsupportedOperationException(
+          "seek operations are unsupported by the internal stream");
+    }
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    checkSeekable();
+    ((Seekable) rawStream).seek(pos);
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    checkSeekable();
+    return ((Seekable) rawStream).getPos();
+  }
+
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    checkSeekable();
+    return ((Seekable) rawStream).seekToNewSource(targetPos);
+  }
 }

+ 23 - 1
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java

@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.tools.CopyListingFileStatus;
@@ -55,6 +56,10 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+
 public class TestCopyMapper {
   private static final Log LOG = LogFactory.getLog(TestCopyMapper.class);
   private static List<Path> pathList = new ArrayList<Path>();
@@ -248,7 +253,11 @@ public class TestCopyMapper {
 
     // do the distcp again with -update and -append option
     CopyMapper copyMapper = new CopyMapper();
-    StubContext stubContext = new StubContext(getConfiguration(), null, 0);
+    Configuration conf = getConfiguration();
+    // set the buffer size to 1/10th the size of the file.
+    conf.setInt(DistCpOptionSwitch.COPY_BUFFER_SIZE.getConfigLabel(),
+        DEFAULT_FILE_SIZE/10);
+    StubContext stubContext = new StubContext(conf, null, 0);
     Mapper<Text, CopyListingFileStatus, Text, Text>.Context context =
         stubContext.getContext();
     // Enable append 
@@ -257,6 +266,10 @@ public class TestCopyMapper {
     copyMapper.setup(context);
 
     int numFiles = 0;
+    MetricsRecordBuilder rb =
+        getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
+    String readCounter = "ReadsFromLocalClient";
+    long readsFromClient = getLongCounter(readCounter, rb);
     for (Path path: pathList) {
       if (fs.getFileStatus(path).isFile()) {
         numFiles++;
@@ -274,6 +287,15 @@ public class TestCopyMapper {
         .getValue());
     Assert.assertEquals(numFiles, stubContext.getReporter().
         getCounter(CopyMapper.Counter.COPY).getValue());
+    rb = getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
+    /*
+     * added as part of HADOOP-15292 to ensure that multiple readBlock()
+     * operations are not performed to read a block from a single Datanode.
+     * assert assumes that there is only one block per file, and that the number
+     * of files appended to in appendSourceData() above is captured by the
+     * variable numFiles.
+     */
+    assertCounter(readCounter, readsFromClient + numFiles, rb);
   }
 
   private void testCopy(boolean preserveChecksum) throws Exception {