浏览代码

MAPREDUCE-3710. Improved FileInputFormat to return better locality for the last split. Contributed by Siddarth Seth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1235510 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli 13 年之前
父节点
当前提交
dc615c312b

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -189,6 +189,9 @@ Release 0.23.1 - Unreleased
 
     MAPREDUCE-3692. yarn-resourcemanager out and log files can get big. (eli)
 
+    MAPREDUCE-3710. Improved FileInputFormat to return better locality for the
+    last split. (Siddarth Seth via vinodkv)
+
   OPTIMIZATIONS
 
     MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar

+ 4 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java

@@ -289,8 +289,10 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
         }
         
         if (bytesRemaining != 0) {
-          splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, 
-                     blkLocations[blkLocations.length-1].getHosts()));
+          String[] splitHosts = getSplitHosts(blkLocations, length
+              - bytesRemaining, bytesRemaining, clusterMap);
+          splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
+              splitHosts));
         }
       } else if (length != 0) {
         String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);

+ 2 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java

@@ -286,8 +286,9 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
           }
 
           if (bytesRemaining != 0) {
+            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
             splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
-                       blkLocations[blkLocations.length-1].getHosts()));
+                       blkLocations[blkIndex].getHosts()));
           }
         } else { // not splitable
           splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));

+ 101 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java

@@ -17,6 +17,10 @@
  */
 package org.apache.hadoop.mapred;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.io.DataOutputStream;
 import java.io.IOException;
 
@@ -32,6 +36,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.Text;
 
+@SuppressWarnings("deprecation")
 public class TestFileInputFormat extends TestCase {
 
   Configuration conf = new Configuration();
@@ -186,6 +191,102 @@ public class TestFileInputFormat extends TestCase {
     assertEquals(splits.length, 2);
   }
 
+  @SuppressWarnings("rawtypes")
+  public void testLastInputSplitAtSplitBoundary() throws Exception {
+    FileInputFormat fif = new FileInputFormatForTest(1024l * 1024 * 1024,
+        128l * 1024 * 1024);
+    JobConf job = new JobConf();
+    InputSplit[] splits = fif.getSplits(job, 8);
+    assertEquals(8, splits.length);
+    for (int i = 0; i < splits.length; i++) {
+      InputSplit split = splits[i];
+      assertEquals(("host" + i), split.getLocations()[0]);
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  public void testLastInputSplitExceedingSplitBoundary() throws Exception {
+    FileInputFormat fif = new FileInputFormatForTest(1027l * 1024 * 1024,
+        128l * 1024 * 1024);
+    JobConf job = new JobConf();
+    InputSplit[] splits = fif.getSplits(job, 8);
+    assertEquals(8, splits.length);
+    for (int i = 0; i < splits.length; i++) {
+      InputSplit split = splits[i];
+      assertEquals(("host" + i), split.getLocations()[0]);
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  public void testLastInputSplitSingleSplit() throws Exception {
+    FileInputFormat fif = new FileInputFormatForTest(100l * 1024 * 1024,
+        128l * 1024 * 1024);
+    JobConf job = new JobConf();
+    InputSplit[] splits = fif.getSplits(job, 1);
+    assertEquals(1, splits.length);
+    for (int i = 0; i < splits.length; i++) {
+      InputSplit split = splits[i];
+      assertEquals(("host" + i), split.getLocations()[0]);
+    }
+  }
+
+  private class FileInputFormatForTest<K, V> extends FileInputFormat<K, V> {
+
+    long splitSize;
+    long length;
+
+    FileInputFormatForTest(long length, long splitSize) {
+      this.length = length;
+      this.splitSize = splitSize;
+    }
+
+    @Override
+    public RecordReader<K, V> getRecordReader(InputSplit split, JobConf job,
+        Reporter reporter) throws IOException {
+      return null;
+    }
+
+    @Override
+    protected FileStatus[] listStatus(JobConf job) throws IOException {
+      FileStatus mockFileStatus = mock(FileStatus.class);
+      when(mockFileStatus.getBlockSize()).thenReturn(splitSize);
+      when(mockFileStatus.isDirectory()).thenReturn(false);
+      Path mockPath = mock(Path.class);
+      FileSystem mockFs = mock(FileSystem.class);
+
+      BlockLocation[] blockLocations = mockBlockLocations(length, splitSize);
+      when(mockFs.getFileBlockLocations(mockFileStatus, 0, length)).thenReturn(
+          blockLocations);
+      when(mockPath.getFileSystem(any(Configuration.class))).thenReturn(mockFs);
+
+      when(mockFileStatus.getPath()).thenReturn(mockPath);
+      when(mockFileStatus.getLen()).thenReturn(length);
+
+      FileStatus[] fs = new FileStatus[1];
+      fs[0] = mockFileStatus;
+      return fs;
+    }
+
+    @Override
+    protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
+      return splitSize;
+    }
+
+    private BlockLocation[] mockBlockLocations(long size, long splitSize) {
+      int numLocations = (int) (size / splitSize);
+      if (size % splitSize != 0)
+        numLocations++;
+      BlockLocation[] blockLocations = new BlockLocation[numLocations];
+      for (int i = 0; i < numLocations; i++) {
+        String[] names = new String[] { "b" + i };
+        String[] hosts = new String[] { "host" + i };
+        blockLocations[i] = new BlockLocation(names, hosts, i * splitSize,
+            Math.min(splitSize, size - (splitSize * i)));
+      }
+      return blockLocations;
+    }
+  }
+
   static void writeFile(Configuration conf, Path name,
       short replication, int numBlocks) throws IOException {
     FileSystem fileSys = FileSystem.get(conf);

+ 111 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java

@@ -19,7 +19,9 @@
 package org.apache.hadoop.mapreduce.lib.input;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
 import org.junit.Test;
 import static org.junit.Assert.*;
@@ -28,10 +30,15 @@ import static org.mockito.Mockito.*;
 import static org.apache.hadoop.test.MockitoMaker.*;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 public class TestFileInputFormat {
 
@@ -80,4 +87,108 @@ public class TestFileInputFormat {
     ispy.getSplits(job);
     verify(conf).setLong(FileInputFormat.NUM_INPUT_FILES, 1);
   }
+  
+  @Test
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public void testLastInputSplitAtSplitBoundary() throws Exception {
+    FileInputFormat fif = new FileInputFormatForTest(1024l * 1024 * 1024,
+        128l * 1024 * 1024);
+    Configuration conf = new Configuration();
+    JobContext jobContext = mock(JobContext.class);
+    when(jobContext.getConfiguration()).thenReturn(conf);
+    List<InputSplit> splits = fif.getSplits(jobContext);
+    assertEquals(8, splits.size());
+    for (int i = 0 ; i < splits.size() ; i++) {
+      InputSplit split = splits.get(i);
+      assertEquals(("host" + i), split.getLocations()[0]);
+    }
+  }
+  
+  @Test
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public void testLastInputSplitExceedingSplitBoundary() throws Exception {
+    FileInputFormat fif = new FileInputFormatForTest(1027l * 1024 * 1024,
+        128l * 1024 * 1024);
+    Configuration conf = new Configuration();
+    JobContext jobContext = mock(JobContext.class);
+    when(jobContext.getConfiguration()).thenReturn(conf);
+    List<InputSplit> splits = fif.getSplits(jobContext);
+    assertEquals(8, splits.size());
+    for (int i = 0; i < splits.size(); i++) {
+      InputSplit split = splits.get(i);
+      assertEquals(("host" + i), split.getLocations()[0]);
+    }
+  }
+
+  @Test
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public void testLastInputSplitSingleSplit() throws Exception {
+    FileInputFormat fif = new FileInputFormatForTest(100l * 1024 * 1024,
+        128l * 1024 * 1024);
+    Configuration conf = new Configuration();
+    JobContext jobContext = mock(JobContext.class);
+    when(jobContext.getConfiguration()).thenReturn(conf);
+    List<InputSplit> splits = fif.getSplits(jobContext);
+    assertEquals(1, splits.size());
+    for (int i = 0; i < splits.size(); i++) {
+      InputSplit split = splits.get(i);
+      assertEquals(("host" + i), split.getLocations()[0]);
+    }
+  }
+
+  private class FileInputFormatForTest<K, V> extends FileInputFormat<K, V> {
+
+    long splitSize;
+    long length;
+
+    FileInputFormatForTest(long length, long splitSize) {
+      this.length = length;
+      this.splitSize = splitSize;
+    }
+
+    @Override
+    public RecordReader<K, V> createRecordReader(InputSplit split,
+        TaskAttemptContext context) throws IOException, InterruptedException {
+      return null;
+    }
+
+    @Override
+    protected List<FileStatus> listStatus(JobContext job) throws IOException {
+      FileStatus mockFileStatus = mock(FileStatus.class);
+      when(mockFileStatus.getBlockSize()).thenReturn(splitSize);
+      Path mockPath = mock(Path.class);
+      FileSystem mockFs = mock(FileSystem.class);
+
+      BlockLocation[] blockLocations = mockBlockLocations(length, splitSize);
+      when(mockFs.getFileBlockLocations(mockFileStatus, 0, length)).thenReturn(
+          blockLocations);
+      when(mockPath.getFileSystem(any(Configuration.class))).thenReturn(mockFs);
+
+      when(mockFileStatus.getPath()).thenReturn(mockPath);
+      when(mockFileStatus.getLen()).thenReturn(length);
+
+      List<FileStatus> list = new ArrayList<FileStatus>();
+      list.add(mockFileStatus);
+      return list;
+    }
+
+    @Override
+    protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
+      return splitSize;
+    }
+
+    private BlockLocation[] mockBlockLocations(long size, long splitSize) {
+      int numLocations = (int) (size / splitSize);
+      if (size % splitSize != 0)
+        numLocations++;
+      BlockLocation[] blockLocations = new BlockLocation[numLocations];
+      for (int i = 0; i < numLocations; i++) {
+        String[] names = new String[] { "b" + i };
+        String[] hosts = new String[] { "host" + i };
+        blockLocations[i] = new BlockLocation(names, hosts, i * splitSize,
+            Math.min(splitSize, size - (splitSize * i)));
+      }
+      return blockLocations;
+    }
+  }
 }