Browse Source

Merge branch 'trunk' into rewrite-junit5-hdfs

Siyao Meng 3 năm trước cách đây
mục cha
commit
4900d04823
38 tập tin đã thay đổi với 1158 bổ sung262 xóa
  1. 18 5
      .github/pull_request_template.md
  2. 1 1
      LICENSE-binary
  3. 0 11
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java
  4. 3 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java
  5. 261 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipCompressor.java
  6. 5 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java
  7. 111 24
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
  8. 28 3
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
  9. 5 4
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt
  10. 15 12
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/configuration_test.cc
  11. 0 102
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/configuration_test.h
  12. 8 5
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_builder_test.cc
  13. 3 1
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_config_connect_bugs.cc
  14. 16 13
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_configuration_test.cc
  15. 19 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/utils/CMakeLists.txt
  16. 69 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/utils/temp-dir.cc
  17. 53 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/utils/temp-dir.h
  18. 63 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/utils/temp-file.cc
  19. 56 0
      hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/utils/temp-file.h
  20. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
  21. 14 10
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
  22. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java
  23. 11 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
  24. 94 15
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
  25. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatusWithBackoffMonitor.java
  26. 1 1
      hadoop-project/pom.xml
  27. 14 6
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
  28. 47 24
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
  29. 5 2
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/InvalidAbfsRestOperationException.java
  30. 3 1
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java
  31. 3 3
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
  32. 2 2
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
  33. 5 1
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
  34. 10 0
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
  35. 112 3
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
  36. 28 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
  37. 14 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
  38. 58 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java

+ 18 - 5
.github/pull_request_template.md

@@ -1,6 +1,19 @@
-## NOTICE
+<!--
+  Thanks for sending a pull request!
+    1. If this is your first time, please read our contributor guidelines: https://cwiki.apache.org/confluence/display/HADOOP/How+To+Contribute
+    2. Make sure your PR title starts with JIRA issue id, e.g., 'HADOOP-17799. Your PR title ...'.
+-->
+
+### Description of PR
+
+
+### How was this patch tested?
+
+
+### For code changes:
+
+- [ ] Does the title or this PR starts with the corresponding JIRA issue id (e.g. 'HADOOP-17799. Your PR title ...')?
+- [ ] Object storage: have the integration tests been executed and the endpoint declared according to the connector-specific documentation?
+- [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
+- [ ] If applicable, have you updated the `LICENSE`, `LICENSE-binary`, `NOTICE-binary` files?
 
-Please create an issue in ASF JIRA before opening a pull request,
-and you need to set the title of the pull request which starts with
-the corresponding JIRA issue number. (e.g. HADOOP-XXXXX. Fix a typo in YYY.)
-For more details, please see https://cwiki.apache.org/confluence/display/HADOOP/How+To+Contribute

+ 1 - 1
LICENSE-binary

@@ -214,7 +214,7 @@ com.aliyun:aliyun-java-sdk-core:3.4.0
 com.aliyun:aliyun-java-sdk-ecs:4.2.0
 com.aliyun:aliyun-java-sdk-ram:3.0.0
 com.aliyun:aliyun-java-sdk-sts:3.0.0
-com.aliyun.oss:aliyun-sdk-oss:3.4.1
+com.aliyun.oss:aliyun-sdk-oss:3.13.0
 com.amazonaws:aws-java-sdk-bundle:1.11.901
 com.cedarsoftware:java-util:1.9.0
 com.cedarsoftware:json-io:2.5.1

+ 0 - 11
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java

@@ -36,8 +36,6 @@ import org.apache.hadoop.io.compress.CompressionOutputStream;
 import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.io.compress.zlib.ZlibFactory;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
@@ -47,7 +45,6 @@ import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.NativeCodeLoader;
 import org.apache.hadoop.util.MergeSort;
 import org.apache.hadoop.util.PriorityQueue;
 import org.apache.hadoop.util.Time;
@@ -1180,14 +1177,6 @@ public class SequenceFile {
           new Metadata() : metadataOption.getValue();
       this.compress = compressionTypeOption.getValue();
       final CompressionCodec codec = compressionTypeOption.getCodec();
-      if (codec != null &&
-          (codec instanceof GzipCodec) &&
-          !NativeCodeLoader.isNativeCodeLoaded() &&
-          !ZlibFactory.isNativeZlibLoaded(conf)) {
-        throw new IllegalArgumentException("SequenceFile doesn't work with " +
-                                           "GzipCodec without native-hadoop " +
-                                           "code!");
-      }
       this.syncInterval = (syncIntervalOption == null) ?
           SYNC_INTERVAL :
           syncIntervalOption.getValue();

+ 3 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java

@@ -29,6 +29,7 @@ import java.util.zip.GZIPOutputStream;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.zlib.BuiltInGzipCompressor;
 import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor;
 import org.apache.hadoop.io.compress.zlib.ZlibCompressor;
 import org.apache.hadoop.io.compress.zlib.ZlibDecompressor;
@@ -154,14 +155,14 @@ public class GzipCodec extends DefaultCodec {
   public Compressor createCompressor() {
     return (ZlibFactory.isNativeZlibLoaded(conf))
       ? new GzipZlibCompressor(conf)
-      : null;
+      : new BuiltInGzipCompressor(conf);
   }
 
   @Override
   public Class<? extends Compressor> getCompressorType() {
     return ZlibFactory.isNativeZlibLoaded(conf)
       ? GzipZlibCompressor.class
-      : null;
+      : BuiltInGzipCompressor.class;
   }
 
   @Override

+ 261 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipCompressor.java

@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.zlib;
+
+import java.io.IOException;
+import java.util.zip.Checksum;
+import java.util.zip.Deflater;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.DoNotPool;
+import org.apache.hadoop.util.DataChecksum;
+
+/**
+ * A {@link Compressor} based on the popular gzip compressed file format.
+ * http://www.gzip.org/
+ */
+@DoNotPool
+public class BuiltInGzipCompressor implements Compressor {
+
+  /**
+   * Fixed ten-byte gzip header. See {@link GZIPOutputStream}'s source for
+   * details.
+   */
+  private static final byte[] GZIP_HEADER = new byte[]{
+      0x1f, (byte) 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
+
+  // The trailer will be overwritten based on crc and output size.
+  private static final byte[] GZIP_TRAILER = new byte[]{
+      0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
+
+  private static final int GZIP_HEADER_LEN = GZIP_HEADER.length;
+  private static final int GZIP_TRAILER_LEN = GZIP_TRAILER.length;
+
+  private Deflater deflater;
+
+  private int headerOff = 0;
+  private int trailerOff = 0;
+
+  private int numExtraBytesWritten = 0;
+
+  private int currentBufLen = 0;
+  private int accuBufLen = 0;
+
+  private final Checksum crc = DataChecksum.newCrc32();
+
+  private BuiltInGzipDecompressor.GzipStateLabel state;
+
+  public BuiltInGzipCompressor(Configuration conf) {
+    init(conf);
+  }
+
+  @Override
+  public boolean finished() {
+    // Only if the trailer is also written, it is thought as finished.
+    return state == BuiltInGzipDecompressor.GzipStateLabel.FINISHED && deflater.finished();
+  }
+
+  @Override
+  public boolean needsInput() {
+    return deflater.needsInput() && state != BuiltInGzipDecompressor.GzipStateLabel.TRAILER_CRC;
+  }
+
+  @Override
+  public int compress(byte[] b, int off, int len) throws IOException {
+    if (finished()) {
+      throw new IOException("compress called on finished compressor");
+    }
+
+    int compressedBytesWritten = 0;
+
+    if (currentBufLen <= 0) {
+      return compressedBytesWritten;
+    }
+
+    // If we are not within uncompressed data yet, output the header.
+    if (state == BuiltInGzipDecompressor.GzipStateLabel.HEADER_BASIC) {
+      int outputHeaderSize = writeHeader(b, off, len);
+      numExtraBytesWritten += outputHeaderSize;
+
+      compressedBytesWritten += outputHeaderSize;
+
+      if (outputHeaderSize == len) {
+        return compressedBytesWritten;
+      }
+
+      off += outputHeaderSize;
+      len -= outputHeaderSize;
+    }
+
+    if (state == BuiltInGzipDecompressor.GzipStateLabel.INFLATE_STREAM) {
+      // now compress it into b[]
+      int deflated = deflater.deflate(b, off, len);
+
+      compressedBytesWritten += deflated;
+      off += deflated;
+      len -= deflated;
+
+      // All current input are processed. And `finished` is called. Going to output trailer.
+      if (deflater.finished()) {
+        state = BuiltInGzipDecompressor.GzipStateLabel.TRAILER_CRC;
+        fillTrailer();
+      } else {
+        return compressedBytesWritten;
+      }
+    }
+
+    if (state == BuiltInGzipDecompressor.GzipStateLabel.TRAILER_CRC) {
+      int outputTrailerSize = writeTrailer(b, off, len);
+      numExtraBytesWritten += outputTrailerSize;
+      compressedBytesWritten += outputTrailerSize;
+    }
+
+    return compressedBytesWritten;
+  }
+
+  @Override
+  public long getBytesRead() {
+    return deflater.getTotalIn();
+  }
+
+  @Override
+  public long getBytesWritten() {
+    return numExtraBytesWritten + deflater.getTotalOut();
+  }
+
+  @Override
+  public void end() {
+    deflater.end();
+  }
+
+  @Override
+  public void finish() {
+    deflater.finish();
+  }
+
+  private void init(Configuration conf) {
+    ZlibCompressor.CompressionLevel level = ZlibFactory.getCompressionLevel(conf);
+    ZlibCompressor.CompressionStrategy strategy = ZlibFactory.getCompressionStrategy(conf);
+
+    // 'true' (nowrap) => Deflater will handle raw deflate stream only
+    deflater = new Deflater(level.compressionLevel(), true);
+    deflater.setStrategy(strategy.compressionStrategy());
+
+    state = BuiltInGzipDecompressor.GzipStateLabel.HEADER_BASIC;
+  }
+
+  @Override
+  public void reinit(Configuration conf) {
+    init(conf);
+    numExtraBytesWritten = 0;
+    currentBufLen = 0;
+    headerOff = 0;
+    trailerOff = 0;
+    crc.reset();
+    accuBufLen = 0;
+  }
+
+  @Override
+  public void reset() {
+    deflater.reset();
+    state = BuiltInGzipDecompressor.GzipStateLabel.HEADER_BASIC;
+    numExtraBytesWritten = 0;
+    currentBufLen = 0;
+    headerOff = 0;
+    trailerOff = 0;
+    crc.reset();
+    accuBufLen = 0;
+  }
+
+  @Override
+  public void setDictionary(byte[] b, int off, int len) {
+    deflater.setDictionary(b, off, len);
+  }
+
+  @Override
+  public void setInput(byte[] b, int off, int len) {
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    deflater.setInput(b, off, len);
+    crc.update(b, off, len);  // CRC-32 is on uncompressed data
+    currentBufLen = len;
+    accuBufLen += currentBufLen;
+  }
+
+  private int writeHeader(byte[] b, int off, int len) {
+    if (len <= 0) {
+      return 0;
+    }
+
+    int n = Math.min(len, GZIP_HEADER_LEN - headerOff);
+    System.arraycopy(GZIP_HEADER, headerOff, b, off, n);
+    headerOff += n;
+
+    // Completes header output.
+    if (headerOff == GZIP_HEADER_LEN) {
+      state = BuiltInGzipDecompressor.GzipStateLabel.INFLATE_STREAM;
+    }
+
+    return n;
+  }
+
+  private void fillTrailer() {
+    if (state == BuiltInGzipDecompressor.GzipStateLabel.TRAILER_CRC) {
+      int streamCrc = (int) crc.getValue();
+      GZIP_TRAILER[0] = (byte) (streamCrc & 0x000000ff);
+      GZIP_TRAILER[1] = (byte) ((streamCrc & 0x0000ff00) >> 8);
+      GZIP_TRAILER[2] = (byte) ((streamCrc & 0x00ff0000) >> 16);
+      GZIP_TRAILER[3] = (byte) ((streamCrc & 0xff000000) >> 24);
+
+      GZIP_TRAILER[4] = (byte) (accuBufLen & 0x000000ff);
+      GZIP_TRAILER[5] = (byte) ((accuBufLen & 0x0000ff00) >> 8);
+      GZIP_TRAILER[6] = (byte) ((accuBufLen & 0x00ff0000) >> 16);
+      GZIP_TRAILER[7] = (byte) ((accuBufLen & 0xff000000) >> 24);
+
+      crc.reset();
+      accuBufLen = 0;
+    }
+  }
+
+  private int writeTrailer(byte[] b, int off, int len) {
+    if (len <= 0) {
+      return 0;
+    }
+
+    int n = Math.min(len, GZIP_TRAILER_LEN - trailerOff);
+    System.arraycopy(GZIP_TRAILER, trailerOff, b, off, n);
+    trailerOff += n;
+
+    if (trailerOff == GZIP_TRAILER_LEN) {
+      state = BuiltInGzipDecompressor.GzipStateLabel.FINISHED;
+      currentBufLen = 0;
+      headerOff = 0;
+      trailerOff = 0;
+    }
+
+    return n;
+  }
+}

+ 5 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java

@@ -68,7 +68,7 @@ public class BuiltInGzipDecompressor implements Decompressor {
    * (Technically, the private variables localBuf through hasHeaderCRC are
    * also part of the state, so this enum is merely the label for it.)
    */
-  private enum GzipStateLabel {
+  public enum GzipStateLabel {
     /**
      * Immediately prior to or (strictly) within the 10-byte basic gzip header.
      */
@@ -93,6 +93,10 @@ public class BuiltInGzipDecompressor implements Decompressor {
      * Immediately prior to or within the main compressed (deflate) data stream.
      */
     DEFLATE_STREAM,
+    /**
+     * Immediately prior to or within the main uncompressed (inflate) data stream.
+     */
+    INFLATE_STREAM,
     /**
      * Immediately prior to or (strictly) within the 4-byte uncompressed CRC.
      */

+ 111 - 24
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java

@@ -64,6 +64,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.compress.bzip2.Bzip2Factory;
+import org.apache.hadoop.io.compress.zlib.BuiltInGzipCompressor;
 import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor;
 import org.apache.hadoop.io.compress.zlib.BuiltInZlibDeflater;
 import org.apache.hadoop.io.compress.zlib.BuiltInZlibInflater;
@@ -101,6 +102,14 @@ public class TestCodec {
 
   @Test
   public void testGzipCodec() throws IOException {
+    Configuration conf = new Configuration();
+    if (ZlibFactory.isNativeZlibLoaded(conf)) {
+      codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.GzipCodec");
+      codecTest(conf, seed, count, "org.apache.hadoop.io.compress.GzipCodec");
+    }
+    // without hadoop-native installed.
+    ZlibFactory.setNativeZlibLoaded(false);
+    assumeTrue(ZlibFactory.isNativeZlibLoaded(conf));
     codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.GzipCodec");
     codecTest(conf, seed, count, "org.apache.hadoop.io.compress.GzipCodec");
   }
@@ -467,15 +476,15 @@ public class TestCodec {
                             "org.apache.hadoop.io.compress.GzipCodec");
       codecTestWithNOCompression(conf,
                          "org.apache.hadoop.io.compress.DefaultCodec");
-    } else {
-      LOG.warn("testCodecInitWithCompressionLevel for native skipped"
-               + ": native libs not loaded");
     }
     conf = new Configuration();
     // don't use native libs
     ZlibFactory.setNativeZlibLoaded(false);
+    LOG.info("testCodecInitWithCompressionLevel without native libs");
     codecTestWithNOCompression( conf,
                          "org.apache.hadoop.io.compress.DefaultCodec");
+    codecTestWithNOCompression(conf,
+            "org.apache.hadoop.io.compress.GzipCodec");
   }
 
   @Test
@@ -550,6 +559,23 @@ public class TestCodec {
     sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.DeflateCodec", 1000000);
   }
 
+  @Test
+  public void testSequenceFileGzipCodec() throws IOException, ClassNotFoundException,
+          InstantiationException, IllegalAccessException {
+    Configuration conf = new Configuration();
+    if (ZlibFactory.isNativeZlibLoaded(conf)) {
+      sequenceFileCodecTest(conf, 100, "org.apache.hadoop.io.compress.GzipCodec", 5);
+      sequenceFileCodecTest(conf, 100, "org.apache.hadoop.io.compress.GzipCodec", 100);
+      sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.GzipCodec", 1000000);
+    }
+    // without hadoop-native installed.
+    ZlibFactory.setNativeZlibLoaded(false);
+    assumeTrue(ZlibFactory.isNativeZlibLoaded(conf));
+    sequenceFileCodecTest(conf, 100, "org.apache.hadoop.io.compress.GzipCodec", 5);
+    sequenceFileCodecTest(conf, 100, "org.apache.hadoop.io.compress.GzipCodec", 100);
+    sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.GzipCodec", 1000000);
+  }
+
   private static void sequenceFileCodecTest(Configuration conf, int lines, 
                                 String codecClass, int blockSize) 
     throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException {
@@ -853,16 +879,16 @@ public class TestCodec {
     // and try to read it back via the regular GZIPInputStream.
 
     // Use native libs per the parameter
-    Configuration conf = new Configuration();
+    Configuration hadoopConf = new Configuration();
     if (useNative) {
-      assumeTrue(ZlibFactory.isNativeZlibLoaded(conf));
+      assumeTrue(ZlibFactory.isNativeZlibLoaded(hadoopConf));
     } else {
       assertFalse("ZlibFactory is using native libs against request",
-          ZlibFactory.isNativeZlibLoaded(conf));
+          ZlibFactory.isNativeZlibLoaded(hadoopConf));
     }
 
     // Ensure that the CodecPool has a BuiltInZlibDeflater in it.
-    Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf);
+    Compressor zlibCompressor = ZlibFactory.getZlibCompressor(hadoopConf);
     assertNotNull("zlibCompressor is null!", zlibCompressor);
     assertTrue("ZlibFactory returned unexpected deflator",
           useNative ? zlibCompressor instanceof ZlibCompressor
@@ -871,37 +897,47 @@ public class TestCodec {
     CodecPool.returnCompressor(zlibCompressor);
 
     // Create a GZIP text file via the Compressor interface.
-    CompressionCodecFactory ccf = new CompressionCodecFactory(conf);
+    CompressionCodecFactory ccf = new CompressionCodecFactory(hadoopConf);
     CompressionCodec codec = ccf.getCodec(new Path("foo.gz"));
     assertTrue("Codec for .gz file is not GzipCodec", 
                codec instanceof GzipCodec);
 
-    final String msg = "This is the message we are going to compress.";
     final String fileName = new Path(GenericTestUtils.getTempPath(
         "testGzipCodecWrite.txt.gz")).toString();
 
     BufferedWriter w = null;
     Compressor gzipCompressor = CodecPool.getCompressor(codec);
-    if (null != gzipCompressor) {
-      // If it gives us back a Compressor, we should be able to use this
-      // to write files we can then read back with Java's gzip tools.
-      OutputStream os = new CompressorStream(new FileOutputStream(fileName),
-          gzipCompressor);
-      w = new BufferedWriter(new OutputStreamWriter(os));
-      w.write(msg);
-      w.close();
-      CodecPool.returnCompressor(gzipCompressor);
-
-      verifyGzipFile(fileName, msg);
+    // When it gives us back a Compressor, we should be able to use this
+    // to write files we can then read back with Java's gzip tools.
+    OutputStream os = new CompressorStream(new FileOutputStream(fileName),
+        gzipCompressor);
+
+    // Call `write` multiple times.
+    int bufferSize = 10000;
+    char[] inputBuffer = new char[bufferSize];
+    Random rand = new Random();
+    for (int i = 0; i < bufferSize; i++) {
+      inputBuffer[i] = (char) ('a' + rand.nextInt(26));
+    }
+    w = new BufferedWriter(new OutputStreamWriter(os), bufferSize);
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < 10; i++) {
+      w.write(inputBuffer);
+      sb.append(inputBuffer);
     }
+    w.close();
+    CodecPool.returnCompressor(gzipCompressor);
+    verifyGzipFile(fileName, sb.toString());
+
 
     // Create a gzip text file via codec.getOutputStream().
     w = new BufferedWriter(new OutputStreamWriter(
-        codec.createOutputStream(new FileOutputStream(fileName))));
-    w.write(msg);
+            codec.createOutputStream(new FileOutputStream(fileName))));
+    for (int i = 0; i < 10; i++) {
+      w.write(inputBuffer);
+    }
     w.close();
-
-    verifyGzipFile(fileName, msg);
+    verifyGzipFile(fileName, sb.toString());
   }
 
   @Test
@@ -915,6 +951,57 @@ public class TestCodec {
   public void testGzipNativeCodecWrite() throws IOException {
     testGzipCodecWrite(true);
   }
+
+  @Test
+  public void testCodecPoolAndGzipCompressor() {
+    // BuiltInZlibDeflater should not be used as the GzipCodec compressor.
+    // Assert that this is the case.
+
+    // Don't use native libs for this test.
+    Configuration conf = new Configuration();
+    ZlibFactory.setNativeZlibLoaded(false);
+    assertFalse("ZlibFactory is using native libs against request",
+            ZlibFactory.isNativeZlibLoaded(conf));
+
+    // This should give us a BuiltInZlibDeflater.
+    Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf);
+    assertNotNull("zlibCompressor is null!", zlibCompressor);
+    assertTrue("ZlibFactory returned unexpected deflator",
+            zlibCompressor instanceof BuiltInZlibDeflater);
+    // its createOutputStream() just wraps the existing stream in a
+    // java.util.zip.GZIPOutputStream.
+    CompressionCodecFactory ccf = new CompressionCodecFactory(conf);
+    CompressionCodec codec = ccf.getCodec(new Path("foo.gz"));
+    assertTrue("Codec for .gz file is not GzipCodec",
+            codec instanceof GzipCodec);
+
+    // make sure we don't get a null compressor
+    Compressor codecCompressor = codec.createCompressor();
+    if (null == codecCompressor) {
+      fail("Got null codecCompressor");
+    }
+
+    // Asking the CodecPool for a compressor for GzipCodec
+    // should not return null
+    Compressor poolCompressor = CodecPool.getCompressor(codec);
+    if (null == poolCompressor) {
+      fail("Got null poolCompressor");
+    }
+    // return a couple compressors
+    CodecPool.returnCompressor(zlibCompressor);
+    CodecPool.returnCompressor(poolCompressor);
+    Compressor poolCompressor2 = CodecPool.getCompressor(codec);
+    if (poolCompressor.getClass() == BuiltInGzipCompressor.class) {
+      if (poolCompressor == poolCompressor2) {
+        fail("Reused java gzip compressor in pool");
+      }
+    } else {
+      if (poolCompressor != poolCompressor2) {
+        fail("Did not reuse native gzip compressor in pool");
+      }
+    }
+  }
+
   @Test
   public void testCodecPoolAndGzipDecompressor() {
     // BuiltInZlibInflater should not be used as the GzipCodec decompressor.

+ 28 - 3
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java

@@ -418,6 +418,28 @@ public abstract class GenericTestUtils {
   public static void waitFor(final Supplier<Boolean> check,
       final long checkEveryMillis, final long waitForMillis)
       throws TimeoutException, InterruptedException {
+    waitFor(check, checkEveryMillis, waitForMillis, null);
+  }
+
+  /**
+   * Wait for the specified test to return true. The test will be performed
+   * initially and then every {@code checkEveryMillis} until at least
+   * {@code waitForMillis} time has expired. If {@code check} is null or
+   * {@code waitForMillis} is less than {@code checkEveryMillis} this method
+   * will throw an {@link IllegalArgumentException}.
+   *
+   * @param check the test to perform.
+   * @param checkEveryMillis how often to perform the test.
+   * @param waitForMillis the amount of time after which no more tests will be
+   * performed.
+   * @param errorMsg error message to provide in TimeoutException.
+   * @throws TimeoutException if the test does not return true in the allotted
+   * time.
+   * @throws InterruptedException if the method is interrupted while waiting.
+   */
+  public static void waitFor(final Supplier<Boolean> check,
+      final long checkEveryMillis, final long waitForMillis,
+      final String errorMsg) throws TimeoutException, InterruptedException {
     Objects.requireNonNull(check, ERROR_MISSING_ARGUMENT);
     if (waitForMillis < checkEveryMillis) {
       throw new IllegalArgumentException(ERROR_INVALID_ARGUMENT);
@@ -432,9 +454,12 @@ public abstract class GenericTestUtils {
     }
 
     if (!result) {
-      throw new TimeoutException("Timed out waiting for condition. " +
-          "Thread diagnostics:\n" +
-          TimedOutTestsListener.buildThreadDiagnosticString());
+      final String exceptionErrorMsg = "Timed out waiting for condition. "
+          + (org.apache.commons.lang3.StringUtils.isNotEmpty(errorMsg)
+          ? "Error Message: " + errorMsg : "")
+          + "\nThread diagnostics:\n" +
+          TimedOutTestsListener.buildThreadDiagnosticString();
+      throw new TimeoutException(exceptionErrorMsg);
     }
   }
 

+ 5 - 4
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt

@@ -66,6 +66,7 @@ endfunction(add_memcheck_test)
 #
 
 add_subdirectory(x-platform)
+add_subdirectory(utils)
 
 add_executable(uri_test uri_test.cc)
 target_link_libraries(uri_test common gmock_main ${CMAKE_THREAD_LIBS_INIT})
@@ -96,12 +97,12 @@ add_executable(node_exclusion_test node_exclusion_test.cc)
 target_link_libraries(node_exclusion_test fs gmock_main common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
 add_memcheck_test(node_exclusion node_exclusion_test)
 
-add_executable(configuration_test $<TARGET_OBJECTS:x_platform_obj> configuration_test.cc)
+add_executable(configuration_test $<TARGET_OBJECTS:test_utils> configuration_test.cc)
 target_include_directories(configuration_test PRIVATE ../lib)
 target_link_libraries(configuration_test common gmock_main ${CMAKE_THREAD_LIBS_INIT})
 add_memcheck_test(configuration configuration_test)
 
-add_executable(hdfs_configuration_test $<TARGET_OBJECTS:x_platform_obj> hdfs_configuration_test.cc)
+add_executable(hdfs_configuration_test $<TARGET_OBJECTS:test_utils> hdfs_configuration_test.cc)
 target_include_directories(hdfs_configuration_test PRIVATE ../lib)
 target_link_libraries(hdfs_configuration_test common gmock_main ${CMAKE_THREAD_LIBS_INIT})
 add_memcheck_test(hdfs_configuration hdfs_configuration_test)
@@ -110,7 +111,7 @@ add_executable(hdfspp_errors_test hdfspp_errors.cc)
 target_link_libraries(hdfspp_errors_test common gmock_main bindings_c fs rpc proto common reader connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
 add_memcheck_test(hdfspp_errors hdfspp_errors_test)
 
-add_executable(hdfs_builder_test $<TARGET_OBJECTS:x_platform_obj> hdfs_builder_test.cc)
+add_executable(hdfs_builder_test $<TARGET_OBJECTS:test_utils> hdfs_builder_test.cc)
 target_include_directories(hdfs_builder_test PRIVATE ../lib)
 target_link_libraries(hdfs_builder_test test_common gmock_main bindings_c fs rpc proto common reader connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
 add_memcheck_test(hdfs_builder_test hdfs_builder_test)
@@ -128,7 +129,7 @@ add_executable(user_lock_test user_lock_test.cc)
 target_link_libraries(user_lock_test fs gmock_main common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
 add_memcheck_test(user_lock user_lock_test)
 
-add_executable(hdfs_config_connect_bugs_test $<TARGET_OBJECTS:x_platform_obj> hdfs_config_connect_bugs.cc)
+add_executable(hdfs_config_connect_bugs_test $<TARGET_OBJECTS:test_utils> hdfs_config_connect_bugs.cc)
 target_include_directories(hdfs_config_connect_bugs_test PRIVATE ../lib)
 target_link_libraries(hdfs_config_connect_bugs_test common gmock_main bindings_c fs rpc proto common reader connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
 add_memcheck_test(hdfs_config_connect_bugs hdfs_config_connect_bugs_test)

+ 15 - 12
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/configuration_test.cc

@@ -19,6 +19,9 @@
 #include "configuration_test.h"
 #include "common/configuration.h"
 #include "common/configuration_loader.h"
+#include "utils/temp-file.h"
+#include "utils/temp-dir.h"
+
 #include <gmock/gmock.h>
 #include <cstdio>
 #include <fstream>
@@ -298,7 +301,7 @@ TEST(ConfigurationTest, TestFileReads)
 {
   // Single stream
   {
-    TempFile tempFile;
+    TestUtils::TempFile tempFile;
     writeSimpleConfig(tempFile.GetFileName(), "key1", "value1");
 
     ConfigurationLoader config_loader;
@@ -311,7 +314,7 @@ TEST(ConfigurationTest, TestFileReads)
 
   // Multiple files
   {
-    TempFile tempFile;
+    TestUtils::TempFile tempFile;
     writeSimpleConfig(tempFile.GetFileName(), "key1", "value1");
 
     ConfigurationLoader loader;
@@ -320,7 +323,7 @@ TEST(ConfigurationTest, TestFileReads)
     ASSERT_TRUE(config && "Parse first stream");
     EXPECT_EQ("value1", config->GetWithDefault("key1", ""));
 
-    TempFile tempFile2;
+    TestUtils::TempFile tempFile2;
     writeSimpleConfig(tempFile2.GetFileName(), "key2", "value2");
     optional<Configuration> config2 =
         loader.OverlayResourceFile(*config, tempFile2.GetFileName());
@@ -331,7 +334,7 @@ TEST(ConfigurationTest, TestFileReads)
 
   // Try to add a directory
   {
-    TempDir tempDir;
+    TestUtils::TempDir tempDir;
 
     ConfigurationLoader config_loader;
     config_loader.ClearSearchPath();
@@ -351,14 +354,14 @@ TEST(ConfigurationTest, TestFileReads)
 
   // Search path
   {
-    TempDir tempDir1;
-    TempFile tempFile1(tempDir1.GetPath() + "/file1.xml");
+    TestUtils::TempDir tempDir1;
+    TestUtils::TempFile tempFile1(tempDir1.GetPath() + "/file1.xml");
     writeSimpleConfig(tempFile1.GetFileName(), "key1", "value1");
-    TempDir tempDir2;
-    TempFile tempFile2(tempDir2.GetPath() + "/file2.xml");
+    TestUtils::TempDir tempDir2;
+    TestUtils::TempFile tempFile2(tempDir2.GetPath() + "/file2.xml");
     writeSimpleConfig(tempFile2.GetFileName(), "key2", "value2");
-    TempDir tempDir3;
-    TempFile tempFile3(tempDir3.GetPath() + "/file3.xml");
+    TestUtils::TempDir tempDir3;
+    TestUtils::TempFile tempFile3(tempDir3.GetPath() + "/file3.xml");
     writeSimpleConfig(tempFile3.GetFileName(), "key3", "value3");
 
     ConfigurationLoader loader;
@@ -379,8 +382,8 @@ TEST(ConfigurationTest, TestFileReads)
 TEST(ConfigurationTest, TestDefaultConfigs) {
   // Search path
   {
-    TempDir tempDir;
-    TempFile coreSite(tempDir.GetPath() + "/core-site.xml");
+    TestUtils::TempDir tempDir;
+    TestUtils::TempFile coreSite(tempDir.GetPath() + "/core-site.xml");
     writeSimpleConfig(coreSite.GetFileName(), "key1", "value1");
 
     ConfigurationLoader loader;

+ 0 - 102
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/configuration_test.h

@@ -23,7 +23,6 @@
 #include "hdfspp/config_parser.h"
 #include "common/configuration.h"
 #include "common/configuration_loader.h"
-#include "x-platform/syscall.h"
 
 #include <cstdio>
 #include <fstream>
@@ -32,8 +31,6 @@
 #include <utility>
 #include <vector>
 
-#include <ftw.h>
-#include <unistd.h>
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
@@ -114,105 +111,6 @@ void writeDamagedConfig(const std::string& filename, Args... args) {
   out.open(filename);
   out << stream.rdbuf();
 }
-
-// TempDir: is deleted on destruction
-class TempFile {
- public:
-  TempFile() {
-    std::vector<char> tmp_buf(filename_.begin(), filename_.end());
-    fd_ = XPlatform::Syscall::CreateAndOpenTempFile(tmp_buf);
-    EXPECT_NE(fd_, -1);
-    filename_.assign(tmp_buf.data());
-  }
-
-  TempFile(std::string fn) : filename_(std::move(fn)) {}
-
-  TempFile(const TempFile& other) = default;
-
-  TempFile(TempFile&& other) noexcept
-      : filename_{std::move(other.filename_)}, fd_{other.fd_} {}
-
-  TempFile& operator=(const TempFile& other) {
-    if (&other != this) {
-      filename_ = other.filename_;
-      fd_ = other.fd_;
-    }
-    return *this;
-  }
-
-  TempFile& operator=(TempFile&& other) noexcept {
-    if (&other != this) {
-      filename_ = std::move(other.filename_);
-      fd_ = other.fd_;
-    }
-    return *this;
-  }
-
-  [[nodiscard]] const std::string& GetFileName() const { return filename_; }
-
-  ~TempFile() {
-    if (-1 != fd_) {
-      EXPECT_NE(XPlatform::Syscall::CloseFile(fd_), -1);
-    }
-
-    unlink(filename_.c_str());
-  }
-
- private:
-  std::string filename_{"/tmp/test_XXXXXXXXXX"};
-  int fd_{-1};
-};
-
-// Callback to remove a directory in the nftw visitor
-int nftw_remove(const char *fpath, const struct stat *sb, int typeflag, struct FTW *ftwbuf)
-{
-  (void)sb; (void)typeflag; (void)ftwbuf;
-
-  int rv = remove(fpath);
-  EXPECT_EQ(0, rv);
-  return rv;
-}
-
-// TempDir: is created in ctor and recursively deletes in dtor
-class TempDir {
- public:
-  TempDir() {
-    std::vector<char> path_pattern(path_.begin(), path_.end());
-    is_path_init_ = XPlatform::Syscall::CreateTempDir(path_pattern);
-    EXPECT_TRUE(is_path_init_);
-    path_.assign(path_pattern.data());
-  }
-
-  TempDir(const TempDir& other) = default;
-
-  TempDir(TempDir&& other) noexcept : path_{std::move(other.path_)} {}
-
-  TempDir& operator=(const TempDir& other) {
-    if (&other != this) {
-      path_ = other.path_;
-    }
-    return *this;
-  }
-
-  TempDir& operator=(TempDir&& other) noexcept {
-    if (&other != this) {
-      path_ = std::move(other.path_);
-    }
-    return *this;
-  }
-
-  [[nodiscard]] const std::string& GetPath() const { return path_; }
-
-  ~TempDir() {
-    if (is_path_init_) {
-      nftw(path_.c_str(), nftw_remove, 64, FTW_DEPTH | FTW_PHYS);
-    }
-  }
-
- private:
-  std::string path_{"/tmp/test_dir_XXXXXXXXXX"};
-  bool is_path_init_{false};
-};
 }
 
 #endif

+ 8 - 5
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_builder_test.cc

@@ -18,6 +18,9 @@
 
 #include "hdfspp/hdfs_ext.h"
 #include "configuration_test.h"
+#include "utils/temp-file.h"
+#include "utils/temp-dir.h"
+
 #include <gmock/gmock.h>
 #include <google/protobuf/stubs/common.h>
 
@@ -27,7 +30,7 @@ using namespace hdfs;
 
 TEST(HdfsBuilderTest, TestStubBuilder) {
   {
-    TempDir tempDir1;
+    TestUtils::TempDir tempDir1;
 
     hdfsBuilder *builder =
         hdfsNewBuilderFromDirectory(tempDir1.GetPath().c_str());
@@ -44,8 +47,8 @@ TEST(HdfsBuilderTest, TestRead)
 {
   // Reading string values
   {
-    TempDir tempDir1;
-    TempFile tempFile1(tempDir1.GetPath() + "/core-site.xml");
+    TestUtils::TempDir tempDir1;
+    TestUtils::TempFile tempFile1(tempDir1.GetPath() + "/core-site.xml");
     writeSimpleConfig(tempFile1.GetFileName(), "key1", "value1");
 
     hdfsBuilder *builder =
@@ -68,8 +71,8 @@ TEST(HdfsBuilderTest, TestRead)
 
   // Reading int values
   {
-    TempDir tempDir1;
-    TempFile tempFile1(tempDir1.GetPath() + "/core-site.xml");
+    TestUtils::TempDir tempDir1;
+    TestUtils::TempFile tempFile1(tempDir1.GetPath() + "/core-site.xml");
     writeSimpleConfig(tempFile1.GetFileName(), "key1", "100");
 
     hdfsBuilder *builder =

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_config_connect_bugs.cc

@@ -27,6 +27,8 @@
 #include <chrono>
 #include <exception>
 
+#include "utils/temp-dir.h"
+
 static const char *hdfs_11294_core_site_txt =
 "<configuration>\n"
 "  <property name=\"fs.defaultFS\" value=\"hdfs://NAMESERVICE1\"/>\n"
@@ -78,7 +80,7 @@ namespace hdfs {
 // Make sure we can set up a mini-cluster and connect to it
 TEST(ConfigConnectBugs, Test_HDFS_11294) {
   // Directory for hdfs config
-  TempDir td;
+  TestUtils::TempDir td;
 
   const std::string &tempDirPath = td.GetPath();
   const std::string coreSitePath = tempDirPath + "/core-site.xml";

+ 16 - 13
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_configuration_test.cc

@@ -18,6 +18,9 @@
 
 #include "common/hdfs_configuration.h"
 #include "configuration_test.h"
+#include "utils/temp-file.h"
+#include "utils/temp-dir.h"
+
 #include <gmock/gmock.h>
 #include <iostream>
 
@@ -70,10 +73,10 @@ TEST(HdfsConfigurationTest, TestSetOptions)
 TEST(HdfsConfigurationTest, TestDefaultConfigs) {
   // Search path
   {
-    TempDir tempDir;
-    TempFile coreSite(tempDir.GetPath() + "/core-site.xml");
+    TestUtils::TempDir tempDir;
+    TestUtils::TempFile coreSite(tempDir.GetPath() + "/core-site.xml");
     writeSimpleConfig(coreSite.GetFileName(), "key1", "value1");
-    TempFile hdfsSite(tempDir.GetPath() + "/hdfs-site.xml");
+    TestUtils::TempFile hdfsSite(tempDir.GetPath() + "/hdfs-site.xml");
     writeSimpleConfig(hdfsSite.GetFileName(), "key2", "value2");
 
     ConfigurationLoader loader;
@@ -87,8 +90,8 @@ TEST(HdfsConfigurationTest, TestDefaultConfigs) {
 
   // Only core-site.xml available
   {
-    TempDir tempDir;
-    TempFile coreSite(tempDir.GetPath() + "/core-site.xml");
+    TestUtils::TempDir tempDir;
+    TestUtils::TempFile coreSite(tempDir.GetPath() + "/core-site.xml");
     writeSimpleConfig(coreSite.GetFileName(), "key1", "value1");
 
     ConfigurationLoader loader;
@@ -101,8 +104,8 @@ TEST(HdfsConfigurationTest, TestDefaultConfigs) {
 
   // Only hdfs-site available
   {
-    TempDir tempDir;
-    TempFile hdfsSite(tempDir.GetPath() + "/hdfs-site.xml");
+    TestUtils::TempDir tempDir;
+    TestUtils::TempFile hdfsSite(tempDir.GetPath() + "/hdfs-site.xml");
     writeSimpleConfig(hdfsSite.GetFileName(), "key2", "value2");
 
     ConfigurationLoader loader;
@@ -119,10 +122,10 @@ TEST(HdfsConfigurationTest, TestDefaultConfigs) {
 TEST(HdfsConfigurationTest, TestConfigParserAPI) {
   // Config parser API
   {
-    TempDir tempDir;
-    TempFile coreSite(tempDir.GetPath() + "/core-site.xml");
+    TestUtils::TempDir tempDir;
+    TestUtils::TempFile coreSite(tempDir.GetPath() + "/core-site.xml");
     writeSimpleConfig(coreSite.GetFileName(), "key1", "value1");
-    TempFile hdfsSite(tempDir.GetPath() + "/hdfs-site.xml");
+    TestUtils::TempFile hdfsSite(tempDir.GetPath() + "/hdfs-site.xml");
     writeSimpleConfig(hdfsSite.GetFileName(), "key2", "value2");
 
     ConfigParser parser(tempDir.GetPath());
@@ -140,10 +143,10 @@ TEST(HdfsConfigurationTest, TestConfigParserAPI) {
   }
 
   {
-    TempDir tempDir;
-    TempFile coreSite(tempDir.GetPath() + "/core-site.xml");
+    TestUtils::TempDir tempDir;
+    TestUtils::TempFile coreSite(tempDir.GetPath() + "/core-site.xml");
     writeSimpleConfig(coreSite.GetFileName(), "key1", "value1");
-    TempFile hdfsSite(tempDir.GetPath() + "/hdfs-site.xml");
+    TestUtils::TempFile hdfsSite(tempDir.GetPath() + "/hdfs-site.xml");
     writeDamagedConfig(hdfsSite.GetFileName(), "key2", "value2");
 
     ConfigParser parser(tempDir.GetPath());

+ 19 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/utils/CMakeLists.txt

@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+add_library(test_utils OBJECT $<TARGET_OBJECTS:x_platform_obj> temp-file.cc temp-dir.cc)

+ 69 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/utils/temp-dir.cc

@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <filesystem>
+#include <iostream>
+#include <string>
+#include <system_error>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "utils/temp-dir.h"
+#include "x-platform/syscall.h"
+
+namespace TestUtils {
+TempDir::TempDir() {
+  std::vector path_pattern(path_.begin(), path_.end());
+  is_path_init_ = XPlatform::Syscall::CreateTempDir(path_pattern);
+  EXPECT_TRUE(is_path_init_);
+  path_.assign(path_pattern.data());
+}
+
+TempDir::TempDir(TempDir &&other) noexcept : path_{std::move(other.path_)} {}
+
+TempDir &TempDir::operator=(const TempDir &other) {
+  if (&other != this) {
+    path_ = other.path_;
+  }
+  return *this;
+}
+
+TempDir &TempDir::operator=(TempDir &&other) noexcept {
+  if (&other != this) {
+    path_ = std::move(other.path_);
+  }
+  return *this;
+}
+
+TempDir::~TempDir() {
+  if (!is_path_init_) {
+    return;
+  }
+
+  const std::filesystem::path tmp_dir_path(path_);
+  std::error_code tmp_dir_rm_err;
+
+  const auto tmp_dir_rm_result = remove_all(tmp_dir_path, tmp_dir_rm_err);
+  EXPECT_TRUE(tmp_dir_rm_result);
+  if (!tmp_dir_rm_result) {
+    std::cerr << "Error in deleting directory " << path_ << ": "
+              << tmp_dir_rm_err.message() << std::endl;
+  }
+}
+} // namespace TestUtils

+ 53 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/utils/temp-dir.h

@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NATIVE_LIBHDFSPP_TESTS_UTILS_TEMP_DIR
+#define NATIVE_LIBHDFSPP_TESTS_UTILS_TEMP_DIR
+
+#include <string>
+
+namespace TestUtils {
+/*
+ * Creates a temporary directory and deletes its contents recursively
+ * upon destruction of its instance.
+ *
+ * Creates a directory in /tmp by default.
+ */
+class TempDir {
+public:
+  TempDir();
+
+  TempDir(const TempDir &other) = default;
+
+  TempDir(TempDir &&other) noexcept;
+
+  TempDir &operator=(const TempDir &other);
+
+  TempDir &operator=(TempDir &&other) noexcept;
+
+  [[nodiscard]] const std::string &GetPath() const { return path_; }
+
+  ~TempDir();
+
+private:
+  std::string path_{"/tmp/test_dir_XXXXXXXXXX"};
+  bool is_path_init_{false};
+};
+} // namespace TestUtils
+
+#endif

+ 63 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/utils/temp-file.cc

@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "utils/temp-file.h"
+#include "x-platform/syscall.h"
+
+namespace TestUtils {
+TempFile::TempFile() {
+  std::vector tmp_buf(filename_.begin(), filename_.end());
+  fd_ = XPlatform::Syscall::CreateAndOpenTempFile(tmp_buf);
+  EXPECT_NE(fd_, -1);
+  filename_.assign(tmp_buf.data());
+}
+
+TempFile::TempFile(std::string filename) : filename_(std::move(filename)) {}
+
+TempFile::TempFile(TempFile &&other) noexcept
+    : filename_{std::move(other.filename_)}, fd_{other.fd_} {}
+
+TempFile &TempFile::operator=(const TempFile &other) {
+  if (&other != this) {
+    filename_ = other.filename_;
+    fd_ = other.fd_;
+  }
+  return *this;
+}
+
+TempFile &TempFile::operator=(TempFile &&other) noexcept {
+  if (&other != this) {
+    filename_ = std::move(other.filename_);
+    fd_ = other.fd_;
+  }
+  return *this;
+}
+
+TempFile::~TempFile() {
+  if (-1 != fd_) {
+    EXPECT_NE(XPlatform::Syscall::CloseFile(fd_), -1);
+  }
+
+  unlink(filename_.c_str());
+}
+} // namespace TestUtils

+ 56 - 0
hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/utils/temp-file.h

@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef NATIVE_LIBHDFSPP_TESTS_UTILS_TEMP_FILE
+#define NATIVE_LIBHDFSPP_TESTS_UTILS_TEMP_FILE
+
+#include <string>
+
+namespace TestUtils {
+/**
+ * Creates a temporary file and deletes it
+ * upon destruction of the TempFile instance.
+ *
+ * The temporary file gets created in /tmp directory
+ * by default.
+ */
+class TempFile {
+public:
+  TempFile();
+
+  TempFile(std::string filename);
+
+  TempFile(const TempFile &other) = default;
+
+  TempFile(TempFile &&other) noexcept;
+
+  TempFile &operator=(const TempFile &other);
+
+  TempFile &operator=(TempFile &&other) noexcept;
+
+  [[nodiscard]] const std::string &GetFileName() const { return filename_; }
+
+  ~TempFile();
+
+private:
+  std::string filename_{"/tmp/test_XXXXXXXXXX"};
+  int fd_{-1};
+};
+} // namespace TestUtils
+
+#endif

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java

@@ -1835,6 +1835,7 @@ public class DFSUtil {
    * Throw if the given directory has any non-empty protected descendants
    * (including itself).
    *
+   * @param fsd the namespace tree.
    * @param iip directory whose descendants are to be checked.
    * @throws AccessControlException if a non-empty protected descendant
    *                                was found.

+ 14 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java

@@ -40,6 +40,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -405,7 +406,8 @@ public class Dispatcher {
           // Pinned block can't be moved. Add this block into failure list.
           // Later in the next iteration mover will exclude these blocks from
           // pending moves.
-          target.getDDatanode().addBlockPinningFailures(this);
+          target.getDDatanode().addBlockPinningFailures(
+              this.reportedBlock.getBlock().getBlockId(), this.getSource());
           return;
         }
 
@@ -643,7 +645,8 @@ public class Dispatcher {
     /** blocks being moved but not confirmed yet */
     private final List<PendingMove> pendings;
     private volatile boolean hasFailure = false;
-    private Map<Long, Set<DatanodeInfo>> blockPinningFailures = new HashMap<>();
+    private final Map<Long, Set<DatanodeInfo>> blockPinningFailures =
+        new ConcurrentHashMap<>();
     private volatile boolean hasSuccess = false;
     private ExecutorService moveExecutor;
 
@@ -729,16 +732,17 @@ public class Dispatcher {
       this.hasFailure = true;
     }
 
-    void addBlockPinningFailures(PendingMove pendingBlock) {
-      synchronized (blockPinningFailures) {
-        long blockId = pendingBlock.reportedBlock.getBlock().getBlockId();
-        Set<DatanodeInfo> pinnedLocations = blockPinningFailures.get(blockId);
+    private void addBlockPinningFailures(long blockId, DatanodeInfo source) {
+      blockPinningFailures.compute(blockId, (key, pinnedLocations) -> {
+        Set<DatanodeInfo> newPinnedLocations;
         if (pinnedLocations == null) {
-          pinnedLocations = new HashSet<>();
-          blockPinningFailures.put(blockId, pinnedLocations);
+          newPinnedLocations = new HashSet<>();
+        } else {
+          newPinnedLocations = pinnedLocations;
         }
-        pinnedLocations.add(pendingBlock.getSource());
-      }
+        newPinnedLocations.add(source);
+        return newPinnedLocations;
+      });
     }
 
     Map<Long, Set<DatanodeInfo>> getBlockPinningFailureList() {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java

@@ -287,7 +287,7 @@ public final class Util {
         fos.getChannel().force(true);
         fos.close();
         double writeSec = Math.max(((float)
-            (flushStartTime - Time.monotonicNow())) / 1000.0, 0.001);
+            (Time.monotonicNow() - flushStartTime)) / 1000.0, 0.001);
         xferCombined += writeSec;
         xferStats.append(String
             .format(" Synchronous (fsync) write to disk of " +

+ 11 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java

@@ -101,18 +101,22 @@ public interface DatanodeProtocol {
    * an array of "DatanodeCommand" objects in HeartbeatResponse.
    * A DatanodeCommand tells the DataNode to invalidate local block(s), 
    * or to copy them to other DataNodes, etc.
-   * @param registration datanode registration information
-   * @param reports utilization report per storage
-   * @param xmitsInProgress number of transfers from this datanode to others
-   * @param xceiverCount number of active transceiver threads
-   * @param failedVolumes number of failed volumes
-   * @param volumeFailureSummary info about volume failures
+   * @param registration datanode registration information.
+   * @param reports utilization report per storage.
+   * @param dnCacheCapacity the total cache capacity of the datanode (in bytes).
+   * @param dnCacheUsed the amount of cache used by the datanode (in bytes).
+   * @param xmitsInProgress number of transfers from this datanode to others.
+   * @param xceiverCount number of active transceiver threads.
+   * @param failedVolumes number of failed volumes.
+   * @param volumeFailureSummary info about volume failures.
    * @param requestFullBlockReportLease whether to request a full block
    *                                    report lease.
    * @param slowPeers Details of peer DataNodes that were detected as being
    *                  slow to respond to packet writes. Empty report if no
    *                  slow peers were detected by the DataNode.
-   * @throws IOException on error
+   * @param slowDisks Details of disks on DataNodes that were detected as
+   *                  being slow. Empty report if no slow disks were detected.
+   * @throws IOException on error.
    */
   @Idempotent
   public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,

+ 94 - 15
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java

@@ -27,7 +27,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
@@ -76,7 +79,8 @@ public class TestDecommissioningStatus {
   private FileSystem fileSys;
   private HostsFileWriter hostsFileWriter;
   private Configuration conf;
-  private Logger LOG;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestDecommissioningStatus.class);
 
   final ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes);
 
@@ -110,7 +114,6 @@ public class TestDecommissioningStatus {
     conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, 1);
     GenericTestUtils.setLogLevel(
         LoggerFactory.getLogger(DatanodeAdminManager.class), Level.DEBUG);
-    LOG = LoggerFactory.getLogger(TestDecommissioningStatus.class);
     return conf;
   }
 
@@ -165,17 +168,30 @@ public class TestDecommissioningStatus {
 
   protected void checkDecommissionStatus(DatanodeDescriptor decommNode,
       int expectedUnderRep, int expectedDecommissionOnly,
-      int expectedUnderRepInOpenFiles) {
-      assertEquals(
-              expectedUnderRep,
-              decommNode.getLeavingServiceStatus().getUnderReplicatedBlocks(), "Unexpected num under-replicated blocks");
-      assertEquals(
-              expectedDecommissionOnly,
-              decommNode.getLeavingServiceStatus().getOutOfServiceOnlyReplicas(), "Unexpected number of decom-only replicas");
-      assertEquals(
-              expectedUnderRepInOpenFiles,
-              decommNode.getLeavingServiceStatus().getUnderReplicatedInOpenFiles(),
-              "Unexpected number of replicas in under-replicated open files");
+      int expectedUnderRepInOpenFiles) throws TimeoutException,
+      InterruptedException {
+    String errorMsg;
+    errorMsg = "Under replicated blocks. Expected: "
+        + expectedUnderRep + " , Actual: "
+        + decommNode.getLeavingServiceStatus().getUnderReplicatedBlocks();
+    GenericTestUtils.waitFor(
+        () -> expectedUnderRep == decommNode.getLeavingServiceStatus()
+            .getUnderReplicatedBlocks(),
+        1000, TimeUnit.SECONDS.toMillis(10), errorMsg);
+    errorMsg = "OutOfService only replicas. Expected: "
+        + expectedDecommissionOnly + " , Actual: "
+        + decommNode.getLeavingServiceStatus().getOutOfServiceOnlyReplicas();
+    GenericTestUtils.waitFor(
+        () -> expectedDecommissionOnly == decommNode.getLeavingServiceStatus()
+            .getOutOfServiceOnlyReplicas(),
+        1000, TimeUnit.SECONDS.toMillis(10), errorMsg);
+    errorMsg = "UnderReplicated in open files. Expected: "
+        + expectedUnderRepInOpenFiles + " , Actual: "
+        + decommNode.getLeavingServiceStatus().getUnderReplicatedInOpenFiles();
+    GenericTestUtils.waitFor(
+        () -> expectedUnderRepInOpenFiles == decommNode
+            .getLeavingServiceStatus().getUnderReplicatedInOpenFiles(),
+        1000, TimeUnit.SECONDS.toMillis(10), errorMsg);
   }
 
   protected void checkDFSAdminDecommissionStatus(
@@ -270,6 +286,7 @@ public class TestDecommissioningStatus {
 
     FSNamesystem fsn = cluster.getNamesystem();
     final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
+    verifyInitialState(fsn, dm);
     for (int iteration = 0; iteration < numDatanodes; iteration++) {
       String downnode = decommissionNode(client, iteration);
       dm.refreshNodes(conf);
@@ -278,14 +295,13 @@ public class TestDecommissioningStatus {
       // Block until the admin's monitor updates the number of tracked nodes.
       waitForDecommissionedNodes(dm.getDatanodeAdminManager(), iteration + 1);
       final List<DatanodeDescriptor> decommissioningNodes = dm.getDecommissioningNodes();
+      assertEquals(decommissioningNodes.size(), iteration + 1);
       if (iteration == 0) {
-        assertEquals(decommissioningNodes.size(), 1);
         DatanodeDescriptor decommNode = decommissioningNodes.get(0);
         checkDecommissionStatus(decommNode, 3, 0, 1);
         checkDFSAdminDecommissionStatus(decommissioningNodes.subList(0, 1),
             fileSys, admin);
       } else {
-        assertEquals(decommissioningNodes.size(), 2);
         DatanodeDescriptor decommNode1 = decommissioningNodes.get(0);
         DatanodeDescriptor decommNode2 = decommissioningNodes.get(1);
         // This one is still 3,3,1 since it passed over the UC block
@@ -307,6 +323,69 @@ public class TestDecommissioningStatus {
     AdminStatesBaseTest.cleanupFile(fileSys, file2);
   }
 
+  // Why do we verify initial state of DataNodes here?
+  // Before we start actual decommission testing, we should ensure that
+  // total 8 blocks (original 4 blocks of 2 files and 4 replicas) are
+  // present over two Datanodes available. If we don't wait until all 8 blocks
+  // are reported live by BlockManager, we might get to a situation
+  // where one of the replicas might not yet been present on any of Datanodes
+  // and we start decommissioning process, and then it would result in
+  // flaky test because total (no of under replicated blocks, no of outOfService
+  // only replicas, no of under replicated in open files) counts would be
+  // incorrect.
+  protected void verifyInitialState(FSNamesystem fsn, DatanodeManager dm)
+      throws InterruptedException {
+    dm.getDatanodes().forEach(datanodeDescriptor -> {
+      try {
+        checkDecommissionStatus(datanodeDescriptor, 0, 0, 0);
+      } catch (TimeoutException | InterruptedException e) {
+        throw new AssertionError("Datanode not in good state.", e);
+      }
+    });
+    int c = 0;
+    int totalBlocks;
+    long totalReplicatedBlocks;
+    while (true) {
+      totalBlocks = fsn.getBlockManager().getTotalBlocks();
+      totalReplicatedBlocks = fsn.getBlockManager().getTotalReplicatedBlocks();
+      if (totalBlocks == 4 && totalReplicatedBlocks == 4) {
+        break;
+      } else {
+        if (c == 4) {
+          throw new AssertionError("Unexpected Total blocks " + totalBlocks
+              + " and replicated blocks " + totalReplicatedBlocks);
+        }
+        Thread.sleep(3000);
+      }
+      c++;
+    }
+    c = 0;
+    AtomicInteger total = new AtomicInteger(0);
+    AtomicInteger sufficientBlocksSuccess = new AtomicInteger(0);
+    while (true) {
+      total.set(0);
+      sufficientBlocksSuccess.set(0);
+      dm.getDatanodes().forEach(
+          datanodeDescriptor -> {
+            total.addAndGet(datanodeDescriptor.numBlocks());
+            if (datanodeDescriptor.numBlocks() == 4) {
+              sufficientBlocksSuccess.incrementAndGet();
+            }
+          });
+      if (total.get() == 8 && sufficientBlocksSuccess.get() == 2) {
+        break;
+      } else {
+        if (c == 4) {
+          throw new AssertionError("Unexpected Total blocks " + total.get()
+              + " from Datanode Storage. 4 blocks per Datanode Storage"
+              + " expected from each DataNode");
+        }
+        Thread.sleep(3000);
+      }
+      c++;
+    }
+  }
+
   /**
    * Verify a DN remains in DECOMMISSION_INPROGRESS state if it is marked
    * as dead before decommission has completed. That will allow DN to resume

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatusWithBackoffMonitor.java

@@ -106,6 +106,7 @@ public class TestDecommissioningStatusWithBackoffMonitor
 
     FSNamesystem fsn = cluster.getNamesystem();
     final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
+    verifyInitialState(fsn, dm);
     for (int iteration = 0; iteration < numDatanodes; iteration++) {
       String downnode = decommissionNode(client, iteration);
       dm.refreshNodes(conf);

+ 1 - 1
hadoop-project/pom.xml

@@ -1468,7 +1468,7 @@
       <dependency>
         <groupId>com.aliyun.oss</groupId>
         <artifactId>aliyun-sdk-oss</artifactId>
-        <version>3.4.1</version>
+        <version>3.13.0</version>
         <exclusions>
           <exclusion>
             <groupId>org.apache.httpcomponents</groupId>

+ 14 - 6
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

@@ -208,16 +208,15 @@ public class AzureBlobFileSystem extends FileSystem
   }
 
   private FSDataInputStream open(final Path path,
-      final Optional<Configuration> options) throws IOException {
+      final Optional<OpenFileParameters> parameters) throws IOException {
     statIncrement(CALL_OPEN);
     Path qualifiedPath = makeQualified(path);
 
     try {
       TracingContext tracingContext = new TracingContext(clientCorrelationId,
-          fileSystemId, FSOperationType.OPEN, tracingHeaderFormat,
-          listener);
-      InputStream inputStream = abfsStore.openFileForRead(qualifiedPath,
-          options, statistics, tracingContext);
+          fileSystemId, FSOperationType.OPEN, tracingHeaderFormat, listener);
+      InputStream inputStream = abfsStore
+          .openFileForRead(qualifiedPath, parameters, statistics, tracingContext);
       return new FSDataInputStream(inputStream);
     } catch(AzureBlobFileSystemException ex) {
       checkException(path, ex);
@@ -225,6 +224,15 @@ public class AzureBlobFileSystem extends FileSystem
     }
   }
 
+  /**
+   * Takes config and other options through
+   * {@link org.apache.hadoop.fs.impl.OpenFileParameters}. Ensure that
+   * FileStatus entered is up-to-date, as it will be used to create the
+   * InputStream (with info such as contentLength, eTag)
+   * @param path The location of file to be opened
+   * @param parameters OpenFileParameters instance; can hold FileStatus,
+   *                   Configuration, bufferSize and mandatoryKeys
+   */
   @Override
   protected CompletableFuture<FSDataInputStream> openFileWithOptions(
       final Path path, final OpenFileParameters parameters) throws IOException {
@@ -235,7 +243,7 @@ public class AzureBlobFileSystem extends FileSystem
         "for " + path);
     return LambdaUtils.eval(
         new CompletableFuture<>(), () ->
-            open(path, Optional.of(parameters.getOptions())));
+            open(path, Optional.of(parameters)));
   }
 
   @Override

+ 47 - 24
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

@@ -115,6 +115,7 @@ import org.apache.hadoop.fs.azurebfs.utils.CRC64;
 import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -129,6 +130,8 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_HYP
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_PLUS;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_STAR;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TOKEN_VERSION;
@@ -669,44 +672,64 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
 
   public AbfsInputStream openFileForRead(final Path path,
       final FileSystem.Statistics statistics, TracingContext tracingContext)
-      throws AzureBlobFileSystemException {
-    return openFileForRead(path, Optional.empty(), statistics, tracingContext);
+      throws IOException {
+    return openFileForRead(path, Optional.empty(), statistics,
+        tracingContext);
   }
 
-  public AbfsInputStream openFileForRead(final Path path,
-      final Optional<Configuration> options,
+  public AbfsInputStream openFileForRead(Path path,
+      final Optional<OpenFileParameters> parameters,
       final FileSystem.Statistics statistics, TracingContext tracingContext)
-      throws AzureBlobFileSystemException {
-    try (AbfsPerfInfo perfInfo = startTracking("openFileForRead", "getPathStatus")) {
+      throws IOException {
+    try (AbfsPerfInfo perfInfo = startTracking("openFileForRead",
+        "getPathStatus")) {
       LOG.debug("openFileForRead filesystem: {} path: {}",
-              client.getFileSystem(),
-              path);
+          client.getFileSystem(), path);
 
+      FileStatus fileStatus = parameters.map(OpenFileParameters::getStatus)
+          .orElse(null);
       String relativePath = getRelativePath(path);
-
-      final AbfsRestOperation op = client
-          .getPathStatus(relativePath, false, tracingContext);
-      perfInfo.registerResult(op.getResult());
-
-      final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
-      final long contentLength = Long.parseLong(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
-      final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG);
+      String resourceType, eTag;
+      long contentLength;
+      if (fileStatus instanceof VersionedFileStatus) {
+        path = path.makeQualified(this.uri, path);
+        Preconditions.checkArgument(fileStatus.getPath().equals(path),
+            String.format(
+                "Filestatus path [%s] does not match with given path [%s]",
+                fileStatus.getPath(), path));
+        resourceType = fileStatus.isFile() ? FILE : DIRECTORY;
+        contentLength = fileStatus.getLen();
+        eTag = ((VersionedFileStatus) fileStatus).getVersion();
+      } else {
+        if (fileStatus != null) {
+          LOG.warn(
+              "Fallback to getPathStatus REST call as provided filestatus "
+                  + "is not of type VersionedFileStatus");
+        }
+        AbfsHttpOperation op = client.getPathStatus(relativePath, false,
+            tracingContext).getResult();
+        resourceType = op.getResponseHeader(
+            HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
+        contentLength = Long.parseLong(
+            op.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH));
+        eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
+      }
 
       if (parseIsDirectory(resourceType)) {
         throw new AbfsRestOperationException(
-                AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
-                AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
-                "openFileForRead must be used with files and not directories",
-                null);
+            AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
+            AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
+            "openFileForRead must be used with files and not directories",
+            null);
       }
 
       perfInfo.registerSuccess(true);
 
       // Add statistics for InputStream
-      return new AbfsInputStream(client, statistics,
-              relativePath, contentLength,
-              populateAbfsInputStreamContext(options),
-              eTag, tracingContext);
+      return new AbfsInputStream(client, statistics, relativePath,
+          contentLength, populateAbfsInputStreamContext(
+          parameters.map(OpenFileParameters::getOptions)),
+          eTag, tracingContext);
     }
   }
 

+ 5 - 2
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/InvalidAbfsRestOperationException.java

@@ -24,7 +24,8 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
 
 /**
- * Exception to wrap invalid Azure service error responses.
+ * Exception to wrap invalid Azure service error responses and exceptions
+ * raised on network IO.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
@@ -34,7 +35,9 @@ public class InvalidAbfsRestOperationException extends AbfsRestOperationExceptio
     super(
         AzureServiceErrorCode.UNKNOWN.getStatusCode(),
         AzureServiceErrorCode.UNKNOWN.getErrorCode(),
-        "InvalidAbfsRestOperationException",
+        innerException != null
+            ? innerException.toString()
+            : "InvalidAbfsRestOperationException",
         innerException);
   }
 }

+ 3 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java

@@ -410,7 +410,9 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
           }
         }
       } catch (IOException ex) {
-        LOG.error("UnexpectedError: ", ex);
+        LOG.warn("IO/Network error: {} {}: {}",
+            method, getMaskedUrl(), ex.getMessage());
+        LOG.debug("IO Error: ", ex);
         throw ex;
       } finally {
         if (this.isTraceEnabled) {

+ 3 - 3
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java

@@ -292,7 +292,7 @@ public class AbfsRestOperation {
     } catch (UnknownHostException ex) {
       String hostname = null;
       hostname = httpOperation.getHost();
-      LOG.warn("Unknown host name: %s. Retrying to resolve the host name...",
+      LOG.warn("Unknown host name: {}. Retrying to resolve the host name...",
           hostname);
       if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) {
         throw new InvalidAbfsRestOperationException(ex);
@@ -300,7 +300,7 @@ public class AbfsRestOperation {
       return false;
     } catch (IOException ex) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("HttpRequestFailure: {}, {}", httpOperation.toString(), ex);
+        LOG.debug("HttpRequestFailure: {}, {}", httpOperation, ex);
       }
 
       if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) {
@@ -312,7 +312,7 @@ public class AbfsRestOperation {
       AbfsClientThrottlingIntercept.updateMetrics(operationType, httpOperation);
     }
 
-    LOG.debug("HttpRequest: {}: {}", operationType, httpOperation.toString());
+    LOG.debug("HttpRequest: {}: {}", operationType, httpOperation);
 
     if (client.getRetryPolicy().shouldRetry(retryCount, httpOperation.getStatusCode())) {
       return false;

+ 2 - 2
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java

@@ -452,8 +452,8 @@ final class ReadBufferManager {
    */
   void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) {
     if (LOGGER.isTraceEnabled()) {
-      LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}",
-          buffer.getStream().getPath(),  buffer.getOffset(), bytesActuallyRead);
+      LOGGER.trace("ReadBufferWorker completed read file {} for offset {} outcome {} bytes {}",
+          buffer.getStream().getPath(),  buffer.getOffset(), result, bytesActuallyRead);
     }
     synchronized (this) {
       inProgressList.remove(buffer);

+ 5 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.azurebfs.services;
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 
+import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
 
 class ReadBufferWorker implements Runnable {
@@ -73,8 +74,11 @@ class ReadBufferWorker implements Runnable {
                   buffer.getTracingContext());
 
           bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead);  // post result back to ReadBufferManager
+        } catch (IOException ex) {
+          buffer.setErrException(ex);
+          bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
         } catch (Exception ex) {
-          buffer.setErrException(new IOException(ex));
+          buffer.setErrException(new PathIOException(buffer.getStream().getPath(), ex));
           bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
         }
       }

+ 10 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
 import org.apache.hadoop.fs.azurebfs.services.AuthType;
 import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore;
@@ -429,6 +430,15 @@ public abstract class AbstractAbfsIntegrationTest extends
     return fs.getAbfsStore();
   }
 
+  public AbfsClient getAbfsClient(final AzureBlobFileSystemStore abfsStore) {
+    return abfsStore.getClient();
+  }
+
+  public void setAbfsClient(AzureBlobFileSystemStore abfsStore,
+      AbfsClient client) {
+    abfsStore.setClient(client);
+  }
+
   public Path makeQualified(Path path) throws java.io.IOException {
     return getFileSystem().makeQualified(path);
   }

+ 112 - 3
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java

@@ -19,31 +19,40 @@
 package org.apache.hadoop.fs.azurebfs.services;
 
 import java.io.IOException;
-
-import org.junit.Assert;
-import org.junit.Test;
 import java.util.Arrays;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
 
 import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
 import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -192,6 +201,106 @@ public class TestAbfsInputStream extends
     ReadBufferManager.getBufferManager().setThresholdAgeMilliseconds(REDUCED_READ_BUFFER_AGE_THRESHOLD);
   }
 
+  private void writeBufferToNewFile(Path testFile, byte[] buffer) throws IOException {
+    AzureBlobFileSystem fs = getFileSystem();
+    fs.create(testFile);
+    FSDataOutputStream out = fs.append(testFile);
+    out.write(buffer);
+    out.close();
+  }
+
+  private void verifyOpenWithProvidedStatus(Path path, FileStatus fileStatus,
+      byte[] buf, AbfsRestOperationType source)
+      throws IOException, ExecutionException, InterruptedException {
+    byte[] readBuf = new byte[buf.length];
+    AzureBlobFileSystem fs = getFileSystem();
+    FutureDataInputStreamBuilder builder = fs.openFile(path);
+    builder.withFileStatus(fileStatus);
+    FSDataInputStream in = builder.build().get();
+    assertEquals(String.format(
+        "Open with fileStatus [from %s result]: Incorrect number of bytes read",
+        source), buf.length, in.read(readBuf));
+    assertArrayEquals(String
+        .format("Open with fileStatus [from %s result]: Incorrect read data",
+            source), readBuf, buf);
+  }
+
+  private void checkGetPathStatusCalls(Path testFile, FileStatus fileStatus,
+      AzureBlobFileSystemStore abfsStore, AbfsClient mockClient,
+      AbfsRestOperationType source, TracingContext tracingContext)
+      throws IOException {
+
+    // verify GetPathStatus not invoked when FileStatus is provided
+    abfsStore.openFileForRead(testFile, Optional
+        .ofNullable(new OpenFileParameters().withStatus(fileStatus)), null, tracingContext);
+    verify(mockClient, times(0).description((String.format(
+        "FileStatus [from %s result] provided, GetFileStatus should not be invoked",
+        source)))).getPathStatus(anyString(), anyBoolean(), any(TracingContext.class));
+
+    // verify GetPathStatus invoked when FileStatus not provided
+    abfsStore.openFileForRead(testFile,
+        Optional.empty(), null,
+        tracingContext);
+    verify(mockClient, times(1).description(
+        "GetPathStatus should be invoked when FileStatus not provided"))
+        .getPathStatus(anyString(), anyBoolean(), any(TracingContext.class));
+
+    Mockito.reset(mockClient); //clears invocation count for next test case
+  }
+
+  @Test
+  public void testOpenFileWithOptions() throws Exception {
+    AzureBlobFileSystem fs = getFileSystem();
+    String testFolder = "/testFolder";
+    Path smallTestFile = new Path(testFolder + "/testFile0");
+    Path largeTestFile = new Path(testFolder + "/testFile1");
+    fs.mkdirs(new Path(testFolder));
+    int readBufferSize = getConfiguration().getReadBufferSize();
+    byte[] smallBuffer = new byte[5];
+    byte[] largeBuffer = new byte[readBufferSize + 5];
+    new Random().nextBytes(smallBuffer);
+    new Random().nextBytes(largeBuffer);
+    writeBufferToNewFile(smallTestFile, smallBuffer);
+    writeBufferToNewFile(largeTestFile, largeBuffer);
+
+    FileStatus[] getFileStatusResults = {fs.getFileStatus(smallTestFile),
+        fs.getFileStatus(largeTestFile)};
+    FileStatus[] listStatusResults = fs.listStatus(new Path(testFolder));
+
+    // open with fileStatus from GetPathStatus
+    verifyOpenWithProvidedStatus(smallTestFile, getFileStatusResults[0],
+        smallBuffer, AbfsRestOperationType.GetPathStatus);
+    verifyOpenWithProvidedStatus(largeTestFile, getFileStatusResults[1],
+        largeBuffer, AbfsRestOperationType.GetPathStatus);
+
+    // open with fileStatus from ListStatus
+    verifyOpenWithProvidedStatus(smallTestFile, listStatusResults[0], smallBuffer,
+        AbfsRestOperationType.ListPaths);
+    verifyOpenWithProvidedStatus(largeTestFile, listStatusResults[1], largeBuffer,
+        AbfsRestOperationType.ListPaths);
+
+    // verify number of GetPathStatus invocations
+    AzureBlobFileSystemStore abfsStore = getAbfsStore(fs);
+    AbfsClient mockClient = spy(getAbfsClient(abfsStore));
+    setAbfsClient(abfsStore, mockClient);
+    TracingContext tracingContext = getTestTracingContext(fs, false);
+    checkGetPathStatusCalls(smallTestFile, getFileStatusResults[0],
+        abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, tracingContext);
+    checkGetPathStatusCalls(largeTestFile, getFileStatusResults[1],
+        abfsStore, mockClient, AbfsRestOperationType.GetPathStatus, tracingContext);
+    checkGetPathStatusCalls(smallTestFile, listStatusResults[0],
+        abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext);
+    checkGetPathStatusCalls(largeTestFile, listStatusResults[1],
+        abfsStore, mockClient, AbfsRestOperationType.ListPaths, tracingContext);
+
+    // Verify with incorrect filestatus
+    getFileStatusResults[0].setPath(new Path("wrongPath"));
+    intercept(ExecutionException.class,
+        () -> verifyOpenWithProvidedStatus(smallTestFile,
+            getFileStatusResults[0], smallBuffer,
+            AbfsRestOperationType.GetPathStatus));
+  }
+
   /**
    * This test expects AbfsInputStream to throw the exception that readAhead
    * thread received on read. The readAhead thread must be initiated from the

+ 28 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -37,6 +37,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.commons.collections.keyvalue.DefaultMapEntry;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -1354,8 +1355,23 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
           initialState.equals(NodeState.DECOMMISSIONING);
       if (isNodeDecommissioning) {
         List<ApplicationId> keepAliveApps = statusEvent.getKeepAliveAppIds();
+        // hasScheduledAMContainers solves the following race condition -
+        // 1. launch AM container on a node with 0 containers.
+        // 2. gracefully decommission this node.
+        // 3. Node heartbeats to RM. In StatusUpdateWhenHealthyTransition,
+        //    rmNode.runningApplications will be empty as it is updated after
+        //    call to RMNodeImpl.deactivateNode. This will cause the node to be
+        //    deactivated even though container is running on it and hence kill
+        //    all containers running on it.
+        // In order to avoid such race conditions the ground truth is retrieved
+        // from the scheduler before deactivating a DECOMMISSIONING node.
+        // Only AM containers are considered as AM container reattempts can
+        // cause application failures if max attempts is set to 1.
         if (rmNode.runningApplications.isEmpty() &&
-            (keepAliveApps == null || keepAliveApps.isEmpty())) {
+            (keepAliveApps == null || keepAliveApps.isEmpty()) &&
+            !hasScheduledAMContainers(rmNode)) {
+          LOG.info("No containers running on " + rmNode.nodeId + ". "
+              + "Attempting to deactivate decommissioning node.");
           RMNodeImpl.deactivateNode(rmNode, NodeState.DECOMMISSIONED);
           return NodeState.DECOMMISSIONED;
         }
@@ -1401,6 +1417,17 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
 
       return initialState;
     }
+
+    /**
+     * Checks if the scheduler has scheduled any AMs on the given node.
+     * @return true if node has any AM scheduled on it.
+     */
+    private boolean hasScheduledAMContainers(RMNodeImpl rmNode) {
+      return rmNode.context.getScheduler()
+          .getSchedulerNode(rmNode.getNodeID())
+          .getCopiedListOfRunningContainers()
+          .stream().anyMatch(RMContainer::isAMContainer);
+    }
   }
 
   public static class StatusUpdateWhenUnHealthyTransition implements

+ 14 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java

@@ -68,6 +68,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdate
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -81,6 +83,7 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentMatchers;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -125,7 +128,7 @@ public class TestRMNodeTransitions {
     rmContext =
         new RMContextImpl(rmDispatcher, mock(ContainerAllocationExpirer.class),
           null, null, mock(DelegationTokenRenewer.class), null, null, null,
-          null, null);
+          null, getMockResourceScheduler());
     NodesListManager nodesListManager = mock(NodesListManager.class);
     HostsFileReader reader = mock(HostsFileReader.class);
     when(nodesListManager.getHostsReader()).thenReturn(reader);
@@ -193,6 +196,16 @@ public class TestRMNodeTransitions {
     return event;
   }
 
+  private ResourceScheduler getMockResourceScheduler() {
+    ResourceScheduler resourceScheduler = mock(ResourceScheduler.class);
+    SchedulerNode schedulerNode = mock(SchedulerNode.class);
+    when(schedulerNode.getCopiedListOfRunningContainers())
+        .thenReturn(Collections.emptyList());
+    when(resourceScheduler.getSchedulerNode(ArgumentMatchers.any()))
+        .thenReturn(schedulerNode);
+    return resourceScheduler;
+  }
+
   private List<ApplicationId> getAppIdList() {
     List<ApplicationId> appIdList = new ArrayList<ApplicationId>();
     appIdList.add(BuilderUtils.newApplicationId(0, 0));

+ 58 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java

@@ -463,6 +463,64 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     rm.waitForState(id1, NodeState.DECOMMISSIONED);
   }
 
+  /**
+   * Test graceful decommission of node when an AM container is scheduled on a
+   * node just before it is gracefully decommissioned.
+   */
+  @Test (timeout = 60000)
+  public void testGracefulDecommissionAfterAMContainerAlloc() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile
+        .getAbsolutePath());
+
+    writeToHostsFile("");
+    rm = new MockRM(conf);
+    rm.start();
+
+    MockNM nm1 = rm.registerNode("host1:1234", 10240);
+    MockNM nm2 = rm.registerNode("host2:5678", 20480);
+    MockNM nm3 = rm.registerNode("host3:4433", 10240);
+
+    NodeId id1 = nm1.getNodeId();
+    NodeId id2 = nm2.getNodeId();
+    NodeId id3 = nm3.getNodeId();
+
+    rm.waitForState(id1, NodeState.RUNNING);
+    rm.waitForState(id2, NodeState.RUNNING);
+    rm.waitForState(id3, NodeState.RUNNING);
+
+    // Create an app and schedule AM on host1.
+    RMApp app = MockRMAppSubmitter.submitWithMemory(2000, rm);
+    MockAM am = MockRM.launchAM(app, rm, nm1);
+
+    // Before sending heartbeat we gracefully decommission the node on which AM
+    // is scheduled to simulate race condition.
+    writeToHostsFile("host1", "host3");
+    rm.getNodesListManager().refreshNodes(conf, true);
+    rm.waitForState(id1, NodeState.DECOMMISSIONING);
+    rm.waitForState(id3, NodeState.DECOMMISSIONING);
+
+    // Heartbeat after the node is in DECOMMISSIONING state. This will be the
+    // first heartbeat containing information about the AM container since the
+    // application was submitted.
+    ApplicationAttemptId aaid = app.getCurrentAppAttempt().getAppAttemptId();
+    nm1.nodeHeartbeat(aaid, 1, ContainerState.RUNNING);
+    nm3.nodeHeartbeat(true);
+
+    // host1 should stay in DECOMMISSIONING as it has container running on it.
+    rm.waitForState(id1, NodeState.DECOMMISSIONING);
+    rm.waitForState(id3, NodeState.DECOMMISSIONED);
+
+    // Go through the normal application flow and wait for it to finish.
+    am.registerAppAttempt();
+    rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
+    MockRM.finishAMAndVerifyAppState(app, rm, nm1, am);
+    nm1.nodeHeartbeat(aaid, 1, ContainerState.COMPLETE);
+    rm.waitForState(app.getApplicationId(), RMAppState.FINISHED);
+    rm.waitForState(id1, NodeState.DECOMMISSIONED);
+  }
+
+
   /**
   * Decommissioning using a post-configured include hosts file
   */