Browse Source

Merging r1543613 through r1543709 from trunk to branch HDFS-2832

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1543710 13f79535-47bb-0310-9956-ffa450edef68
Arpit Agarwal 11 years ago
parent
commit
59393e0162
43 changed files with 671 additions and 692 deletions
  1. 2 2
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 2 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ReadaheadPool.java
  3. 11 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DefaultCodec.java
  4. 0 69
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectCompressor.java
  5. 35 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectDecompressionCodec.java
  6. 30 42
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectDecompressor.java
  7. 9 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java
  8. 10 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java
  9. 72 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java
  10. 1 107
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java
  11. 82 103
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java
  12. 12 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java
  13. 49 12
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
  14. 53 1
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/snappy/TestSnappyCompressorDecompressor.java
  15. 33 125
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java
  16. 8 1
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  17. 1 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/PathBasedCacheDirective.java
  18. 9 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
  19. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
  20. 9 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
  21. 9 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  22. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
  23. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
  24. 7 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  25. 9 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
  26. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeConfig.java
  27. 9 5
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java
  28. 26 44
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
  29. 14 35
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java
  30. 17 7
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestJMXGet.java
  31. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java
  32. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
  33. 7 0
      hadoop-yarn-project/CHANGES.txt
  34. 93 79
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
  35. 9 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
  36. 5 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java
  37. 4 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java
  38. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NavBlock.java
  39. 6 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java
  40. 7 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java
  41. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
  42. 5 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java
  43. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ResourceManagerRest.apt.vm

+ 2 - 2
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -387,8 +387,8 @@ Release 2.3.0 - UNRELEASED
 
     HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)
 
-    HADOOP-10047. Add a direct-buffer based apis for compression. (Gopal V via
-    acmurthy)
+   HADOOP-10047. Add a direct-buffer based apis for compression. (Gopal V
+   via acmurthy)
 
   BUG FIXES
 

+ 2 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ReadaheadPool.java

@@ -203,8 +203,8 @@ public class ReadaheadPool {
       // It's also possible that we'll end up requesting readahead on some
       // other FD, which may be wasted work, but won't cause a problem.
       try {
-        NativeIO.POSIX.posixFadviseIfPossible(identifier, fd, off, len,
-            NativeIO.POSIX.POSIX_FADV_WILLNEED);
+        NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
+            fd, off, len, NativeIO.POSIX.POSIX_FADV_WILLNEED);
       } catch (IOException ioe) {
         if (canceled) {
           // no big deal - the reader canceled the request and closed

+ 11 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DefaultCodec.java

@@ -28,11 +28,12 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.zlib.ZlibDecompressor;
 import org.apache.hadoop.io.compress.zlib.ZlibFactory;
 
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public class DefaultCodec implements Configurable, CompressionCodec {
+public class DefaultCodec implements Configurable, CompressionCodec, DirectDecompressionCodec {
   private static final Log LOG = LogFactory.getLog(DefaultCodec.class);
   
   Configuration conf;
@@ -103,6 +104,15 @@ public class DefaultCodec implements Configurable, CompressionCodec {
     return ZlibFactory.getZlibDecompressor(conf);
   }
   
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public DirectDecompressor createDirectDecompressor() {
+    return ZlibFactory.getZlibDirectDecompressor(conf);
+  }
+  
+  
   @Override
   public String getDefaultExtension() {
     return ".deflate";

+ 0 - 69
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectCompressor.java

@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.io.compress;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
-public interface DirectCompressor extends Compressor {
-  /**
-   * Example usage
-   * <pre> {@code
-   * private void compress(DirectCompressor comp, ByteBufferProducer in, ByteBufferConsumer out) throws IOException {
-   *    ByteBuffer outBB = ByteBuffer.allocateDirect(64*1024);
-   *    outBB.clear();
-   *    // returns inBB.remaining() &gt; 0 || inBB == null 
-   *    // if you do a inBB.put(), remember to do a inBB.flip()
-   *    ByteBuffer inBB = in.get();       
-   *    while(!comp.finished()) {
-   *      comp.compress(outBB, inBB);
-   *      if(outBB.remaining() == 0) {
-   *        // flush when the buffer only when it is full
-   *        outBB.flip();          
-   *        // has to consume the buffer, because it is reused
-   *        out.put(outBB);
-   *        outBB.clear();
-   *      }
-   *      if(inBB != null &amp;&amp; inBB.remaining() == 0) {
-   *        inBB = in.get();
-   *        if(inBB == null) {
-   *          // EOF
-   *          comp.finish();
-   *        }
-   *      }
-   *    }
-   *    
-   *    if(outBB.position() &gt; 0) {
-   *      outBB.flip();
-   *      out.put(outBB);
-   *      outBB.clear();
-   *    }
-   *  }
-   * } </pre>
-   * @param dst Destination {@link ByteBuffer} for storing the results into. Requires dst.remaining() to be > 0
-   * @param src Source {@link ByteBuffer} for reading from. This can be null or src.remaining() > 0
-   * @return bytes stored into dst
-   * @throws IOException if compression fails
-   */
-	public int compress(ByteBuffer dst, ByteBuffer src) throws IOException;
-}

+ 35 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectDecompressionCodec.java

@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io.compress;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This class encapsulates a codec which can decompress direct bytebuffers.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface DirectDecompressionCodec extends CompressionCodec {
+  /**
+   * Create a new {@link DirectDecompressor} for use by this {@link DirectDecompressionCodec}.
+   * 
+   * @return a new direct decompressor for use by this codec
+   */
+  DirectDecompressor createDirectDecompressor();
+}

+ 30 - 42
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DirectDecompressor.java

@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -23,49 +23,37 @@ import java.nio.ByteBuffer;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
-
+/**
+ * Specification of a direct ByteBuffer 'de-compressor'. 
+ */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public interface DirectDecompressor extends Decompressor {
-  /**
-   * Example usage
+public interface DirectDecompressor {
+  /*
+   * This exposes a direct interface for record decompression with direct byte
+   * buffers.
+   * 
+   * The decompress() function need not always consume the buffers provided,
+   * it will need to be called multiple times to decompress an entire buffer 
+   * and the object will hold the compression context internally.
+   * 
+   * Codecs such as {@link SnappyCodec} may or may not support partial
+   * decompression of buffers and will need enough space in the destination
+   * buffer to decompress an entire block.
+   * 
+   * The operation is modelled around dst.put(src);
+   * 
+   * The end result will move src.position() by the bytes-read and
+   * dst.position() by the bytes-written. It should not modify the src.limit()
+   * or dst.limit() to maintain consistency of operation between codecs.
+   * 
+   * @param src Source direct {@link ByteBuffer} for reading from. Requires src
+   * != null and src.remaining() > 0
+   * 
+   * @param dst Destination direct {@link ByteBuffer} for storing the results
+   * into. Requires dst != null and dst.remaining() to be > 0
    * 
-   * <pre>{@code
-   * private void decompress(DirectDecompressor decomp, ByteBufferProducer in, ByteBufferConsumer out) throws IOException {
-   *    ByteBuffer outBB = ByteBuffer.allocate(64*1024);
-   *    outBB.clear();
-   *    // returns inBB.remaining() &gt; 0 || inBB == null 
-   *    // if you do a inBB.put(), remember to do a inBB.flip()
-   *    ByteBuffer inBB = in.get();
-   *    if(inBB == null) {
-   *      // no data at all?
-   *    }
-   *    while(!decomp.finished()) {
-   *      decomp.decompress(outBB, inBB);
-   *      if(outBB.remaining() == 0) {
-   *        // flush when the buffer is full
-   *        outBB.flip();
-   *        // has to consume the buffer, because it is reused
-   *        out.put(outBB);
-   *        outBB.clear();
-   *      }
-   *      if(inBB != null &amp;&amp; inBB.remaining() == 0) {
-   *        // inBB = null for EOF
-   *        inBB = in.get();
-   *      }
-   *    }
-   *    
-   *    if(outBB.position() &gt; 0) {
-   *      outBB.flip();
-   *      out.put(outBB);
-   *      outBB.clear();
-   *    }
-   *  }
-   * }</pre>
-   * @param dst Destination {@link ByteBuffer} for storing the results into. Requires dst.remaining() to be > 0
-   * @param src Source {@link ByteBuffer} for reading from. This can be null or src.remaining() > 0
-   * @return bytes stored into dst (dst.postion += more)
-   * @throws IOException if compression fails   
+   * @throws IOException if compression fails
    */
-	public int decompress(ByteBuffer dst, ByteBuffer src) throws IOException;
+  public void decompress(ByteBuffer src, ByteBuffer dst) throws IOException;
 }

+ 9 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java

@@ -25,6 +25,8 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.compress.zlib.*;
+import org.apache.hadoop.io.compress.zlib.ZlibDecompressor.ZlibDirectDecompressor;
+
 import static org.apache.hadoop.util.PlatformName.IBM_JAVA;
 
 /**
@@ -218,6 +220,13 @@ public class GzipCodec extends DefaultCodec {
       ? GzipZlibDecompressor.class
       : BuiltInGzipDecompressor.class;
   }
+    
+  @Override
+  public DirectDecompressor createDirectDecompressor() {
+    return ZlibFactory.isNativeZlibLoaded(conf) 
+        ? new ZlibDecompressor.ZlibDirectDecompressor(
+          ZlibDecompressor.CompressionHeader.AUTODETECT_GZIP_ZLIB, 0) : null;
+  }
 
   @Override
   public String getDefaultExtension() {

+ 10 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java

@@ -26,13 +26,14 @@ import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
 import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
+import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.util.NativeCodeLoader;
 
 /**
  * This class creates snappy compressors/decompressors.
  */
-public class SnappyCodec implements Configurable, CompressionCodec {
+public class SnappyCodec implements Configurable, CompressionCodec, DirectDecompressionCodec {
   Configuration conf;
 
   /**
@@ -203,6 +204,14 @@ public class SnappyCodec implements Configurable, CompressionCodec {
         CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT);
     return new SnappyDecompressor(bufferSize);
   }
+  
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public DirectDecompressor createDirectDecompressor() {
+    return isNativeCodeLoaded() ? new SnappyDirectDecompressor() : null;
+  }
 
   /**
    * Get the default filename extension for this kind of compression.

+ 72 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java

@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressor;
 import org.apache.hadoop.util.NativeCodeLoader;
 
 /**
@@ -282,4 +283,75 @@ public class SnappyDecompressor implements Decompressor {
   private native static void initIDs();
 
   private native int decompressBytesDirect();
+  
+  int decompressDirect(ByteBuffer src, ByteBuffer dst) throws IOException {
+    assert (this instanceof SnappyDirectDecompressor);
+    
+    ByteBuffer presliced = dst;
+    if (dst.position() > 0) {
+      presliced = dst;
+      dst = dst.slice();
+    }
+
+    Buffer originalCompressed = compressedDirectBuf;
+    Buffer originalUncompressed = uncompressedDirectBuf;
+    int originalBufferSize = directBufferSize;
+    compressedDirectBuf = src.slice();
+    compressedDirectBufLen = src.remaining();
+    uncompressedDirectBuf = dst;
+    directBufferSize = dst.remaining();
+    int n = 0;
+    try {
+      n = decompressBytesDirect();
+      presliced.position(presliced.position() + n);
+      // SNAPPY always consumes the whole buffer or throws an exception
+      src.position(src.limit());
+      finished = true;
+    } finally {
+      compressedDirectBuf = originalCompressed;
+      uncompressedDirectBuf = originalUncompressed;
+      compressedDirectBufLen = 0;
+      directBufferSize = originalBufferSize;
+    }
+    return n;
+  }
+  
+  public static class SnappyDirectDecompressor extends SnappyDecompressor implements
+      DirectDecompressor {
+    
+    @Override
+    public boolean finished() {
+      return (endOfInput && super.finished());
+    }
+
+    @Override
+    public void reset() {
+      super.reset();
+      endOfInput = true;
+    }
+
+    private boolean endOfInput;
+
+    @Override
+    public synchronized void decompress(ByteBuffer src, ByteBuffer dst)
+        throws IOException {
+      assert dst.isDirect() : "dst.isDirect()";
+      assert src.isDirect() : "src.isDirect()";
+      assert dst.remaining() > 0 : "dst.remaining() > 0";
+      this.decompressDirect(src, dst);
+      endOfInput = !src.hasRemaining();
+    }
+
+    @Override
+    public synchronized void setDictionary(byte[] b, int off, int len) {
+      throw new UnsupportedOperationException(
+          "byte[] arrays are not supported for DirectDecompressor");
+    }
+
+    @Override
+    public synchronized int decompress(byte[] b, int off, int len) {
+      throw new UnsupportedOperationException(
+          "byte[] arrays are not supported for DirectDecompressor");
+    }
+  }
 }

+ 1 - 107
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java

@@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.DirectCompressor;
 import org.apache.hadoop.util.NativeCodeLoader;
 
 import org.apache.commons.logging.Log;
@@ -36,7 +35,7 @@ import org.apache.commons.logging.LogFactory;
  * http://www.zlib.net/
  * 
  */
-public class ZlibCompressor implements Compressor,DirectCompressor {
+public class ZlibCompressor implements Compressor {
 
   private static final Log LOG = LogFactory.getLog(ZlibCompressor.class);
 
@@ -421,7 +420,6 @@ public class ZlibCompressor implements Compressor,DirectCompressor {
     compressedDirectBuf.limit(directBufferSize);
     compressedDirectBuf.position(directBufferSize);
     userBufOff = userBufLen = 0;
-    userBuf = null;
   }
   
   @Override
@@ -437,110 +435,6 @@ public class ZlibCompressor implements Compressor,DirectCompressor {
       throw new NullPointerException();
   }
   
-  private int put(ByteBuffer dst, ByteBuffer src) {
-    // this will lop off data from src[pos:limit] into dst[pos:limit]
-    int l1 = src.remaining();
-    int l2 = dst.remaining();
-    int pos1 = src.position();
-    int pos2 = dst.position();
-    int len = Math.min(l1, l2);
-
-    if (len == 0) {
-      return 0;
-    }
-
-    ByteBuffer slice = src.slice();
-    slice.limit(len);
-    dst.put(slice);
-    src.position(pos1 + len);
-    return len;
-  }
-
-  public int compress(ByteBuffer dst, ByteBuffer src) throws IOException {
-    assert dst.remaining() > 0 : "dst.remaining() == 0";
-    int n = 0;
-    
-    /* fast path for clean state and direct buffers */
-    /* TODO: reset should free userBuf? */
-    if((src != null && src.isDirect()) && dst.isDirect() && userBuf == null) {
-      /*
-       * TODO: fix these assumptions in inflateDirect(), eventually by allowing
-       * it to read position()/limit() directly
-       */
-      boolean cleanDst = (dst.position() == 0 && dst.remaining() == dst.capacity() && dst.capacity() >= directBufferSize);
-      boolean cleanState = (keepUncompressedBuf == false && uncompressedDirectBufLen == 0 && compressedDirectBuf.remaining() == 0);
-      /* use the buffers directly */
-      if(cleanDst && cleanState) {
-        Buffer originalCompressed = compressedDirectBuf;
-        Buffer originalUncompressed = uncompressedDirectBuf;
-        int originalBufferSize = directBufferSize;
-        uncompressedDirectBuf = src;
-        uncompressedDirectBufOff = src.position();
-        uncompressedDirectBufLen = src.remaining();
-        compressedDirectBuf = dst;
-        directBufferSize = dst.remaining();
-        // Compress data
-        n = deflateBytesDirect();
-        // we move dst.position() forward, not limit() 
-        // unlike the local buffer case, which moves it when we put() into the dst
-        dst.position(n);
-        if(uncompressedDirectBufLen > 0) {
-          src.position(uncompressedDirectBufOff);
-        } else {
-          src.position(src.limit());
-        }
-        compressedDirectBuf = originalCompressed;
-        uncompressedDirectBuf = originalUncompressed;
-        uncompressedDirectBufOff = 0;
-        uncompressedDirectBufLen = 0;
-        directBufferSize = originalBufferSize;
-        return n;
-      }
-    }
-    
-    // Check if there is compressed data
-    if (compressedDirectBuf.remaining() > 0) {
-      n = put(dst, (ByteBuffer) compressedDirectBuf);
-    }
-
-    if (dst.remaining() == 0) {
-      return n;
-    } else {
-      needsInput();
-
-      // if we have drained userBuf, read from src (ideally, do not mix buffer
-      // modes, but sometimes you can)
-      if (userBufLen == 0 && src != null && src.remaining() > 0) {
-        put((ByteBuffer) uncompressedDirectBuf, src);
-        uncompressedDirectBufLen = uncompressedDirectBuf.position();
-      }
-
-      // Re-initialize the zlib's output direct buffer
-      compressedDirectBuf.rewind();
-      compressedDirectBuf.limit(directBufferSize);
-
-      // Compress data
-      int more = deflateBytesDirect();
-
-      compressedDirectBuf.limit(more);
-
-      // Check if zlib consumed all input buffer
-      // set keepUncompressedBuf properly
-      if (uncompressedDirectBufLen <= 0) { // zlib consumed all input buffer
-        keepUncompressedBuf = false;
-        uncompressedDirectBuf.clear();
-        uncompressedDirectBufOff = 0;
-        uncompressedDirectBufLen = 0;
-      } else { // zlib did not consume all input buffer
-        keepUncompressedBuf = true;
-      }
-
-      // fill the dst buffer from compressedDirectBuf
-      int fill = put(dst, ((ByteBuffer) compressedDirectBuf));
-      return n + fill;
-    }
-  }
-  
   private native static void initIDs();
   private native static long init(int level, int strategy, int windowBits);
   private native static void setDictionary(long strm, byte[] b, int off,

+ 82 - 103
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java

@@ -32,7 +32,7 @@ import org.apache.hadoop.util.NativeCodeLoader;
  * http://www.zlib.net/
  * 
  */
-public class ZlibDecompressor implements Decompressor,DirectDecompressor {
+public class ZlibDecompressor implements Decompressor {
   private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024;
   
   // HACK - Use this as a global lock in the JNI layer
@@ -107,7 +107,7 @@ public class ZlibDecompressor implements Decompressor,DirectDecompressor {
    */
   public ZlibDecompressor(CompressionHeader header, int directBufferSize) {
     this.header = header;
-    this.directBufferSize = directBufferSize;
+    this.directBufferSize = directBufferSize;    
     compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
     uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
     uncompressedDirectBuf.position(directBufferSize);
@@ -281,7 +281,6 @@ public class ZlibDecompressor implements Decompressor,DirectDecompressor {
     uncompressedDirectBuf.limit(directBufferSize);
     uncompressedDirectBuf.position(directBufferSize);
     userBufOff = userBufLen = 0;
-    userBuf = null;
   }
 
   @Override
@@ -301,117 +300,97 @@ public class ZlibDecompressor implements Decompressor,DirectDecompressor {
     if (stream == 0)
       throw new NullPointerException();
   }
+  
+  private native static void initIDs();
+  private native static long init(int windowBits);
+  private native static void setDictionary(long strm, byte[] b, int off,
+                                           int len);
+  private native int inflateBytesDirect();
+  private native static long getBytesRead(long strm);
+  private native static long getBytesWritten(long strm);
+  private native static int getRemaining(long strm);
+  private native static void reset(long strm);
+  private native static void end(long strm);
     
-  private int put(ByteBuffer dst, ByteBuffer src) {
-    // this will lop off data from src[pos:limit] into dst[pos:limit], using the
-    // min() of both remaining()
-    int l1 = src.remaining();
-    int l2 = dst.remaining();
-    int pos1 = src.position();
-    int pos2 = dst.position();
-    int len = Math.min(l1, l2);
-
-    if (len == 0) {
-      return 0;
+  int inflateDirect(ByteBuffer src, ByteBuffer dst) throws IOException {
+    assert (this instanceof ZlibDirectDecompressor);
+    
+    ByteBuffer presliced = dst;
+    if (dst.position() > 0) {
+      presliced = dst;
+      dst = dst.slice();
     }
 
-    ByteBuffer slice = src.slice();
-    slice.limit(len);
-    dst.put(slice);
-    src.position(pos1 + len);
-    return len;
+    Buffer originalCompressed = compressedDirectBuf;
+    Buffer originalUncompressed = uncompressedDirectBuf;
+    int originalBufferSize = directBufferSize;
+    compressedDirectBuf = src;
+    compressedDirectBufOff = src.position();
+    compressedDirectBufLen = src.remaining();
+    uncompressedDirectBuf = dst;
+    directBufferSize = dst.remaining();
+    int n = 0;
+    try {
+      n = inflateBytesDirect();
+      presliced.position(presliced.position() + n);
+      if (compressedDirectBufLen > 0) {
+        src.position(compressedDirectBufOff);
+      } else {
+        src.position(src.limit());
+      }
+    } finally {
+      compressedDirectBuf = originalCompressed;
+      uncompressedDirectBuf = originalUncompressed;
+      compressedDirectBufOff = 0;
+      compressedDirectBufLen = 0;
+      directBufferSize = originalBufferSize;
+    }
+    return n;
   }
+  
+  public static class ZlibDirectDecompressor 
+      extends ZlibDecompressor implements DirectDecompressor {
+    public ZlibDirectDecompressor() {
+      super(CompressionHeader.DEFAULT_HEADER, 0);
+    }
 
-  public int decompress(ByteBuffer dst, ByteBuffer src) throws IOException {
-    assert dst.remaining() > 0 : "dst.remaining == 0";
-    int n = 0;
+    public ZlibDirectDecompressor(CompressionHeader header, int directBufferSize) {
+      super(header, directBufferSize);
+    }
     
-    /* fast path for clean state and direct buffers */
-    if((src != null && src.isDirect()) && dst.isDirect() && userBuf == null) {
-      /*
-       * TODO: fix these assumptions in inflateDirect(), eventually by allowing
-       * it to read position()/limit() directly
-       */
-      boolean cleanDst = (dst.position() == 0 && dst.remaining() == dst.capacity() && dst.remaining() >= directBufferSize);
-      boolean cleanState = (compressedDirectBufLen == 0 && uncompressedDirectBuf.remaining() == 0);
-      /* use the buffers directly */
-      if(cleanDst && cleanState) {
-        Buffer originalCompressed = compressedDirectBuf;
-        Buffer originalUncompressed = uncompressedDirectBuf;
-        int originalBufferSize = directBufferSize;
-        compressedDirectBuf = src;
-        compressedDirectBufOff = src.position();
-        compressedDirectBufLen = src.remaining();
-        uncompressedDirectBuf = dst;
-        directBufferSize = dst.remaining();
-        // Compress data
-        n = inflateBytesDirect();
-        dst.position(n);
-        if(compressedDirectBufLen > 0) {
-          src.position(compressedDirectBufOff);
-        } else {
-          src.position(src.limit());
-        }
-        compressedDirectBuf = originalCompressed;
-        uncompressedDirectBuf = originalUncompressed;        
-        compressedDirectBufOff = 0;
-        compressedDirectBufLen = 0;
-        directBufferSize = originalBufferSize;
-        return n;
-      }
+    @Override
+    public boolean finished() {
+      return (endOfInput && super.finished());
     }
     
-    // Check if there is compressed data
-    if (uncompressedDirectBuf.remaining() > 0) {
-      n = put(dst, (ByteBuffer) uncompressedDirectBuf);
+    @Override
+    public void reset() {
+      super.reset();
+      endOfInput = true;
+    }
+    
+    private boolean endOfInput;
+
+    @Override
+    public synchronized void decompress(ByteBuffer src, ByteBuffer dst)
+        throws IOException {
+      assert dst.isDirect() : "dst.isDirect()";
+      assert src.isDirect() : "src.isDirect()";
+      assert dst.remaining() > 0 : "dst.remaining() > 0";      
+      this.inflateDirect(src, dst);
+      endOfInput = !src.hasRemaining();
     }
 
-    if (dst.remaining() == 0) {
-      return n;
-    } else {
-      if (needsInput()) {
-        // this does not update buffers if we have no userBuf
-        if (userBufLen <= 0) {
-          compressedDirectBufOff = 0;
-          compressedDirectBufLen = 0;
-          compressedDirectBuf.rewind().limit(directBufferSize);
-        }
-        if (src != null) {
-          assert src.remaining() > 0 : "src.remaining() == 0";
-        }
-      }
-
-      // if we have drained userBuf, read from src (ideally, do not mix buffer
-      // modes, but sometimes you can)
-      if (userBufLen == 0 && src != null && src.remaining() > 0) {
-        compressedDirectBufLen += put(((ByteBuffer) compressedDirectBuf), src);
-      }
-      
-      // Re-initialize the zlib's output direct buffer
-      uncompressedDirectBuf.rewind();
-      uncompressedDirectBuf.limit(directBufferSize);
-
-      // Compress data
-      int more = inflateBytesDirect();
-
-      uncompressedDirectBuf.limit(more);
+    @Override
+    public synchronized void setDictionary(byte[] b, int off, int len) {
+      throw new UnsupportedOperationException(
+          "byte[] arrays are not supported for DirectDecompressor");
+    }
 
-      // Get atmost 'len' bytes
-      int fill = put(dst, ((ByteBuffer) uncompressedDirectBuf));
-      return n + fill;
+    @Override
+    public synchronized int decompress(byte[] b, int off, int len) {
+      throw new UnsupportedOperationException(
+          "byte[] arrays are not supported for DirectDecompressor");
     }
   }
-
-  
-  
-  private native static void initIDs();
-  private native static long init(int windowBits);
-  private native static void setDictionary(long strm, byte[] b, int off,
-                                           int len);
-  private native int inflateBytesDirect();
-  private native static long getBytesRead(long strm);
-  private native static long getBytesWritten(long strm);
-  private native static int getRemaining(long strm);
-  private native static void reset(long strm);
-  private native static void end(long strm);
 }

+ 12 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibFactory.java

@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressor;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
 import org.apache.hadoop.util.NativeCodeLoader;
@@ -116,6 +117,17 @@ public class ZlibFactory {
     return (isNativeZlibLoaded(conf)) ? 
       new ZlibDecompressor() : new BuiltInZlibInflater(); 
   }
+  
+  /**
+   * Return the appropriate implementation of the zlib direct decompressor. 
+   * 
+   * @param conf configuration
+   * @return the appropriate implementation of the zlib decompressor.
+   */
+  public static DirectDecompressor getZlibDirectDecompressor(Configuration conf) {
+    return (isNativeZlibLoaded(conf)) ? 
+      new ZlibDecompressor.ZlibDirectDecompressor() : null; 
+  }
 
   public static void setCompressionStrategy(Configuration conf,
       CompressionStrategy strategy) {

+ 49 - 12
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java

@@ -98,9 +98,6 @@ public class NativeIO {
 
     private static final Log LOG = LogFactory.getLog(NativeIO.class);
 
-    @VisibleForTesting
-    public static CacheTracker cacheTracker = null;
-    
     private static boolean nativeLoaded = false;
     private static boolean fadvisePossible = true;
     private static boolean syncFileRangePossible = true;
@@ -111,18 +108,61 @@ public class NativeIO {
 
     private static long cacheTimeout = -1;
 
-    public static interface CacheTracker {
-      public void fadvise(String identifier, long offset, long len, int flags);
+    private static CacheManipulator cacheManipulator = new CacheManipulator();
+
+    public static CacheManipulator getCacheManipulator() {
+      return cacheManipulator;
     }
 
-    public static CacheManipulator cacheManipulator = new CacheManipulator();
+    public static void setCacheManipulator(CacheManipulator cacheManipulator) {
+      POSIX.cacheManipulator = cacheManipulator;
+    }
 
+    /**
+     * Used to manipulate the operating system cache.
+     */
     @VisibleForTesting
     public static class CacheManipulator {
       public void mlock(String identifier, ByteBuffer buffer,
           long len) throws IOException {
         POSIX.mlock(buffer, len);
       }
+
+      public long getMemlockLimit() {
+        return NativeIO.getMemlockLimit();
+      }
+
+      public long getOperatingSystemPageSize() {
+        return NativeIO.getOperatingSystemPageSize();
+      }
+
+      public void posixFadviseIfPossible(String identifier,
+        FileDescriptor fd, long offset, long len, int flags)
+            throws NativeIOException {
+        NativeIO.POSIX.posixFadviseIfPossible(identifier, fd, offset,
+            len, flags);
+      }
+    }
+
+    /**
+     * A CacheManipulator used for testing which does not actually call mlock.
+     * This allows many tests to be run even when the operating system does not
+     * allow mlock, or only allows limited mlocking.
+     */
+    @VisibleForTesting
+    public static class NoMlockCacheManipulator extends CacheManipulator {
+      public void mlock(String identifier, ByteBuffer buffer,
+          long len) throws IOException {
+        LOG.info("mlocking " + identifier);
+      }
+
+      public long getMemlockLimit() {
+        return 1125899906842624L;
+      }
+
+      public long getOperatingSystemPageSize() {
+        return 4096;
+      }
     }
 
     static {
@@ -207,12 +247,9 @@ public class NativeIO {
      *
      * @throws NativeIOException if there is an error with the syscall
      */
-    public static void posixFadviseIfPossible(String identifier,
+    static void posixFadviseIfPossible(String identifier,
         FileDescriptor fd, long offset, long len, int flags)
         throws NativeIOException {
-      if (cacheTracker != null) {
-        cacheTracker.fadvise(identifier, offset, len, flags);
-      }
       if (nativeLoaded && fadvisePossible) {
         try {
           posix_fadvise(fd, offset, len, flags);
@@ -566,7 +603,7 @@ public class NativeIO {
    *         Long.MAX_VALUE if there is no limit;
    *         The number of bytes that can be locked into memory otherwise.
    */
-  public static long getMemlockLimit() {
+  static long getMemlockLimit() {
     return isAvailable() ? getMemlockLimit0() : 0;
   }
 
@@ -575,7 +612,7 @@ public class NativeIO {
   /**
    * @return the operating system's page size.
    */
-  public static long getOperatingSystemPageSize() {
+  static long getOperatingSystemPageSize() {
     try {
       Field f = Unsafe.class.getDeclaredField("theUnsafe");
       f.setAccessible(true);

+ 53 - 1
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/snappy/TestSnappyCompressorDecompressor.java

@@ -29,6 +29,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.lang.reflect.Array;
+import java.nio.ByteBuffer;
 import java.util.Random;
 
 import org.apache.hadoop.io.DataInputBuffer;
@@ -38,6 +39,7 @@ import org.apache.hadoop.io.compress.BlockDecompressorStream;
 import org.apache.hadoop.io.compress.CompressionInputStream;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
 import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -147,7 +149,7 @@ public class TestSnappyCompressorDecompressor {
       fail("testSnappyCompressorCompressAIOBException ex error !!!");
     }
   }
-
+  
   @Test
   public void testSnappyDecompressorCompressAIOBException() {
     try {
@@ -275,6 +277,56 @@ public class TestSnappyCompressorDecompressor {
       fail("testSnappyBlockCompression ex error !!!");
     }
   }
+  
+  private void compressDecompressLoop(int rawDataSize) throws IOException {
+    byte[] rawData = BytesGenerator.get(rawDataSize);    
+    byte[] compressedResult = new byte[rawDataSize+20];
+    int directBufferSize = Math.max(rawDataSize*2, 64*1024);    
+    SnappyCompressor compressor = new SnappyCompressor(directBufferSize);
+    compressor.setInput(rawData, 0, rawDataSize);
+    int compressedSize = compressor.compress(compressedResult, 0, compressedResult.length);
+    SnappyDirectDecompressor decompressor = new SnappyDirectDecompressor();
+   
+    ByteBuffer inBuf = ByteBuffer.allocateDirect(compressedSize);
+    ByteBuffer outBuf = ByteBuffer.allocateDirect(rawDataSize);
+
+    inBuf.put(compressedResult, 0, compressedSize);
+    inBuf.flip();    
+
+    ByteBuffer expected = ByteBuffer.wrap(rawData);
+    
+    outBuf.clear();
+    while(!decompressor.finished()) {
+      decompressor.decompress(inBuf, outBuf);
+      if (outBuf.remaining() == 0) {
+        outBuf.flip();
+        while (outBuf.remaining() > 0) {        
+          assertEquals(expected.get(), outBuf.get());
+        }
+        outBuf.clear();
+      }
+    }
+    outBuf.flip();
+    while (outBuf.remaining() > 0) {        
+      assertEquals(expected.get(), outBuf.get());
+    }
+    outBuf.clear();
+    
+    assertEquals(0, expected.remaining());
+  }
+  
+  @Test
+  public void testSnappyDirectBlockCompression() {
+    int[] size = { 4 * 1024, 64 * 1024, 128 * 1024, 1024 * 1024 };    
+    assumeTrue(SnappyCodec.isNativeCodeLoaded());
+    try {
+      for (int i = 0; i < size.length; i++) {
+        compressDecompressLoop(size[i]);
+      }
+    } catch (IOException ex) {
+      fail("testSnappyDirectBlockCompression ex !!!" + ex);
+    }
+  }
 
   @Test
   public void testSnappyCompressorDecopressorLogicWithCompressionStreams() {

+ 33 - 125
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java

@@ -20,13 +20,12 @@ package org.apache.hadoop.io.compress.zlib;
 import static org.junit.Assert.*;
 import static org.junit.Assume.*;
 
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.Console;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.Random;
+import java.util.zip.DeflaterOutputStream;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -38,12 +37,10 @@ import org.apache.hadoop.io.compress.DecompressorStream;
 import org.apache.hadoop.io.compress.CompressDecompressTester.CompressionTestStrategy;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
-import org.apache.log4j.ConsoleAppender;
+import org.apache.hadoop.io.compress.zlib.ZlibDecompressor.ZlibDirectDecompressor;
+import org.apache.hadoop.util.NativeCodeLoader;
 import org.junit.Before;
 import org.junit.Test;
-
-import sun.util.logging.resources.logging;
-
 import com.google.common.collect.ImmutableSet;
 
 public class TestZlibCompressorDecompressor {
@@ -159,143 +156,54 @@ public class TestZlibCompressorDecompressor {
     }
   }
   
-  private void compressDecompressLoop(int rawDataSize, int inSize, int outSize)
-      throws IOException {
+  
+  private void compressDecompressLoop(int rawDataSize) throws IOException {
     byte[] rawData = null;
     rawData = generate(rawDataSize);
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    ByteBuffer inBuf = ByteBuffer.allocateDirect(inSize);
-    ByteBuffer outBuf = ByteBuffer.allocateDirect(outSize);
-    ZlibCompressor compressor = new ZlibCompressor();
-    ZlibDecompressor decompressor = new ZlibDecompressor();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream(rawDataSize+12);
+    DeflaterOutputStream dos = new DeflaterOutputStream(baos);
+    dos.write(rawData);
+    dos.flush();
+    dos.close();
+    byte[] compressedResult = baos.toByteArray();
+    int compressedSize = compressedResult.length;
+    ZlibDirectDecompressor decompressor = new ZlibDirectDecompressor();
+   
+    ByteBuffer inBuf = ByteBuffer.allocateDirect(compressedSize);
+    ByteBuffer outBuf = ByteBuffer.allocateDirect(rawDataSize);
+
+    inBuf.put(compressedResult, 0, compressedSize);
+    inBuf.flip();    
+
+    ByteBuffer expected = ByteBuffer.wrap(rawData);
+    
     outBuf.clear();
-    /* compression loop */
-    int off = 0;
-    int len = rawDataSize;
-    int min = Math.min(inBuf.remaining(), len);
-    if (min > 0) {
-      inBuf.put(rawData, off, min);
-    }
-    inBuf.flip();
-    len -= min;
-    off += min;
-    while (!compressor.finished()) {
-      compressor.compress(outBuf, inBuf);
+    while(!decompressor.finished()) {
+      decompressor.decompress(inBuf, outBuf);
       if (outBuf.remaining() == 0) {
-        // flush when the buffer is full
         outBuf.flip();
-        while (outBuf.remaining() > 0) {
-          baos.write(outBuf.get());
+        while (outBuf.remaining() > 0) {        
+          assertEquals(expected.get(), outBuf.get());
         }
         outBuf.clear();
       }
-      if (inBuf != null && inBuf.remaining() == 0) {
-        inBuf.clear();
-        if (len > 0) {
-          min = Math.min(inBuf.remaining(), len);
-          inBuf.put(rawData, off, min);
-          inBuf.flip();
-          len -= min;
-          off += min;
-        } else {
-          inBuf = null;
-          compressor.finish();
-        }
-      }
     }
-
     outBuf.flip();
-    if (outBuf.remaining() > 0) {
-      while (outBuf.remaining() > 0) {
-        baos.write(outBuf.get());
-      }
-      outBuf.clear();
+    while (outBuf.remaining() > 0) {        
+      assertEquals(expected.get(), outBuf.get());
     }
-
-    compressor.end();
-
-    byte[] compressed = baos.toByteArray();
-    ByteBuffer expected = ByteBuffer.wrap(rawData);
     outBuf.clear();
-    inBuf = ByteBuffer.allocateDirect(inSize);
-    inBuf.clear();
-
-    // zlib always has header
-    if (compressed.length != 0) {
-      off = 0;
-      len = compressed.length;
-      min = Math.min(inBuf.remaining(), len);
-      inBuf.put(compressed, off, min);
-      inBuf.flip();
-      len -= min;
-      off += min;
-      while (!decompressor.finished()) {
-        decompressor.decompress(outBuf, inBuf);
-        if (outBuf.remaining() == 0) {
-          outBuf.flip();
-          while (outBuf.remaining() > 0) {
-            assertEquals(expected.get(), outBuf.get());
-          }
-          outBuf.clear();
-        }
-
-        if (inBuf != null && inBuf.remaining() == 0) {
-          inBuf.clear();
-          if (len > 0) {
-            min = Math.min(inBuf.remaining(), len);
-            inBuf.put(compressed, off, min);
-            inBuf.flip();
-            len -= min;
-            off += min;
-          }
-        }
-      }
-    }
-
-    outBuf.flip();
-    if (outBuf.remaining() > 0) {
-      while (outBuf.remaining() > 0) {
-        assertEquals(expected.get(), outBuf.get());
-      }
-      outBuf.clear();
-    }
-
+    
     assertEquals(0, expected.remaining());
   }
-  
+
   @Test
   public void testZlibDirectCompressDecompress() {
-    int[] size = { 4, 16, 4 * 1024, 64 * 1024, 128 * 1024, 256 * 1024,
-        1024 * 1024 };
+    int[] size = { 1, 4, 16, 4 * 1024, 64 * 1024, 128 * 1024, 1024 * 1024 };
+    assumeTrue(NativeCodeLoader.isNativeCodeLoaded());
     try {
-      // 0-2 bytes results in sizeof(outBuf) > sizeof(inBuf)
-      compressDecompressLoop(0, 4096, 4096);
-      compressDecompressLoop(0, 1, 1);
-      compressDecompressLoop(1, 1, 2);
-      compressDecompressLoop(1, 2, 1);
-      compressDecompressLoop(2, 3, 2);
-
       for (int i = 0; i < size.length; i++) {
-        compressDecompressLoop(size[i], 4096, 4096);
-        compressDecompressLoop(size[i], 1, 1);
-        compressDecompressLoop(size[i], 1, 2);
-        compressDecompressLoop(size[i], 2, 1);
-        compressDecompressLoop(size[i], 3, 2);
-        compressDecompressLoop(size[i], size[i], 4096);
-        compressDecompressLoop(size[i], size[i] - 1, 4096);
-        compressDecompressLoop(size[i], size[i] + 1, 4096);
-        compressDecompressLoop(size[i], 4096, size[i]);
-        compressDecompressLoop(size[i], 4096, size[i] - 1);
-        compressDecompressLoop(size[i], 4096, size[i] + 1);
-        compressDecompressLoop(size[i], size[i] - 1, size[i] - 1);
-
-        compressDecompressLoop(size[i], size[i] / 2, 4096);
-        compressDecompressLoop(size[i], size[i] / 2 - 1, 4096);
-        compressDecompressLoop(size[i], size[i] / 2 + 1, 4096);
-        compressDecompressLoop(size[i], 4096, size[i] / 2);
-        compressDecompressLoop(size[i], 4096, size[i] / 2 - 1);
-        compressDecompressLoop(size[i], 4096, size[i] / 2 + 1);
-        compressDecompressLoop(size[i], size[i] / 2 - 1, size[i] / 2 - 1);
+        compressDecompressLoop(size[i]);
       }
     } catch (IOException ex) {
       fail("testZlibDirectCompressDecompress ex !!!" + ex);

+ 8 - 1
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -199,6 +199,9 @@ Trunk (Unreleased)
 
     HDFS-5366. recaching improvements (cmccabe)
 
+    HDFS-5511. improve CacheManipulator interface to allow better unit testing
+    (cmccabe)
+
   OPTIMIZATIONS
     HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
 
@@ -379,7 +382,9 @@ Trunk (Unreleased)
     nextEntryId (cmccabe)
 
     HDFS-5512. CacheAdmin -listPools fails with NPE when user lacks permissions
-    to view all pools (awang via cmccabe)
+    to view all pools (wang via cmccabe)
+
+    HDFS-5513. CacheAdmin commands fail when using . as the path. (wang)
 
 Release 2.3.0 - UNRELEASED
 
@@ -506,6 +511,8 @@ Release 2.3.0 - UNRELEASED
 
     HDFS-5073. TestListCorruptFileBlocks fails intermittently. (Arpit Agarwal)
 
+    HDFS-1386. TestJMXGet fails in jdk7 (jeagles)
+
   OPTIMIZATIONS
 
     HDFS-5239.  Allow FSNamesystem lock fairness to be configurable (daryn)

+ 1 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/PathBasedCacheDirective.java

@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hdfs.protocol;
 
-import java.net.URI;
-
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -61,9 +59,7 @@ public class PathBasedCacheDirective {
      */
     public Builder(PathBasedCacheDirective directive) {
       this.id = directive.getId();
-      // deep-copy URI
-      URI uri = directive.getPath().toUri();
-      this.path = new Path(uri.getScheme(), uri.getAuthority(), uri.getPath());
+      this.path = directive.getPath();
       this.replication = directive.getReplication();
       this.pool = directive.getPool();
     }

+ 9 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java

@@ -24,6 +24,8 @@ import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
 
+import javax.management.ObjectName;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -61,6 +63,7 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
   private JournalNodeRpcServer rpcServer;
   private JournalNodeHttpServer httpServer;
   private Map<String, Journal> journalsById = Maps.newHashMap();
+  private ObjectName journalNodeInfoBeanName;
 
   private File localDir;
 
@@ -181,6 +184,11 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
     for (Journal j : journalsById.values()) {
       IOUtils.cleanup(LOG, j);
     }
+
+    if (journalNodeInfoBeanName != null) {
+      MBeans.unregister(journalNodeInfoBeanName);
+      journalNodeInfoBeanName = null;
+    }
   }
 
   /**
@@ -256,7 +264,7 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
    * Register JournalNodeMXBean
    */
   private void registerJNMXBean() {
-    MBeans.register("JournalNode", "JournalNodeInfo", this);
+    journalNodeInfoBeanName = MBeans.register("JournalNode", "JournalNodeInfo", this);
   }
   
   private class ErrorReporter implements StorageErrorReporter {

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -664,8 +664,9 @@ class BlockReceiver implements Closeable {
         //                     
         long dropPos = lastCacheManagementOffset - CACHE_DROP_LAG_BYTES;
         if (dropPos > 0 && dropCacheBehindWrites) {
-          NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
-              outFd, 0, dropPos, NativeIO.POSIX.POSIX_FADV_DONTNEED);
+          NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
+              block.getBlockName(), outFd, 0, dropPos,
+              NativeIO.POSIX.POSIX_FADV_DONTNEED);
         }
         lastCacheManagementOffset = offsetInBlock;
       }

+ 9 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

@@ -375,8 +375,9 @@ class BlockSender implements java.io.Closeable {
         ((dropCacheBehindAllReads) ||
          (dropCacheBehindLargeReads && isLongRead()))) {
       try {
-        NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
-            blockInFd, lastCacheDropOffset, offset - lastCacheDropOffset,
+        NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
+            block.getBlockName(), blockInFd, lastCacheDropOffset,
+            offset - lastCacheDropOffset,
             NativeIO.POSIX.POSIX_FADV_DONTNEED);
       } catch (Exception e) {
         LOG.warn("Unable to drop cache on file close", e);
@@ -674,8 +675,9 @@ class BlockSender implements java.io.Closeable {
 
     if (isLongRead() && blockInFd != null) {
       // Advise that this file descriptor will be accessed sequentially.
-      NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
-          blockInFd, 0, 0, NativeIO.POSIX.POSIX_FADV_SEQUENTIAL);
+      NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
+          block.getBlockName(), blockInFd, 0, 0,
+          NativeIO.POSIX.POSIX_FADV_SEQUENTIAL);
     }
     
     // Trigger readahead of beginning of file if configured.
@@ -761,9 +763,9 @@ class BlockSender implements java.io.Closeable {
       long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
       if (offset >= nextCacheDropOffset) {
         long dropLength = offset - lastCacheDropOffset;
-        NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
-            blockInFd, lastCacheDropOffset, dropLength,
-            NativeIO.POSIX.POSIX_FADV_DONTNEED);
+        NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
+            block.getBlockName(), blockInFd, lastCacheDropOffset,
+            dropLength, NativeIO.POSIX.POSIX_FADV_DONTNEED);
         lastCacheDropOffset = offset;
       }
     }

+ 9 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -95,6 +95,8 @@ import java.security.PrivilegedExceptionAction;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.management.ObjectName;
+
 import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 import static org.apache.hadoop.util.ExitUtil.terminate;
 
@@ -209,6 +211,7 @@ public class DataNode extends Configured
   private boolean connectToDnViaHostname;
   ReadaheadPool readaheadPool;
   private final boolean getHdfsBlockLocationsEnabled;
+  private ObjectName dataNodeInfoBeanName;
 
   /**
    * Create the DataNode given a configuration, an array of dataDirs,
@@ -658,7 +661,7 @@ public class DataNode extends Configured
             " size (%s) is greater than zero and native code is not available.",
             DFS_DATANODE_MAX_LOCKED_MEMORY_KEY));
       }
-      long ulimit = NativeIO.getMemlockLimit();
+      long ulimit = NativeIO.POSIX.getCacheManipulator().getMemlockLimit();
       if (dnConf.maxLockedMemory > ulimit) {
       throw new RuntimeException(String.format(
           "Cannot start datanode because the configured max locked memory" +
@@ -890,7 +893,7 @@ public class DataNode extends Configured
   }
   
   private void registerMXBean() {
-    MBeans.register("DataNode", "DataNodeInfo", this);
+    dataNodeInfoBeanName = MBeans.register("DataNode", "DataNodeInfo", this);
   }
   
   @VisibleForTesting
@@ -1215,6 +1218,10 @@ public class DataNode extends Configured
     if (metrics != null) {
       metrics.shutdown();
     }
+    if (dataNodeInfoBeanName != null) {
+      MBeans.unregister(dataNodeInfoBeanName);
+      dataNodeInfoBeanName = null;
+    }
   }
   
   

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java

@@ -163,7 +163,8 @@ public class FsDatasetCache {
   private final UsedBytesCount usedBytesCount;
 
   public static class PageRounder {
-    private final long osPageSize = NativeIO.getOperatingSystemPageSize();
+    private final long osPageSize =
+        NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
 
     /**
      * Round up a number to the operating system page size.

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java

@@ -82,7 +82,7 @@ public class MappableBlock implements Closeable {
         throw new IOException("Block InputStream has no FileChannel.");
       }
       mmap = blockChannel.map(MapMode.READ_ONLY, 0, length);
-      NativeIO.POSIX.cacheManipulator.mlock(blockFileName, mmap, length);
+      NativeIO.POSIX.getCacheManipulator().mlock(blockFileName, mmap, length);
       verifyChecksum(length, metaIn, blockChannel, blockFileName);
       mappableBlock = new MappableBlock(mmap, length);
     } finally {

+ 7 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -5513,6 +5513,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   }
   
   private ObjectName mbeanName;
+  private ObjectName mxbeanName;
 
   /**
    * Register the FSNamesystem MBean using the name
@@ -5536,6 +5537,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   void shutdown() {
     if (mbeanName != null) {
       MBeans.unregister(mbeanName);
+      mbeanName = null;
+    }
+    if (mxbeanName != null) {
+      MBeans.unregister(mxbeanName);
+      mxbeanName = null;
     }
     if (dir != null) {
       dir.shutdown();
@@ -6345,7 +6351,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * Register NameNodeMXBean
    */
   private void registerMXBean() {
-    MBeans.register("NameNode", "NameNodeInfo", this);
+    mxbeanName = MBeans.register("NameNode", "NameNodeInfo", this);
   }
 
   /**

+ 9 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -26,6 +26,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+
+import javax.management.ObjectName;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -261,6 +264,7 @@ public class NameNode implements NameNodeStatusMXBean {
   private NameNodeRpcServer rpcServer;
 
   private JvmPauseMonitor pauseMonitor;
+  private ObjectName nameNodeStatusBeanName;
   
   /** Format a new filesystem.  Destroys any filesystem that may already
    * exist at this location.  **/
@@ -745,6 +749,10 @@ public class NameNode implements NameNodeStatusMXBean {
       if (namesystem != null) {
         namesystem.shutdown();
       }
+      if (nameNodeStatusBeanName != null) {
+        MBeans.unregister(nameNodeStatusBeanName);
+        nameNodeStatusBeanName = null;
+      }
     }
   }
 
@@ -1414,7 +1422,7 @@ public class NameNode implements NameNodeStatusMXBean {
    * Register NameNodeStatusMXBean
    */
   private void registerNNSMXBean() {
-    MBeans.register("NameNode", "NameNodeStatus", this);
+    nameNodeStatusBeanName = MBeans.register("NameNode", "NameNodeStatus", this);
   }
 
   @Override // NameNodeStatusMXBean

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeConfig.java

@@ -113,7 +113,8 @@ public class TestDatanodeConfig {
   @Test(timeout=60000)
   public void testMemlockLimit() throws Exception {
     assumeTrue(NativeIO.isAvailable());
-    final long memlockLimit = NativeIO.getMemlockLimit();
+    final long memlockLimit =
+        NativeIO.POSIX.getCacheManipulator().getMemlockLimit();
 
     // Can't increase the memlock limit past the maximum.
     assumeTrue(memlockLimit != Long.MAX_VALUE);

+ 9 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestCachingStrategy.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import java.io.FileDescriptor;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Map;
@@ -36,7 +37,8 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
-import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheTracker;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
+import org.apache.hadoop.io.nativeio.NativeIOException;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -54,7 +56,7 @@ public class TestCachingStrategy {
     EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
 
     // Track calls to posix_fadvise.
-    NativeIO.POSIX.cacheTracker = tracker;
+    NativeIO.POSIX.setCacheManipulator(tracker);
     
     // Normally, we wait for a few megabytes of data to be read or written 
     // before dropping the cache.  This is to avoid an excessive number of
@@ -106,12 +108,13 @@ public class TestCachingStrategy {
     }
   }
 
-  private static class TestRecordingCacheTracker implements CacheTracker {
+  private static class TestRecordingCacheTracker extends CacheManipulator {
     private final Map<String, Stats> map = new TreeMap<String, Stats>();
 
     @Override
-    synchronized public void fadvise(String name,
-        long offset, long len, int flags) {
+    public void posixFadviseIfPossible(String name,
+      FileDescriptor fd, long offset, long len, int flags)
+          throws NativeIOException {
       if ((len < 0) || (len > Integer.MAX_VALUE)) {
         throw new RuntimeException("invalid length of " + len +
             " passed to posixFadviseIfPossible");
@@ -126,6 +129,7 @@ public class TestCachingStrategy {
         map.put(name, stats);
       }
       stats.fadvise((int)offset, (int)len, flags);
+      super.posixFadviseIfPossible(name, fd, offset, len, flags);
     }
 
     synchronized void clear() {

+ 26 - 44
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java

@@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.MetricsAsserts;
@@ -99,7 +100,6 @@ public class TestFsDatasetCache {
   @Before
   public void setUp() throws Exception {
     assumeTrue(!Path.WINDOWS);
-    assumeTrue(NativeIO.getMemlockLimit() >= CACHE_CAPACITY);
     conf = new HdfsConfiguration();
     conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY, true);
     conf.setLong(DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS,
@@ -122,18 +122,8 @@ public class TestFsDatasetCache {
 
     spyNN = DataNodeTestUtils.spyOnBposToNN(dn, nn);
 
-    prevCacheManipulator = NativeIO.POSIX.cacheManipulator;
-
-    // Save the current CacheManipulator and replace it at the end of the test
-    // Stub out mlock calls to avoid failing when not enough memory is lockable
-    // by the operating system.
-    NativeIO.POSIX.cacheManipulator = new CacheManipulator() {
-      @Override
-      public void mlock(String identifier,
-          ByteBuffer mmap, long length) throws IOException {
-        LOG.info("mlocking " + identifier);
-      }
-    };
+    prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
+    NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
   }
 
   @After
@@ -145,7 +135,7 @@ public class TestFsDatasetCache {
       cluster.shutdown();
     }
     // Restore the original CacheManipulator
-    NativeIO.POSIX.cacheManipulator = prevCacheManipulator;
+    NativeIO.POSIX.setCacheManipulator(prevCacheManipulator);
   }
 
   private static void setHeartbeatResponse(DatanodeCommand[] cmds)
@@ -222,7 +212,8 @@ public class TestFsDatasetCache {
           if (tries++ > 10) {
             LOG.info("verifyExpectedCacheUsage: expected " +
                 expected + ", got " + curDnCacheUsed + "; " +
-                "memlock limit = " + NativeIO.getMemlockLimit() +
+                "memlock limit = " +
+                NativeIO.POSIX.getCacheManipulator().getMemlockLimit() +
                 ".  Waiting...");
           }
           return false;
@@ -297,40 +288,31 @@ public class TestFsDatasetCache {
    */
   @Test(timeout=600000)
   public void testCacheAndUncacheBlockWithRetries() throws Exception {
-    CacheManipulator prevCacheManipulator = NativeIO.POSIX.cacheManipulator;
-    
-    try {
-      NativeIO.POSIX.cacheManipulator = new CacheManipulator() {
-        private final Set<String> seenIdentifiers = new HashSet<String>();
-        
-        @Override
-        public void mlock(String identifier,
-            ByteBuffer mmap, long length) throws IOException {
-          if (seenIdentifiers.contains(identifier)) {
-            // mlock succeeds the second time.
-            LOG.info("mlocking " + identifier);
-            return;
-          }
-          seenIdentifiers.add(identifier);
-          throw new IOException("injecting IOException during mlock of " +
-              identifier);
+    // We don't have to save the previous cacheManipulator
+    // because it will be reinstalled by the @After function.
+    NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator() {
+      private final Set<String> seenIdentifiers = new HashSet<String>();
+      
+      @Override
+      public void mlock(String identifier,
+          ByteBuffer mmap, long length) throws IOException {
+        if (seenIdentifiers.contains(identifier)) {
+          // mlock succeeds the second time.
+          LOG.info("mlocking " + identifier);
+          return;
         }
-      };
-      testCacheAndUncacheBlock();
-    } finally {
-      NativeIO.POSIX.cacheManipulator = prevCacheManipulator;
-    }
+        seenIdentifiers.add(identifier);
+        throw new IOException("injecting IOException during mlock of " +
+            identifier);
+      }
+    });
+    testCacheAndUncacheBlock();
   }
 
   @Test(timeout=600000)
   public void testFilesExceedMaxLockedMemory() throws Exception {
     LOG.info("beginning testFilesExceedMaxLockedMemory");
 
-    // We don't want to deal with page rounding issues, so skip this
-    // test if page size is weird
-    long osPageSize = NativeIO.getOperatingSystemPageSize();
-    assumeTrue(osPageSize == 4096); 
-
     // Create some test files that will exceed total cache capacity
     final int numFiles = 5;
     final long fileSize = 15000;
@@ -411,7 +393,7 @@ public class TestFsDatasetCache {
     assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity);
     assertEquals("Unexpected amount of cache used", current, cacheUsed);
 
-    NativeIO.POSIX.cacheManipulator = new NativeIO.POSIX.CacheManipulator() {
+    NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator() {
       @Override
       public void mlock(String identifier,
           ByteBuffer mmap, long length) throws IOException {
@@ -422,7 +404,7 @@ public class TestFsDatasetCache {
           Assert.fail();
         }
       }
-    };
+    });
     // Starting caching each block in succession.  The usedBytes amount
     // should increase, even though caching doesn't complete on any of them.
     for (int i=0; i<NUM_BLOCKS; i++) {

+ 14 - 35
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java

@@ -31,9 +31,8 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
-import java.nio.MappedByteBuffer;
-import java.security.PrivilegedExceptionAction;
 import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -59,16 +58,15 @@ import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
 import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.GSet;
 import org.junit.After;
-import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -100,18 +98,8 @@ public class TestPathBasedCacheRequests {
     cluster.waitActive();
     dfs = cluster.getFileSystem();
     proto = cluster.getNameNodeRpc();
-    prevCacheManipulator = NativeIO.POSIX.cacheManipulator;
-
-    // Save the current CacheManipulator and replace it at the end of the test
-    // Stub out mlock calls to avoid failing when not enough memory is lockable
-    // by the operating system.
-    NativeIO.POSIX.cacheManipulator = new CacheManipulator() {
-      @Override
-      public void mlock(String identifier,
-          ByteBuffer mmap, long length) throws IOException {
-        LOG.info("mlocking " + identifier);
-      }
-    };
+    prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
+    NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
   }
 
   @After
@@ -120,7 +108,7 @@ public class TestPathBasedCacheRequests {
       cluster.shutdown();
     }
     // Restore the original CacheManipulator
-    NativeIO.POSIX.cacheManipulator = prevCacheManipulator;
+    NativeIO.POSIX.setCacheManipulator(prevCacheManipulator);
   }
 
   @Test(timeout=60000)
@@ -482,6 +470,15 @@ public class TestPathBasedCacheRequests {
     dfs.removePathBasedCacheDirective(relativeId);
     iter = dfs.listPathBasedCacheDirectives(null);
     assertFalse(iter.hasNext());
+
+    // Verify that PBCDs with path "." work correctly
+    PathBasedCacheDirective directive =
+        new PathBasedCacheDirective.Builder().setPath(new Path("."))
+            .setPool("pool1").build();
+    long id = dfs.addPathBasedCacheDirective(directive);
+    dfs.modifyPathBasedCacheDirective(new PathBasedCacheDirective.Builder(
+        directive).setId(id).setReplication((short)2).build());
+    dfs.removePathBasedCacheDirective(id);
   }
 
   @Test(timeout=60000)
@@ -647,20 +644,6 @@ public class TestPathBasedCacheRequests {
   // Most Linux installs will allow non-root users to lock 64KB.
   private static final long CACHE_CAPACITY = 64 * 1024 / NUM_DATANODES;
 
-  /**
-   * Return true if we can test DN caching.
-   */
-  private static boolean canTestDatanodeCaching() {
-    if (!NativeIO.isAvailable()) {
-      // Need NativeIO in order to cache blocks on the DN.
-      return false;
-    }
-    if (NativeIO.getMemlockLimit() < CACHE_CAPACITY) {
-      return false;
-    }
-    return true;
-  }
-
   private static HdfsConfiguration createCachingConf() {
     HdfsConfiguration conf = new HdfsConfiguration();
     conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
@@ -674,7 +657,6 @@ public class TestPathBasedCacheRequests {
 
   @Test(timeout=120000)
   public void testWaitForCachedReplicas() throws Exception {
-    Assume.assumeTrue(canTestDatanodeCaching());
     HdfsConfiguration conf = createCachingConf();
     FileSystemTestHelper helper = new FileSystemTestHelper();
     MiniDFSCluster cluster =
@@ -732,7 +714,6 @@ public class TestPathBasedCacheRequests {
   @Test(timeout=120000)
   public void testAddingPathBasedCacheDirectivesWhenCachingIsDisabled()
       throws Exception {
-    Assume.assumeTrue(canTestDatanodeCaching());
     HdfsConfiguration conf = createCachingConf();
     conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, false);
     MiniDFSCluster cluster =
@@ -780,7 +761,6 @@ public class TestPathBasedCacheRequests {
 
   @Test(timeout=120000)
   public void testWaitForCachedReplicasInDirectory() throws Exception {
-    Assume.assumeTrue(canTestDatanodeCaching());
     HdfsConfiguration conf = createCachingConf();
     MiniDFSCluster cluster =
       new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
@@ -832,7 +812,6 @@ public class TestPathBasedCacheRequests {
    */
   @Test(timeout=120000)
   public void testReplicationFactor() throws Exception {
-    Assume.assumeTrue(canTestDatanodeCaching());
     HdfsConfiguration conf = createCachingConf();
     MiniDFSCluster cluster =
       new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();

+ 17 - 7
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestJMXGet.java

@@ -28,7 +28,12 @@ import java.io.IOException;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
 import java.io.PrintStream;
+import java.lang.management.ManagementFactory;
 import java.util.Random;
+import java.util.Set;
+
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -92,9 +97,8 @@ public class TestJMXGet {
         fileSize, fileSize, blockSize, (short) 2, seed);
 
     JMXGet jmx = new JMXGet();
-    //jmx.setService("*"); // list all hadoop services
-    //jmx.init();
-    //jmx = new JMXGet();
+    String serviceName = "NameNode";
+    jmx.setService(serviceName);
     jmx.init(); // default lists namenode mbeans only
     assertTrue("error printAllValues", checkPrintAllValues(jmx));
 
@@ -107,6 +111,10 @@ public class TestJMXGet {
         jmx.getValue("NumOpenConnections")));
 
     cluster.shutdown();
+    MBeanServerConnection mbsc = ManagementFactory.getPlatformMBeanServer();
+    ObjectName query = new ObjectName("Hadoop:service=" + serviceName + ",*");
+    Set<ObjectName> names = mbsc.queryNames(query, null);
+    assertTrue("No beans should be registered for " + serviceName, names.isEmpty());
   }
   
   private static boolean checkPrintAllValues(JMXGet jmx) throws Exception {
@@ -140,13 +148,15 @@ public class TestJMXGet {
         fileSize, fileSize, blockSize, (short) 2, seed);
 
     JMXGet jmx = new JMXGet();
-    //jmx.setService("*"); // list all hadoop services
-    //jmx.init();
-    //jmx = new JMXGet();
-    jmx.setService("DataNode");
+    String serviceName = "DataNode";
+    jmx.setService(serviceName);
     jmx.init();
     assertEquals(fileSize, Integer.parseInt(jmx.getValue("BytesWritten")));
 
     cluster.shutdown();
+    MBeanServerConnection mbsc = ManagementFactory.getPlatformMBeanServer();
+    ObjectName query = new ObjectName("Hadoop:service=" + serviceName + ",*");
+    Set<ObjectName> names = mbsc.queryNames(query, null);
+    assertTrue("No beans should be registered for " + serviceName, names.isEmpty());
   }
 }

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java

@@ -69,7 +69,7 @@ public class FadvisedChunkedFile extends ChunkedFile {
     }
     if (manageOsCache && getEndOffset() - getStartOffset() > 0) {
       try {
-        NativeIO.POSIX.posixFadviseIfPossible(identifier,
+        NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
             fd,
             getStartOffset(), getEndOffset() - getStartOffset(),
             NativeIO.POSIX.POSIX_FADV_DONTNEED);

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java

@@ -79,7 +79,7 @@ public class FadvisedFileRegion extends DefaultFileRegion {
   public void transferSuccessful() {
     if (manageOsCache && getCount() > 0) {
       try {
-        NativeIO.POSIX.posixFadviseIfPossible(identifier,
+        NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
            fd, getPosition(), getCount(),
            NativeIO.POSIX.POSIX_FADV_DONTNEED);
       } catch (Throwable t) {

+ 7 - 0
hadoop-yarn-project/CHANGES.txt

@@ -156,6 +156,10 @@ Release 2.3.0 - UNRELEASED
     YARN-1419. TestFifoScheduler.testAppAttemptMetrics fails intermittently
     under jdk7 (Jonathan Eagles via jlowe)
 
+    YARN-744. Race condition in ApplicationMasterService.allocate .. It might
+    process same allocate request twice resulting in additional containers
+    getting allocated. (Omkar Vinit Joshi via bikas)
+
 Release 2.2.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -239,6 +243,9 @@ Release 2.2.1 - UNRELEASED
     YARN-1381. Same relaxLocality appears twice in exception message of
     AMRMClientImpl#checkLocalityRelaxationConflict() (Ted Yu via Sandy Ryza)
 
+    YARN-1407. RM Web UI and REST APIs should uniformly use
+    YarnApplicationState (Sandy Ryza)
+
 Release 2.2.0 - 2013-10-13
 
   INCOMPATIBLE CHANGES

+ 93 - 79
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java

@@ -72,7 +72,6 @@ import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
@@ -81,7 +80,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -97,8 +95,8 @@ public class ApplicationMasterService extends AbstractService implements
   private Server server;
   private final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
-  private final ConcurrentMap<ApplicationAttemptId, AllocateResponse> responseMap =
-      new ConcurrentHashMap<ApplicationAttemptId, AllocateResponse>();
+  private final ConcurrentMap<ApplicationAttemptId, AllocateResponseLock> responseMap =
+      new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
   private final AllocateResponse resync =
       recordFactory.newRecordInstance(AllocateResponse.class);
   private final RMContext rmContext;
@@ -217,21 +215,19 @@ public class ApplicationMasterService extends AbstractService implements
     ApplicationAttemptId applicationAttemptId = authorizeRequest();
 
     ApplicationId appID = applicationAttemptId.getApplicationId();
-    AllocateResponse lastResponse = responseMap.get(applicationAttemptId);
-    if (lastResponse == null) {
-      String message = "Application doesn't exist in cache "
-          + applicationAttemptId;
-      LOG.error(message);
+    AllocateResponseLock lock = responseMap.get(applicationAttemptId);
+    if (lock == null) {
       RMAuditLogger.logFailure(this.rmContext.getRMApps().get(appID).getUser(),
-          AuditConstants.REGISTER_AM, message, "ApplicationMasterService",
+          AuditConstants.REGISTER_AM, "Application doesn't exist in cache "
+              + applicationAttemptId, "ApplicationMasterService",
           "Error in registering application master", appID,
           applicationAttemptId);
-      throw RPCUtil.getRemoteException(message);
+      throwApplicationDoesNotExistInCacheException(applicationAttemptId);
     }
 
     // Allow only one thread in AM to do registerApp at a time.
-    synchronized (lastResponse) {
-
+    synchronized (lock) {
+      AllocateResponse lastResponse = lock.getAllocateResponse();
       if (hasApplicationMasterRegistered(applicationAttemptId)) {
         String message =
             "Application Master is already registered : "
@@ -251,7 +247,7 @@ public class ApplicationMasterService extends AbstractService implements
       // Setting the response id to 0 to identify if the
       // application master is register for the respective attemptid
       lastResponse.setResponseId(0);
-      responseMap.put(applicationAttemptId, lastResponse);
+      lock.setAllocateResponse(lastResponse);
       LOG.info("AM registration " + applicationAttemptId);
       this.rmContext
         .getDispatcher()
@@ -286,17 +282,14 @@ public class ApplicationMasterService extends AbstractService implements
 
     ApplicationAttemptId applicationAttemptId = authorizeRequest();
 
-    AllocateResponse lastResponse = responseMap.get(applicationAttemptId);
-    if (lastResponse == null) {
-      String message = "Application doesn't exist in cache "
-          + applicationAttemptId;
-      LOG.error(message);
-      throw RPCUtil.getRemoteException(message);
+    AllocateResponseLock lock = responseMap.get(applicationAttemptId);
+    if (lock == null) {
+      throwApplicationDoesNotExistInCacheException(applicationAttemptId);
     }
 
     // Allow only one thread in AM to do finishApp at a time.
-    synchronized (lastResponse) {
-
+    synchronized (lock) {
+      
       this.amLivelinessMonitor.receivedPing(applicationAttemptId);
 
       rmContext.getDispatcher().getEventHandler().handle(
@@ -313,6 +306,15 @@ public class ApplicationMasterService extends AbstractService implements
     }
   }
 
+  private void throwApplicationDoesNotExistInCacheException(
+      ApplicationAttemptId appAttemptId)
+      throws InvalidApplicationMasterRequestException {
+    String message = "Application doesn't exist in cache "
+        + appAttemptId;
+    LOG.error(message);
+    throw new InvalidApplicationMasterRequestException(message);
+  }
+  
   /**
    * @param appAttemptId
    * @return true if application is registered for the respective attemptid
@@ -320,10 +322,11 @@ public class ApplicationMasterService extends AbstractService implements
   public boolean hasApplicationMasterRegistered(
       ApplicationAttemptId appAttemptId) {
     boolean hasApplicationMasterRegistered = false;
-    AllocateResponse lastResponse = responseMap.get(appAttemptId);
+    AllocateResponseLock lastResponse = responseMap.get(appAttemptId);
     if (lastResponse != null) {
       synchronized (lastResponse) {
-        if (lastResponse.getResponseId() >= 0) {
+        if (lastResponse.getAllocateResponse() != null
+            && lastResponse.getAllocateResponse().getResponseId() >= 0) {
           hasApplicationMasterRegistered = true;
         }
       }
@@ -340,38 +343,38 @@ public class ApplicationMasterService extends AbstractService implements
     this.amLivelinessMonitor.receivedPing(appAttemptId);
 
     /* check if its in cache */
-    AllocateResponse lastResponse = responseMap.get(appAttemptId);
-    if (lastResponse == null) {
+    AllocateResponseLock lock = responseMap.get(appAttemptId);
+    if (lock == null) {
       LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId);
       return resync;
     }
-    
-    if (!hasApplicationMasterRegistered(appAttemptId)) {
-      String message =
-          "Application Master is trying to allocate before registering for: "
-              + appAttemptId.getApplicationId();
-      LOG.error(message);
-      RMAuditLogger.logFailure(
-        this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
-          .getUser(), AuditConstants.REGISTER_AM, "",
-        "ApplicationMasterService", message, appAttemptId.getApplicationId(),
-        appAttemptId);
-      throw new InvalidApplicationMasterRequestException(message);
-    }
+    synchronized (lock) {
+      AllocateResponse lastResponse = lock.getAllocateResponse();
+      if (!hasApplicationMasterRegistered(appAttemptId)) {
+        String message =
+            "Application Master is trying to allocate before registering for: "
+                + appAttemptId.getApplicationId();
+        LOG.error(message);
+        RMAuditLogger.logFailure(
+            this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
+                .getUser(), AuditConstants.REGISTER_AM, "",
+            "ApplicationMasterService", message,
+            appAttemptId.getApplicationId(),
+            appAttemptId);
+        throw new InvalidApplicationMasterRequestException(message);
+      }
 
-    if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
-      /* old heartbeat */
-      return lastResponse;
-    } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
-      LOG.error("Invalid responseid from appAttemptId " + appAttemptId);
-      // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO:
-      // Reboot is not useful since after AM reboots, it will send register and 
-      // get an exception. Might as well throw an exception here.
-      return resync;
-    } 
-    
-    // Allow only one thread in AM to do heartbeat at a time.
-    synchronized (lastResponse) {
+      if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
+        /* old heartbeat */
+        return lastResponse;
+      } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
+        LOG.error("Invalid responseid from appAttemptId " + appAttemptId);
+        // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO:
+        // Reboot is not useful since after AM reboots, it will send register
+        // and
+        // get an exception. Might as well throw an exception here.
+        return resync;
+      }
 
       // Send the status update to the appAttempt.
       this.rmContext.getDispatcher().getEventHandler().handle(
@@ -380,15 +383,16 @@ public class ApplicationMasterService extends AbstractService implements
 
       List<ResourceRequest> ask = request.getAskList();
       List<ContainerId> release = request.getReleaseList();
-      
-      ResourceBlacklistRequest blacklistRequest = request.getResourceBlacklistRequest();
-      List<String> blacklistAdditions = 
-          (blacklistRequest != null) ? 
+
+      ResourceBlacklistRequest blacklistRequest =
+          request.getResourceBlacklistRequest();
+      List<String> blacklistAdditions =
+          (blacklistRequest != null) ?
               blacklistRequest.getBlacklistAdditions() : null;
-      List<String> blacklistRemovals = 
-          (blacklistRequest != null) ? 
+      List<String> blacklistRemovals =
+          (blacklistRequest != null) ?
               blacklistRequest.getBlacklistRemovals() : null;
-      
+
       // sanity check
       try {
         RMServerUtils.validateResourceRequests(ask,
@@ -443,7 +447,7 @@ public class ApplicationMasterService extends AbstractService implements
               rmNode.getTotalCapability(), numContainers,
               rmNode.getHealthReport(),
               rmNode.getLastHealthReportTime());
-          
+
           updatedNodeReports.add(report);
         }
         allocateResponse.setUpdatedNodes(updatedNodeReports);
@@ -454,11 +458,12 @@ public class ApplicationMasterService extends AbstractService implements
           .pullJustFinishedContainers());
       allocateResponse.setResponseId(lastResponse.getResponseId() + 1);
       allocateResponse.setAvailableResources(allocation.getResourceLimit());
-      
+
       allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
-   
+
       // add preemption to the allocateResponse message (if any)
-      allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation));
+      allocateResponse
+          .setPreemptionMessage(generatePreemptionMessage(allocation));
 
       // Adding NMTokens for allocated containers.
       if (!allocation.getContainers().isEmpty()) {
@@ -466,21 +471,14 @@ public class ApplicationMasterService extends AbstractService implements
             .createAndGetNMTokens(app.getUser(), appAttemptId,
                 allocation.getContainers()));
       }
-
-      // before returning response, verify in sync
-      AllocateResponse oldResponse =
-          responseMap.put(appAttemptId, allocateResponse);
-      if (oldResponse == null) {
-        // appAttempt got unregistered, remove it back out
-        responseMap.remove(appAttemptId);
-        String message = "App Attempt removed from the cache during allocate"
-            + appAttemptId;
-        LOG.error(message);
-        return resync;
-      }
-
+      /*
+       * As we are updating the response inside the lock object so we don't
+       * need to worry about unregister call occurring in between (which
+       * removes the lock object).
+       */
+      lock.setAllocateResponse(allocateResponse);
       return allocateResponse;
-    }
+    }    
   }
   
   private PreemptionMessage generatePreemptionMessage(Allocation allocation){
@@ -542,7 +540,7 @@ public class ApplicationMasterService extends AbstractService implements
     // attemptID get registered
     response.setResponseId(-1);
     LOG.info("Registering app attempt : " + attemptId);
-    responseMap.put(attemptId, response);
+    responseMap.put(attemptId, new AllocateResponseLock(response));
     rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId);
   }
 
@@ -564,4 +562,20 @@ public class ApplicationMasterService extends AbstractService implements
     }
     super.serviceStop();
   }
-}
+  
+  public static class AllocateResponseLock {
+    private AllocateResponse response;
+    
+    public AllocateResponseLock(AllocateResponse response) {
+      this.response = response;
+    }
+    
+    public synchronized AllocateResponse getAllocateResponse() {
+      return response;
+    }
+    
+    public synchronized void setAllocateResponse(AllocateResponse response) {
+      this.response = response;
+    }
+  }
+}

+ 9 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java

@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -653,7 +654,14 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
   public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
     this.readLock.lock();
     try {
-      return scheduler.getAppResourceUsageReport(this.getAppAttemptId());
+      ApplicationResourceUsageReport report =
+          scheduler.getAppResourceUsageReport(this.getAppAttemptId());
+      if (report == null) {
+        Resource none = Resource.newInstance(0, 0);
+        report = ApplicationResourceUsageReport.newInstance(0, 0, none, none,
+            none);
+      }
+      return report;
     } finally {
       this.readLock.unlock();
     }

+ 5 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java

@@ -29,9 +29,9 @@ import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
@@ -65,18 +65,18 @@ class AppsBlock extends HtmlBlock {
             th(".progress", "Progress").
             th(".ui", "Tracking UI")._()._().
         tbody();
-    Collection<RMAppState> reqAppStates = null;
+    Collection<YarnApplicationState> reqAppStates = null;
     String reqStateString = $(APP_STATE);
     if (reqStateString != null && !reqStateString.isEmpty()) {
       String[] appStateStrings = reqStateString.split(",");
-      reqAppStates = new HashSet<RMAppState>(appStateStrings.length);
+      reqAppStates = new HashSet<YarnApplicationState>(appStateStrings.length);
       for(String stateString : appStateStrings) {
-        reqAppStates.add(RMAppState.valueOf(stateString));
+        reqAppStates.add(YarnApplicationState.valueOf(stateString));
       }
     }
     StringBuilder appsTableData = new StringBuilder("[\n");
     for (RMApp app : apps.values()) {
-      if (reqAppStates != null && !reqAppStates.contains(app.getState())) {
+      if (reqAppStates != null && !reqAppStates.contains(app.createApplicationState())) {
         continue;
       }
       AppInfo appInfo = new AppInfo(app, true);

+ 4 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java

@@ -30,10 +30,10 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo;
@@ -77,13 +77,13 @@ public class FairSchedulerAppsBlock extends HtmlBlock {
             th(".progress", "Progress").
             th(".ui", "Tracking UI")._()._().
         tbody();
-    Collection<RMAppState> reqAppStates = null;
+    Collection<YarnApplicationState> reqAppStates = null;
     String reqStateString = $(APP_STATE);
     if (reqStateString != null && !reqStateString.isEmpty()) {
       String[] appStateStrings = reqStateString.split(",");
-      reqAppStates = new HashSet<RMAppState>(appStateStrings.length);
+      reqAppStates = new HashSet<YarnApplicationState>(appStateStrings.length);
       for(String stateString : appStateStrings) {
-        reqAppStates.add(RMAppState.valueOf(stateString));
+        reqAppStates.add(YarnApplicationState.valueOf(stateString));
       }
     }
     StringBuilder appsTableData = new StringBuilder("[\n");

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NavBlock.java

@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.webapp;
 
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.LI;
@@ -38,7 +38,7 @@ public class NavBlock extends HtmlBlock {
           li().a(url("apps"), "Applications").
             ul();
     subAppsList.li()._();
-    for (RMAppState state : RMAppState.values()) {
+    for (YarnApplicationState state : YarnApplicationState.values()) {
       subAppsList.
               li().a(url("apps", state.toString()), state.toString())._();
     }

+ 6 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java

@@ -21,8 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp;
 import static org.apache.hadoop.yarn.util.StringHelper.join;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.QUEUE_NAME;
 
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
@@ -62,12 +62,11 @@ public class RmController extends Controller {
   public void scheduler() {
     // limit applications to those in states relevant to scheduling
     set(YarnWebParams.APP_STATE, StringHelper.cjoin(
-        RMAppState.NEW.toString(),
-        RMAppState.NEW_SAVING.toString(),
-        RMAppState.SUBMITTED.toString(),
-        RMAppState.ACCEPTED.toString(),
-        RMAppState.RUNNING.toString(),
-        RMAppState.FINISHING.toString()));
+        YarnApplicationState.NEW.toString(),
+        YarnApplicationState.NEW_SAVING.toString(),
+        YarnApplicationState.SUBMITTED.toString(),
+        YarnApplicationState.ACCEPTED.toString(),
+        YarnApplicationState.RUNNING.toString()));
 
     ResourceManager rm = getInstance(ResourceManager.class);
     ResourceScheduler rs = rm.getResourceScheduler();

+ 7 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@@ -58,7 +59,7 @@ public class AppInfo {
   protected String user;
   protected String name;
   protected String queue;
-  protected RMAppState state;
+  protected YarnApplicationState state;
   protected FinalApplicationStatus finalStatus;
   protected float progress;
   protected String trackingUI;
@@ -88,12 +89,12 @@ public class AppInfo {
 
     if (app != null) {
       String trackingUrl = app.getTrackingUrl();
-      this.state = app.getState();
+      this.state = app.createApplicationState();
       this.trackingUrlIsNotReady = trackingUrl == null || trackingUrl.isEmpty()
-          || RMAppState.NEW == this.state
-          || RMAppState.NEW_SAVING == this.state
-          || RMAppState.SUBMITTED == this.state
-          || RMAppState.ACCEPTED == this.state;
+          || YarnApplicationState.NEW == this.state
+          || YarnApplicationState.NEW_SAVING == this.state
+          || YarnApplicationState.SUBMITTED == this.state
+          || YarnApplicationState.ACCEPTED == this.state;
       this.trackingUI = this.trackingUrlIsNotReady ? "UNASSIGNED" : (app
           .getFinishTime() == 0 ? "ApplicationMaster" : "History");
       if (!trackingUrlIsNotReady) {

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java

@@ -168,8 +168,8 @@ public abstract class MockAsm extends MockApps {
     final long start = 123456 + i * 1000;
     final long finish = 234567 + i * 1000;
     final String type = YarnConfiguration.DEFAULT_APPLICATION_TYPE;
-    RMAppState[] allStates = RMAppState.values();
-    final RMAppState state = allStates[i % allStates.length];
+    YarnApplicationState[] allStates = YarnApplicationState.values();
+    final YarnApplicationState state = allStates[i % allStates.length];
     final int maxAppAttempts = i % 1000;
     return new ApplicationBase() {
       @Override
@@ -210,7 +210,7 @@ public abstract class MockAsm extends MockApps {
         return null;
       }
       @Override
-      public RMAppState getState() {
+      public YarnApplicationState createApplicationState() {
         return state;
       }
       @Override

+ 5 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -39,7 +40,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.MockAsm;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@@ -93,12 +93,14 @@ public class TestRMWebApp {
       }
     });
     RmView rmViewInstance = injector.getInstance(RmView.class);
-    rmViewInstance.set(YarnWebParams.APP_STATE, RMAppState.RUNNING.toString());
+    rmViewInstance.set(YarnWebParams.APP_STATE,
+        YarnApplicationState.RUNNING.toString());
     rmViewInstance.render();
     WebAppTests.flushOutput(injector);
 
     rmViewInstance.set(YarnWebParams.APP_STATE, StringHelper.cjoin(
-        RMAppState.ACCEPTED.toString(), RMAppState.RUNNING.toString()));
+        YarnApplicationState.ACCEPTED.toString(),
+        YarnApplicationState.RUNNING.toString()));
     rmViewInstance.render();
     WebAppTests.flushOutput(injector);
   }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ResourceManagerRest.apt.vm

@@ -1445,7 +1445,7 @@ _01_000001</amContainerLogs>
 *---------------+--------------+--------------------------------+
 | queue | string  | The queue the application was submitted to|
 *---------------+--------------+--------------------------------+
-| state         | string | The application state according to the ResourceManager - valid values are: NEW, SUBMITTED, ACCEPTED, RUNNING, FINISHED, FAILED, KILLED|
+| state         | string | The application state according to the ResourceManager - valid values are members of the YarnApplicationState enum: NEW, NEW_SAVING, SUBMITTED, ACCEPTED, RUNNING, FINISHED, FAILED, KILLED|
 *---------------+--------------+--------------------------------+
 | finalStatus | string | The final status of the application if finished - reported by the application itself - valid values are: UNDEFINED, SUCCEEDED, FAILED, KILLED|
 *---------------+--------------+--------------------------------+