Browse Source

Merge of -r 651005:651006 from trunk to 0.17 to fix HADOOP-3285.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.17@651009 13f79535-47bb-0310-9956-ffa450edef68
Owen O'Malley 17 năm trước cách đây
mục cha
commit
817806a00a

+ 3 - 0
CHANGES.txt

@@ -577,6 +577,9 @@ Release 0.17.0 - Unreleased
     HADOOP-3286. Prevent collisions in gridmix output dirs by increasing the
     granularity of the timestamp. (Runping Qi via cdouglas)
 
+    HADOOP-3285. Fix input split locality when the splits align to
+    fs blocks. (omalley)
+
 Release 0.16.3 - 2008-04-16
 
   BUG FIXES

+ 12 - 0
src/java/org/apache/hadoop/fs/BlockLocation.java

@@ -177,4 +177,16 @@ public class BlockLocation implements Writable {
       hosts[i] = host.toString();
     }
   }
+  
+  public String toString() {
+    StringBuilder result = new StringBuilder();
+    result.append(offset);
+    result.append(',');
+    result.append(length);
+    for(String h: hosts) {
+      result.append(',');
+      result.append(h);
+    }
+    return result.toString();
+  }
 }

+ 9 - 4
src/java/org/apache/hadoop/mapred/FileInputFormat.java

@@ -279,12 +279,17 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
   protected int getBlockIndex(BlockLocation[] blkLocations, 
                               long offset) {
     for (int i = 0 ; i < blkLocations.length; i++) {
+      // is the offset inside this block?
       if ((blkLocations[i].getOffset() <= offset) &&
-        ((blkLocations[i].getOffset() + blkLocations[i].getLength()) >= 
-        offset))
-          return i;
+          (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
+        return i;
+      }
     }
-    return 0;
+    BlockLocation last = blkLocations[blkLocations.length];
+    long fileLength = last.getOffset() + last.getLength() -1;
+    throw new IllegalArgumentException("Offset " + offset + 
+                                       " is outside of file (0.." +
+                                       fileLength + ")");
   }
 
   /**

+ 86 - 0
src/test/org/apache/hadoop/mapred/TestFileInputFormat.java

@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import junit.framework.TestCase;
+
+public class TestFileInputFormat extends TestCase {
+
+  public void testLocality() throws Exception {
+    JobConf conf = new JobConf();
+    MiniDFSCluster dfs = null;
+    try {
+      dfs = new MiniDFSCluster(conf, 4, true,
+                               new String[]{"/rack0", "/rack0", 
+                                             "/rack1", "/rack1"},
+                               new String[]{"host0", "host1", 
+                                            "host2", "host3"});
+      FileSystem fs = dfs.getFileSystem();
+      System.out.println("FileSystem " + fs.getUri());
+      Path path = new Path("/foo/bar");
+      // create a multi-block file on hdfs
+      DataOutputStream out = fs.create(path, true, 4096, 
+                                       (short) 2, 512, null);
+      for(int i=0; i < 1000; ++i) {
+        out.writeChars("Hello\n");
+      }
+      out.close();
+      System.out.println("Wrote file");
+
+      // split it using a file input format
+      TextInputFormat.addInputPath(conf, path);
+      TextInputFormat inFormat = new TextInputFormat();
+      inFormat.configure(conf);
+      InputSplit[] splits = inFormat.getSplits(conf, 1);
+      BlockLocation[] locations = 
+        fs.getFileBlockLocations(path, 0, fs.getFileStatus(path).getLen());
+      System.out.println("Made splits");
+
+      // make sure that each split is a block and the locations match
+      for(int i=0; i < splits.length; ++i) {
+        FileSplit fileSplit = (FileSplit) splits[i];
+        System.out.println("File split: " + fileSplit);
+        for (String h: fileSplit.getLocations()) {
+          System.out.println("Location: " + h);
+        }
+        System.out.println("Block: " + locations[i]);
+        assertEquals(locations[i].getOffset(), fileSplit.getStart());
+        assertEquals(locations[i].getLength(), fileSplit.getLength());
+        String[] blockLocs = locations[i].getHosts();
+        String[] splitLocs = fileSplit.getLocations();
+        assertEquals(2, blockLocs.length);
+        assertEquals(2, splitLocs.length);
+        assertTrue((blockLocs[0].equals(splitLocs[0]) && 
+                    blockLocs[1].equals(splitLocs[1])) ||
+                   (blockLocs[1].equals(splitLocs[0]) &&
+                    blockLocs[0].equals(splitLocs[1])));
+      }
+    } finally {
+      if (dfs != null) {
+        dfs.shutdown();
+      }
+    }
+  }
+}