Browse Source

YARN-9037. [CSI] Ignore volume resource in resource calculators based on tags. Contributed by Sunil Govindan.

Weiwei Yang 6 years ago
parent
commit
0921b706f7
15 changed files with 236 additions and 36 deletions
  1. 26 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java
  2. 16 16
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
  3. 8 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
  4. 53 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java
  5. 58 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/resource-types/resource-types-6.xml
  6. 3 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/volume/csi/ContainerVolumePublisher.java
  7. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
  8. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
  9. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
  10. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
  11. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
  12. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ConfigurableResource.java
  13. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
  14. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
  15. 61 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java

+ 26 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java

@@ -60,6 +60,7 @@ public class ResourceUtils {
   public static final String TAGS = ".tags";
   public static final String MINIMUM_ALLOCATION = ".minimum-allocation";
   public static final String MAXIMUM_ALLOCATION = ".maximum-allocation";
+  public static final String EXTERNAL_VOLUME_RESOURCE_TAG = "system:csi-volume";
 
   private static final String MEMORY = ResourceInformation.MEMORY_MB.getName();
   private static final String VCORES = ResourceInformation.VCORES.getName();
@@ -74,10 +75,12 @@ public class ResourceUtils {
   private static final Map<String, Integer> RESOURCE_NAME_TO_INDEX =
       new ConcurrentHashMap<String, Integer>();
   private static volatile Map<String, ResourceInformation> resourceTypes;
+  private static volatile Map<String, ResourceInformation> nonCountableResourceTypes;
   private static volatile ResourceInformation[] resourceTypesArray;
   private static volatile boolean initializedNodeResources = false;
   private static volatile Map<String, ResourceInformation> readOnlyNodeResources;
   private static volatile int numKnownResourceTypes = -1;
+  private static volatile int numNonCountableResourceTypes = -1;
 
   static final Logger LOG = LoggerFactory.getLogger(ResourceUtils.class);
 
@@ -290,15 +293,18 @@ public class ResourceUtils {
   public static void initializeResourcesFromResourceInformationMap(
       Map<String, ResourceInformation> resourceInformationMap) {
     resourceTypes = Collections.unmodifiableMap(resourceInformationMap);
+    nonCountableResourceTypes = new HashMap<>();
     updateKnownResources();
     updateResourceTypeIndex();
     initializedResources = true;
     numKnownResourceTypes = resourceTypes.size();
+    numNonCountableResourceTypes = nonCountableResourceTypes.size();
   }
 
   private static void updateKnownResources() {
     // Update resource names.
     resourceTypesArray = new ResourceInformation[resourceTypes.size()];
+    List<ResourceInformation> nonCountableResources = new ArrayList<>();
 
     int index = 2;
     for (ResourceInformation resInfo : resourceTypes.values()) {
@@ -309,10 +315,22 @@ public class ResourceUtils {
         resourceTypesArray[1] = ResourceInformation
             .newInstance(resourceTypes.get(VCORES));
       } else {
+        if (resInfo.getTags() != null && resInfo.getTags()
+            .contains(EXTERNAL_VOLUME_RESOURCE_TAG)) {
+          nonCountableResources.add(resInfo);
+          continue;
+        }
         resourceTypesArray[index] = ResourceInformation.newInstance(resInfo);
         index++;
       }
     }
+
+    // Add all non-countable resource types to the end of the resource array.
+    for(ResourceInformation resInfo: nonCountableResources) {
+      resourceTypesArray[index] = ResourceInformation.newInstance(resInfo);
+      nonCountableResourceTypes.put(resInfo.getName(), resInfo);
+      index++;
+    }
   }
 
   private static void updateResourceTypeIndex() {
@@ -355,6 +373,13 @@ public class ResourceUtils {
     return numKnownResourceTypes;
   }
 
+  public static int getNumberOfCountableResourceTypes() {
+    if (numKnownResourceTypes < 0) {
+      initializeResourceTypesIfNeeded();
+    }
+    return numKnownResourceTypes - numNonCountableResourceTypes;
+  }
+
   private static Map<String, ResourceInformation> getResourceTypes(
       Configuration conf) {
     return getResourceTypes(conf,
@@ -383,6 +408,7 @@ public class ResourceUtils {
       }
     }
     numKnownResourceTypes = resourceTypes.size();
+    numNonCountableResourceTypes = nonCountableResourceTypes.size();
   }
 
   private static Map<String, ResourceInformation> getResourceTypes(

+ 16 - 16
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java

@@ -72,7 +72,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
     boolean rhsGreater = false;
     int ret = 0;
 
-    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
     for (int i = 0; i < maxLength; i++) {
       ResourceInformation lhsResourceInformation = lhs
           .getResourceInformation(i);
@@ -110,7 +110,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
     // resources and then look for which resource has the biggest
     // share overall.
     ResourceInformation[] clusterRes = clusterResource.getResources();
-    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
 
     // If array creation shows up as a time sink, these arrays could be cached
     // because they're always the same length.
@@ -183,7 +183,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
     ResourceInformation[] firstRes = first.getResources();
     ResourceInformation[] secondRes = second.getResources();
 
-    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
     for (int i = 0; i < maxLength; i++) {
       firstShares[i] = calculateShare(clusterRes[i], firstRes[i]);
       secondShares[i] = calculateShare(clusterRes[i], secondRes[i]);
@@ -274,7 +274,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
     max[0] = 0.0;
     max[1] = 0.0;
 
-    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
     for (int i = 0; i < maxLength; i++) {
       firstShares[i] = calculateShare(clusterRes[i], firstRes[i]);
       secondShares[i] = calculateShare(clusterRes[i], secondRes[i]);
@@ -330,7 +330,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
   public long computeAvailableContainers(Resource available,
       Resource required) {
     long min = Long.MAX_VALUE;
-    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
     for (int i = 0; i < maxLength; i++) {
       ResourceInformation availableResource = available
           .getResourceInformation(i);
@@ -346,7 +346,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
   @Override
   public float divide(Resource clusterResource,
       Resource numerator, Resource denominator) {
-    int nKnownResourceTypes = ResourceUtils.getNumberOfKnownResourceTypes();
+    int nKnownResourceTypes = ResourceUtils.getNumberOfCountableResourceTypes();
     ResourceInformation[] clusterRes = clusterResource.getResources();
     // We have to provide the calculateShares() method with somewhere to store
     // the shares. We don't actually need these shares afterwards.
@@ -375,7 +375,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
   @Override
   public float ratio(Resource a, Resource b) {
     float ratio = 0.0f;
-    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
     for (int i = 0; i < maxLength; i++) {
       ResourceInformation aResourceInformation = a.getResourceInformation(i);
       ResourceInformation bResourceInformation = b.getResourceInformation(i);
@@ -393,7 +393,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
 
   public Resource divideAndCeil(Resource numerator, long denominator) {
     Resource ret = Resource.newInstance(numerator);
-    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
     for (int i = 0; i < maxLength; i++) {
       ResourceInformation resourceInformation = ret.getResourceInformation(i);
       resourceInformation
@@ -414,7 +414,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
   public Resource normalize(Resource r, Resource minimumResource,
       Resource maximumResource, Resource stepFactor) {
     Resource ret = Resource.newInstance(r);
-    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
     for (int i = 0; i < maxLength; i++) {
       ResourceInformation rResourceInformation = r.getResourceInformation(i);
       ResourceInformation minimumResourceInformation = minimumResource
@@ -448,7 +448,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
 
   private Resource rounding(Resource r, Resource stepFactor, boolean roundUp) {
     Resource ret = Resource.newInstance(r);
-    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
     for (int i = 0; i < maxLength; i++) {
       ResourceInformation rResourceInformation = r.getResourceInformation(i);
       ResourceInformation stepFactorResourceInformation = stepFactor
@@ -473,7 +473,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
   public Resource multiplyAndNormalizeUp(Resource r, double[] by,
       Resource stepFactor) {
     Resource ret = Resource.newInstance(r);
-    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
     for (int i = 0; i < maxLength; i++) {
       ResourceInformation rResourceInformation = r.getResourceInformation(i);
       ResourceInformation stepFactorResourceInformation = stepFactor
@@ -502,7 +502,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
   private Resource multiplyAndNormalize(Resource r, double by,
       Resource stepFactor, boolean roundUp) {
     Resource ret = Resource.newInstance(r);
-    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
     for (int i = 0; i < maxLength; i++) {
       ResourceInformation rResourceInformation = r.getResourceInformation(i);
       ResourceInformation stepFactorResourceInformation = stepFactor
@@ -528,7 +528,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
 
   @Override
   public boolean fitsIn(Resource smaller, Resource bigger) {
-    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
     for (int i = 0; i < maxLength; i++) {
       ResourceInformation sResourceInformation = smaller
           .getResourceInformation(i);
@@ -544,7 +544,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
   @Override
   public Resource normalizeDown(Resource r, Resource stepFactor) {
     Resource ret = Resource.newInstance(r);
-    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
     for (int i = 0; i < maxLength; i++) {
       ResourceInformation rResourceInformation = r.getResourceInformation(i);
       ResourceInformation stepFactorResourceInformation = stepFactor
@@ -564,7 +564,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
 
   @Override
   public boolean isAnyMajorResourceZeroOrNegative(Resource resource) {
-    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
     for (int i = 0; i < maxLength; i++) {
       ResourceInformation resourceInformation = resource.getResourceInformation(
           i);
@@ -577,7 +577,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
 
   @Override
   public boolean isAnyMajorResourceAboveZero(Resource resource) {
-    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
     for (int i = 0; i < maxLength; i++) {
       ResourceInformation resourceInformation = resource.getResourceInformation(
           i);

+ 8 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java

@@ -251,7 +251,7 @@ public class Resources {
   }
 
   public static Resource addTo(Resource lhs, Resource rhs) {
-    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
     for (int i = 0; i < maxLength; i++) {
       try {
         ResourceInformation rhsValue = rhs.getResourceInformation(i);
@@ -270,7 +270,7 @@ public class Resources {
   }
 
   public static Resource subtractFrom(Resource lhs, Resource rhs) {
-    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
     for (int i = 0; i < maxLength; i++) {
       try {
         ResourceInformation rhsValue = rhs.getResourceInformation(i);
@@ -325,7 +325,7 @@ public class Resources {
   }
 
   public static Resource multiplyTo(Resource lhs, double by) {
-    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
     for (int i = 0; i < maxLength; i++) {
       try {
         ResourceInformation lhsValue = lhs.getResourceInformation(i);
@@ -348,7 +348,7 @@ public class Resources {
    */
   public static Resource multiplyAndAddTo(
       Resource lhs, Resource rhs, double by) {
-    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
     for (int i = 0; i < maxLength; i++) {
       try {
         ResourceInformation rhsValue = rhs.getResourceInformation(i);
@@ -381,7 +381,7 @@ public class Resources {
   
   public static Resource multiplyAndRoundDown(Resource lhs, double by) {
     Resource out = clone(lhs);
-    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
     for (int i = 0; i < maxLength; i++) {
       try {
         ResourceInformation lhsValue = lhs.getResourceInformation(i);
@@ -490,7 +490,7 @@ public class Resources {
   }
   
   public static boolean fitsIn(Resource smaller, Resource bigger) {
-    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
     for (int i = 0; i < maxLength; i++) {
       try {
         ResourceInformation rhsValue = bigger.getResourceInformation(i);
@@ -513,7 +513,7 @@ public class Resources {
   
   public static Resource componentwiseMin(Resource lhs, Resource rhs) {
     Resource ret = createResource(0);
-    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
     for (int i = 0; i < maxLength; i++) {
       try {
         ResourceInformation rhsValue = rhs.getResourceInformation(i);
@@ -532,7 +532,7 @@ public class Resources {
   
   public static Resource componentwiseMax(Resource lhs, Resource rhs) {
     Resource ret = createResource(0);
-    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
     for (int i = 0; i < maxLength; i++) {
       try {
         ResourceInformation rhsValue = rhs.getResourceInformation(i);

+ 53 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java

@@ -400,4 +400,57 @@ public class TestResourceUtils {
     ResourceUtils.getResourceTypes();
     return dest.getAbsolutePath();
   }
+
+  @Test
+  public void testMultipleOpsForResourcesWithTags() throws Exception {
+
+    Configuration conf = new YarnConfiguration();
+    setupResourceTypes(conf, "resource-types-6.xml");
+    Resource resourceA = Resource.newInstance(2, 4);
+    Resource resourceB = Resource.newInstance(3, 6);
+
+    resourceA.setResourceInformation("resource1",
+        ResourceInformation.newInstance("resource1", "T", 5L));
+
+    resourceA.setResourceInformation("resource2",
+        ResourceInformation.newInstance("resource2", "M", 2L));
+    resourceA.setResourceInformation("yarn.io/gpu",
+        ResourceInformation.newInstance("yarn.io/gpu", "", 1));
+    resourceA.setResourceInformation("yarn.io/test-volume",
+        ResourceInformation.newInstance("yarn.io/test-volume", "", 2));
+
+    resourceB.setResourceInformation("resource1",
+        ResourceInformation.newInstance("resource1", "T", 3L));
+
+    resourceB.setResourceInformation("resource2",
+        ResourceInformation.newInstance("resource2", "M", 4L));
+    resourceB.setResourceInformation("yarn.io/gpu",
+        ResourceInformation.newInstance("yarn.io/gpu", "", 2));
+    resourceB.setResourceInformation("yarn.io/test-volume",
+        ResourceInformation.newInstance("yarn.io/test-volume", "", 3));
+
+    Resource addedResource = Resources.add(resourceA, resourceB);
+    Assert.assertEquals(addedResource.getMemorySize(), 5);
+    Assert.assertEquals(addedResource.getVirtualCores(), 10);
+    Assert.assertEquals(
+        addedResource.getResourceInformation("resource1").getValue(), 8);
+
+    // Verify that value of resourceA and resourceB is not added up for
+    // "yarn.io/test-volume".
+    Assert.assertEquals(
+        addedResource.getResourceInformation("yarn.io/test-volume").getValue(),
+        2);
+
+    Resource mulResource = Resources.multiplyAndRoundDown(resourceA, 3);
+    Assert.assertEquals(mulResource.getMemorySize(), 6);
+    Assert.assertEquals(mulResource.getVirtualCores(), 12);
+    Assert.assertEquals(
+        mulResource.getResourceInformation("resource1").getValue(), 15);
+
+    // Verify that value of resourceA is not multiplied up for
+    // "yarn.io/test-volume".
+    Assert.assertEquals(
+        mulResource.getResourceInformation("yarn.io/test-volume").getValue(),
+        2);
+  }
 }

+ 58 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/resource-types/resource-types-6.xml

@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License. See accompanying LICENSE file.
+-->
+
+<configuration>
+
+    <property>
+        <name>yarn.resource-types</name>
+        <value>resource1,resource2,resource3,yarn.io/gpu,yarn.io/test-volume</value>
+    </property>
+
+    <property>
+        <name>yarn.resource-types.resource1.units</name>
+        <value>G</value>
+    </property>
+
+    <property>
+        <name>yarn.resource-types.resource2.units</name>
+        <value>m</value>
+    </property>
+
+    <property>
+        <name>yarn.resource-types.resource3.units</name>
+        <value>G</value>
+    </property>
+
+    <property>
+        <name>yarn.resource-types.resource3.tags</name>
+        <value>resource3_tag_1,resource3_tag_2</value>
+    </property>
+
+    <property>
+        <name>yarn.resource-types.yarn.io/gpu.units</name>
+        <value></value>
+    </property>
+
+    <property>
+        <name>yarn.resource-types.yarn.io/test-volume.units</name>
+        <value>G</value>
+    </property>
+
+    <property>
+        <name>yarn.resource-types.yarn.io/test-volume.tags</name>
+        <value>system:csi-volume</value>
+    </property>
+</configuration>

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/volume/csi/ContainerVolumePublisher.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DockerLinuxContainerRuntime;
+import org.apache.hadoop.yarn.server.volume.csi.CsiConstants;
 import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData;
 import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
 import org.slf4j.Logger;
@@ -120,7 +121,8 @@ public class ContainerVolumePublisher {
     if (containerResource != null) {
       for (ResourceInformation resourceInformation :
           containerResource.getAllResourcesListCopy()) {
-        if (resourceInformation.getTags().contains("system:csi-volume")) {
+        if (resourceInformation.getTags()
+            .contains(CsiConstants.CSI_VOLUME_RESOURCE_TAG)) {
           volumes.addAll(VolumeMetaData.fromResource(resourceInformation));
         }
       }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java

@@ -262,7 +262,7 @@ public class AbstractPreemptableResourceCalculator {
   private void resetCapacity(Resource clusterResource,
       Collection<TempQueuePerPartition> queues, boolean ignoreGuar) {
     Resource activeCap = Resource.newInstance(0, 0);
-    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
 
     if (ignoreGuar) {
       for (TempQueuePerPartition q : queues) {

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java

@@ -98,7 +98,7 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
     }
 
     this.normalizedGuarantee = new double[ResourceUtils
-        .getNumberOfKnownResourceTypes()];
+        .getNumberOfCountableResourceTypes()];
     this.children = new ArrayList<>();
     this.apps = new ArrayList<>();
     this.untouchableExtra = Resource.newInstance(0, 0);

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java

@@ -73,7 +73,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
   private boolean reportedMaxAllocation = false;
 
   public ClusterNodeTracker() {
-    maxAllocation = new long[ResourceUtils.getNumberOfKnownResourceTypes()];
+    maxAllocation = new long[ResourceUtils.getNumberOfCountableResourceTypes()];
     Arrays.fill(maxAllocation, -1);
   }
 

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java

@@ -355,7 +355,7 @@ public class SchedulerUtils {
   private static Map<String, ResourceInformation> getZeroResources(
       Resource resource) {
     Map<String, ResourceInformation> resourceInformations = Maps.newHashMap();
-    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
 
     for (int i = 0; i < maxLength; i++) {
       ResourceInformation resourceInformation =
@@ -372,7 +372,7 @@ public class SchedulerUtils {
   @VisibleForTesting
   static void checkResourceRequestAgainstAvailableResource(Resource reqResource,
       Resource availableResource) throws InvalidResourceRequestException {
-    for (int i = 0; i < ResourceUtils.getNumberOfKnownResourceTypes(); i++) {
+    for (int i = 0; i < ResourceUtils.getNumberOfCountableResourceTypes(); i++) {
       final ResourceInformation requestedRI =
           reqResource.getResourceInformation(i);
       final String reqResourceName = requestedRI.getName();
@@ -404,7 +404,7 @@ public class SchedulerUtils {
     }
 
     List<ResourceInformation> invalidResources = Lists.newArrayList();
-    for (int i = 0; i < ResourceUtils.getNumberOfKnownResourceTypes(); i++) {
+    for (int i = 0; i < ResourceUtils.getNumberOfCountableResourceTypes(); i++) {
       final ResourceInformation requestedRI =
           reqResource.getResourceInformation(i);
       final String reqResourceName = requestedRI.getName();

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java

@@ -1031,7 +1031,7 @@ public class ParentQueue extends AbstractCSQueue {
   private Resource getMinResourceNormalized(String name, Map<String, Float> effectiveMinRatio,
       Resource minResource) {
     Resource ret = Resource.newInstance(minResource);
-    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
     for (int i = 0; i < maxLength; i++) {
       ResourceInformation nResourceInformation = minResource
           .getResourceInformation(i);
@@ -1055,7 +1055,7 @@ public class ParentQueue extends AbstractCSQueue {
       Resource configuredMinResources, Resource numeratorForMinRatio) {
     Map<String, Float> effectiveMinRatioPerResource = new HashMap<>();
     if (numeratorForMinRatio != null) {
-      int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+      int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
       for (int i = 0; i < maxLength; i++) {
         ResourceInformation nResourceInformation = numeratorForMinRatio
             .getResourceInformation(i);

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ConfigurableResource.java

@@ -63,7 +63,7 @@ public class ConfigurableResource {
 
   private static double[] getOneHundredPercentArray() {
     double[] resourcePercentages =
-        new double[ResourceUtils.getNumberOfKnownResourceTypes()];
+        new double[ResourceUtils.getNumberOfCountableResourceTypes()];
     Arrays.fill(resourcePercentages, 1.0);
 
     return resourcePercentages;

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java

@@ -599,7 +599,7 @@ public class FairSchedulerConfiguration extends Configuration {
   private static double[] getResourcePercentage(
       String val) throws AllocationConfigurationException {
     int numberOfKnownResourceTypes = ResourceUtils
-        .getNumberOfKnownResourceTypes();
+        .getNumberOfCountableResourceTypes();
     double[] resourcePercentage = new double[numberOfKnownResourceTypes];
     String[] strings = val.split(",");
 

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java

@@ -48,7 +48,7 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
   public static final String NAME = "DRF";
 
   private static final int NUM_RESOURCES =
-      ResourceUtils.getNumberOfKnownResourceTypes();
+      ResourceUtils.getNumberOfCountableResourceTypes();
   private static final DominantResourceFairnessComparator COMPARATORN =
       new DominantResourceFairnessComparatorN();
   private static final DominantResourceFairnessComparator COMPARATOR2 =

+ 61 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java

@@ -17,14 +17,17 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
 import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.api.records.ResourceSizing;
@@ -57,7 +60,9 @@ import org.mockito.Mockito;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doReturn;
@@ -132,6 +137,9 @@ public class TestVolumeProcessor {
       yarnConf.set(YarnConfiguration.RESOURCE_TYPES, VOLUME_RESOURCE_NAME);
       yarnConf.set("yarn.resource-types."
           + VOLUME_RESOURCE_NAME + ".units", "Mi");
+      yarnConf.set("yarn.resource-types."
+          + VOLUME_RESOURCE_NAME + ".tags",
+          CsiConstants.CSI_VOLUME_RESOURCE_TAG);
       yarnConf.writeXml(fw);
     } finally {
       fw.close();
@@ -267,4 +275,57 @@ public class TestVolumeProcessor {
     }
     rm.stop();
   }
+
+  @Test (timeout = 10000L)
+  public void testVolumeResourceAllocate() throws Exception {
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]);
+    Resource resource = Resource.newInstance(1024, 1);
+    ResourceInformation volumeResource = ResourceInformation
+        .newInstance(VOLUME_RESOURCE_NAME, "Mi", 1024,
+            ResourceTypes.COUNTABLE, 0, Long.MAX_VALUE,
+            ImmutableSet.of(CsiConstants.CSI_VOLUME_RESOURCE_TAG),
+            ImmutableMap.of(
+                CsiConstants.CSI_VOLUME_ID, "test-vol-000001",
+                CsiConstants.CSI_DRIVER_NAME, "hostpath",
+                CsiConstants.CSI_VOLUME_MOUNT, "/mnt/data"
+            )
+        );
+    resource.setResourceInformation(VOLUME_RESOURCE_NAME, volumeResource);
+    SchedulingRequest sc = SchedulingRequest
+        .newBuilder().allocationRequestId(0L)
+        .resourceSizing(ResourceSizing.newInstance(1, resource))
+        .build();
+
+    // inject adaptor client for testing
+    CsiAdaptorProtocol mockedClient = Mockito
+        .mock(CsiAdaptorProtocol.class);
+    rm.getRMContext().getVolumeManager()
+        .registerCsiDriverAdaptor("hostpath", mockedClient);
+
+    // simulate validation succeed
+    doReturn(ValidateVolumeCapabilitiesResponse.newInstance(true, ""))
+        .when(mockedClient)
+        .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
+
+    am1.addSchedulingRequest(ImmutableList.of(sc));
+    List<Container> allocated = new ArrayList<>();
+    while (allocated.size() != 1) {
+      AllocateResponse response = am1.schedule();
+      mockNMS[0].nodeHeartbeat(true);
+      allocated.addAll(response.getAllocatedContainers());
+      Thread.sleep(500);
+    }
+
+    Assert.assertEquals(1, allocated.size());
+    Container alloc = allocated.get(0);
+    Assert.assertEquals(alloc.getResource().getMemorySize(), 1024);
+    Assert.assertEquals(alloc.getResource().getVirtualCores(), 1);
+    ResourceInformation allocatedVolume =
+        alloc.getResource().getResourceInformation(VOLUME_RESOURCE_NAME);
+    Assert.assertNotNull(allocatedVolume);
+    Assert.assertEquals(allocatedVolume.getValue(), 1024);
+    Assert.assertEquals(allocatedVolume.getUnits(), "Mi");
+    rm.stop();
+  }
 }