Ver Fonte

HADOOP-2027. Return the number of bytes in each block in a file via a
single rpc to the namenode to speed up job planning. Contributed by
Lohit Vijaya Renu.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@639081 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley há 17 anos atrás
pai
commit
4ae9adc2f6
27 ficheiros alterados com 426 adições e 89 exclusões
  1. 6 2
      CHANGES.txt
  2. 2 1
      src/examples/org/apache/hadoop/examples/RandomWriter.java
  3. 41 5
      src/java/org/apache/hadoop/dfs/DFSClient.java
  4. 4 3
      src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
  5. 180 0
      src/java/org/apache/hadoop/fs/BlockLocation.java
  6. 36 5
      src/java/org/apache/hadoop/fs/FileSystem.java
  7. 5 5
      src/java/org/apache/hadoop/fs/FilterFileSystem.java
  8. 4 3
      src/java/org/apache/hadoop/fs/FsShell.java
  9. 18 7
      src/java/org/apache/hadoop/fs/InMemoryFileSystem.java
  10. 17 8
      src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java
  11. 23 6
      src/java/org/apache/hadoop/mapred/FileInputFormat.java
  12. 21 9
      src/java/org/apache/hadoop/mapred/FileSplit.java
  13. 6 4
      src/java/org/apache/hadoop/mapred/MultiFileSplit.java
  14. 2 2
      src/java/org/apache/hadoop/util/CopyFiles.java
  15. 4 2
      src/test/org/apache/hadoop/dfs/DFSTestUtil.java
  16. 11 7
      src/test/org/apache/hadoop/dfs/TestDatanodeDeath.java
  17. 7 4
      src/test/org/apache/hadoop/dfs/TestDecommission.java
  18. 4 2
      src/test/org/apache/hadoop/dfs/TestFileCreation.java
  19. 4 2
      src/test/org/apache/hadoop/dfs/TestFileStatus.java
  20. 3 2
      src/test/org/apache/hadoop/dfs/TestSetrepIncreasing.java
  21. 3 1
      src/test/org/apache/hadoop/dfs/TestSmallBlock.java
  22. 14 1
      src/test/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java
  23. 2 2
      src/test/org/apache/hadoop/io/FileBench.java
  24. 1 1
      src/test/org/apache/hadoop/mapred/GenericMRLoadGenerator.java
  25. 4 2
      src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
  26. 2 1
      src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryInputFormat.java
  27. 2 2
      src/test/org/apache/hadoop/mapred/ThreadedMapBenchmark.java

+ 6 - 2
CHANGES.txt

@@ -130,11 +130,15 @@ Trunk (unreleased changes)
     (Tsz Wo (Nicholas), SZE via dhruba)
 
     HADOOP-2606. ReplicationMonitor selects data-nodes to replicate directly
-		from needed replication blocks instead of looking up for the blocks for 
-		each live data-node. (shv)
+    from needed replication blocks instead of looking up for the blocks for 
+    each live data-node. (shv)
 
     HADOOP-2148. Eliminate redundant data-node blockMap lookups. (shv)
 
+    HADOOP-2027. Return the number of bytes in each block in a file
+    via a single rpc to the namenode to speed up job planning. 
+    (Lohit Vijaya Renu via omalley)
+
   BUG FIXES
 
     HADOOP-2195. '-mkdir' behaviour is now closer to Linux shell in case of

+ 2 - 1
src/examples/org/apache/hadoop/examples/RandomWriter.java

@@ -107,7 +107,8 @@ public class RandomWriter extends Configured implements Tool {
       InputSplit[] result = new InputSplit[numSplits];
       Path outDir = job.getOutputPath();
       for(int i=0; i < result.length; ++i) {
-        result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, job);
+        result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, 
+                                  (String[])null);
       }
       return result;
     }

+ 41 - 5
src/java/org/apache/hadoop/dfs/DFSClient.java

@@ -126,6 +126,7 @@ class DFSClient implements FSConstants {
     methodNameToPolicyMap.put("isDir", methodPolicy);
     methodNameToPolicyMap.put("getListing", methodPolicy);
     methodNameToPolicyMap.put("getHints", methodPolicy);
+    methodNameToPolicyMap.put("getBlockLocations", methodPolicy);
     methodNameToPolicyMap.put("renewLease", methodPolicy);
     methodNameToPolicyMap.put("getStats", methodPolicy);
     methodNameToPolicyMap.put("getDatanodeReport", methodPolicy);
@@ -242,6 +243,8 @@ class DFSClient implements FSConstants {
   }
     
   /**
+   *  @deprecated Use getBlockLocations instead
+   *
    * Get hints about the location of the indicated block(s).
    * 
    * getHints() returns a list of hostnames that store data for
@@ -253,25 +256,58 @@ class DFSClient implements FSConstants {
    * MapReduce system tries to schedule tasks on the same machines
    * as the data-block the task processes. 
    */
+  @Deprecated
   public String[][] getHints(String src, long start, long length) 
     throws IOException {
+    BlockLocation[] blkLocations = getBlockLocations(src, start, length);
+    if ((blkLocations == null) || (blkLocations.length == 0)) {
+      return new String[0][];
+    }
+    int blkCount = blkLocations.length;
+    String[][]hints = new String[blkCount][];
+    for (int i=0; i < blkCount ; i++) {
+      String[] hosts = blkLocations[i].getHosts();
+      hints[i] = new String[hosts.length];
+      hints[i] = hosts;
+    }
+    return hints;
+  }
+  
+  /**
+   * Get block location info about file
+   * 
+   * getBlockLocations() returns a list of hostnames that store 
+   * data for a specific file region.  It returns a set of hostnames
+   * for every block within the indicated region.
+   *
+   * This function is very useful when writing code that considers
+   * data-placement when performing operations.  For example, the
+   * MapReduce system tries to schedule tasks on the same machines
+   * as the data-block the task processes. 
+   */
+  public BlockLocation[] getBlockLocations(String src, long start, 
+    long length) throws IOException {
     LocatedBlocks blocks = namenode.getBlockLocations(src, start, length);
     if (blocks == null) {
-      return new String[0][];
+      return new BlockLocation[0];
     }
     int nrBlocks = blocks.locatedBlockCount();
-    String[][] hints = new String[nrBlocks][];
+    BlockLocation[] blkLocations = new BlockLocation[nrBlocks];
     int idx = 0;
     for (LocatedBlock blk : blocks.getLocatedBlocks()) {
       assert idx < nrBlocks : "Incorrect index";
       DatanodeInfo[] locations = blk.getLocations();
-      hints[idx] = new String[locations.length];
+      String[] hosts = new String[locations.length];
+      String[] names = new String[locations.length];
       for (int hCnt = 0; hCnt < locations.length; hCnt++) {
-        hints[idx][hCnt] = locations[hCnt].getHostName();
+        hosts[hCnt] = locations[hCnt].getHostName();
+        names[hCnt] = locations[hCnt].getName();
       }
+      blkLocations[idx] = new BlockLocation(names, hosts, blk.getStartOffset(),
+                                            blk.getBlockSize());
       idx++;
     }
-    return hints;
+    return blkLocations;
   }
 
   public DFSInputStream open(String src) throws IOException {

+ 4 - 3
src/java/org/apache/hadoop/dfs/DistributedFileSystem.java

@@ -114,10 +114,11 @@ public class DistributedFileSystem extends FileSystem {
     }
     return result;
   }
+  
 
-  public String[][] getFileCacheHints(Path f, long start, long len)
-      throws IOException {
-    return dfs.getHints(getPathName(f), start, len);
+  public BlockLocation[] getFileBlockLocations(Path f, long start,
+      long len) throws IOException {
+    return dfs.getBlockLocations(getPathName(f), start, len);
   }
 
   public void setVerifyChecksum(boolean verifyChecksum) {

+ 180 - 0
src/java/org/apache/hadoop/fs/BlockLocation.java

@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.io.*;
+
+import java.io.*;
+
+/*
+ * A BlockLocation lists hosts, offset and length
+ * of block. 
+ * 
+ */
+public class BlockLocation implements Writable {
+
+  static {               // register a ctor
+    WritableFactories.setFactory
+      (BlockLocation.class,
+       new WritableFactory() {
+         public Writable newInstance() { return new BlockLocation(); }
+       });
+  }
+
+  private String[] hosts; //hostnames of datanodes
+  private String[] names; //hostname:portNumber of datanodes
+  private long offset;  //offset of the of the block in the file
+  private long length;
+
+  /**
+   * Default Constructor
+   */
+  public BlockLocation() {
+    this(new String[0], new String[0],  0L, 0L);
+  }
+
+  /**
+   * Constructor with host, name, offset and length
+   */
+  public BlockLocation(String[] names, String[] hosts, long offset, 
+                       long length) {
+    if (names == null) {
+      this.names = new String[0];
+    } else {
+      this.names = names;
+    }
+    if (hosts == null) {
+      this.hosts = new String[0];
+    } else {
+      this.hosts = hosts;
+    }
+    this.offset = offset;
+    this.length = length;
+  }
+
+  /**
+   * Get the list of hosts (hostname) hosting this block
+   */
+  public String[] getHosts() throws IOException {
+    if ((hosts == null) || (hosts.length == 0)) {
+      return new String[0];
+    } else {
+      return hosts;
+    }
+  }
+
+  /**
+   * Get the list of names (hostname:port) hosting this block
+   */
+  public String[] getNames() throws IOException {
+    if ((names == null) || (names.length == 0)) {
+      return new String[0];
+    } else {
+      return this.names;
+    }
+  }
+  
+  /**
+   * Get the start offset of file associated with this block
+   */
+  public long getOffset() {
+    return offset;
+  }
+  
+  /**
+   * Get the length of the block
+   */
+  public long getLength() {
+    return length;
+  }
+  
+  /**
+   * Set the start offset of file associated with this block
+   */
+  public void setOffset(long offset) {
+    this.offset = offset;
+  }
+
+  /**
+   * Set the length of block
+   */
+  public void setLength(long length) {
+    this.length = length;
+  }
+
+  /**
+   * Set the hosts hosting this block
+   */
+  public void setHosts(String[] hosts) throws IOException {
+    if (hosts == null) {
+      this.hosts = new String[0];
+    } else {
+      this.hosts = hosts;
+    }
+  }
+
+  /**
+   * Set the names (host:port) hosting this block
+   */
+  public void setNames(String[] names) throws IOException {
+    if (names == null) {
+      this.names = new String[0];
+    } else {
+      this.names = names;
+    }
+  }
+
+  /**
+   * Implement write of Writable
+   */
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(offset);
+    out.writeLong(length);
+    out.writeInt(names.length);
+    for (int i=0; i < names.length; i++) {
+      Text name = new Text(names[i]);
+      name.write(out);
+    }
+    out.writeInt(hosts.length);
+    for (int i=0; i < hosts.length; i++) {
+      Text host = new Text(hosts[i]);
+      host.write(out);
+    }
+  }
+  
+  /**
+   * Implement readFields of Writable
+   */
+  public void readFields(DataInput in) throws IOException {
+    this.offset = in.readLong();
+    this.length = in.readLong();
+    int numNames = in.readInt();
+    this.names = new String[numNames];
+    for (int i = 0; i < numNames; i++) {
+      Text name = new Text();
+      name.readFields(in);
+      names[i] = name.toString();
+    }
+    int numHosts = in.readInt();
+    for (int i = 0; i < numHosts; i++) {
+      Text host = new Text();
+      host.readFields(in);
+      hosts[i] = host.toString();
+    }
+  }
+}

+ 36 - 5
src/java/org/apache/hadoop/fs/FileSystem.java

@@ -297,6 +297,8 @@ public abstract class FileSystem extends Configured implements Closeable {
   }
 
   /**
+   * @deprecated Use getFileBlockLocations() instead
+   *
    * Return a 2D array of size 1x1 or greater, containing hostnames 
    * where portions of the given file can be found.  For a nonexistent 
    * file or regions, null will be returned.
@@ -306,18 +308,47 @@ public abstract class FileSystem extends Configured implements Closeable {
    *
    * The FileSystem will simply return an elt containing 'localhost'.
    */
+  @Deprecated
   public String[][] getFileCacheHints(Path f, long start, long len)
-    throws IOException {
+      throws IOException {
+    BlockLocation[] blkLocations = getFileBlockLocations(f, start, len);
+    if ((blkLocations == null) || (blkLocations.length == 0)) {
+      return new String[0][];
+    }
+    int blkCount = blkLocations.length;
+    String[][] hints = new String[blkCount][];
+    for (int i=0; i < blkCount; i++) {
+      String[] hosts = blkLocations[i].getHosts();
+      hints[i] = new String[hosts.length];
+      hints[i] = hosts;
+    }
+    return hints;
+  }
+
+  /**
+   * Return an array containing hostnames, offset and size of 
+   * portions of the given file.  For a nonexistent 
+   * file or regions, null will be returned.
+   *
+   * This call is most helpful with DFS, where it returns 
+   * hostnames of machines that contain the given file.
+   *
+   * The FileSystem will simply return an elt containing 'localhost'.
+   */
+  public BlockLocation[] getFileBlockLocations(Path f, 
+    long start, long len) throws IOException {
     if (!exists(f)) {
       return null;
     } else {
-      String result[][] = new String[1][];
-      result[0] = new String[1];
-      result[0][0] = "localhost";
+      BlockLocation result[] = new BlockLocation[1];
+      String[] name = new String[1];
+      name[0] = "localhost:50010";
+      String[] host = new String[1];
+      host[0] = "localhost";
+      result[0] = new BlockLocation(name, host, 0, len);
       return result;
     }
   }
-
   /**
    * Opens an FSDataInputStream at the indicated Path.
    * @param f the file name to open

+ 5 - 5
src/java/org/apache/hadoop/fs/FilterFileSystem.java

@@ -82,8 +82,8 @@ public class FilterFileSystem extends FileSystem {
   }
 
   /**
-   * Return a 2D array of size 1x1 or greater, containing hostnames 
-   * where portions of the given file can be found.  For a nonexistent 
+   * Return an array containing hostnames, offset and size of 
+   * portions of the given file.  For a nonexistent 
    * file or regions, null will be returned.
    *
    * This call is most helpful with DFS, where it returns 
@@ -91,9 +91,9 @@ public class FilterFileSystem extends FileSystem {
    *
    * The FileSystem will simply return an elt containing 'localhost'.
    */
-  public String[][] getFileCacheHints(Path f, long start, long len)
-    throws IOException {
-    return fs.getFileCacheHints(f, start, len);
+  public BlockLocation[] getFileBlockLocations(Path f, long start,  
+    long len) throws IOException {
+      return fs.getFileBlockLocations(f, start, len);
   }
 
   /**

+ 4 - 3
src/java/org/apache/hadoop/fs/FsShell.java

@@ -498,10 +498,11 @@ public class FsShell extends Configured implements Tool {
       long len = fs.getFileStatus(f).getLen();
 
       for(boolean done = false; !done; ) {
-        String[][] locations = fs.getFileCacheHints(f, 0, len);
+        BlockLocation[] locations = fs.getFileBlockLocations(f, 0, len);
         int i = 0;
-        for(; i < locations.length && locations[i].length == rep; i++)
-          if (!printWarning && locations[i].length > rep) {
+        for(; i < locations.length && 
+          locations[i].getHosts().length == rep; i++)
+          if (!printWarning && locations[i].getHosts().length > rep) {
             System.out.println("\nWARNING: the waiting time may be long for "
                 + "DECREASING the number of replication.");
             printWarning = true;

+ 18 - 7
src/java/org/apache/hadoop/fs/InMemoryFileSystem.java

@@ -82,18 +82,29 @@ public class InMemoryFileSystem extends ChecksumFileSystem {
     }
 
     /**
-     * Return 1x1 'inmemory' cell if the file exists.
-     * Return null if otherwise.
-     */
-    public String[][] getFileCacheHints(Path f, long start, long len)
-      throws IOException {
+     * Return an array containing hostnames, offset and size.
+     *
+     * This call is most helpful with DFS, where it returns
+     * hostnames for machines that contain the given file.
+     
+     * The InMemoryFileSystem will simply return an elt 
+     * containing 'inmemory'
+     */ 
+    public BlockLocation[] getFileBlockLocations (Path f, 
+      long start, long len) throws IOException {
       if (!exists(f)) {
         return null;
       } else {
-        return new String[][] {{"inmemory"}};
+        BlockLocation result[] = new BlockLocation[1];
+        String[] name = new String[1];
+        name[0] = "inmemory:50010";
+        String[] host = new String[1];
+        host[0] = "inmemory";
+        result[0] = new BlockLocation(name, host, 0, len);
+        return result;
       }
     }
-
+    
     private class InMemoryInputStream extends FSInputStream {
       private DataInputBuffer din = new DataInputBuffer();
       private FileAttributes fAttr;

+ 17 - 8
src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.fs.BlockLocation;
 
 /**
  * A FileSystem backed by KFS.
@@ -367,14 +368,22 @@ public class KosmosFileSystem extends FileSystem {
      * Return null if the file doesn't exist; otherwise, get the
      * locations of the various chunks of the file file from KFS.
      */
-    public String[][] getFileCacheHints(Path f, long start, long len)
-	throws IOException {
-	if (!exists(f)) {
-	    return null;
-	}
-        String srep = makeAbsolute(f).toUri().getPath();
-        String[][] hints = kfsImpl.getDataLocation(srep, start, len);
-        return hints;
+    public BlockLocation[] getBlockLocations(Path f, long start, long len
+                                             ) throws IOException {
+      if (!exists(f)) {
+        return null;
+      }
+      String srep = makeAbsolute(f).toUri().getPath();
+      String[][] hints = kfsImpl.getDataLocation(srep, start, len);
+      BlockLocation[] result = new BlockLocation[hints.length];
+      long blockSize = getDefaultBlockSize();
+      long length = len;
+      for(int i=0; i < result.length; ++i) {
+        result[i] = new BlockLocation(null, hints[i], start, 
+                                      length < blockSize ? length : blockSize);
+        length -= blockSize;
+      }
+      return result;
     }
 
     public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws IOException {

+ 23 - 6
src/java/org/apache/hadoop/mapred/FileInputFormat.java

@@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.BlockLocation;
 
 /** 
  * A base class for file-based {@link InputFormat}.
@@ -178,23 +179,28 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
       Path file = files[i];
       FileSystem fs = file.getFileSystem(job);
       long length = fs.getLength(file);
+      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
       if ((length != 0) && isSplitable(fs, file)) { 
         long blockSize = fs.getBlockSize(file);
         long splitSize = computeSplitSize(goalSize, minSize, blockSize);
 
         long bytesRemaining = length;
         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
-          splits.add(new FileSplit(file, length-bytesRemaining, splitSize,
-                                   job));
+          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
+          splits.add(new FileSplit(file, length-bytesRemaining, splitSize, 
+                                   blkLocations[blkIndex].getHosts()));
           bytesRemaining -= splitSize;
         }
         
         if (bytesRemaining != 0) {
-          splits.add(new FileSplit(file, length-bytesRemaining, 
-                                   bytesRemaining, job));
+          splits.add(new FileSplit(file, length-bytesRemaining, bytesRemaining, 
+                     blkLocations[blkLocations.length-1].getHosts()));
         }
-      } else {
-        splits.add(new FileSplit(file, 0, length, job));
+      } else if (length != 0) {
+        splits.add(new FileSplit(file, 0, length, blkLocations[0].getHosts()));
+      } else { 
+        //Create empty hosts array for zero length files
+        splits.add(new FileSplit(file, 0, length, new String[0]));
       }
     }
     LOG.debug("Total # of splits: " + splits.size());
@@ -205,4 +211,15 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
                                        long blockSize) {
     return Math.max(minSize, Math.min(goalSize, blockSize));
   }
+
+  protected int getBlockIndex(BlockLocation[] blkLocations, 
+                              long offset) {
+    for (int i = 0 ; i < blkLocations.length; i++) {
+      if ((blkLocations[i].getOffset() <= offset) &&
+        ((blkLocations[i].getOffset() + blkLocations[i].getLength()) >= 
+        offset))
+          return i;
+    }
+    return 0;
+  }
 }

+ 21 - 9
src/java/org/apache/hadoop/mapred/FileSplit.java

@@ -34,23 +34,35 @@ public class FileSplit implements InputSplit {
   private Path file;
   private long start;
   private long length;
-  private JobConf conf;
+  private String[] hosts;
   
   FileSplit() {}
 
   /** Constructs a split.
-   *
+   * @deprecated
    * @param file the file name
    * @param start the position of the first byte in the file to process
    * @param length the number of bytes in the file to process
    */
+  @Deprecated
   public FileSplit(Path file, long start, long length, JobConf conf) {
+    this(file, start, length, (String[])null);
+  }
+
+  /** Constructs a split with host information
+   *
+   * @param file the file name
+   * @param start the position of the first byte in the file to process
+   * @param length the number of bytes in the file to process
+   * @param hosts the list of hosts containing the block, possibly null
+   */
+  public FileSplit(Path file, long start, long length, String[] hosts) {
     this.file = file;
     this.start = start;
     this.length = length;
-    this.conf = conf;
+    this.hosts = hosts;
   }
-  
+
   /** @deprecated Call {@link #getPath()} instead. */
   public File getFile() { return new File(file.toString()); }
   
@@ -78,15 +90,15 @@ public class FileSplit implements InputSplit {
     file = new Path(UTF8.readString(in));
     start = in.readLong();
     length = in.readLong();
+    hosts = null;
   }
 
   public String[] getLocations() throws IOException {
-    String[][] hints = file.getFileSystem(conf).
-      getFileCacheHints(file, start, length);
-    if (hints != null && hints.length > 0) {
-      return hints[0];
+    if (this.hosts == null) {
+      return new String[]{};
+    } else {
+      return this.hosts;
     }
-    return new String[]{};
   }
   
 }

+ 6 - 4
src/java/org/apache/hadoop/mapred/MultiFileSplit.java

@@ -26,6 +26,7 @@ import java.util.Set;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.io.Text;
 
 /**
@@ -89,10 +90,11 @@ public class MultiFileSplit implements InputSplit {
   public String[] getLocations() throws IOException {
     HashSet<String> hostSet = new HashSet<String>();
     for (Path file : paths) {
-      String[][] hints = FileSystem.get(job)
-      .getFileCacheHints(file, 0, FileSystem.get(job).getFileStatus(file).getLen());
-      if (hints != null && hints.length > 0) {
-        addToSet(hostSet, hints[0]);
+      BlockLocation[] blkLocations = FileSystem.get(job)
+        .getFileBlockLocations(file, 0, FileSystem.get(job)
+        .getFileStatus(file).getLen());
+      if (blkLocations != null && blkLocations.length > 0) {
+        addToSet(hostSet, blkLocations[0].getHosts());
       }
     }
     return hostSet.toArray(new String[hostSet.size()]);

+ 2 - 2
src/java/org/apache/hadoop/util/CopyFiles.java

@@ -191,7 +191,7 @@ public class CopyFiles implements Tool {
           // cut the last split and put this next file in the next split.
           if (acc + key.get() > targetsize && acc != 0) {
             long splitsize = last - pos;
-            splits.add(new FileSplit(src, pos, splitsize, job));
+            splits.add(new FileSplit(src, pos, splitsize, (String[])null));
             cbrem -= splitsize;
             pos = last;
             acc = 0L;
@@ -203,7 +203,7 @@ public class CopyFiles implements Tool {
         checkAndClose(sl);
       }
       if (cbrem != 0) {
-        splits.add(new FileSplit(src, pos, cbrem, job));
+        splits.add(new FileSplit(src, pos, cbrem, (String[])null));
       }
 
       return splits.toArray(new FileSplit[splits.size()]);

+ 4 - 2
src/test/org/apache/hadoop/dfs/DFSTestUtil.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.BlockLocation;
 
 /**
  */
@@ -191,9 +192,10 @@ public class DFSTestUtil extends TestCase {
     boolean good;
     do {
       good = true;
-      String locs[][] = fs.getFileCacheHints(fileName, 0, Long.MAX_VALUE);
+      BlockLocation locs[] = fs.getFileBlockLocations(fileName, 0,
+                                                      Long.MAX_VALUE);
       for (int j = 0; j < locs.length; j++) {
-        String[] loc = locs[j];
+        String[] loc = locs[j].getHosts();
         if (loc.length != replFactor) {
           System.out.println("File " + fileName + " has replication factor " +
               loc.length);

+ 11 - 7
src/test/org/apache/hadoop/dfs/TestDatanodeDeath.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.BlockLocation;
 
 /**
  * This class tests that a file need not be closed before its
@@ -139,12 +140,13 @@ public class TestDatanodeDeath extends TestCase {
         Thread.sleep(1000);
       } catch (InterruptedException e) {}
       done = true;
-      String[][] locations = fileSys.getFileCacheHints(name, start, end);
+      BlockLocation[] locations = fileSys.getFileBlockLocations(name, start, 
+                                                                end);
       if (locations.length < 1) {
         done = false;
         continue;
       }
-      if (locations[0].length < repl) {
+      if (locations[0].getHosts().length < repl) {
         done = false;
         continue;
       }
@@ -205,7 +207,9 @@ public class TestDatanodeDeath extends TestCase {
         Thread.sleep(1000);
       } catch (InterruptedException e) {}
       done = true;
-      String[][] locations = fileSys.getFileCacheHints(name, 0, filesize);
+      BlockLocation[] locations = fileSys.getFileBlockLocations(name, 0, 
+                                                                filesize);
+
       if (locations.length < numblocks) {
         if (attempt > 100) {
           System.out.println("File " + name + " has only " +
@@ -217,14 +221,14 @@ public class TestDatanodeDeath extends TestCase {
         continue;
       }
       for (int idx = 0; idx < locations.length; idx++) {
-        if (locations[idx].length < repl) {
+        if (locations[idx].getHosts().length < repl) {
           if (attempt > 100) {
             System.out.println("File " + name + " has " +
                                locations.length + " blocks: " +
                                " The " + idx + " block has only " +
-                               locations[idx].length + " replicas " +
-                               " but is expected to have " + repl +
-                               " replicas.");
+                               locations[idx].getHosts().length + 
+                               " replicas but is expected to have " 
+                               + repl + " replicas.");
           }
           done = false;
           break;

+ 7 - 4
src/test/org/apache/hadoop/dfs/TestDecommission.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.dfs.FSConstants.DatanodeReportType;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.BlockLocation;
 
 /**
  * This class tests the decommissioning of nodes.
@@ -91,9 +92,10 @@ public class TestDecommission extends TestCase {
         Thread.sleep(1000);
       } catch (InterruptedException e) {}
       done = true;
-      String[][] locations = fileSys.getFileCacheHints(name, 0, fileSize);
+      BlockLocation[] locations = fileSys.getFileBlockLocations(name, 0, 
+                                                                fileSize);
       for (int idx = 0; idx < locations.length; idx++) {
-        if (locations[idx].length < repl) {
+        if (locations[idx].getHosts().length < repl) {
           done = false;
           break;
         }
@@ -103,9 +105,10 @@ public class TestDecommission extends TestCase {
 
   private void printFileLocations(FileSystem fileSys, Path name)
   throws IOException {
-    String[][] locations = fileSys.getFileCacheHints(name, 0, fileSize);
+    BlockLocation[] locations = fileSys.getFileBlockLocations(name, 0, 
+                                                              fileSize);
     for (int idx = 0; idx < locations.length; idx++) {
-      String[] loc = locations[idx];
+      String[] loc = locations[idx].getHosts();
       System.out.print("Block[" + idx + "] : ");
       for (int j = 0; j < loc.length; j++) {
         System.out.print(loc[j] + " ");

+ 4 - 2
src/test/org/apache/hadoop/dfs/TestFileCreation.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.dfs.FSConstants.DatanodeReportType;
+import org.apache.hadoop.fs.BlockLocation;
 
 
 /**
@@ -89,13 +90,14 @@ public class TestFileCreation extends TestCase {
         Thread.sleep(1000);
       } catch (InterruptedException e) {}
       done = true;
-      String[][] locations = fileSys.getFileCacheHints(name, 0, fileSize);
+      BlockLocation[] locations = fileSys.getFileBlockLocations(name, 0, 
+                                                                fileSize);
       if (locations.length < numBlocks) {
         done = false;
         continue;
       }
       for (int idx = 0; idx < locations.length; idx++) {
-        if (locations[idx].length < repl) {
+        if (locations[idx].getHosts().length < repl) {
           done = false;
           break;
         }

+ 4 - 2
src/test/org/apache/hadoop/dfs/TestFileStatus.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.BlockLocation;
 
 /**
  * This class tests the FileStatus API.
@@ -62,9 +63,10 @@ public class TestFileStatus extends TestCase {
         Thread.sleep(1000);
       } catch (InterruptedException e) {}
       done = true;
-      String[][] locations = fileSys.getFileCacheHints(name, 0, fileSize);
+      BlockLocation[] locations = fileSys.getFileBlockLocations(name, 0, 
+                                                                fileSize);
       for (int idx = 0; idx < locations.length; idx++) {
-        if (locations[idx].length < repl) {
+        if (locations[idx].getHosts().length < repl) {
           done = false;
           break;
         }

+ 3 - 2
src/test/org/apache/hadoop/dfs/TestSetrepIncreasing.java

@@ -22,6 +22,7 @@ import java.io.*;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.BlockLocation;
 
 public class TestSetrepIncreasing extends TestCase {
   static void setrep(int fromREP, int toREP, boolean simulatedStorage) throws IOException {
@@ -56,8 +57,8 @@ public class TestSetrepIncreasing extends TestCase {
       //get fs again since the old one may be closed
       fs = cluster.getFileSystem();
       long len = fs.getFileStatus(f).getLen();
-      for(String[] locations : fs.getFileCacheHints(f, 0, len)) {
-        assertTrue(locations.length == toREP);
+      for(BlockLocation locations : fs.getFileBlockLocations(f, 0, len)) {
+        assertTrue(locations.getHosts().length == toREP);
       }
       TestDFSShell.show("done setrep waiting: " + root);
     } finally {

+ 3 - 1
src/test/org/apache/hadoop/dfs/TestSmallBlock.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.BlockLocation;
 
 /**
  * This class tests the creation of files with block-size
@@ -58,7 +59,8 @@ public class TestSmallBlock extends TestCase {
   }
   
   private void checkFile(FileSystem fileSys, Path name) throws IOException {
-    String[][] locations = fileSys.getFileCacheHints(name, 0, fileSize);
+    BlockLocation[] locations = fileSys.getFileBlockLocations(name, 0, 
+                                                              fileSize);
     assertEquals("Number of blocks", fileSize, locations.length);
     FSDataInputStream stm = fileSys.open(name);
     byte[] expected = new byte[fileSize];

+ 14 - 1
src/test/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.BlockLocation;
 
 
 public class KFSEmulationImpl implements IFSImpl {
@@ -120,7 +121,19 @@ public class KFSEmulationImpl implements IFSImpl {
         return 1;
     }
     public String[][] getDataLocation(String path, long start, long len) throws IOException {
-        return localFS.getFileCacheHints(new Path(path), start, len);
+        BlockLocation[] blkLocations = 
+          localFS.getFileBlockLocations(new Path(path), start, len);
+          if ((blkLocations == null) || (blkLocations.length == 0)) {
+            return new String[0][];     
+          }
+          int blkCount = blkLocations.length;
+          String[][]hints = new String[blkCount][];
+          for (int i=0; i < blkCount ; i++) {
+            String[] hosts = blkLocations[i].getHosts();
+            hints[i] = new String[hosts.length];
+            hints[i] = hosts;
+          }
+          return hints;
     }
 
     public long getModificationTime(String path) throws IOException {

+ 2 - 2
src/test/org/apache/hadoop/io/FileBench.java

@@ -141,8 +141,8 @@ public class FileBench extends Configured implements Tool {
     final String fn = conf.get("test.filebench.name", "");
     Path pin = new Path(conf.getInputPaths()[0], fn);
     FileStatus in = pin.getFileSystem(conf).getFileStatus(pin);
-    RecordReader rr = inf.getRecordReader(
-        new FileSplit(pin, 0, in.getLen(), conf), conf, Reporter.NULL);
+    RecordReader rr = inf.getRecordReader(new FileSplit(pin, 0, in.getLen(), 
+                                          (String[])null), conf, Reporter.NULL);
     try {
       Object key = rr.createKey();
       Object val = rr.createValue();

+ 1 - 1
src/test/org/apache/hadoop/mapred/GenericMRLoadGenerator.java

@@ -445,7 +445,7 @@ public class GenericMRLoadGenerator extends Configured implements Tool {
             SequenceFileInputFormat.class), job);
       IndirectSplit is = ((IndirectSplit)split);
       return indirIF.getRecordReader(new FileSplit(is.getPath(), 0,
-            is.getLength(), job),
+            is.getLength(), (String[])null),
           job, reporter);
     }
   }

+ 4 - 2
src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java

@@ -24,6 +24,7 @@ import junit.framework.TestCase;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.dfs.*;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.SequenceFile;
@@ -158,8 +159,9 @@ public class TestRackAwareTaskPlacement extends TestCase {
     boolean isReplicationDone;
     
     do {
-      String[][] hints = fileSys.getFileCacheHints(name, 0, Long.MAX_VALUE);
-      if (hints[0].length == replication) {
+      BlockLocation[] hints = fileSys.getFileBlockLocations(name, 0, 
+                                                            Long.MAX_VALUE);
+      if (hints[0].getHosts().length == replication) {
         isReplicationDone = true;
       } else {
         isReplicationDone = false;  

+ 2 - 1
src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryInputFormat.java

@@ -71,7 +71,8 @@ public class TestSequenceFileAsBinaryInputFormat extends TestCase {
     DataInputBuffer buf = new DataInputBuffer();
     RecordReader<BytesWritable,BytesWritable> reader =
       bformat.getRecordReader(new FileSplit(file, 0,
-            fs.getFileStatus(file).getLen(), job), job, Reporter.NULL);
+                              fs.getFileStatus(file).getLen(), 
+                              (String[])null), job, Reporter.NULL);
     try {
       while (reader.next(bkey, bval)) {
         tkey.set(Integer.toString(r.nextInt(), 36));

+ 2 - 2
src/test/org/apache/hadoop/mapred/ThreadedMapBenchmark.java

@@ -80,8 +80,8 @@ public class ThreadedMapBenchmark extends Configured implements Tool {
       InputSplit[] result = new InputSplit[numSplits];
       Path outDir = job.getOutputPath();
       for(int i=0; i < result.length; ++i) {
-        result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, 
-                                  job);
+        result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i),
+                                  0, 1, (String[])null);
       }
       return result;
     }