瀏覽代碼

Revert due to an error "HDFS-10994. Support an XOR policy XOR-2-1-64k in HDFS. Contributed by Sammi Chen"

This reverts commit 5614f847b2ef2a5b70bd9a06edc4eba06174c6.
Kai Zheng 8 年之前
父節點
當前提交
cfd8076f81
共有 12 個文件被更改,包括 471 次插入240 次删除
  1. 0 3
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ErasureCodeConstants.java
  2. 0 1
      hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
  3. 3 20
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingPolicyManager.java
  4. 2 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
  5. 4 24
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
  6. 16 34
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
  7. 7 20
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java
  8. 9 28
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
  9. 0 33
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedInputStream.java
  10. 0 35
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedOutputStream.java
  11. 0 36
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedOutputStreamWithFailure.java
  12. 430 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java

+ 0 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ErasureCodeConstants.java

@@ -38,7 +38,4 @@ public final class ErasureCodeConstants {
 
   public static final ECSchema RS_6_3_LEGACY_SCHEMA = new ECSchema(
       RS_LEGACY_CODEC_NAME, 6, 3);
-
-  public static final ECSchema XOR_2_1_SCHEMA = new ECSchema(
-      XOR_CODEC_NAME, 2, 1);
 }

+ 0 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java

@@ -147,7 +147,6 @@ public final class HdfsConstants {
   public static final byte RS_6_3_POLICY_ID = 0;
   public static final byte RS_3_2_POLICY_ID = 1;
   public static final byte RS_6_3_LEGACY_POLICY_ID = 2;
-  public static final byte XOR_2_1_POLICY_ID = 3;
 
   /* Hidden constructor */
   protected HdfsConstants() {

+ 3 - 20
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingPolicyManager.java

@@ -36,7 +36,7 @@ import java.util.TreeMap;
 public final class ErasureCodingPolicyManager {
 
   /**
-   * TODO: HDFS-8095.
+   * TODO: HDFS-8095
    */
   private static final int DEFAULT_CELLSIZE = 64 * 1024;
   private static final ErasureCodingPolicy SYS_POLICY1 =
@@ -48,14 +48,10 @@ public final class ErasureCodingPolicyManager {
   private static final ErasureCodingPolicy SYS_POLICY3 =
       new ErasureCodingPolicy(ErasureCodeConstants.RS_6_3_LEGACY_SCHEMA,
           DEFAULT_CELLSIZE, HdfsConstants.RS_6_3_LEGACY_POLICY_ID);
-  private static final ErasureCodingPolicy SYS_POLICY4 =
-      new ErasureCodingPolicy(ErasureCodeConstants.XOR_2_1_SCHEMA,
-          DEFAULT_CELLSIZE, HdfsConstants.XOR_2_1_POLICY_ID);
 
   //We may add more later.
   private static final ErasureCodingPolicy[] SYS_POLICIES =
-      new ErasureCodingPolicy[]{SYS_POLICY1, SYS_POLICY2, SYS_POLICY3,
-          SYS_POLICY4};
+      new ErasureCodingPolicy[]{SYS_POLICY1, SYS_POLICY2, SYS_POLICY3};
 
   // Supported storage policies for striped EC files
   private static final byte[] SUITABLE_STORAGE_POLICIES_FOR_EC_STRIPED_MODE = new byte[] {
@@ -100,19 +96,6 @@ public final class ErasureCodingPolicyManager {
     return SYS_POLICY1;
   }
 
-  /**
-   * Get system-wide policy by policy ID.
-   * @return ecPolicy
-   */
-  public static ErasureCodingPolicy getPolicyByPolicyID(byte id) {
-    for (ErasureCodingPolicy policy : SYS_POLICIES) {
-      if (policy.getId() == id) {
-        return policy;
-      }
-    }
-    return null;
-  }
-
   /**
    * Get all policies that's available to use.
    * @return all policies
@@ -158,7 +141,7 @@ public final class ErasureCodingPolicyManager {
   }
 
   /**
-   * Clear and clean up.
+   * Clear and clean up
    */
   public void clear() {
     activePoliciesByName.clear();

+ 2 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java

@@ -455,13 +455,9 @@ public class INodeFile extends INodeWithAdditionalFields
     if(!isStriped()){
       return max;
     }
-
+    // TODO support more policies based on policyId
     ErasureCodingPolicy ecPolicy =
-        ErasureCodingPolicyManager.getPolicyByPolicyID(
-            getErasureCodingPolicyID());
-    if (ecPolicy == null){
-      ecPolicy = ErasureCodingPolicyManager.getSystemDefaultPolicy();
-    }
+        ErasureCodingPolicyManager.getSystemDefaultPolicy();
     return (short) (ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits());
   }
 

+ 4 - 24
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java

@@ -1888,41 +1888,21 @@ public class DFSTestUtil {
    * Creates the metadata of a file in striped layout. This method only
    * manipulates the NameNode state without injecting data to DataNode.
    * You should disable periodical heartbeat before use this.
-   * @param file Path of the file to create
+   *  @param file Path of the file to create
    * @param dir Parent path of the file
    * @param numBlocks Number of striped block groups to add to the file
    * @param numStripesPerBlk Number of striped cells in each block
    * @param toMkdir
    */
-  public static void createStripedFile(MiniDFSCluster cluster, Path file,
-      Path dir, int numBlocks, int numStripesPerBlk, boolean toMkdir)
-      throws Exception {
-    createStripedFile(cluster, file, dir, numBlocks, numStripesPerBlk,
-        toMkdir, null);
-  }
-
-  /**
-   * Creates the metadata of a file in striped layout. This method only
-   * manipulates the NameNode state without injecting data to DataNode.
-   * You should disable periodical heartbeat before use this.
-   * @param file Path of the file to create
-   * @param dir Parent path of the file
-   * @param numBlocks Number of striped block groups to add to the file
-   * @param numStripesPerBlk Number of striped cells in each block
-   * @param toMkdir
-   * @param ecPolicy erasure coding policy apply to created file. A null value
-   *                 means using default erasure coding policy.
-   */
-  public static void createStripedFile(MiniDFSCluster cluster, Path file,
-      Path dir, int numBlocks, int numStripesPerBlk, boolean toMkdir,
-      ErasureCodingPolicy ecPolicy) throws Exception {
+  public static void createStripedFile(MiniDFSCluster cluster, Path file, Path dir,
+      int numBlocks, int numStripesPerBlk, boolean toMkdir) throws Exception {
     DistributedFileSystem dfs = cluster.getFileSystem();
     // If outer test already set EC policy, dir should be left as null
     if (toMkdir) {
       assert dir != null;
       dfs.mkdirs(dir);
       try {
-        dfs.getClient().setErasureCodingPolicy(dir.toString(), ecPolicy);
+        dfs.getClient().setErasureCodingPolicy(dir.toString(), null);
       } catch (IOException e) {
         if (!e.getMessage().contains("non-empty directory")) {
           throw e;

+ 16 - 34
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java

@@ -64,34 +64,20 @@ public class TestDFSStripedInputStream {
   private DistributedFileSystem fs;
   private final Path dirPath = new Path("/striped");
   private Path filePath = new Path(dirPath, "file");
-  private ErasureCodingPolicy ecPolicy;
-  private short dataBlocks;
-  private short parityBlocks;
-  private int cellSize;
+  private final ErasureCodingPolicy ecPolicy =
+      ErasureCodingPolicyManager.getSystemDefaultPolicy();
+  private final short dataBlocks = (short) ecPolicy.getNumDataUnits();
+  private final short parityBlocks = (short) ecPolicy.getNumParityUnits();
+  private final int cellSize = ecPolicy.getCellSize();
   private final int stripesPerBlock = 2;
-  private int blockSize;
-  private int blockGroupSize;
+  private final int blockSize = stripesPerBlock * cellSize;
+  private final int blockGroupSize =  dataBlocks * blockSize;
 
   @Rule
   public Timeout globalTimeout = new Timeout(300000);
 
-  public ErasureCodingPolicy getEcPolicy() {
-    return ErasureCodingPolicyManager.getSystemDefaultPolicy();
-  }
-
   @Before
   public void setup() throws IOException {
-    /*
-     * Initialize erasure coding policy.
-     */
-    ecPolicy = getEcPolicy();
-    dataBlocks = (short) ecPolicy.getNumDataUnits();
-    parityBlocks = (short) ecPolicy.getNumParityUnits();
-    cellSize = ecPolicy.getCellSize();
-    blockSize = stripesPerBlock * cellSize;
-    blockGroupSize =  dataBlocks * blockSize;
-    System.out.println("EC policy = " + ecPolicy);
-
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
     if (ErasureCodeNative.isNativeCodeLoaded()) {
@@ -108,7 +94,7 @@ public class TestDFSStripedInputStream {
     }
     fs = cluster.getFileSystem();
     fs.mkdirs(dirPath);
-    fs.getClient().setErasureCodingPolicy(dirPath.toString(), ecPolicy);
+    fs.getClient().setErasureCodingPolicy(dirPath.toString(), null);
   }
 
   @After
@@ -120,13 +106,13 @@ public class TestDFSStripedInputStream {
   }
 
   /**
-   * Test {@link DFSStripedInputStream#getBlockAt(long)}.
+   * Test {@link DFSStripedInputStream#getBlockAt(long)}
    */
   @Test
   public void testRefreshBlock() throws Exception {
     final int numBlocks = 4;
     DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
-        stripesPerBlock, false, ecPolicy);
+        stripesPerBlock, false);
     LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
         filePath.toString(), 0, blockGroupSize * numBlocks);
     final DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(),
@@ -150,7 +136,7 @@ public class TestDFSStripedInputStream {
   public void testPread() throws Exception {
     final int numBlocks = 2;
     DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
-        stripesPerBlock, false, ecPolicy);
+        stripesPerBlock, false);
     LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
         filePath.toString(), 0, blockGroupSize * numBlocks);
     int fileLen = blockGroupSize * numBlocks;
@@ -168,9 +154,7 @@ public class TestDFSStripedInputStream {
             bg.getBlock().getBlockPoolId());
       }
 
-      /**
-       * A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks
-       */
+      /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */
       for (int i = 0; i < stripesPerBlock; i++) {
         for (int j = 0; j < dataBlocks; j++) {
           for (int k = 0; k < cellSize; k++) {
@@ -210,7 +194,7 @@ public class TestDFSStripedInputStream {
     final int numBlocks = 4;
     final int failedDNIdx = dataBlocks - 1;
     DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
-        stripesPerBlock, false, ecPolicy);
+        stripesPerBlock, false);
     LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
         filePath.toString(), 0, blockGroupSize);
 
@@ -321,7 +305,7 @@ public class TestDFSStripedInputStream {
       setup();
     }
     DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
-        stripesPerBlock, false, ecPolicy);
+        stripesPerBlock, false);
     LocatedBlocks lbs = fs.getClient().namenode.
         getBlockLocations(filePath.toString(), 0, fileSize);
 
@@ -346,9 +330,7 @@ public class TestDFSStripedInputStream {
     byte[] expected = new byte[fileSize];
 
     for (LocatedBlock bg : lbs.getLocatedBlocks()) {
-      /**
-       * A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks
-       */
+      /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */
       for (int i = 0; i < stripesPerBlock; i++) {
         for (int j = 0; j < dataBlocks; j++) {
           for (int k = 0; k < cellSize; k++) {
@@ -389,7 +371,7 @@ public class TestDFSStripedInputStream {
     final int numBlocks = 4;
     final int failedDNIdx = dataBlocks - 1;
     DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
-        stripesPerBlock, false, ecPolicy);
+        stripesPerBlock, false);
     LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
         filePath.toString(), 0, blockGroupSize);
 

+ 7 - 20
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java

@@ -47,36 +47,23 @@ public class TestDFSStripedOutputStream {
     GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL);
   }
 
-  private ErasureCodingPolicy ecPolicy;
-  private int dataBlocks;
-  private int parityBlocks;
+  private final ErasureCodingPolicy ecPolicy =
+      ErasureCodingPolicyManager.getSystemDefaultPolicy();
+  private final int dataBlocks = ecPolicy.getNumDataUnits();
+  private final int parityBlocks = ecPolicy.getNumParityUnits();
 
   private MiniDFSCluster cluster;
   private DistributedFileSystem fs;
   private Configuration conf;
-  private int cellSize;
+  private final int cellSize = ecPolicy.getCellSize();
   private final int stripesPerBlock = 4;
-  private int blockSize;
+  private final int blockSize = cellSize * stripesPerBlock;
 
   @Rule
   public Timeout globalTimeout = new Timeout(300000);
 
-  public ErasureCodingPolicy getEcPolicy() {
-    return ErasureCodingPolicyManager.getSystemDefaultPolicy();
-  }
-
   @Before
   public void setup() throws IOException {
-    /*
-     * Initialize erasure coding policy.
-     */
-    ecPolicy = getEcPolicy();
-    dataBlocks = (short) ecPolicy.getNumDataUnits();
-    parityBlocks = (short) ecPolicy.getNumParityUnits();
-    cellSize = ecPolicy.getCellSize();
-    blockSize = stripesPerBlock * cellSize;
-    System.out.println("EC policy = " + ecPolicy);
-
     int numDNs = dataBlocks + parityBlocks + 2;
     conf = new Configuration();
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
@@ -89,7 +76,7 @@ public class TestDFSStripedOutputStream {
           NativeRSRawErasureCoderFactory.class.getCanonicalName());
     }
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
-    cluster.getFileSystem().getClient().setErasureCodingPolicy("/", ecPolicy);
+    cluster.getFileSystem().getClient().setErasureCodingPolicy("/", null);
     fs = cluster.getFileSystem();
   }
 

+ 9 - 28
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java

@@ -47,7 +47,6 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Level;
 import org.junit.Assert;
 import org.junit.Assume;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -77,36 +76,18 @@ public class TestDFSStripedOutputStreamWithFailure {
         .getLogger().setLevel(Level.ALL);
   }
 
-  private ErasureCodingPolicy ecPolicy;
-  private int dataBlocks;
-  private int parityBlocks;
-  private int cellSize;
+  private final ErasureCodingPolicy ecPolicy =
+      ErasureCodingPolicyManager.getSystemDefaultPolicy();
+  private final int dataBlocks = ecPolicy.getNumDataUnits();
+  private final int parityBlocks = ecPolicy.getNumParityUnits();
+  private final int cellSize = ecPolicy.getCellSize();
   private final int stripesPerBlock = 4;
-  private int blockSize;
-  private int blockGroupSize;
+  private final int blockSize = cellSize * stripesPerBlock;
+  private final int blockGroupSize = blockSize * dataBlocks;
 
   private static final int FLUSH_POS =
       9 * DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT + 1;
 
-  public ErasureCodingPolicy getEcPolicy() {
-    return ErasureCodingPolicyManager.getSystemDefaultPolicy();
-  }
-
-  /*
-   * Initialize erasure coding policy.
-   */
-  @Before
-  public void init(){
-    ecPolicy = getEcPolicy();
-    dataBlocks = ecPolicy.getNumDataUnits();
-    parityBlocks = ecPolicy.getNumParityUnits();
-    cellSize = ecPolicy.getCellSize();
-    blockSize = cellSize * stripesPerBlock;
-    blockGroupSize = blockSize * dataBlocks;
-    dnIndexSuite = getDnIndexSuite();
-    lengths = newLengths();
-  }
-
   List<Integer> newLengths() {
     final List<Integer> lens = new ArrayList<>();
     lens.add(FLUSH_POS + 2);
@@ -123,7 +104,7 @@ public class TestDFSStripedOutputStreamWithFailure {
     return lens;
   }
 
-  private int[][] dnIndexSuite;
+  private final int[][] dnIndexSuite = getDnIndexSuite();
 
   private int[][] getDnIndexSuite() {
     final int maxNumLevel = 2;
@@ -186,7 +167,7 @@ public class TestDFSStripedOutputStreamWithFailure {
     return positions;
   }
 
-  private List<Integer> lengths;
+  private final List<Integer> lengths = newLengths();
 
   Integer getLength(int i) {
     return i >= 0 && i < lengths.size()? lengths.get(i): null;

+ 0 - 33
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedInputStream.java

@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
-
-/**
- * This tests read operation of DFS striped file with XOR-2-1-64k erasure code
- * policy.
- */
-public class TestDFSXORStripedInputStream extends TestDFSStripedInputStream{
-
-  public ErasureCodingPolicy getEcPolicy() {
-    return ErasureCodingPolicyManager.getPolicyByPolicyID(
-        HdfsConstants.XOR_2_1_POLICY_ID);
-  }
-}

+ 0 - 35
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedOutputStream.java

@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
-
-/**
- * This tests write operation of DFS striped file with XOR-2-1-64k erasure code
- * policy.
- */
-public class TestDFSXORStripedOutputStream extends TestDFSStripedOutputStream{
-
-  @Override
-  public ErasureCodingPolicy getEcPolicy() {
-    return ErasureCodingPolicyManager.getPolicyByPolicyID(
-        HdfsConstants.XOR_2_1_POLICY_ID);
-  }
-}

+ 0 - 36
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedOutputStreamWithFailure.java

@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
-
-/**
- * This tests write operation of DFS striped file with XOR-2-1-64k erasure code
- * policy when there is data node failure.
- */
-public class TestDFSXORStripedOutputStreamWithFailure
-    extends TestDFSStripedOutputStreamWithFailure{
-
-  @Override
-  public ErasureCodingPolicy getEcPolicy() {
-    return ErasureCodingPolicyManager.getPolicyByPolicyID(
-        HdfsConstants.XOR_2_1_POLICY_ID);
-  }
-}

+ 430 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java

@@ -0,0 +1,430 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class TestProportionalCapacityPreemptionPolicyForReservedContainers
+    extends ProportionalCapacityPreemptionPolicyMockFramework {
+  @Before
+  public void setup() {
+    super.setup();
+    conf.setBoolean(
+        CapacitySchedulerConfiguration.PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS,
+        true);
+    policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
+  }
+
+  @Test
+  public void testPreemptionForSimpleReservedContainer() throws IOException {
+    /**
+     * The simplest test of reserved container, Queue structure is:
+     *
+     * <pre>
+     *       root
+     *       /  \
+     *      a    b
+     * </pre>
+     * Guaranteed resource of a/b are 50:50
+     * Total cluster resource = 100
+     * - A has 90 containers on two node, n1 has 45, n2 has 45, size of each
+     * container is 1.
+     * - B has am container at n1, and reserves 1 container with size = 9 at n1,
+     * so B needs to preempt 9 containers from A at n1 instead of randomly
+     * preempt from n1 and n2.
+     */
+    String labelsConfig =
+        "=100,true;";
+    String nodesConfig = // n1 / n2 has no label
+        "n1= res=50;" +
+        "n2= res=50";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 100 9 9]);" + //root
+            "-a(=[50 100 90 0]);" + // a
+            "-b(=[50 100 10 9 9])"; // b
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t" // app1 in a
+            + "(1,1,n1,,45,false)" // 45 in n1
+            + "(1,1,n2,,45,false);" + // 45 in n2
+        "b\t" // app2 in b
+            + "(1,1,n1,,1,false)" // AM container in n1
+            + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // Total 5 preempted from app1 at n1, don't preempt container from other
+    // app/node
+    verify(mDisp, times(5)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    verify(mDisp, times(5)).handle(
+        argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+            NodeId.newInstance("n1", 1))));
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+  }
+
+  @Test
+  public void testUseReservedAndFifoSelectorTogether() throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *       /  \
+     *      a    b
+     * </pre>
+     * Guaranteed resource of a/b are 30:70
+     * Total cluster resource = 100
+     * - A has 45 containers on two node, n1 has 10, n2 has 35, size of each
+     * container is 1.
+     * - B has 5 containers at n2, and reserves 1 container with size = 50 at n1,
+     *   B also has 20 pending resources.
+     * so B needs to preempt:
+     * - 10 containers from n1 (for reserved)
+     * - 5 containers from n2 for pending resources
+     */
+    String labelsConfig =
+        "=100,true;";
+    String nodesConfig = // n1 / n2 has no label
+        "n1= res=50;" +
+        "n2= res=50";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 100 70 10]);" + //root
+            "-a(=[30 100 45 0]);" + // a
+            "-b(=[70 100 55 70 50])"; // b
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t" // app1 in a
+            + "(1,1,n2,,35,false)" // 35 in n2
+            + "(1,1,n1,,10,false);" + // 10 in n1
+            "b\t" // app2 in b
+            + "(1,1,n2,,5,false)" // 5 in n2
+            + "(1,50,n1,,1,true)"; // 1 container with size=50 reserved at n1
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    verify(mDisp, times(15)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    verify(mDisp, times(10)).handle(
+        argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+            NodeId.newInstance("n1", 1))));
+    verify(mDisp, times(5)).handle(
+        argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+            NodeId.newInstance("n2", 1))));
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+  }
+
+  @Test
+  public void testReservedSelectorSkipsAMContainer() throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *       /  \
+     *      a    b
+     * </pre>
+     * Guaranteed resource of a/b are 30:70
+     * Total cluster resource = 100
+     * - A has 45 containers on two node, n1 has 10, n2 has 35, size of each
+     * container is 1.
+     * - B has 5 containers at n2, and reserves 1 container with size = 50 at n1,
+     *   B also has 20 pending resources.
+     *
+     * Ideally B needs to preempt:
+     * - 10 containers from n1 (for reserved)
+     * - 5 containers from n2 for pending resources
+     *
+     * However, since one AM container is located at n1 (from queueA), we cannot
+     * preempt 10 containers from n1 for reserved container. Instead, we will
+     * preempt 15 containers from n2, since containers from queueA launched in n2
+     * are later than containers from queueA launched in n1 (FIFO order of containers)
+     */
+    String labelsConfig =
+        "=100,true;";
+    String nodesConfig = // n1 / n2 has no label
+        "n1= res=50;" +
+            "n2= res=50";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 100 70 10]);" + //root
+            "-a(=[30 100 45 0]);" + // a
+            "-b(=[70 100 55 70 50])"; // b
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t" // app1 in a
+            + "(1,1,n1,,10,false)" // 10 in n1
+            + "(1,1,n2,,35,false);" +// 35 in n2
+            "b\t" // app2 in b
+            + "(1,1,n2,,5,false)" // 5 in n2
+            + "(1,50,n1,,1,true)"; // 1 container with size=50 reserved at n1
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    verify(mDisp, times(15)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    verify(mDisp, times(0)).handle(
+        argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+            NodeId.newInstance("n1", 1))));
+    verify(mDisp, times(15)).handle(
+        argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+            NodeId.newInstance("n2", 1))));
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+  }
+
+  @Test
+  public void testPreemptionForReservedContainerRespectGuaranteedResource()
+      throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *       /  \
+     *      a    b
+     * </pre>
+     * Guaranteed resource of a/b are 85:15
+     * Total cluster resource = 100
+     * - A has 90 containers on two node, n1 has 45, n2 has 45, size of each
+     * container is 1.
+     * - B has am container at n1, and reserves 1 container with size = 9 at n1,
+     *
+     * If we preempt 9 containers from queue-A, queue-A will be below its
+     * guaranteed resource = 90 - 9 = 81 < 85.
+     *
+     * So no preemption will take place
+     */
+    String labelsConfig =
+        "=100,true;";
+    String nodesConfig = // n1 / n2 has no label
+        "n1= res=50;" +
+            "n2= res=50";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 100 9 9]);" + //root
+            "-a(=[85 100 90 0]);" + // a
+            "-b(=[15 100 10 9 9])"; // b
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t" // app1 in a
+            + "(1,1,n1,,45,false)" // 45 in n1
+            + "(1,1,n2,,45,false);" + // 45 in n2
+            "b\t" // app2 in b
+            + "(1,1,n1,,1,false)" // AM container in n1
+            + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+  }
+
+  @Test
+  public void testPreemptionForReservedContainerWhichHasAvailableResource()
+      throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *       /  \
+     *      a    b
+     * </pre>
+     *
+     * Guaranteed resource of a/b are 50:50
+     * Total cluster resource = 100
+     * - A has 90 containers on two node, n1 has 45, n2 has 45, size of each
+     * container is 1.
+     * - B has am container at n1, and reserves 1 container with size = 9 at n1,
+     *
+     * So we can get 4 containers preempted after preemption.
+     * (reserved 5 + preempted 4) = 9
+     */
+    String labelsConfig =
+        "=100,true;";
+    String nodesConfig = // n1 / n2 has no label
+        "n1= res=50;" +
+            "n2= res=50";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 99 9 9]);" + //root
+            "-a(=[50 100 90 0]);" + // a
+            "-b(=[50 100 9 9 9])"; // b
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t" // app1 in a
+            + "(1,1,n1,,45,false)" // 45 in n1
+            + "(1,1,n2,,45,false);" + // 45 in n2
+            "b\t" // app2 in b
+            + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // Total 4 preempted from app1 at n1, don't preempt container from other
+    // app/node
+    verify(mDisp, times(4)).handle(argThat(
+        new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+            NodeId.newInstance("n1", 1))));
+    verify(mDisp, times(0)).handle(argThat(
+        new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+            NodeId.newInstance("n2", 1))));
+  }
+
+  @Test
+  public void testPreemptionForReservedContainerWhichHasNondivisibleAvailableResource()
+      throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *       /  \
+     *      a    b
+     * </pre>
+     *
+     * Guaranteed resource of a/b are 50:50
+     * Total cluster resource = 100
+     * - A has 45 containers on two node, size of each container is 2,
+     *   n1 has 23, n2 has 22
+     * - B reserves 1 container with size = 9 at n1,
+     *
+     * So we can get 4 containers (total-resource = 8) preempted after
+     * preemption. Actual required is 3.5, but we need to preempt integer
+     * number of containers
+     */
+    String labelsConfig =
+        "=100,true;";
+    String nodesConfig = // n1 / n2 has no label
+        "n1= res=50;" +
+            "n2= res=50";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 99 9 9]);" + //root
+            "-a(=[50 100 90 0]);" + // a
+            "-b(=[50 100 9 9 9])"; // b
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t" // app1 in a
+            + "(1,2,n1,,24,false)" // 48 in n1
+            + "(1,2,n2,,23,false);" + // 46 in n2
+            "b\t" // app2 in b
+            + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // Total 4 preempted from app1 at n1, don't preempt container from other
+    // app/node
+    verify(mDisp, times(4)).handle(argThat(
+        new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+            NodeId.newInstance("n1", 1))));
+    verify(mDisp, times(0)).handle(argThat(
+        new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+            NodeId.newInstance("n2", 1))));
+  }
+
+  @Test
+  public void testPreemptionForReservedContainerRespectAvailableResources()
+      throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *       /  \
+     *      a    b
+     * </pre>
+     *
+     * Guaranteed resource of a/b are 50:50
+     * Total cluster resource = 100, 4 nodes, 25 on each node
+     * - A has 10 containers on every node, size of container is 2
+     * - B reserves 1 container with size = 9 at n1,
+     *
+     * So even if we cannot allocate container for B now, no preemption should
+     * happen since there're plenty of available resources.
+     */
+    String labelsConfig =
+        "=100,true;";
+    String nodesConfig =
+        "n1= res=25;" +
+            "n2= res=25;" +
+            "n3= res=25;" +
+            "n4= res=25;";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 89 9 9]);" + //root
+            "-a(=[50 100 80 0]);" + // a
+            "-b(=[50 100 9 9 9])"; // b
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t" // app1 in a
+            + "(1,2,n1,,10,false)" // 10 in n1
+            + "(1,2,n2,,10,false)" // 10 in n2
+            + "(1,2,n3,,10,false)" // 10 in n3
+            + "(1,2,n4,,10,false);" + // 10 in n4
+            "b\t" // app2 in b
+            + "(1,9,n1,,1,true)"; // 1 container with size=5 reserved at n1
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // No preemption should happen
+    verify(mDisp, times(0)).handle(argThat(
+        new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+            NodeId.newInstance("n1", 1))));
+    verify(mDisp, times(0)).handle(argThat(
+        new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+            NodeId.newInstance("n2", 1))));
+    verify(mDisp, times(0)).handle(argThat(
+        new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+            NodeId.newInstance("n3", 1))));
+    verify(mDisp, times(0)).handle(argThat(
+        new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+            NodeId.newInstance("n4", 1))));
+  }
+}