Преглед на файлове

merge r1535792 through r1541341 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1541342 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze преди 11 години
родител
ревизия
ba98e8f737
променени са 47 файла, в които са добавени 2073 реда и са изтрити 326 реда
  1. 3 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 66 35
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BlockLocation.java
  3. 4 3
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/InvalidRequestException.java
  4. 108 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestBlockLocation.java
  5. 1 1
      hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSKerberosAuthenticationHandler.java
  6. 1 1
      hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSWithKerberos.java
  7. 1 1
      hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/lib/service/security/TestDelegationTokenManagerService.java
  8. 1 2
      hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
  9. 8 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  10. 1 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java
  11. 7 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
  12. 3 43
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
  13. 3 31
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
  14. 41 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
  15. 124 118
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
  16. 21 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java
  17. 0 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
  18. 6 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
  19. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  20. 10 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
  21. 9 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
  22. 5 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
  23. 8 12
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
  24. 1 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAbandonBlock.java
  25. 1 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
  26. 1 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java
  27. 1 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
  28. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java
  29. 1 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
  30. 1 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java
  31. 8 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
  32. 1 5
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java
  33. 1 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java
  34. 190 31
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java
  35. 50 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
  36. 1 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestXMLUtils.java
  37. 1 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestURLConnectionFactory.java
  38. 1 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java
  39. 3 0
      hadoop-mapreduce-project/CHANGES.txt
  40. 97 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthInputFormat.java
  41. 89 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthRecordReader.java
  42. 90 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthInputFormat.java
  43. 220 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java
  44. 416 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java
  45. 461 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java
  46. 3 0
      hadoop-yarn-project/CHANGES.txt
  47. 1 1
      hadoop-yarn-project/hadoop-yarn/bin/yarn.cmd

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -277,6 +277,9 @@ Trunk (Unreleased)
     HADOOP-9740. Fix FsShell '-text' command to be able to read Avro
     files stored in HDFS and other filesystems. (Allan Yan via cutting)
 
+    HDFS-5471. CacheAdmin -listPools fails when user lacks permissions to view
+    all pools (Andrew Wang via Colin Patrick McCabe)
+
   OPTIMIZATIONS
 
     HADOOP-7761. Improve the performance of raw comparisons. (todd)

+ 66 - 35
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BlockLocation.java

@@ -31,17 +31,33 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceStability.Stable
 public class BlockLocation {
   private String[] hosts; // Datanode hostnames
+  private String[] cachedHosts; // Datanode hostnames with a cached replica
   private String[] names; // Datanode IP:xferPort for accessing the block
   private String[] topologyPaths; // Full path name in network topology
   private long offset;  // Offset of the block in the file
   private long length;
   private boolean corrupt;
 
+  private static final String[] EMPTY_STR_ARRAY = new String[0];
+
   /**
    * Default Constructor
    */
   public BlockLocation() {
-    this(new String[0], new String[0],  0L, 0L);
+    this(EMPTY_STR_ARRAY, EMPTY_STR_ARRAY, 0L, 0L);
+  }
+
+  /**
+   * Copy constructor
+   */
+  public BlockLocation(BlockLocation that) {
+    this.hosts = that.hosts;
+    this.cachedHosts = that.cachedHosts;
+    this.names = that.names;
+    this.topologyPaths = that.topologyPaths;
+    this.offset = that.offset;
+    this.length = that.length;
+    this.corrupt = that.corrupt;
   }
 
   /**
@@ -57,20 +73,7 @@ public class BlockLocation {
    */
   public BlockLocation(String[] names, String[] hosts, long offset, 
                        long length, boolean corrupt) {
-    if (names == null) {
-      this.names = new String[0];
-    } else {
-      this.names = names;
-    }
-    if (hosts == null) {
-      this.hosts = new String[0];
-    } else {
-      this.hosts = hosts;
-    }
-    this.offset = offset;
-    this.length = length;
-    this.topologyPaths = new String[0];
-    this.corrupt = corrupt;
+    this(names, hosts, null, offset, length, corrupt);
   }
 
   /**
@@ -87,34 +90,55 @@ public class BlockLocation {
    */
   public BlockLocation(String[] names, String[] hosts, String[] topologyPaths,
                        long offset, long length, boolean corrupt) {
-    this(names, hosts, offset, length, corrupt);
+    this(names, hosts, null, topologyPaths, offset, length, corrupt);
+  }
+
+  public BlockLocation(String[] names, String[] hosts, String[] cachedHosts,
+      String[] topologyPaths, long offset, long length, boolean corrupt) {
+    if (names == null) {
+      this.names = EMPTY_STR_ARRAY;
+    } else {
+      this.names = names;
+    }
+    if (hosts == null) {
+      this.hosts = EMPTY_STR_ARRAY;
+    } else {
+      this.hosts = hosts;
+    }
+    if (cachedHosts == null) {
+      this.cachedHosts = EMPTY_STR_ARRAY;
+    } else {
+      this.cachedHosts = cachedHosts;
+    }
     if (topologyPaths == null) {
-      this.topologyPaths = new String[0];
+      this.topologyPaths = EMPTY_STR_ARRAY;
     } else {
       this.topologyPaths = topologyPaths;
     }
+    this.offset = offset;
+    this.length = length;
+    this.corrupt = corrupt;
   }
 
   /**
    * Get the list of hosts (hostname) hosting this block
    */
   public String[] getHosts() throws IOException {
-    if (hosts == null || hosts.length == 0) {
-      return new String[0];
-    } else {
-      return hosts;
-    }
+    return hosts;
+  }
+
+  /**
+   * Get the list of hosts (hostname) hosting a cached replica of the block
+   */
+  public String[] getCachedHosts() {
+   return cachedHosts;
   }
 
   /**
    * Get the list of names (IP:xferPort) hosting this block
    */
   public String[] getNames() throws IOException {
-    if (names == null || names.length == 0) {
-      return new String[0];
-    } else {
-      return names;
-    }
+    return names;
   }
 
   /**
@@ -122,11 +146,7 @@ public class BlockLocation {
    * The last component of the path is the "name" (IP:xferPort).
    */
   public String[] getTopologyPaths() throws IOException {
-    if (topologyPaths == null || topologyPaths.length == 0) {
-      return new String[0];
-    } else {
-      return topologyPaths;
-    }
+    return topologyPaths;
   }
   
   /**
@@ -176,18 +196,29 @@ public class BlockLocation {
    */
   public void setHosts(String[] hosts) throws IOException {
     if (hosts == null) {
-      this.hosts = new String[0];
+      this.hosts = EMPTY_STR_ARRAY;
     } else {
       this.hosts = hosts;
     }
   }
 
+  /**
+   * Set the hosts hosting a cached replica of this block
+   */
+  public void setCachedHosts(String[] cachedHosts) {
+    if (cachedHosts == null) {
+      this.cachedHosts = EMPTY_STR_ARRAY;
+    } else {
+      this.cachedHosts = cachedHosts;
+    }
+  }
+
   /**
    * Set the names (host:port) hosting this block
    */
   public void setNames(String[] names) throws IOException {
     if (names == null) {
-      this.names = new String[0];
+      this.names = EMPTY_STR_ARRAY;
     } else {
       this.names = names;
     }
@@ -198,7 +229,7 @@ public class BlockLocation {
    */
   public void setTopologyPaths(String[] topologyPaths) throws IOException {
     if (topologyPaths == null) {
-      this.topologyPaths = new String[0];
+      this.topologyPaths = EMPTY_STR_ARRAY;
     } else {
       this.topologyPaths = topologyPaths;
     }

+ 4 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/IdNotFoundException.java → hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/InvalidRequestException.java

@@ -20,12 +20,13 @@ package org.apache.hadoop.fs;
 import java.io.IOException;
 
 /**
- * Exception corresponding to ID not found - EINVAL
+ * Thrown when the user makes a malformed request, for example missing required
+ * parameters or parameters that are not valid.
  */
-public class IdNotFoundException extends IOException {
+public class InvalidRequestException extends IOException {
   static final long serialVersionUID = 0L;
 
-  public IdNotFoundException(String str) {
+  public InvalidRequestException(String str) {
     super(str);
   }
 }

+ 108 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestBlockLocation.java

@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import org.junit.Test;
+
+public class TestBlockLocation {
+
+  private static final String[] EMPTY_STR_ARRAY = new String[0];
+
+  private static void checkBlockLocation(final BlockLocation loc)
+      throws Exception {
+    checkBlockLocation(loc, 0, 0, false);
+  }
+
+  private static void checkBlockLocation(final BlockLocation loc,
+      final long offset, final long length, final boolean corrupt)
+      throws Exception {
+    checkBlockLocation(loc, EMPTY_STR_ARRAY, EMPTY_STR_ARRAY, EMPTY_STR_ARRAY,
+        EMPTY_STR_ARRAY, offset, length, corrupt);
+  }
+
+  private static void checkBlockLocation(final BlockLocation loc,
+      String[] names, String[] hosts, String[] cachedHosts,
+      String[] topologyPaths, final long offset, final long length,
+      final boolean corrupt) throws Exception {
+    assertNotNull(loc.getHosts());
+    assertNotNull(loc.getCachedHosts());
+    assertNotNull(loc.getNames());
+    assertNotNull(loc.getTopologyPaths());
+
+    assertArrayEquals(hosts, loc.getHosts());
+    assertArrayEquals(cachedHosts, loc.getCachedHosts());
+    assertArrayEquals(names, loc.getNames());
+    assertArrayEquals(topologyPaths, loc.getTopologyPaths());
+
+    assertEquals(offset, loc.getOffset());
+    assertEquals(length, loc.getLength());
+    assertEquals(corrupt, loc.isCorrupt());
+  }
+
+  /**
+   * Call all the constructors and verify the delegation is working properly
+   */
+  @Test(timeout = 5000)
+  public void testBlockLocationConstructors() throws Exception {
+    //
+    BlockLocation loc;
+    loc = new BlockLocation();
+    checkBlockLocation(loc);
+    loc = new BlockLocation(null, null, 1, 2);
+    checkBlockLocation(loc, 1, 2, false);
+    loc = new BlockLocation(null, null, null, 1, 2);
+    checkBlockLocation(loc, 1, 2, false);
+    loc = new BlockLocation(null, null, null, 1, 2, true);
+    checkBlockLocation(loc, 1, 2, true);
+    loc = new BlockLocation(null, null, null, null, 1, 2, true);
+    checkBlockLocation(loc, 1, 2, true);
+  }
+
+  /**
+   * Call each of the setters and verify
+   */
+  @Test(timeout = 5000)
+  public void testBlockLocationSetters() throws Exception {
+    BlockLocation loc;
+    loc = new BlockLocation();
+    // Test that null sets the empty array
+    loc.setHosts(null);
+    loc.setCachedHosts(null);
+    loc.setNames(null);
+    loc.setTopologyPaths(null);
+    checkBlockLocation(loc);
+    // Test that not-null gets set properly
+    String[] names = new String[] { "name" };
+    String[] hosts = new String[] { "host" };
+    String[] cachedHosts = new String[] { "cachedHost" };
+    String[] topologyPaths = new String[] { "path" };
+    loc.setNames(names);
+    loc.setHosts(hosts);
+    loc.setCachedHosts(cachedHosts);
+    loc.setTopologyPaths(topologyPaths);
+    loc.setOffset(1);
+    loc.setLength(2);
+    loc.setCorrupt(true);
+    checkBlockLocation(loc, names, hosts, cachedHosts, topologyPaths, 1, 2,
+        true);
+  }
+}

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSKerberosAuthenticationHandler.java

@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.fs.http.server;
 
-import junit.framework.Assert;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.http.client.HttpFSFileSystem;
 import org.apache.hadoop.fs.http.client.HttpFSKerberosAuthenticator;
@@ -36,6 +35,7 @@ import org.apache.hadoop.test.TestDir;
 import org.apache.hadoop.test.TestDirHelper;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSWithKerberos.java

@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.fs.http.server;
 
-import junit.framework.Assert;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.DelegationTokenRenewer;
@@ -40,6 +39,7 @@ import org.apache.hadoop.test.TestJettyHelper;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mortbay.jetty.Server;
 import org.mortbay.jetty.webapp.WebAppContext;

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/lib/service/security/TestDelegationTokenManagerService.java

@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.lib.service.security;
 
-import junit.framework.Assert;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.http.server.HttpFSServerWebApp;
 import org.apache.hadoop.lib.server.Server;
@@ -30,6 +29,7 @@ import org.apache.hadoop.test.HTestCase;
 import org.apache.hadoop.test.TestDir;
 import org.apache.hadoop.test.TestDirHelper;
 import org.apache.hadoop.util.StringUtils;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.net.InetAddress;

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java

@@ -26,8 +26,6 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.concurrent.ConcurrentNavigableMap;
 
-import junit.framework.Assert;
-
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -49,6 +47,7 @@ import org.apache.hadoop.nfs.nfs3.response.CREATE3Response;
 import org.apache.hadoop.nfs.nfs3.response.READ3Response;
 import org.apache.hadoop.oncrpc.XDR;
 import org.apache.hadoop.oncrpc.security.SecurityHandler;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 

+ 8 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -190,6 +190,8 @@ Trunk (Unreleased)
 
     HDFS-5326. add modifyDirective to cacheAdmin.  (cmccabe)
 
+    HDFS-5450. Better API for getting the cached blocks locations. (wang)
+
   OPTIMIZATIONS
     HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
 
@@ -468,6 +470,9 @@ Release 2.3.0 - UNRELEASED
     HDFS-5467. Remove tab characters in hdfs-default.xml.
     (Shinichi Yamashita via Andrew Wang)
 
+    HDFS-5495. Remove further JUnit3 usages from HDFS.
+    (Jarek Jarcec Cecho via wang)
+
   OPTIMIZATIONS
 
     HDFS-5239.  Allow FSNamesystem lock fairness to be configurable (daryn)
@@ -526,6 +531,9 @@ Release 2.3.0 - UNRELEASED
 
     HDFS-5488. Clean up TestHftpURLTimeout. (Haohui Mai via jing9)
 
+    HDFS-5425. Renaming underconstruction file with snapshots can make NN failure on 
+    restart. (jing9 and Vinay)
+
 Release 2.2.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java

@@ -37,8 +37,7 @@ public class HdfsBlockLocation extends BlockLocation {
   public HdfsBlockLocation(BlockLocation loc, LocatedBlock block) 
       throws IOException {
     // Initialize with data from passed in BlockLocation
-    super(loc.getNames(), loc.getHosts(), loc.getTopologyPaths(), 
-        loc.getOffset(), loc.getLength(), loc.isCorrupt());
+    super(loc);
     this.block = block;
   }
   

+ 7 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java

@@ -436,7 +436,13 @@ public class DFSUtil {
                                      locations[hCnt].getNetworkLocation());
         racks[hCnt] = node.toString();
       }
-      blkLocations[idx] = new BlockLocation(xferAddrs, hosts, racks,
+      DatanodeInfo[] cachedLocations = blk.getCachedLocations();
+      String[] cachedHosts = new String[cachedLocations.length];
+      for (int i=0; i<cachedLocations.length; i++) {
+        cachedHosts[i] = cachedLocations[i].getHostName();
+      }
+      blkLocations[idx] = new BlockLocation(xferAddrs, hosts, cachedHosts,
+                                            racks,
                                             blk.getStartOffset(),
                                             blk.getBlockSize(),
                                             blk.isCorrupt());

+ 3 - 43
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java

@@ -1109,21 +1109,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
   public AddCachePoolResponseProto addCachePool(RpcController controller,
       AddCachePoolRequestProto request) throws ServiceException {
     try {
-      CachePoolInfo info =
-          new CachePoolInfo(request.getPoolName());
-      if (request.hasOwnerName()) {
-        info.setOwnerName(request.getOwnerName());
-      }
-      if (request.hasGroupName()) {
-        info.setGroupName(request.getGroupName());
-      }
-      if (request.hasMode()) {
-        info.setMode(new FsPermission((short)request.getMode()));
-      }
-      if (request.hasWeight()) {
-        info.setWeight(request.getWeight());
-      }
-      server.addCachePool(info);
+      server.addCachePool(PBHelper.convert(request.getInfo()));
       return AddCachePoolResponseProto.newBuilder().build();
     } catch (IOException e) {
       throw new ServiceException(e);
@@ -1134,21 +1120,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
   public ModifyCachePoolResponseProto modifyCachePool(RpcController controller,
       ModifyCachePoolRequestProto request) throws ServiceException {
     try {
-      CachePoolInfo info =
-          new CachePoolInfo(request.getPoolName());
-      if (request.hasOwnerName()) {
-        info.setOwnerName(request.getOwnerName());
-      }
-      if (request.hasGroupName()) {
-        info.setGroupName(request.getGroupName());
-      }
-      if (request.hasMode()) {
-        info.setMode(new FsPermission((short)request.getMode()));
-      }
-      if (request.hasWeight()) {
-        info.setWeight(request.getWeight());
-      }
-      server.modifyCachePool(info);
+      server.modifyCachePool(PBHelper.convert(request.getInfo()));
       return ModifyCachePoolResponseProto.newBuilder().build();
     } catch (IOException e) {
       throw new ServiceException(e);
@@ -1179,19 +1151,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
         CachePoolInfo pool = iter.next();
         ListCachePoolsResponseElementProto.Builder elemBuilder = 
             ListCachePoolsResponseElementProto.newBuilder();
-        elemBuilder.setPoolName(pool.getPoolName());
-        if (pool.getOwnerName() != null) {
-          elemBuilder.setOwnerName(pool.getOwnerName());
-        }
-        if (pool.getGroupName() != null) {
-          elemBuilder.setGroupName(pool.getGroupName());
-        }
-        if (pool.getMode() != null) {
-          elemBuilder.setMode(pool.getMode().toShort());
-        }
-        if (pool.getWeight() != null) {
-          elemBuilder.setWeight(pool.getWeight());
-        }
+        elemBuilder.setInfo(PBHelper.convert(pool));
         responseBuilder.addElements(elemBuilder.build());
         prevPoolName = pool.getPoolName();
       }

+ 3 - 31
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java

@@ -1112,19 +1112,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
   public void addCachePool(CachePoolInfo info) throws IOException {
     AddCachePoolRequestProto.Builder builder = 
         AddCachePoolRequestProto.newBuilder();
-    builder.setPoolName(info.getPoolName());
-    if (info.getOwnerName() != null) {
-      builder.setOwnerName(info.getOwnerName());
-    }
-    if (info.getGroupName() != null) {
-      builder.setGroupName(info.getGroupName());
-    }
-    if (info.getMode() != null) {
-      builder.setMode(info.getMode().toShort());
-    }
-    if (info.getWeight() != null) {
-      builder.setWeight(info.getWeight());
-    }
+    builder.setInfo(PBHelper.convert(info));
     try {
       rpcProxy.addCachePool(null, builder.build());
     } catch (ServiceException e) {
@@ -1136,19 +1124,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
   public void modifyCachePool(CachePoolInfo req) throws IOException {
     ModifyCachePoolRequestProto.Builder builder = 
         ModifyCachePoolRequestProto.newBuilder();
-    builder.setPoolName(req.getPoolName());
-    if (req.getOwnerName() != null) {
-      builder.setOwnerName(req.getOwnerName());
-    }
-    if (req.getGroupName() != null) {
-      builder.setGroupName(req.getGroupName());
-    }
-    if (req.getMode() != null) {
-      builder.setMode(req.getMode().toShort());
-    }
-    if (req.getWeight() != null) {
-      builder.setWeight(req.getWeight());
-    }
+    builder.setInfo(PBHelper.convert(req));
     try {
       rpcProxy.modifyCachePool(null, builder.build());
     } catch (ServiceException e) {
@@ -1178,11 +1154,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
     @Override
     public CachePoolInfo get(int i) {
       ListCachePoolsResponseElementProto elem = proto.getElements(i);
-      return new CachePoolInfo(elem.getPoolName()).
-          setOwnerName(elem.getOwnerName()).
-          setGroupName(elem.getGroupName()).
-          setMode(new FsPermission((short)elem.getMode())).
-          setWeight(elem.getWeight());
+      return PBHelper.convert(elem.getInfo());
     }
 
     @Override

+ 41 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java

@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.protocolPB;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -35,6 +37,7 @@ import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -54,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateFlagProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeReportTypeProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto;
@@ -1710,6 +1714,43 @@ public class PBHelper {
     return builder.build();
   }
   
+  public static CachePoolInfoProto convert(CachePoolInfo info) {
+    CachePoolInfoProto.Builder builder = CachePoolInfoProto.newBuilder();
+    builder.setPoolName(info.getPoolName());
+    if (info.getOwnerName() != null) {
+      builder.setOwnerName(info.getOwnerName());
+    }
+    if (info.getGroupName() != null) {
+      builder.setGroupName(info.getGroupName());
+    }
+    if (info.getMode() != null) {
+      builder.setMode(info.getMode().toShort());
+    }
+    if (info.getWeight() != null) {
+      builder.setWeight(info.getWeight());
+    }
+    return builder.build();
+  }
+
+  public static CachePoolInfo convert (CachePoolInfoProto proto) {
+    // Pool name is a required field, the rest are optional
+    String poolName = checkNotNull(proto.getPoolName());
+    CachePoolInfo info = new CachePoolInfo(poolName);
+    if (proto.hasOwnerName()) {
+        info.setOwnerName(proto.getOwnerName());
+    }
+    if (proto.hasGroupName()) {
+      info.setGroupName(proto.getGroupName());
+    }
+    if (proto.hasMode()) {
+      info.setMode(new FsPermission((short)proto.getMode()));
+    }
+    if (proto.hasWeight()) {
+      info.setWeight(proto.getWeight());
+    }
+    return info;
+  }
+
   public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) {
     return HdfsProtos.ChecksumTypeProto.valueOf(type.id);
   }

+ 124 - 118
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java

@@ -43,7 +43,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.IdNotFoundException;
+import org.apache.hadoop.fs.InvalidRequestException;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -250,11 +250,87 @@ public final class CacheManager {
   private long getNextEntryId() throws IOException {
     assert namesystem.hasWriteLock();
     if (nextEntryId == Long.MAX_VALUE) {
-      throw new IOException("No more available IDs");
+      throw new IOException("No more available IDs.");
     }
     return nextEntryId++;
   }
 
+  // Helper getter / validation methods
+
+  private static void checkWritePermission(FSPermissionChecker pc,
+      CachePool pool) throws AccessControlException {
+    if ((pc != null)) {
+      pc.checkPermission(pool, FsAction.WRITE);
+    }
+  }
+
+  private static String validatePoolName(PathBasedCacheDirective directive)
+      throws InvalidRequestException {
+    String pool = directive.getPool();
+    if (pool == null) {
+      throw new InvalidRequestException("No pool specified.");
+    }
+    if (pool.isEmpty()) {
+      throw new InvalidRequestException("Invalid empty pool name.");
+    }
+    return pool;
+  }
+
+  private static String validatePath(PathBasedCacheDirective directive)
+      throws InvalidRequestException {
+    if (directive.getPath() == null) {
+      throw new InvalidRequestException("No path specified.");
+    }
+    String path = directive.getPath().toUri().getPath();
+    if (!DFSUtil.isValidName(path)) {
+      throw new InvalidRequestException("Invalid path '" + path + "'.");
+    }
+    return path;
+  }
+
+  private static short validateReplication(PathBasedCacheDirective directive,
+      short defaultValue) throws InvalidRequestException {
+    short repl = (directive.getReplication() != null)
+        ? directive.getReplication() : defaultValue;
+    if (repl <= 0) {
+      throw new InvalidRequestException("Invalid replication factor " + repl
+          + " <= 0");
+    }
+    return repl;
+  }
+
+  /**
+   * Get a PathBasedCacheEntry by ID, validating the ID and that the entry
+   * exists.
+   */
+  private PathBasedCacheEntry getById(long id) throws InvalidRequestException {
+    // Check for invalid IDs.
+    if (id <= 0) {
+      throw new InvalidRequestException("Invalid negative ID.");
+    }
+    // Find the entry.
+    PathBasedCacheEntry entry = entriesById.get(id);
+    if (entry == null) {
+      throw new InvalidRequestException("No directive with ID " + id
+          + " found.");
+    }
+    return entry;
+  }
+
+  /**
+   * Get a CachePool by name, validating that it exists.
+   */
+  private CachePool getCachePool(String poolName)
+      throws InvalidRequestException {
+    CachePool pool = cachePools.get(poolName);
+    if (pool == null) {
+      throw new InvalidRequestException("Unknown pool " + poolName);
+    }
+    return pool;
+  }
+
+  // RPC handlers
+
   private void addInternal(PathBasedCacheEntry entry) {
     entriesById.put(entry.getEntryId(), entry);
     String path = entry.getPath();
@@ -272,34 +348,10 @@ public final class CacheManager {
     assert namesystem.hasWriteLock();
     PathBasedCacheEntry entry;
     try {
-      if (directive.getPool() == null) {
-        throw new IdNotFoundException("addDirective: no pool was specified.");
-      }
-      if (directive.getPool().isEmpty()) {
-        throw new IdNotFoundException("addDirective: pool name was empty.");
-      }
-      CachePool pool = cachePools.get(directive.getPool());
-      if (pool == null) {
-        throw new IdNotFoundException("addDirective: no such pool as " +
-            directive.getPool());
-      }
-      if ((pc != null) && (!pc.checkPermission(pool, FsAction.WRITE))) {
-        throw new AccessControlException("addDirective: write " +
-            "permission denied for pool " + directive.getPool());
-      }
-      if (directive.getPath() == null) {
-        throw new IOException("addDirective: no path was specified.");
-      }
-      String path = directive.getPath().toUri().getPath();
-      if (!DFSUtil.isValidName(path)) {
-        throw new IOException("addDirective: path '" + path + "' is invalid.");
-      }
-      short replication = directive.getReplication() == null ? 
-          (short)1 : directive.getReplication();
-      if (replication <= 0) {
-        throw new IOException("addDirective: replication " + replication +
-            " is invalid.");
-      }
+      CachePool pool = getCachePool(validatePoolName(directive));
+      checkWritePermission(pc, pool);
+      String path = validatePath(directive);
+      short replication = validateReplication(directive, (short)1);
       long id;
       if (directive.getId() != null) {
         // We are loading an entry from the edit log.
@@ -312,10 +364,10 @@ public final class CacheManager {
       entry = new PathBasedCacheEntry(id, path, replication, pool);
       addInternal(entry);
     } catch (IOException e) {
-      LOG.warn("addDirective " + directive + ": failed.", e);
+      LOG.warn("addDirective of " + directive + " failed: ", e);
       throw e;
     }
-    LOG.info("addDirective " + directive + ": succeeded.");
+    LOG.info("addDirective of " + directive + " successful.");
     if (monitor != null) {
       monitor.kick();
     }
@@ -332,75 +384,43 @@ public final class CacheManager {
       // Check for invalid IDs.
       Long id = directive.getId();
       if (id == null) {
-        throw new IdNotFoundException("modifyDirective: " +
-            "no ID to modify was supplied.");
-      }
-      if (id <= 0) {
-        throw new IdNotFoundException("modifyDirective " + id +
-            ": invalid non-positive directive ID.");
-      }
-      // Find the entry.
-      PathBasedCacheEntry prevEntry = entriesById.get(id);
-      if (prevEntry == null) {
-        throw new IdNotFoundException("modifyDirective " + id +
-            ": id not found.");
-      }
-      if ((pc != null) &&
-          (!pc.checkPermission(prevEntry.getPool(), FsAction.WRITE))) {
-        throw new AccessControlException("modifyDirective " + id +
-            ": permission denied for initial pool " + prevEntry.getPool());
+        throw new InvalidRequestException("Must supply an ID.");
       }
+      PathBasedCacheEntry prevEntry = getById(id);
+      checkWritePermission(pc, prevEntry.getPool());
       String path = prevEntry.getPath();
       if (directive.getPath() != null) {
-        path = directive.getPath().toUri().getPath();
-        if (!DFSUtil.isValidName(path)) {
-          throw new IOException("modifyDirective " + id + ": new path " +
-              path + " is not valid.");
-        }
+        path = validatePath(directive);
       }
-      short replication = (directive.getReplication() != null) ?
-          directive.getReplication() : prevEntry.getReplication();
-      if (replication <= 0) {
-        throw new IOException("modifyDirective: replication " + replication +
-            " is invalid.");
+      short replication = prevEntry.getReplication();
+      if (directive.getReplication() != null) {
+        replication = validateReplication(directive, replication);
       }
       CachePool pool = prevEntry.getPool();
       if (directive.getPool() != null) {
-        pool = cachePools.get(directive.getPool());
-        if (pool == null) {
-          throw new IdNotFoundException("modifyDirective " + id +
-              ": pool " + directive.getPool() + " not found.");
-        }
-        if (directive.getPool().isEmpty()) {
-          throw new IdNotFoundException("modifyDirective: pool name was " +
-              "empty.");
-        }
-        if ((pc != null) &&
-            (!pc.checkPermission(pool, FsAction.WRITE))) {
-          throw new AccessControlException("modifyDirective " + id +
-              ": permission denied for target pool " + pool);
-        }
+        pool = getCachePool(validatePoolName(directive));
+        checkWritePermission(pc, pool);
       }
       removeInternal(prevEntry);
       PathBasedCacheEntry newEntry =
           new PathBasedCacheEntry(id, path, replication, pool);
       addInternal(newEntry);
     } catch (IOException e) {
-      LOG.warn("modifyDirective " + idString + ": failed.", e);
+      LOG.warn("modifyDirective of " + idString + " failed: ", e);
       throw e;
     }
-    LOG.info("modifyDirective " + idString + ": successfully applied " +
-        directive);
+    LOG.info("modifyDirective of " + idString + " successfully applied " +
+        directive + ".");
   }
 
   public void removeInternal(PathBasedCacheEntry existing)
-      throws IOException {
+      throws InvalidRequestException {
     assert namesystem.hasWriteLock();
     // Remove the corresponding entry in entriesByPath.
     String path = existing.getPath();
     List<PathBasedCacheEntry> entries = entriesByPath.get(path);
     if (entries == null || !entries.remove(existing)) {
-      throw new IdNotFoundException("removeInternal: failed to locate entry " +
+      throw new InvalidRequestException("Failed to locate entry " +
           existing.getEntryId() + " by path " + existing.getPath());
     }
     if (entries.size() == 0) {
@@ -413,32 +433,17 @@ public final class CacheManager {
       throws IOException {
     assert namesystem.hasWriteLock();
     try {
-      // Check for invalid IDs.
-      if (id <= 0) {
-        throw new IdNotFoundException("removeDirective " + id + ": invalid " +
-            "non-positive directive ID.");
-      }
-      // Find the entry.
-      PathBasedCacheEntry existing = entriesById.get(id);
-      if (existing == null) {
-        throw new IdNotFoundException("removeDirective " + id +
-            ": id not found.");
-      }
-      if ((pc != null) &&
-          (!pc.checkPermission(existing.getPool(), FsAction.WRITE))) {
-        throw new AccessControlException("removeDirective " + id +
-            ": write permission denied on pool " +
-            existing.getPool().getPoolName());
-      }
+      PathBasedCacheEntry existing = getById(id);
+      checkWritePermission(pc, existing.getPool());
       removeInternal(existing);
     } catch (IOException e) {
-      LOG.warn("removeDirective " + id + " failed.", e);
+      LOG.warn("removeDirective of " + id + " failed: ", e);
       throw e;
     }
     if (monitor != null) {
       monitor.kick();
     }
-    LOG.info("removeDirective " + id + ": succeeded.");
+    LOG.info("removeDirective of " + id + " successful.");
   }
 
   public BatchedListEntries<PathBasedCacheDirective> 
@@ -449,18 +454,13 @@ public final class CacheManager {
     final int NUM_PRE_ALLOCATED_ENTRIES = 16;
     String filterPath = null;
     if (filter.getId() != null) {
-      throw new IOException("we currently don't support filtering by ID");
+      throw new IOException("Filtering by ID is unsupported.");
     }
     if (filter.getPath() != null) {
-      filterPath = filter.getPath().toUri().getPath();
-      if (!DFSUtil.isValidName(filterPath)) {
-        throw new IOException("listPathBasedCacheDirectives: invalid " +
-            "path name '" + filterPath + "'");
-      }
+      filterPath = validatePath(filter);
     }
     if (filter.getReplication() != null) {
-      throw new IOException("we currently don't support filtering " +
-          "by replication");
+      throw new IOException("Filtering by replication is unsupported.");
     }
     ArrayList<PathBasedCacheDirective> replies =
         new ArrayList<PathBasedCacheDirective>(NUM_PRE_ALLOCATED_ENTRIES);
@@ -481,8 +481,15 @@ public final class CacheManager {
           !directive.getPath().toUri().getPath().equals(filterPath)) {
         continue;
       }
-      if ((pc == null) ||
-          (pc.checkPermission(curEntry.getPool(), FsAction.READ))) {
+      boolean hasPermission = true;
+      if (pc != null) {
+        try {
+          pc.checkPermission(curEntry.getPool(), FsAction.READ);
+        } catch (AccessControlException e) {
+          hasPermission = false;
+        }
+      }
+      if (hasPermission) {
         replies.add(cur.getValue().toDirective());
         numReplies++;
       }
@@ -505,12 +512,13 @@ public final class CacheManager {
     String poolName = info.getPoolName();
     CachePool pool = cachePools.get(poolName);
     if (pool != null) {
-      throw new IOException("cache pool " + poolName + " already exists.");
+      throw new InvalidRequestException("Cache pool " + poolName
+          + " already exists.");
     }
     pool = CachePool.createFromInfoAndDefaults(info);
     cachePools.put(pool.getPoolName(), pool);
-    LOG.info("created new cache pool " + pool);
-    return pool.getInfo(true);
+    LOG.info("Created new cache pool " + pool);
+    return pool.getInfo(null);
   }
 
   /**
@@ -528,7 +536,8 @@ public final class CacheManager {
     String poolName = info.getPoolName();
     CachePool pool = cachePools.get(poolName);
     if (pool == null) {
-      throw new IOException("cache pool " + poolName + " does not exist.");
+      throw new InvalidRequestException("Cache pool " + poolName
+          + " does not exist.");
     }
     StringBuilder bld = new StringBuilder();
     String prefix = "";
@@ -575,7 +584,8 @@ public final class CacheManager {
     CachePoolInfo.validateName(poolName);
     CachePool pool = cachePools.remove(poolName);
     if (pool == null) {
-      throw new IOException("can't remove non-existent cache pool " + poolName);
+      throw new InvalidRequestException(
+          "Cannot remove non-existent cache pool " + poolName);
     }
     
     // Remove entries using this pool
@@ -607,11 +617,7 @@ public final class CacheManager {
       if (numListed++ >= maxListCachePoolsResponses) {
         return new BatchedListEntries<CachePoolInfo>(results, true);
       }
-      if (pc == null) {
-        results.add(cur.getValue().getInfo(true));
-      } else {
-        results.add(cur.getValue().getInfo(pc));
-      }
+      results.add(cur.getValue().getInfo(pc));
     }
     return new BatchedListEntries<CachePoolInfo>(results, false);
   }
@@ -755,7 +761,7 @@ public final class CacheManager {
     Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
     out.writeInt(cachePools.size());
     for (CachePool pool: cachePools.values()) {
-      pool.getInfo(true).writeTo(out);
+      pool.getInfo(null).writeTo(out);
       counter.increment();
     }
     prog.endStep(Phase.SAVING_CHECKPOINT, step);

+ 21 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import com.google.common.base.Preconditions;
@@ -162,7 +163,7 @@ public final class CachePool {
   }
   
   /**
-   * Get information about this cache pool.
+   * Get either full or partial information about this CachePool.
    *
    * @param fullInfo
    *          If true, only the name will be returned (i.e., what you 
@@ -170,7 +171,7 @@ public final class CachePool {
    * @return
    *          Cache pool information.
    */
-  public CachePoolInfo getInfo(boolean fullInfo) {
+  private CachePoolInfo getInfo(boolean fullInfo) {
     CachePoolInfo info = new CachePoolInfo(poolName);
     if (!fullInfo) {
       return info;
@@ -181,8 +182,25 @@ public final class CachePool {
         setWeight(weight);
   }
 
+  /**
+   * Returns a CachePoolInfo describing this CachePool based on the permissions
+   * of the calling user. Unprivileged users will see only minimal descriptive
+   * information about the pool.
+   * 
+   * @param pc Permission checker to be used to validate the user's permissions,
+   *          or null
+   * @return CachePoolInfo describing this CachePool
+   */
   public CachePoolInfo getInfo(FSPermissionChecker pc) {
-    return getInfo(pc.checkPermission(this, FsAction.READ)); 
+    boolean hasPermission = true;
+    if (pc != null) {
+      try {
+        pc.checkPermission(this, FsAction.READ);
+      } catch (AccessControlException e) {
+        hasPermission = false;
+      }
+    }
+    return getInfo(hasPermission);
   }
 
   public String toString() {

+ 0 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java

@@ -30,7 +30,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -84,7 +83,6 @@ import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Co
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
 import org.apache.hadoop.hdfs.util.ChunkedArrayList;
 import org.apache.hadoop.hdfs.util.Holder;
-import org.apache.jasper.tagplugins.jstl.core.Remove;
 
 import com.google.common.base.Joiner;
 

+ 6 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java

@@ -824,7 +824,12 @@ public class FSImageFormat {
         final INodesInPath iip = fsDir.getLastINodeInPath(path);
         INodeFile oldnode = INodeFile.valueOf(iip.getINode(0), path);
         cons.setLocalName(oldnode.getLocalNameBytes());
-        cons.setParent(oldnode.getParent());
+        INodeReference parentRef = oldnode.getParentReference();
+        if (parentRef != null) {
+          cons.setParentReference(parentRef);
+        } else {
+          cons.setParent(oldnode.getParent());
+        }
 
         if (oldnode instanceof INodeFileWithSnapshot) {
           cons = new INodeFileUnderConstructionWithSnapshot(cons,

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -2557,7 +2557,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       final INode[] inodes = analyzeFileState(
           src, fileId, clientName, previous, onRetryBlock).getINodes();
       final INodeFileUnderConstruction pendingFile =
-          (INodeFileUnderConstruction) inodes[inodes.length - 1];
+          (INodeFileUnderConstruction) inodes[inodes.length - 1].asFile();
 
       if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
         // This is a retry. Just return the last block if having locations.
@@ -2595,7 +2595,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
           analyzeFileState(src, fileId, clientName, previous, onRetryBlock);
       INode[] inodes = inodesInPath.getINodes();
       final INodeFileUnderConstruction pendingFile =
-          (INodeFileUnderConstruction) inodes[inodes.length - 1];
+          (INodeFileUnderConstruction) inodes[inodes.length - 1].asFile();
 
       if (onRetryBlock[0] != null) {
         if (onRetryBlock[0].getLocations().length > 0) {

+ 10 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java

@@ -261,24 +261,27 @@ class FSPermissionChecker {
    *
    * @param pool CachePool being accessed
    * @param access type of action being performed on the cache pool
-   * @return if the pool can be accessed
+   * @throws AccessControlException if pool cannot be accessed
    */
-  public boolean checkPermission(CachePool pool, FsAction access) {
+  public void checkPermission(CachePool pool, FsAction access)
+      throws AccessControlException {
     FsPermission mode = pool.getMode();
     if (isSuperUser()) {
-      return true;
+      return;
     }
     if (user.equals(pool.getOwnerName())
         && mode.getUserAction().implies(access)) {
-      return true;
+      return;
     }
     if (groups.contains(pool.getGroupName())
         && mode.getGroupAction().implies(access)) {
-      return true;
+      return;
     }
     if (mode.getOtherAction().implies(access)) {
-      return true;
+      return;
     }
-    return false;
+    throw new AccessControlException("Permission denied while accessing pool "
+        + pool.getPoolName() + ": user " + user + " does not have "
+        + access.toString() + " permissions.");
   }
 }

+ 9 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java

@@ -595,7 +595,15 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
   public void replaceChild(final INode oldChild, final INode newChild,
       final INodeMap inodeMap) {
     super.replaceChild(oldChild, newChild, inodeMap);
-    diffs.replaceChild(ListType.CREATED, oldChild, newChild);
+    if (oldChild.getParentReference() != null && !newChild.isReference()) {
+      // oldChild is referred by a Reference node. Thus we are replacing the 
+      // referred inode, e.g., 
+      // INodeFileWithSnapshot -> INodeFileUnderConstructionWithSnapshot
+      // in this case, we do not need to update the diff list
+      return;
+    } else {
+      diffs.replaceChild(ListType.CREATED, oldChild, newChild);
+    }
   }
   
   /**

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java

@@ -395,6 +395,7 @@ public class JsonUtil {
     m.put("startOffset", locatedblock.getStartOffset());
     m.put("block", toJsonMap(locatedblock.getBlock()));
     m.put("locations", toJsonArray(locatedblock.getLocations()));
+    m.put("cachedLocations", toJsonArray(locatedblock.getCachedLocations()));
     return m;
   }
 
@@ -409,8 +410,11 @@ public class JsonUtil {
         (Object[])m.get("locations"));
     final long startOffset = (Long)m.get("startOffset");
     final boolean isCorrupt = (Boolean)m.get("isCorrupt");
+    final DatanodeInfo[] cachedLocations = toDatanodeInfoArray(
+        (Object[])m.get("cachedLocations"));
 
-    final LocatedBlock locatedblock = new LocatedBlock(b, locations, startOffset, isCorrupt);
+    final LocatedBlock locatedblock = new LocatedBlock(b, locations,
+        startOffset, isCorrupt, cachedLocations);
     locatedblock.setBlockToken(toBlockToken((Map<?, ?>)m.get("blockToken")));
     return locatedblock;
   }

+ 8 - 12
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto

@@ -407,23 +407,23 @@ message ListPathBasedCacheDirectivesResponseProto {
   required bool hasMore = 2;
 }
 
-message AddCachePoolRequestProto {
-  required string poolName = 1;
+message CachePoolInfoProto {
+  optional string poolName = 1;
   optional string ownerName = 2;
   optional string groupName = 3;
   optional int32 mode = 4;
   optional int32 weight = 5;
 }
 
+message AddCachePoolRequestProto {
+  required CachePoolInfoProto info = 1;
+}
+
 message AddCachePoolResponseProto { // void response
 }
 
 message ModifyCachePoolRequestProto {
-  required string poolName = 1;
-  optional string ownerName = 2;
-  optional string groupName = 3;
-  optional int32 mode = 4;
-  optional int32 weight = 5;
+  required CachePoolInfoProto info = 1;
 }
 
 message ModifyCachePoolResponseProto { // void response
@@ -446,11 +446,7 @@ message ListCachePoolsResponseProto {
 }
 
 message ListCachePoolsResponseElementProto {
-  required string poolName = 1;
-  required string ownerName = 2;
-  required string groupName = 3;
-  required int32 mode = 4;
-  required int32 weight = 5;
+  required CachePoolInfoProto info = 1;
 }
 
 message GetFileLinkInfoRequestProto {

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAbandonBlock.java

@@ -21,8 +21,6 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 
-import junit.framework.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -33,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java

@@ -22,8 +22,6 @@ import static org.junit.Assert.assertEquals;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
-import junit.framework.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
@@ -32,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Matchers;
 import org.mockito.Mockito;

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java

@@ -23,8 +23,6 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.List;
 
-import junit.framework.Assert;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -33,6 +31,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
 import org.apache.hadoop.util.ThreadUtil;
 
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java

@@ -20,12 +20,11 @@ package org.apache.hadoop.hdfs;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicReference;
 
-import junit.framework.Assert;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.internal.util.reflection.Whitebox;

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java

@@ -17,13 +17,13 @@
  */
 package org.apache.hadoop.hdfs;
 
-import junit.framework.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.FileInputStream;

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java

@@ -32,8 +32,6 @@ import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeoutException;
 
-import junit.framework.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
@@ -64,6 +62,7 @@ import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
 import org.apache.hadoop.util.Time;
 import org.apache.log4j.Level;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java

@@ -22,8 +22,6 @@ import java.util.Arrays;
 import java.util.Map;
 import java.util.TreeMap;
 
-import junit.framework.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -39,6 +37,7 @@ import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheTracker;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 

+ 8 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java

@@ -17,9 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static junit.framework.Assert.assertTrue;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock.Mlocker;
 import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
@@ -87,6 +88,8 @@ public class TestFsDatasetCache {
   private static DatanodeProtocolClientSideTranslatorPB spyNN;
   private static PageRounder rounder = new PageRounder();
 
+  private Mlocker mlocker;
+
   @Before
   public void setUp() throws Exception {
     assumeTrue(!Path.WINDOWS);
@@ -110,6 +113,8 @@ public class TestFsDatasetCache {
     fsd = dn.getFSDataset();
 
     spyNN = DataNodeTestUtils.spyOnBposToNN(dn, nn);
+    // Save the current mlocker and replace it at the end of the test
+    mlocker = MappableBlock.mlocker;
   }
 
   @After
@@ -120,6 +125,8 @@ public class TestFsDatasetCache {
     if (cluster != null) {
       cluster.shutdown();
     }
+    // Restore the original mlocker
+    MappableBlock.mlocker = mlocker;
   }
 
   private static void setHeartbeatResponse(DatanodeCommand[] cmds)

+ 1 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java

@@ -20,8 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.IOException;
 
-import junit.framework.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
@@ -34,12 +32,10 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
-import org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceScanner;
 import static org.apache.hadoop.hdfs.server.datanode.DataBlockScanner.SLEEP_PERIOD_MS;
 import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
+import org.junit.Assert;
 import org.junit.Test;
-import org.junit.Ignore;
 import static org.junit.Assert.fail;
 
 

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java

@@ -23,12 +23,11 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_AVAILABLE_SPACE_
 import java.util.ArrayList;
 import java.util.List;
 
-import junit.framework.Assert;
-
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 

+ 190 - 31
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java

@@ -17,42 +17,47 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.Assert.fail;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.nio.MappedByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 
-import junit.framework.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemTestHelper;
-import org.apache.hadoop.fs.IdNotFoundException;
+import org.apache.hadoop.fs.InvalidRequestException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.security.AccessControlException;
@@ -77,6 +82,15 @@ public class TestPathBasedCacheRequests {
   static private DistributedFileSystem dfs;
   static private NamenodeProtocols proto;
 
+  static {
+    MappableBlock.mlocker = new MappableBlock.Mlocker() {
+      @Override
+      public void mlock(MappedByteBuffer mmap, long length) throws IOException {
+        // Stubbed out for testing
+      }
+    };
+  }
+
   @Before
   public void setup() throws Exception {
     conf = new HdfsConfiguration();
@@ -187,15 +201,15 @@ public class TestPathBasedCacheRequests {
       fail("expected to get an exception when " +
           "removing a non-existent pool.");
     } catch (IOException ioe) {
-      GenericTestUtils.assertExceptionContains("can't remove " +
+      GenericTestUtils.assertExceptionContains("Cannot remove " +
           "non-existent cache pool", ioe);
     }
     try {
       dfs.removeCachePool(poolName);
-      Assert.fail("expected to get an exception when " +
+      fail("expected to get an exception when " +
           "removing a non-existent pool.");
     } catch (IOException ioe) {
-      GenericTestUtils.assertExceptionContains("can't remove " +
+      GenericTestUtils.assertExceptionContains("Cannot remove " +
           "non-existent cache pool", ioe);
     }
     try {
@@ -272,18 +286,18 @@ public class TestPathBasedCacheRequests {
 
     try {
       proto.removeCachePool("pool99");
-      Assert.fail("expected to get an exception when " +
+      fail("expected to get an exception when " +
           "removing a non-existent pool.");
     } catch (IOException ioe) {
-      GenericTestUtils.assertExceptionContains("can't remove non-existent",
+      GenericTestUtils.assertExceptionContains("Cannot remove non-existent",
           ioe);
     }
     try {
       proto.removeCachePool(poolName);
-      Assert.fail("expected to get an exception when " +
+      fail("expected to get an exception when " +
           "removing a non-existent pool.");
     } catch (IOException ioe) {
-      GenericTestUtils.assertExceptionContains("can't remove non-existent",
+      GenericTestUtils.assertExceptionContains("Cannot remove non-existent",
           ioe);
     }
 
@@ -351,8 +365,8 @@ public class TestPathBasedCacheRequests {
           setPool("no_such_pool").
           build());
       fail("expected an error when adding to a non-existent pool.");
-    } catch (IdNotFoundException ioe) {
-      GenericTestUtils.assertExceptionContains("no such pool as", ioe);
+    } catch (InvalidRequestException ioe) {
+      GenericTestUtils.assertExceptionContains("Unknown pool", ioe);
     }
 
     try {
@@ -364,7 +378,7 @@ public class TestPathBasedCacheRequests {
           "mode 0 (no permissions for anyone).");
     } catch (AccessControlException e) {
       GenericTestUtils.
-          assertExceptionContains("permission denied for pool", e);
+          assertExceptionContains("Permission denied while accessing pool", e);
     }
 
     try {
@@ -384,10 +398,10 @@ public class TestPathBasedCacheRequests {
           setReplication((short)1).
           setPool("").
           build());
-      Assert.fail("expected an error when adding a PathBasedCache " +
+      fail("expected an error when adding a PathBasedCache " +
           "directive with an empty pool name.");
-    } catch (IdNotFoundException e) {
-      GenericTestUtils.assertExceptionContains("pool name was empty", e);
+    } catch (InvalidRequestException e) {
+      GenericTestUtils.assertExceptionContains("Invalid empty pool name", e);
     }
 
     long deltaId = addAsUnprivileged(delta);
@@ -405,7 +419,7 @@ public class TestPathBasedCacheRequests {
     validateListAll(iter, alphaId, alphaId2, betaId, deltaId, relativeId );
     iter = dfs.listPathBasedCacheDirectives(
         new PathBasedCacheDirective.Builder().setPool("pool3").build());
-    Assert.assertFalse(iter.hasNext());
+    assertFalse(iter.hasNext());
     iter = dfs.listPathBasedCacheDirectives(
         new PathBasedCacheDirective.Builder().setPool("pool1").build());
     validateListAll(iter, alphaId, alphaId2, deltaId, relativeId );
@@ -416,27 +430,27 @@ public class TestPathBasedCacheRequests {
     dfs.removePathBasedCacheDirective(betaId);
     iter = dfs.listPathBasedCacheDirectives(
         new PathBasedCacheDirective.Builder().setPool("pool2").build());
-    Assert.assertFalse(iter.hasNext());
+    assertFalse(iter.hasNext());
 
     try {
       dfs.removePathBasedCacheDirective(betaId);
-      Assert.fail("expected an error when removing a non-existent ID");
-    } catch (IdNotFoundException e) {
-      GenericTestUtils.assertExceptionContains("id not found", e);
+      fail("expected an error when removing a non-existent ID");
+    } catch (InvalidRequestException e) {
+      GenericTestUtils.assertExceptionContains("No directive with ID", e);
     }
 
     try {
       proto.removePathBasedCacheDirective(-42l);
-      Assert.fail("expected an error when removing a negative ID");
-    } catch (IdNotFoundException e) {
+      fail("expected an error when removing a negative ID");
+    } catch (InvalidRequestException e) {
       GenericTestUtils.assertExceptionContains(
-          "invalid non-positive directive ID", e);
+          "Invalid negative ID", e);
     }
     try {
       proto.removePathBasedCacheDirective(43l);
-      Assert.fail("expected an error when removing a non-existent ID");
-    } catch (IdNotFoundException e) {
-      GenericTestUtils.assertExceptionContains("id not found", e);
+      fail("expected an error when removing a non-existent ID");
+    } catch (InvalidRequestException e) {
+      GenericTestUtils.assertExceptionContains("No directive with ID", e);
     }
 
     dfs.removePathBasedCacheDirective(alphaId);
@@ -529,6 +543,14 @@ public class TestPathBasedCacheRequests {
     assertFalse("Unexpected # of cache directives found", dit.hasNext());
   }
 
+  /**
+   * Wait for the NameNode to have an expected number of cached blocks
+   * and replicas.
+   * @param nn NameNode
+   * @param expectedCachedBlocks
+   * @param expectedCachedReplicas
+   * @throws Exception
+   */
   private static void waitForCachedBlocks(NameNode nn,
       final int expectedCachedBlocks, final int expectedCachedReplicas) 
           throws Exception {
@@ -569,6 +591,37 @@ public class TestPathBasedCacheRequests {
     }, 500, 60000);
   }
 
+  private static void checkNumCachedReplicas(final DistributedFileSystem dfs,
+      final List<Path> paths, final int expectedBlocks,
+      final int expectedReplicas)
+      throws Exception {
+    int numCachedBlocks = 0;
+    int numCachedReplicas = 0;
+    for (Path p: paths) {
+      final FileStatus f = dfs.getFileStatus(p);
+      final long len = f.getLen();
+      final long blockSize = f.getBlockSize();
+      // round it up to full blocks
+      final long numBlocks = (len + blockSize - 1) / blockSize;
+      BlockLocation[] locs = dfs.getFileBlockLocations(p, 0, len);
+      assertEquals("Unexpected number of block locations for path " + p,
+          numBlocks, locs.length);
+      for (BlockLocation l: locs) {
+        if (l.getCachedHosts().length > 0) {
+          numCachedBlocks++;
+        }
+        numCachedReplicas += l.getCachedHosts().length;
+      }
+    }
+    LOG.info("Found " + numCachedBlocks + " of " + expectedBlocks + " blocks");
+    LOG.info("Found " + numCachedReplicas + " of " + expectedReplicas
+        + " replicas");
+    assertEquals("Unexpected number of cached blocks", expectedBlocks,
+        numCachedBlocks);
+    assertEquals("Unexpected number of cached replicas", expectedReplicas,
+        numCachedReplicas);
+  }
+
   private static final long BLOCK_SIZE = 512;
   private static final int NUM_DATANODES = 4;
 
@@ -745,4 +798,110 @@ public class TestPathBasedCacheRequests {
     }
   }
 
+  /**
+   * Tests stepping the cache replication factor up and down, checking the
+   * number of cached replicas and blocks as well as the advertised locations.
+   * @throws Exception
+   */
+  @Test(timeout=120000)
+  public void testReplicationFactor() throws Exception {
+    Assume.assumeTrue(canTestDatanodeCaching());
+    HdfsConfiguration conf = createCachingConf();
+    MiniDFSCluster cluster =
+      new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+
+    try {
+      cluster.waitActive();
+      DistributedFileSystem dfs = cluster.getFileSystem();
+      NameNode namenode = cluster.getNameNode();
+      // Create the pool
+      final String pool = "friendlyPool";
+      dfs.addCachePool(new CachePoolInfo(pool));
+      // Create some test files
+      final List<Path> paths = new LinkedList<Path>();
+      paths.add(new Path("/foo/bar"));
+      paths.add(new Path("/foo/baz"));
+      paths.add(new Path("/foo2/bar2"));
+      paths.add(new Path("/foo2/baz2"));
+      dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
+      dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
+      final int numBlocksPerFile = 2;
+      for (Path path : paths) {
+        FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
+            (int)BLOCK_SIZE, (short)3, false);
+      }
+      waitForCachedBlocks(namenode, 0, 0);
+      checkNumCachedReplicas(dfs, paths, 0, 0);
+      // cache directory
+      long id = dfs.addPathBasedCacheDirective(
+          new PathBasedCacheDirective.Builder().
+            setPath(new Path("/foo")).
+            setReplication((short)1).
+            setPool(pool).
+            build());
+      waitForCachedBlocks(namenode, 4, 4);
+      checkNumCachedReplicas(dfs, paths, 4, 4);
+      // step up the replication factor
+      for (int i=2; i<=3; i++) {
+        dfs.modifyPathBasedCacheDirective(
+            new PathBasedCacheDirective.Builder().
+            setId(id).
+            setReplication((short)i).
+            build());
+        waitForCachedBlocks(namenode, 4, 4*i);
+        checkNumCachedReplicas(dfs, paths, 4, 4*i);
+      }
+      // step it down
+      for (int i=2; i>=1; i--) {
+        dfs.modifyPathBasedCacheDirective(
+            new PathBasedCacheDirective.Builder().
+            setId(id).
+            setReplication((short)i).
+            build());
+        waitForCachedBlocks(namenode, 4, 4*i);
+        checkNumCachedReplicas(dfs, paths, 4, 4*i);
+      }
+      // remove and watch numCached go to 0
+      dfs.removePathBasedCacheDirective(id);
+      waitForCachedBlocks(namenode, 0, 0);
+      checkNumCachedReplicas(dfs, paths, 0, 0);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test(timeout=60000)
+  public void testListCachePoolPermissions() throws Exception {
+    final UserGroupInformation myUser = UserGroupInformation
+        .createRemoteUser("myuser");
+    final DistributedFileSystem myDfs = 
+        (DistributedFileSystem)DFSTestUtil.getFileSystemAs(myUser, conf);
+    final String poolName = "poolparty";
+    dfs.addCachePool(new CachePoolInfo(poolName)
+        .setMode(new FsPermission((short)0700)));
+    // Should only see partial info
+    RemoteIterator<CachePoolInfo> it = myDfs.listCachePools();
+    CachePoolInfo info = it.next();
+    assertFalse(it.hasNext());
+    assertEquals("Expected pool name", poolName, info.getPoolName());
+    assertNull("Unexpected owner name", info.getOwnerName());
+    assertNull("Unexpected group name", info.getGroupName());
+    assertNull("Unexpected mode", info.getMode());
+    assertNull("Unexpected weight", info.getWeight());
+    // Modify the pool so myuser is now the owner
+    dfs.modifyCachePool(new CachePoolInfo(poolName)
+        .setOwnerName(myUser.getShortUserName())
+        .setWeight(99));
+    // Should see full info
+    it = myDfs.listCachePools();
+    info = it.next();
+    assertFalse(it.hasNext());
+    assertEquals("Expected pool name", poolName, info.getPoolName());
+    assertEquals("Mismatched owner name", myUser.getShortUserName(),
+        info.getOwnerName());
+    assertNotNull("Expected group name", info.getGroupName());
+    assertEquals("Mismatched mode", (short) 0700,
+        info.getMode().toShort());
+    assertEquals("Mismatched weight", 99, (int)info.getWeight());
+  }
 }

+ 50 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java

@@ -29,6 +29,7 @@ import static org.mockito.Mockito.spy;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Random;
 
@@ -40,9 +41,12 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSOutputStream;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
@@ -102,6 +106,7 @@ public class TestRenameWithSnapshots {
   
   @Before
   public void setUp() throws Exception {
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPL).format(true)
         .build();
     cluster.waitActive();
@@ -2289,4 +2294,49 @@ public class TestRenameWithSnapshots {
     assertEquals(0, diff.getChildrenDiff().getList(ListType.DELETED).size());
     assertEquals(0, diff.getChildrenDiff().getList(ListType.CREATED).size());
   }
+
+  /**
+   * Rename of the underconstruction file in snapshot should not fail NN restart
+   * after checkpoint. Unit test for HDFS-5425.
+   */
+  @Test
+  public void testRenameUCFileInSnapshot() throws Exception {
+    final Path test = new Path("/test");
+    final Path foo = new Path(test, "foo");
+    final Path bar = new Path(foo, "bar");
+    hdfs.mkdirs(foo);
+    // create a file and keep it as underconstruction.
+    hdfs.create(bar);
+    SnapshotTestHelper.createSnapshot(hdfs, test, "s0");
+    // rename bar --> bar2
+    final Path bar2 = new Path(foo, "bar2");
+    hdfs.rename(bar, bar2);
+
+    // save namespace and restart
+    restartClusterAndCheckImage(true);
+  }
+  
+  /**
+   * Similar with testRenameUCFileInSnapshot, but do renaming first and then 
+   * append file without closing it. Unit test for HDFS-5425.
+   */
+  @Test
+  public void testAppendFileAfterRenameInSnapshot() throws Exception {
+    final Path test = new Path("/test");
+    final Path foo = new Path(test, "foo");
+    final Path bar = new Path(foo, "bar");
+    DFSTestUtil.createFile(hdfs, bar, BLOCKSIZE, REPL, SEED);
+    SnapshotTestHelper.createSnapshot(hdfs, test, "s0");
+    // rename bar --> bar2
+    final Path bar2 = new Path(foo, "bar2");
+    hdfs.rename(bar, bar2);
+    // append file and keep it as underconstruction.
+    FSDataOutputStream out = hdfs.append(bar2);
+    out.writeByte(0);
+    ((DFSOutputStream) out.getWrappedStream()).hsync(
+        EnumSet.of(SyncFlag.UPDATE_LENGTH));
+
+    // save namespace and restart
+    restartClusterAndCheckImage(true);
+  }
 }

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestXMLUtils.java

@@ -17,9 +17,8 @@
  */
 package org.apache.hadoop.hdfs.util;
 
-import junit.framework.Assert;
-
 import org.apache.hadoop.hdfs.util.XMLUtils.UnmanglingError;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class TestXMLUtils {

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestURLConnectionFactory.java

@@ -22,9 +22,8 @@ import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.List;
 
-import junit.framework.Assert;
-
 import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
+import org.junit.Assert;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java

@@ -26,8 +26,6 @@ import static org.junit.Assert.fail;
 import java.util.HashMap;
 import java.util.Map;
 
-import junit.framework.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -38,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -164,6 +164,9 @@ Release 2.3.0 - UNRELEASED
 
     MAPREDUCE-4421. Run MapReduce framework via the distributed cache (jlowe)
 
+    MAPREDUCE-1176. FixedLengthInputFormat and FixedLengthRecordReader
+    (Mariappan Asokan and BitsOfInfo via Sandy Ryza)
+
   OPTIMIZATIONS
 
     MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)

+ 97 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthInputFormat.java

@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+
+/**
+ * FixedLengthInputFormat is an input format used to read input files
+ * which contain fixed length records.  The content of a record need not be
+ * text.  It can be arbitrary binary data.  Users must configure the record
+ * length property by calling:
+ * FixedLengthInputFormat.setRecordLength(conf, recordLength);<br><br> or
+ * conf.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, recordLength);
+ * <br><br>
+ * @see FixedLengthRecordReader
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class FixedLengthInputFormat
+    extends FileInputFormat<LongWritable, BytesWritable>
+    implements JobConfigurable {
+
+  private CompressionCodecFactory compressionCodecs = null;
+  
+  public static final String FIXED_RECORD_LENGTH =
+      "fixedlengthinputformat.record.length"; 
+
+  /**
+   * Set the length of each record
+   * @param conf configuration
+   * @param recordLength the length of a record
+   */
+  public static void setRecordLength(Configuration conf, int recordLength) {
+    conf.setInt(FIXED_RECORD_LENGTH, recordLength);
+  }
+
+  /**
+   * Get record length value
+   * @param conf configuration
+   * @return the record length, zero means none was set
+   */
+  public static int getRecordLength(Configuration conf) {
+    return conf.getInt(FIXED_RECORD_LENGTH, 0);
+  }
+
+  @Override
+  public void configure(JobConf conf) {
+    compressionCodecs = new CompressionCodecFactory(conf);
+  }
+
+  @Override
+  public RecordReader<LongWritable, BytesWritable>
+      getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter)
+      throws IOException {
+    reporter.setStatus(genericSplit.toString());
+    int recordLength = getRecordLength(job);
+    if (recordLength <= 0) {
+      throw new IOException("Fixed record length " + recordLength
+          + " is invalid.  It should be set to a value greater than zero");
+    }
+    return new FixedLengthRecordReader(job, (FileSplit)genericSplit,
+                                       recordLength);
+  }
+
+  @Override
+  protected boolean isSplitable(FileSystem fs, Path file) {
+    final CompressionCodec codec = compressionCodecs.getCodec(file);
+    return(null == codec);
+  }
+
+}

+ 89 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthRecordReader.java

@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * A reader to read fixed length records from a split.  Record offset is
+ * returned as key and the record as bytes is returned in value.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class FixedLengthRecordReader
+    implements RecordReader<LongWritable, BytesWritable> {
+
+  private int recordLength;
+  // Make use of the new API implementation to avoid code duplication.
+  private org.apache.hadoop.mapreduce.lib.input.FixedLengthRecordReader reader;
+
+  public FixedLengthRecordReader(Configuration job, FileSplit split,
+                                 int recordLength) throws IOException {
+    this.recordLength = recordLength;
+    reader = new org.apache.hadoop.mapreduce.lib.input.FixedLengthRecordReader(
+        recordLength);
+    reader.initialize(job, split.getStart(), split.getLength(),
+        split.getPath());
+  }
+
+  @Override
+  public LongWritable createKey() {
+    return new LongWritable();
+  }
+  
+  @Override
+  public BytesWritable createValue() {
+    return new BytesWritable(new byte[recordLength]);
+  }
+  
+  @Override
+  public synchronized boolean next(LongWritable key, BytesWritable value)
+      throws IOException {
+    boolean dataRead = reader.nextKeyValue();
+    if (dataRead) {
+      LongWritable newKey = reader.getCurrentKey();
+      BytesWritable newValue = reader.getCurrentValue();
+      key.set(newKey.get());
+      value.set(newValue);
+    }
+    return dataRead;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return reader.getProgress();
+  }
+  
+  @Override
+  public synchronized long getPos() throws IOException {
+    return reader.getPos();
+  }
+
+  @Override
+  public void close() throws IOException {
+    reader.close();
+  }    
+
+}

+ 90 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthInputFormat.java

@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * FixedLengthInputFormat is an input format used to read input files
+ * which contain fixed length records.  The content of a record need not be
+ * text.  It can be arbitrary binary data.  Users must configure the record
+ * length property by calling:
+ * FixedLengthInputFormat.setRecordLength(conf, recordLength);<br><br> or
+ * conf.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, recordLength);
+ * <br><br>
+ * @see FixedLengthRecordReader
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class FixedLengthInputFormat
+    extends FileInputFormat<LongWritable, BytesWritable> {
+
+  public static final String FIXED_RECORD_LENGTH =
+      "fixedlengthinputformat.record.length"; 
+
+  /**
+   * Set the length of each record
+   * @param conf configuration
+   * @param recordLength the length of a record
+   */
+  public static void setRecordLength(Configuration conf, int recordLength) {
+    conf.setInt(FIXED_RECORD_LENGTH, recordLength);
+  }
+
+  /**
+   * Get record length value
+   * @param conf configuration
+   * @return the record length, zero means none was set
+   */
+  public static int getRecordLength(Configuration conf) {
+    return conf.getInt(FIXED_RECORD_LENGTH, 0);
+  }
+
+  @Override
+  public RecordReader<LongWritable, BytesWritable>
+      createRecordReader(InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    int recordLength = getRecordLength(context.getConfiguration());
+    if (recordLength <= 0) {
+      throw new IOException("Fixed record length " + recordLength
+          + " is invalid.  It should be set to a value greater than zero");
+    }
+    return new FixedLengthRecordReader(recordLength);
+  }
+
+  @Override
+  protected boolean isSplitable(JobContext context, Path file) {
+    final CompressionCodec codec = 
+        new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
+    return (null == codec);
+  } 
+
+}

+ 220 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java

@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+
+/**
+ * A reader to read fixed length records from a split.  Record offset is
+ * returned as key and the record as bytes is returned in value.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class FixedLengthRecordReader
+    extends RecordReader<LongWritable, BytesWritable> {
+  private static final Log LOG 
+      = LogFactory.getLog(FixedLengthRecordReader.class);
+
+  private int recordLength;
+  private long start;
+  private long pos;
+  private long end;
+  private long  numRecordsRemainingInSplit;
+  private FSDataInputStream fileIn;
+  private Seekable filePosition;
+  private LongWritable key;
+  private BytesWritable value;
+  private boolean isCompressedInput;
+  private Decompressor decompressor;
+  private InputStream inputStream;
+
+  public FixedLengthRecordReader(int recordLength) {
+    this.recordLength = recordLength;
+  }
+
+  @Override
+  public void initialize(InputSplit genericSplit,
+                         TaskAttemptContext context) throws IOException {
+    FileSplit split = (FileSplit) genericSplit;
+    Configuration job = context.getConfiguration();
+    final Path file = split.getPath();
+    initialize(job, split.getStart(), split.getLength(), file);
+  }
+
+  // This is also called from the old FixedLengthRecordReader API implementation
+  public void initialize(Configuration job, long splitStart, long splitLength,
+                         Path file) throws IOException {
+    start = splitStart;
+    end = start + splitLength;
+    long partialRecordLength = start % recordLength;
+    long numBytesToSkip = 0;
+    if (partialRecordLength != 0) {
+      numBytesToSkip = recordLength - partialRecordLength;
+    }
+
+    // open the file and seek to the start of the split
+    final FileSystem fs = file.getFileSystem(job);
+    fileIn = fs.open(file);
+
+    CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
+    if (null != codec) {
+      isCompressedInput = true;	
+      decompressor = CodecPool.getDecompressor(codec);
+      CompressionInputStream cIn
+          = codec.createInputStream(fileIn, decompressor);
+      filePosition = cIn;
+      inputStream = cIn;
+      numRecordsRemainingInSplit = Long.MAX_VALUE;
+      LOG.info(
+          "Compressed input; cannot compute number of records in the split");
+    } else {
+      fileIn.seek(start);
+      filePosition = fileIn;
+      inputStream = fileIn;
+      long splitSize = end - start - numBytesToSkip;
+      numRecordsRemainingInSplit = (splitSize + recordLength - 1)/recordLength;
+      if (numRecordsRemainingInSplit < 0) {
+        numRecordsRemainingInSplit = 0;
+      }
+      LOG.info("Expecting " + numRecordsRemainingInSplit
+          + " records each with a length of " + recordLength
+          + " bytes in the split with an effective size of "
+          + splitSize + " bytes");
+    }
+    if (numBytesToSkip != 0) {
+      start += inputStream.skip(numBytesToSkip);
+    }
+    this.pos = start;
+  }
+
+  @Override
+  public synchronized boolean nextKeyValue() throws IOException {
+    if (key == null) {
+      key = new LongWritable();
+    }
+    if (value == null) {
+      value = new BytesWritable(new byte[recordLength]);
+    }
+    boolean dataRead = false;
+    value.setSize(recordLength);
+    byte[] record = value.getBytes();
+    if (numRecordsRemainingInSplit > 0) {
+      key.set(pos);
+      int offset = 0;
+      int numBytesToRead = recordLength;
+      int numBytesRead = 0;
+      while (numBytesToRead > 0) {
+        numBytesRead = inputStream.read(record, offset, numBytesToRead);
+        if (numBytesRead == -1) {
+          // EOF
+          break;
+        }
+        offset += numBytesRead;
+        numBytesToRead -= numBytesRead;
+      }
+      numBytesRead = recordLength - numBytesToRead;
+      pos += numBytesRead;
+      if (numBytesRead > 0) {
+        dataRead = true;
+        if (numBytesRead >= recordLength) {
+          if (!isCompressedInput) {
+            numRecordsRemainingInSplit--;
+          }
+        } else {
+          throw new IOException("Partial record(length = " + numBytesRead
+              + ") found at the end of split.");
+        }
+      } else {
+        numRecordsRemainingInSplit = 0L; // End of input.
+      }
+    }
+    return dataRead;
+  }
+
+  @Override
+  public LongWritable getCurrentKey() {
+    return key;
+  }
+
+  @Override
+  public BytesWritable getCurrentValue() {
+    return value;
+  }
+
+  @Override
+  public synchronized float getProgress() throws IOException {
+    if (start == end) {
+      return 0.0f;
+    } else {
+      return Math.min(1.0f, (getFilePosition() - start) / (float)(end - start));
+    }
+  }
+  
+  @Override
+  public synchronized void close() throws IOException {
+    try {
+      if (inputStream != null) {
+        inputStream.close();
+        inputStream = null;
+      }
+    } finally {
+      if (decompressor != null) {
+        CodecPool.returnDecompressor(decompressor);
+        decompressor = null;
+      }
+    }
+  }
+
+  // This is called from the old FixedLengthRecordReader API implementation.
+  public long getPos() {
+    return pos;
+  }
+
+  private long getFilePosition() throws IOException {
+    long retVal;
+    if (isCompressedInput && null != filePosition) {
+      retVal = filePosition.getPos();
+    } else {
+      retVal = pos;
+    }
+    return retVal;
+  }
+
+}

+ 416 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java

@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static junit.framework.Assert.*;
+
+public class TestFixedLengthInputFormat {
+
+  private static Log LOG;
+  private static Configuration defaultConf;
+  private static FileSystem localFs; 
+  private static Path workDir;
+  private static Reporter voidReporter;
+  
+  // some chars for the record data
+  private static char[] chars;
+  private static Random charRand;
+
+  @BeforeClass
+  public static void onlyOnce() {
+    try {
+      LOG = LogFactory.getLog(TestFixedLengthInputFormat.class.getName());
+      defaultConf = new Configuration();
+      defaultConf.set("fs.defaultFS", "file:///");
+      localFs = FileSystem.getLocal(defaultConf);
+      voidReporter = Reporter.NULL;
+      // our set of chars
+      chars = ("abcdefghijklmnopqrstuvABCDEFGHIJKLMN OPQRSTUVWXYZ1234567890)"
+          + "(*&^%$#@!-=><?:\"{}][';/.,']").toCharArray();
+      workDir = 
+          new Path(new Path(System.getProperty("test.build.data", "."), "data"),
+          "TestKeyValueFixedLengthInputFormat");
+      charRand = new Random();
+    } catch (IOException e) {
+      throw new RuntimeException("init failure", e);
+    }
+  }
+
+  /**
+   * 20 random tests of various record, file, and split sizes.  All tests have
+   * uncompressed file as input.
+   */
+  @Test (timeout=500000)
+  public void testFormat() throws IOException {
+    runRandomTests(null);
+  }
+
+  /**
+   * 20 random tests of various record, file, and split sizes.  All tests have
+   * compressed file as input.
+   */
+  @Test (timeout=500000)
+  public void testFormatCompressedIn() throws IOException {
+    runRandomTests(new GzipCodec());
+  }
+
+  /**
+   * Test with no record length set.
+   */
+  @Test (timeout=5000)
+  public void testNoRecordLength() throws IOException {
+    localFs.delete(workDir, true);
+    Path file = new Path(workDir, new String("testFormat.txt"));
+    createFile(file, null, 10, 10);
+    // Set the fixed length record length config property 
+    Configuration testConf = new Configuration(defaultConf);
+    JobConf job = new JobConf(testConf);
+    FileInputFormat.setInputPaths(job, workDir);
+    FixedLengthInputFormat format = new FixedLengthInputFormat();
+    format.configure(job);
+    InputSplit splits[] = format.getSplits(job, 1);
+    boolean exceptionThrown = false;
+    for (InputSplit split : splits) {
+      try {
+        RecordReader<LongWritable, BytesWritable> reader = 
+            format.getRecordReader(split, job, voidReporter);
+      } catch(IOException ioe) {
+        exceptionThrown = true;
+        LOG.info("Exception message:" + ioe.getMessage());
+      }
+    }
+    assertTrue("Exception for not setting record length:", exceptionThrown);
+  }
+
+  /**
+   * Test with record length set to 0
+   */
+  @Test (timeout=5000)
+  public void testZeroRecordLength() throws IOException {
+    localFs.delete(workDir, true);
+    Path file = new Path(workDir, new String("testFormat.txt"));
+    createFile(file, null, 10, 10);
+    // Set the fixed length record length config property 
+    Configuration testConf = new Configuration(defaultConf);
+    JobConf job = new JobConf(testConf);
+    FileInputFormat.setInputPaths(job, workDir);
+    FixedLengthInputFormat format = new FixedLengthInputFormat();
+    format.setRecordLength(job, 0);
+    format.configure(job);
+    InputSplit splits[] = format.getSplits(job, 1);
+    boolean exceptionThrown = false;
+    for (InputSplit split : splits) {
+      try {
+        RecordReader<LongWritable, BytesWritable> reader = 
+                             format.getRecordReader(split, job, voidReporter);
+      } catch(IOException ioe) {
+        exceptionThrown = true;
+        LOG.info("Exception message:" + ioe.getMessage());
+      }
+    }
+    assertTrue("Exception for zero record length:", exceptionThrown);
+  }
+
+  /**
+   * Test with record length set to a negative value
+   */
+  @Test (timeout=5000)
+  public void testNegativeRecordLength() throws IOException {
+    localFs.delete(workDir, true);
+    Path file = new Path(workDir, new String("testFormat.txt"));
+    createFile(file, null, 10, 10);
+    // Set the fixed length record length config property 
+    Configuration testConf = new Configuration(defaultConf);
+    JobConf job = new JobConf(testConf);
+    FileInputFormat.setInputPaths(job, workDir);
+    FixedLengthInputFormat format = new FixedLengthInputFormat();
+    format.setRecordLength(job, -10);
+    format.configure(job);
+    InputSplit splits[] = format.getSplits(job, 1);
+    boolean exceptionThrown = false;
+    for (InputSplit split : splits) {
+      try {
+        RecordReader<LongWritable, BytesWritable> reader = 
+            format.getRecordReader(split, job, voidReporter);
+      } catch(IOException ioe) {
+        exceptionThrown = true;
+        LOG.info("Exception message:" + ioe.getMessage());
+      }
+    }
+    assertTrue("Exception for negative record length:", exceptionThrown);
+  }
+
+  /**
+   * Test with partial record at the end of a compressed input file.
+   */
+  @Test (timeout=5000)
+  public void testPartialRecordCompressedIn() throws IOException {
+    CompressionCodec gzip = new GzipCodec();
+    runPartialRecordTest(gzip);
+  }
+
+  /**
+   * Test with partial record at the end of an uncompressed input file.
+   */
+  @Test (timeout=5000)
+  public void testPartialRecordUncompressedIn() throws IOException {
+    runPartialRecordTest(null);
+  }
+
+  /**
+   * Test using the gzip codec with two input files.
+   */
+  @Test (timeout=5000)
+  public void testGzipWithTwoInputs() throws IOException {
+    CompressionCodec gzip = new GzipCodec();
+    localFs.delete(workDir, true);
+    // Create files with fixed length records with 5 byte long records.
+    writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip, 
+        "one  two  threefour five six  seveneightnine ten  ");
+    writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
+        "ten  nine eightsevensix  five four threetwo  one  ");
+    FixedLengthInputFormat format = new FixedLengthInputFormat();
+    format.setRecordLength(defaultConf, 5);
+    JobConf job = new JobConf(defaultConf);
+    FileInputFormat.setInputPaths(job, workDir);
+    ReflectionUtils.setConf(gzip, job);
+    format.configure(job);
+    InputSplit[] splits = format.getSplits(job, 100);
+    assertEquals("compressed splits == 2", 2, splits.length);
+    FileSplit tmp = (FileSplit) splits[0];
+    if (tmp.getPath().getName().equals("part2.txt.gz")) {
+      splits[0] = splits[1];
+      splits[1] = tmp;
+    }
+    List<String> results = readSplit(format, splits[0], job);
+    assertEquals("splits[0] length", 10, results.size());
+    assertEquals("splits[0][5]", "six  ", results.get(5));
+    results = readSplit(format, splits[1], job);
+    assertEquals("splits[1] length", 10, results.size());
+    assertEquals("splits[1][0]", "ten  ", results.get(0));
+    assertEquals("splits[1][1]", "nine ", results.get(1));
+  }
+
+  // Create a file containing fixed length records with random data
+  private ArrayList<String> createFile(Path targetFile, CompressionCodec codec,
+                                       int recordLen,
+                                       int numRecords) throws IOException {
+    ArrayList<String> recordList = new ArrayList<String>(numRecords);
+    OutputStream ostream = localFs.create(targetFile);
+    if (codec != null) {
+      ostream = codec.createOutputStream(ostream);
+    }
+    Writer writer = new OutputStreamWriter(ostream);
+    try {
+      StringBuffer sb = new StringBuffer();
+      for (int i = 0; i < numRecords; i++) {
+        for (int j = 0; j < recordLen; j++) {
+          sb.append(chars[charRand.nextInt(chars.length)]);
+        }
+        String recordData = sb.toString();
+        recordList.add(recordData);
+        writer.write(recordData);
+        sb.setLength(0);
+      }
+    } finally {
+      writer.close();
+    }
+    return recordList;
+  }
+
+  private void runRandomTests(CompressionCodec codec) throws IOException {
+    StringBuilder fileName = new StringBuilder("testFormat.txt");
+    if (codec != null) {
+      fileName.append(".gz");
+    }
+    localFs.delete(workDir, true);
+    Path file = new Path(workDir, fileName.toString());
+    int seed = new Random().nextInt();
+    LOG.info("Seed = " + seed);
+    Random random = new Random(seed);
+    int MAX_TESTS = 20;
+    LongWritable key = new LongWritable();
+    BytesWritable value = new BytesWritable();
+
+    for (int i = 0; i < MAX_TESTS; i++) {
+      LOG.info("----------------------------------------------------------");
+      // Maximum total records of 999
+      int totalRecords = random.nextInt(999)+1;
+      // Test an empty file
+      if (i == 8) {
+         totalRecords = 0;
+      }
+      // Maximum bytes in a record of 100K
+      int recordLength = random.nextInt(1024*100)+1;
+      // For the 11th test, force a record length of 1
+      if (i == 10) {
+        recordLength = 1;
+      }
+      // The total bytes in the test file
+      int fileSize = (totalRecords * recordLength);
+      LOG.info("totalRecords=" + totalRecords + " recordLength="
+          + recordLength);
+      // Create the test file
+      ArrayList<String> recordList
+          = createFile(file, codec, recordLength, totalRecords);
+      assertTrue(localFs.exists(file));
+      // Set the fixed length record length config property 
+      Configuration testConf = new Configuration(defaultConf);
+      FixedLengthInputFormat.setRecordLength(testConf, recordLength);
+
+      int numSplits = 1;
+      // Arbitrarily set number of splits.
+      if (i > 0) {
+        if (i == (MAX_TESTS-1)) {
+          // Test a split size that is less than record len
+          numSplits = (int)(fileSize/Math.floor(recordLength/2));
+        } else {
+          if (MAX_TESTS % i == 0) {
+            // Let us create a split size that is forced to be 
+            // smaller than the end file itself, (ensures 1+ splits)
+            numSplits = fileSize/(fileSize - random.nextInt(fileSize));
+          } else {
+            // Just pick a random split size with no upper bound 
+            numSplits = Math.max(1, fileSize/random.nextInt(Integer.MAX_VALUE));
+          }
+        }
+        LOG.info("Number of splits set to: " + numSplits);
+      }
+
+      // Create the job, and setup the input path
+      JobConf job = new JobConf(testConf);
+      FileInputFormat.setInputPaths(job, workDir);
+      // Try splitting the file in a variety of sizes
+      FixedLengthInputFormat format = new FixedLengthInputFormat();
+      format.configure(job);
+      InputSplit splits[] = format.getSplits(job, numSplits);
+      LOG.info("Actual number of splits = " + splits.length);
+      // Test combined split lengths = total file size
+      long recordOffset = 0;
+      int recordNumber = 0;
+      for (InputSplit split : splits) {
+        RecordReader<LongWritable, BytesWritable> reader = 
+            format.getRecordReader(split, job, voidReporter);
+        Class<?> clazz = reader.getClass();
+        assertEquals("RecordReader class should be FixedLengthRecordReader:", 
+            FixedLengthRecordReader.class, clazz);
+        // Plow through the records in this split
+        while (reader.next(key, value)) {
+          assertEquals("Checking key", (long)(recordNumber*recordLength),
+              key.get());
+          String valueString =
+              new String(value.getBytes(), 0, value.getLength());
+          assertEquals("Checking record length:", recordLength,
+              value.getLength());
+          assertTrue("Checking for more records than expected:",
+              recordNumber < totalRecords);
+          String origRecord = recordList.get(recordNumber);
+          assertEquals("Checking record content:", origRecord, valueString);
+          recordNumber++;
+        }
+        reader.close();
+      }
+      assertEquals("Total original records should be total read records:",
+          recordList.size(), recordNumber);
+    }
+  }
+
+  private static void writeFile(FileSystem fs, Path name, 
+                                CompressionCodec codec,
+                                String contents) throws IOException {
+    OutputStream stm;
+    if (codec == null) {
+      stm = fs.create(name);
+    } else {
+      stm = codec.createOutputStream(fs.create(name));
+    }
+    stm.write(contents.getBytes());
+    stm.close();
+  }
+
+  private static List<String> readSplit(FixedLengthInputFormat format, 
+                                        InputSplit split, 
+                                        JobConf job) throws IOException {
+    List<String> result = new ArrayList<String>();
+    RecordReader<LongWritable, BytesWritable> reader =
+        format.getRecordReader(split, job, voidReporter);
+    LongWritable key = reader.createKey();
+    BytesWritable value = reader.createValue();
+    while (reader.next(key, value)) {
+      result.add(new String(value.getBytes(), 0, value.getLength()));
+    }
+    reader.close();
+    return result;
+  }
+
+  private void runPartialRecordTest(CompressionCodec codec) throws IOException {
+    localFs.delete(workDir, true);
+    // Create a file with fixed length records with 5 byte long
+    // records with a partial record at the end.
+    StringBuilder fileName = new StringBuilder("testFormat.txt");
+    if (codec != null) {
+      fileName.append(".gz");
+    }
+    writeFile(localFs, new Path(workDir, fileName.toString()), codec,
+        "one  two  threefour five six  seveneightnine ten");
+    FixedLengthInputFormat format = new FixedLengthInputFormat();
+    format.setRecordLength(defaultConf, 5);
+    JobConf job = new JobConf(defaultConf);
+    FileInputFormat.setInputPaths(job, workDir);
+    if (codec != null) {
+      ReflectionUtils.setConf(codec, job);
+    }
+    format.configure(job);
+    InputSplit[] splits = format.getSplits(job, 100);
+    if (codec != null) {
+      assertEquals("compressed splits == 1", 1, splits.length);
+    }
+    boolean exceptionThrown = false;
+    for (InputSplit split : splits) {
+      try {
+        List<String> results = readSplit(format, split, job);
+      } catch(IOException ioe) {
+        exceptionThrown = true;
+        LOG.info("Exception message:" + ioe.getMessage());
+      }
+    }
+    assertTrue("Exception for partial record:", exceptionThrown);
+  }
+
+}

+ 461 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java

@@ -0,0 +1,461 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
+
+import org.apache.hadoop.util.ReflectionUtils;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static junit.framework.Assert.*;
+
+public class TestFixedLengthInputFormat {
+
+  private static Log LOG;
+  private static Configuration defaultConf;
+  private static FileSystem localFs;
+  private static Path workDir;
+
+  // some chars for the record data
+  private static char[] chars;
+  private static Random charRand;
+
+  @BeforeClass
+  public static void onlyOnce() {
+    try {
+      LOG = LogFactory.getLog(TestFixedLengthInputFormat.class.getName());
+      defaultConf = new Configuration();
+      defaultConf.set("fs.defaultFS", "file:///");
+      localFs = FileSystem.getLocal(defaultConf);
+      // our set of chars
+      chars = ("abcdefghijklmnopqrstuvABCDEFGHIJKLMN OPQRSTUVWXYZ1234567890)"
+          + "(*&^%$#@!-=><?:\"{}][';/.,']").toCharArray();
+      workDir = 
+          new Path(new Path(System.getProperty("test.build.data", "."), "data"),
+          "TestKeyValueFixedLengthInputFormat");
+      charRand = new Random();
+    } catch (IOException e) {
+      throw new RuntimeException("init failure", e);
+    }
+  }
+
+  /**
+   * 20 random tests of various record, file, and split sizes.  All tests have
+   * uncompressed file as input.
+   */
+  @Test (timeout=500000)
+  public void testFormat() throws Exception {
+    runRandomTests(null);
+  }
+
+  /**
+   * 20 random tests of various record, file, and split sizes.  All tests have
+   * compressed file as input.
+   */
+  @Test (timeout=500000)
+  public void testFormatCompressedIn() throws Exception {
+    runRandomTests(new GzipCodec());
+  }
+
+  /**
+   * Test with no record length set.
+   */
+  @Test (timeout=5000)
+  public void testNoRecordLength() throws Exception {
+    localFs.delete(workDir, true);
+    Path file = new Path(workDir, new String("testFormat.txt"));
+    createFile(file, null, 10, 10);
+    // Set the fixed length record length config property 
+    Configuration testConf = new Configuration(defaultConf);
+    Job job = Job.getInstance(testConf);
+    FileInputFormat.setInputPaths(job, workDir);
+    FixedLengthInputFormat format = new FixedLengthInputFormat();
+    List<InputSplit> splits = format.getSplits(job);
+    boolean exceptionThrown = false;
+    for (InputSplit split : splits) {
+      try {
+        TaskAttemptContext context = MapReduceTestUtil.
+            createDummyMapTaskAttemptContext(job.getConfiguration());
+        RecordReader<LongWritable, BytesWritable> reader =
+            format.createRecordReader(split, context);
+        MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
+            mcontext =
+            new MapContextImpl<LongWritable, BytesWritable, LongWritable,
+            BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
+            reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
+        reader.initialize(split, mcontext);
+      } catch(IOException ioe) {
+        exceptionThrown = true;
+        LOG.info("Exception message:" + ioe.getMessage());
+      }
+    }
+    assertTrue("Exception for not setting record length:", exceptionThrown);
+  }
+
+  /**
+   * Test with record length set to 0
+   */
+  @Test (timeout=5000)
+  public void testZeroRecordLength() throws Exception {
+    localFs.delete(workDir, true);
+    Path file = new Path(workDir, new String("testFormat.txt"));
+    createFile(file, null, 10, 10);
+    // Set the fixed length record length config property 
+    Configuration testConf = new Configuration(defaultConf);
+    FixedLengthInputFormat format = new FixedLengthInputFormat();
+    format.setRecordLength(testConf, 0);
+    Job job = Job.getInstance(testConf);
+    FileInputFormat.setInputPaths(job, workDir);
+    List<InputSplit> splits = format.getSplits(job);
+    boolean exceptionThrown = false;
+    for (InputSplit split : splits) {
+      try {
+        TaskAttemptContext context =
+            MapReduceTestUtil.createDummyMapTaskAttemptContext(
+            job.getConfiguration());
+        RecordReader<LongWritable, BytesWritable> reader = 
+            format.createRecordReader(split, context);
+        MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
+            mcontext =
+            new MapContextImpl<LongWritable, BytesWritable, LongWritable,
+            BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
+            reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
+        reader.initialize(split, mcontext);
+      } catch(IOException ioe) {
+        exceptionThrown = true;
+        LOG.info("Exception message:" + ioe.getMessage());
+      }
+    }
+    assertTrue("Exception for zero record length:", exceptionThrown);
+  }
+
+  /**
+   * Test with record length set to a negative value
+   */
+  @Test (timeout=5000)
+  public void testNegativeRecordLength() throws Exception {
+    localFs.delete(workDir, true);
+    Path file = new Path(workDir, new String("testFormat.txt"));
+    createFile(file, null, 10, 10);
+    // Set the fixed length record length config property 
+    Configuration testConf = new Configuration(defaultConf);
+    FixedLengthInputFormat format = new FixedLengthInputFormat();
+    format.setRecordLength(testConf, -10);
+    Job job = Job.getInstance(testConf);
+    FileInputFormat.setInputPaths(job, workDir);
+    List<InputSplit> splits = format.getSplits(job);
+    boolean exceptionThrown = false;
+    for (InputSplit split : splits) {
+      try {
+        TaskAttemptContext context = MapReduceTestUtil.
+            createDummyMapTaskAttemptContext(job.getConfiguration());
+        RecordReader<LongWritable, BytesWritable> reader = 
+            format.createRecordReader(split, context);
+        MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
+            mcontext =
+            new MapContextImpl<LongWritable, BytesWritable, LongWritable,
+            BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
+            reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
+        reader.initialize(split, mcontext);
+      } catch(IOException ioe) {
+        exceptionThrown = true;
+        LOG.info("Exception message:" + ioe.getMessage());
+      }
+    }
+    assertTrue("Exception for negative record length:", exceptionThrown);
+  }
+
+  /**
+   * Test with partial record at the end of a compressed input file.
+   */
+  @Test (timeout=5000)
+  public void testPartialRecordCompressedIn() throws Exception {
+    CompressionCodec gzip = new GzipCodec();
+    runPartialRecordTest(gzip);
+  }
+
+  /**
+   * Test with partial record at the end of an uncompressed input file.
+   */
+  @Test (timeout=5000)
+  public void testPartialRecordUncompressedIn() throws Exception {
+    runPartialRecordTest(null);
+  }
+
+  /**
+   * Test using the gzip codec with two input files.
+   */
+  @Test (timeout=5000)
+  public void testGzipWithTwoInputs() throws Exception {
+    CompressionCodec gzip = new GzipCodec();
+    localFs.delete(workDir, true);
+    // Create files with fixed length records with 5 byte long records.
+    writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip, 
+        "one  two  threefour five six  seveneightnine ten  ");
+    writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
+        "ten  nine eightsevensix  five four threetwo  one  ");
+    FixedLengthInputFormat format = new FixedLengthInputFormat();
+    format.setRecordLength(defaultConf, 5);
+    ReflectionUtils.setConf(gzip, defaultConf);
+    Job job = Job.getInstance(defaultConf);
+    FileInputFormat.setInputPaths(job, workDir);
+    List<InputSplit> splits = format.getSplits(job);
+    assertEquals("compressed splits == 2", 2, splits.size());
+    FileSplit tmp = (FileSplit) splits.get(0);
+    if (tmp.getPath().getName().equals("part2.txt.gz")) {
+      splits.set(0, splits.get(1));
+      splits.set(1, tmp);
+    }
+    List<String> results = readSplit(format, splits.get(0), job);
+    assertEquals("splits[0] length", 10, results.size());
+    assertEquals("splits[0][5]", "six  ", results.get(5));
+    results = readSplit(format, splits.get(1), job);
+    assertEquals("splits[1] length", 10, results.size());
+    assertEquals("splits[1][0]", "ten  ", results.get(0));
+    assertEquals("splits[1][1]", "nine ", results.get(1));
+  }
+
+  // Create a file containing fixed length records with random data
+  private ArrayList<String> createFile(Path targetFile, CompressionCodec codec,
+                                       int recordLen,
+                                       int numRecords) throws IOException {
+    ArrayList<String> recordList = new ArrayList<String>(numRecords);
+    OutputStream ostream = localFs.create(targetFile);
+    if (codec != null) {
+      ostream = codec.createOutputStream(ostream);
+    }
+    Writer writer = new OutputStreamWriter(ostream);
+    try {
+      StringBuffer sb = new StringBuffer();
+      for (int i = 0; i < numRecords; i++) {
+        for (int j = 0; j < recordLen; j++) {
+          sb.append(chars[charRand.nextInt(chars.length)]);
+        }
+        String recordData = sb.toString();
+        recordList.add(recordData);
+        writer.write(recordData);
+        sb.setLength(0);
+      }
+    } finally {
+      writer.close();
+    }
+    return recordList;
+  }
+
+  private void runRandomTests(CompressionCodec codec) throws Exception {
+    StringBuilder fileName = new StringBuilder("testFormat.txt");
+    if (codec != null) {
+      fileName.append(".gz");
+    }
+    localFs.delete(workDir, true);
+    Path file = new Path(workDir, fileName.toString());
+    int seed = new Random().nextInt();
+    LOG.info("Seed = " + seed);
+    Random random = new Random(seed);
+    int MAX_TESTS = 20;
+    LongWritable key;
+    BytesWritable value;
+
+    for (int i = 0; i < MAX_TESTS; i++) {
+      LOG.info("----------------------------------------------------------");
+      // Maximum total records of 999
+      int totalRecords = random.nextInt(999)+1;
+      // Test an empty file
+      if (i == 8) {
+         totalRecords = 0;
+      }
+      // Maximum bytes in a record of 100K
+      int recordLength = random.nextInt(1024*100)+1;
+      // For the 11th test, force a record length of 1
+      if (i == 10) {
+        recordLength = 1;
+      }
+      // The total bytes in the test file
+      int fileSize = (totalRecords * recordLength);
+      LOG.info("totalRecords=" + totalRecords + " recordLength="
+          + recordLength);
+      // Create the test file
+      ArrayList<String> recordList =
+          createFile(file, codec, recordLength, totalRecords);
+      assertTrue(localFs.exists(file));
+      // Set the fixed length record length config property 
+      Configuration testConf = new Configuration(defaultConf);
+      FixedLengthInputFormat.setRecordLength(testConf, recordLength);
+
+      int numSplits = 1;
+      // Arbitrarily set number of splits.
+      if (i > 0) {
+        if (i == (MAX_TESTS-1)) {
+          // Test a split size that is less than record len
+          numSplits = (int)(fileSize/Math.floor(recordLength/2));
+        } else {
+          if (MAX_TESTS % i == 0) {
+            // Let us create a split size that is forced to be 
+            // smaller than the end file itself, (ensures 1+ splits)
+            numSplits = fileSize/(fileSize - random.nextInt(fileSize));
+          } else {
+            // Just pick a random split size with no upper bound 
+            numSplits = Math.max(1, fileSize/random.nextInt(Integer.MAX_VALUE));
+          }
+        }
+        LOG.info("Number of splits set to: " + numSplits);
+      }
+      testConf.setLong("mapreduce.input.fileinputformat.split.maxsize", 
+          (long)(fileSize/numSplits));
+
+      // Create the job, and setup the input path
+      Job job = Job.getInstance(testConf);
+      FileInputFormat.setInputPaths(job, workDir);
+      // Try splitting the file in a variety of sizes
+      FixedLengthInputFormat format = new FixedLengthInputFormat();
+      List<InputSplit> splits = format.getSplits(job);
+      LOG.info("Actual number of splits = " + splits.size());
+      // Test combined split lengths = total file size
+      long recordOffset = 0;
+      int recordNumber = 0;
+      for (InputSplit split : splits) {
+        TaskAttemptContext context = MapReduceTestUtil.
+            createDummyMapTaskAttemptContext(job.getConfiguration());
+        RecordReader<LongWritable, BytesWritable> reader = 
+            format.createRecordReader(split, context);
+        MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
+            mcontext =
+            new MapContextImpl<LongWritable, BytesWritable, LongWritable,
+            BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
+            reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
+        reader.initialize(split, mcontext);
+        Class<?> clazz = reader.getClass();
+        assertEquals("RecordReader class should be FixedLengthRecordReader:", 
+            FixedLengthRecordReader.class, clazz);
+        // Plow through the records in this split
+        while (reader.nextKeyValue()) {
+          key = reader.getCurrentKey();
+          value = reader.getCurrentValue();
+          assertEquals("Checking key", (long)(recordNumber*recordLength),
+              key.get());
+          String valueString = new String(value.getBytes(), 0,
+              value.getLength());
+          assertEquals("Checking record length:", recordLength,
+              value.getLength());
+          assertTrue("Checking for more records than expected:",
+              recordNumber < totalRecords);
+          String origRecord = recordList.get(recordNumber);
+          assertEquals("Checking record content:", origRecord, valueString);
+          recordNumber++;
+        }
+        reader.close();
+      }
+      assertEquals("Total original records should be total read records:",
+          recordList.size(), recordNumber);
+    }
+  }
+
+  private static void writeFile(FileSystem fs, Path name, 
+                                CompressionCodec codec,
+                                String contents) throws IOException {
+    OutputStream stm;
+    if (codec == null) {
+      stm = fs.create(name);
+    } else {
+      stm = codec.createOutputStream(fs.create(name));
+    }
+    stm.write(contents.getBytes());
+    stm.close();
+  }
+
+  private static List<String> readSplit(FixedLengthInputFormat format, 
+                                        InputSplit split, 
+                                        Job job) throws Exception {
+    List<String> result = new ArrayList<String>();
+    TaskAttemptContext context = MapReduceTestUtil.
+        createDummyMapTaskAttemptContext(job.getConfiguration());
+    RecordReader<LongWritable, BytesWritable> reader =
+        format.createRecordReader(split, context);
+    MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
+        mcontext =
+        new MapContextImpl<LongWritable, BytesWritable, LongWritable,
+        BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
+        reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
+    reader.initialize(split, mcontext);
+    LongWritable key;
+    BytesWritable value;
+    while (reader.nextKeyValue()) {
+      key = reader.getCurrentKey();
+      value = reader.getCurrentValue();
+      result.add(new String(value.getBytes(), 0, value.getLength()));
+    }
+    reader.close();
+    return result;
+  }
+
+  private void runPartialRecordTest(CompressionCodec codec) throws Exception {
+    localFs.delete(workDir, true);
+    // Create a file with fixed length records with 5 byte long
+    // records with a partial record at the end.
+    StringBuilder fileName = new StringBuilder("testFormat.txt");
+    if (codec != null) {
+      fileName.append(".gz");
+      ReflectionUtils.setConf(codec, defaultConf);
+    }
+    writeFile(localFs, new Path(workDir, fileName.toString()), codec,
+        "one  two  threefour five six  seveneightnine ten");
+    FixedLengthInputFormat format = new FixedLengthInputFormat();
+    format.setRecordLength(defaultConf, 5);
+    Job job = Job.getInstance(defaultConf);
+    FileInputFormat.setInputPaths(job, workDir);
+    List<InputSplit> splits = format.getSplits(job);
+    if (codec != null) {
+      assertEquals("compressed splits == 1", 1, splits.size());
+    }
+    boolean exceptionThrown = false;
+    for (InputSplit split : splits) {
+      try {
+        List<String> results = readSplit(format, split, job);
+      } catch(IOException ioe) {
+        exceptionThrown = true;
+        LOG.info("Exception message:" + ioe.getMessage());
+      }
+    }
+    assertTrue("Exception for partial record:", exceptionThrown);
+  }
+
+}

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -124,6 +124,9 @@ Release 2.3.0 - UNRELEASED
     YARN-1395. Distributed shell application master launched with debug flag can
     hang waiting for external ls process. (cnauroth)
 
+    YARN-1400. yarn.cmd uses HADOOP_RESOURCEMANAGER_OPTS. Should be
+    YARN_RESOURCEMANAGER_OPTS. (Raja Aluri via cnauroth)
+
 Release 2.2.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/bin/yarn.cmd

@@ -180,7 +180,7 @@ goto :eof
 :resourcemanager
   set CLASSPATH=%CLASSPATH%;%YARN_CONF_DIR%\rm-config\log4j.properties
   set CLASS=org.apache.hadoop.yarn.server.resourcemanager.ResourceManager
-  set YARN_OPTS=%YARN_OPTS% %HADOOP_RESOURCEMANAGER_OPTS%
+  set YARN_OPTS=%YARN_OPTS% %YARN_RESOURCEMANAGER_OPTS%
   if defined YARN_RESOURCEMANAGER_HEAPSIZE (
     set JAVA_HEAP_MAX=-Xmx%YARN_RESOURCEMANAGER_HEAPSIZE%m
   )