Browse Source

YARN-8902. [CSI] Add volume manager that manages CSI volume lifecycle. Contributed by Weiwei Yang.

Sunil G 6 năm trước cách đây
mục cha
commit
4e7284443e
40 tập tin đã thay đổi với 2594 bổ sung0 xóa
  1. 34 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiAdaptorClientProtocol.java
  2. 37 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiConstants.java
  3. 107 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/VolumeCapabilityRange.java
  4. 59 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/VolumeId.java
  5. 227 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/VolumeMetaData.java
  6. 28 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/InvalidVolumeException.java
  7. 34 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/VolumeException.java
  8. 32 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/VolumeProvisioningException.java
  9. 27 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/package-info.java
  10. 27 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/package-info.java
  11. 14 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
  12. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
  13. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
  14. 14 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
  15. 36 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/CsiAdaptorClient.java
  16. 106 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeBuilder.java
  17. 63 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java
  18. 108 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java
  19. 60 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeStates.java
  20. 30 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/ControllerPublishVolumeEvent.java
  21. 30 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/ValidateVolumeEvent.java
  22. 43 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/VolumeEvent.java
  23. 29 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/VolumeEventType.java
  24. 27 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/package-info.java
  25. 37 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java
  26. 199 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java
  27. 35 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeState.java
  28. 27 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/package-info.java
  29. 27 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/package-info.java
  30. 158 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java
  31. 27 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/package-info.java
  32. 32 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioner.java
  33. 87 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioningResults.java
  34. 66 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioningTask.java
  35. 27 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/package-info.java
  36. 27 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/package-info.java
  37. 67 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeCapabilityRange.java
  38. 161 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java
  39. 178 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeMetaData.java
  40. 250 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java

+ 34 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiAdaptorClientProtocol.java

@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.volume.csi;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException;
+
+/**
+ * Protocol for the CSI adaptor.
+ */
+@Private
+@Unstable
+public interface CsiAdaptorClientProtocol {
+
+  void validateVolume() throws VolumeException;
+
+  void controllerPublishVolume() throws VolumeException;
+}

+ 37 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiConstants.java

@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.volume.csi;
+
+/**
+ * CSI constants.
+ */
+public final class CsiConstants {
+
+  private CsiConstants() {
+    // Hide the constructor for this constant class.
+  }
+
+  public static final String CSI_VOLUME_NAME = "volume.name";
+  public static final String CSI_VOLUME_ID = "volume.id";
+  public static final String CSI_VOLUME_CAPABILITY = "volume.capability";
+  public static final String CSI_DRIVER_NAME = "driver.name";
+  public static final String CSI_VOLUME_MOUNT = "volume.mount";
+  public static final String CSI_VOLUME_ACCESS_MODE =  "volume.accessMode";
+
+  public static final String CSI_VOLUME_RESOURCE_TAG = "system:csi-volume";
+}

+ 107 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/VolumeCapabilityRange.java

@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.volume.csi;
+
+import com.google.common.base.Strings;
+import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
+
+/**
+ * Volume capability range that specified in a volume resource request,
+ * this range defines the desired min/max capacity.
+ */
+public final class VolumeCapabilityRange {
+
+  private final long minCapacity;
+  private final long maxCapacity;
+  private final String unit;
+
+  private VolumeCapabilityRange(long minCapacity,
+      long maxCapacity, String unit) {
+    this.minCapacity = minCapacity;
+    this.maxCapacity = maxCapacity;
+    this.unit = unit;
+  }
+
+  public long getMinCapacity() {
+    return minCapacity;
+  }
+
+  public long getMaxCapacity() {
+    return maxCapacity;
+  }
+
+  public String getUnit() {
+    return unit;
+  }
+
+  @Override
+  public String toString() {
+    return "MinCapability: " + minCapacity + unit
+        + ", MaxCapability: " + maxCapacity + unit;
+  }
+
+  public static VolumeCapabilityBuilder newBuilder() {
+    return new VolumeCapabilityBuilder();
+  }
+
+  /**
+   * The builder used to build a VolumeCapabilityRange instance.
+   */
+  public static class VolumeCapabilityBuilder {
+    // An invalid default value implies this value must be set
+    private long minCap = -1L;
+    private long maxCap = Long.MAX_VALUE;
+    private String unit;
+
+    public VolumeCapabilityBuilder minCapacity(long minCapacity) {
+      this.minCap = minCapacity;
+      return this;
+    }
+
+    public VolumeCapabilityBuilder maxCapacity(long maxCapacity) {
+      this.maxCap = maxCapacity;
+      return this;
+    }
+
+    public VolumeCapabilityBuilder unit(String capacityUnit) {
+      this.unit = capacityUnit;
+      return this;
+    }
+
+    public VolumeCapabilityRange build() throws InvalidVolumeException {
+      VolumeCapabilityRange
+          capability = new VolumeCapabilityRange(minCap, maxCap, unit);
+      validateCapability(capability);
+      return capability;
+    }
+
+    private void validateCapability(VolumeCapabilityRange capability)
+        throws InvalidVolumeException {
+      if (capability.getMinCapacity() < 0) {
+        throw new InvalidVolumeException("Invalid volume capability range,"
+            + " minimal capability must not be less than 0. Capability: "
+            + capability.toString());
+      }
+      if (Strings.isNullOrEmpty(capability.getUnit())) {
+        throw new InvalidVolumeException("Invalid volume capability range,"
+            + " capability unit is missing. Capability: "
+            + capability.toString());
+      }
+    }
+  }
+}

+ 59 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/VolumeId.java

@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.volume.csi;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Unique ID for a volume. This may or may not come from a storage system,
+ * YARN depends on this ID to recognized volumes and manage their states.
+ */
+public class VolumeId {
+
+  private final String volumeId;
+
+  public VolumeId(String volumeId) {
+    this.volumeId = volumeId;
+  }
+
+  public String getId() {
+    return this.volumeId;
+  }
+
+  @Override
+  public String toString() {
+    return this.volumeId;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null || !(obj instanceof VolumeId)) {
+      return false;
+    }
+    return StringUtils.equalsIgnoreCase(volumeId,
+        ((VolumeId) obj).getId());
+  }
+
+  @Override
+  public int hashCode() {
+    HashCodeBuilder hc = new HashCodeBuilder();
+    hc.append(volumeId);
+    return hc.toHashCode();
+  }
+}

+ 227 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/VolumeMetaData.java

@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.volume.csi;
+
+import com.google.common.base.Strings;
+import com.google.gson.JsonObject;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * VolumeMetaData defines all valid info for a CSI compatible volume.
+ */
+public class VolumeMetaData {
+
+  private VolumeId volumeId;
+  private String volumeName;
+  private VolumeCapabilityRange volumeCapabilityRange;
+  private String driverName;
+  private String mountPoint;
+
+  private void setVolumeId(VolumeId volumeId) {
+    this.volumeId = volumeId;
+  }
+
+  private void setVolumeName(String volumeName) {
+    this.volumeName = volumeName;
+  }
+
+  private void setVolumeCapabilityRange(VolumeCapabilityRange capability) {
+    this.volumeCapabilityRange = capability;
+  }
+
+  private void setDriverName(String driverName) {
+    this.driverName = driverName;
+  }
+
+  private void setMountPoint(String mountPoint) {
+    this.mountPoint = mountPoint;
+  }
+
+  public boolean isProvisionedVolume() {
+    return this.volumeId != null;
+  }
+
+  public VolumeId getVolumeId() {
+    return volumeId;
+  }
+
+  public String getVolumeName() {
+    return volumeName;
+  }
+
+  public VolumeCapabilityRange getVolumeCapabilityRange() {
+    return volumeCapabilityRange;
+  }
+
+  public String getDriverName() {
+    return driverName;
+  }
+
+  public String getMountPoint() {
+    return mountPoint;
+  }
+
+  public static VolumeSpecBuilder newBuilder() {
+    return new VolumeSpecBuilder();
+  }
+
+  public static List<VolumeMetaData> fromResource(
+      ResourceInformation resourceInfo) throws InvalidVolumeException {
+    List<VolumeMetaData> volumeMetaData = new ArrayList<>();
+    if (resourceInfo != null) {
+      if (resourceInfo.getTags() != null && resourceInfo.getTags()
+          .contains(CsiConstants.CSI_VOLUME_RESOURCE_TAG)) {
+        VolumeSpecBuilder builder = VolumeMetaData.newBuilder();
+        // Volume ID
+        if (resourceInfo.getAttributes()
+            .containsKey(CsiConstants.CSI_VOLUME_ID)) {
+          String id = resourceInfo.getAttributes()
+              .get(CsiConstants.CSI_VOLUME_ID);
+          builder.volumeId(new VolumeId(id));
+        }
+        // Volume name
+        if (resourceInfo.getAttributes()
+            .containsKey(CsiConstants.CSI_VOLUME_NAME)) {
+          builder.volumeName(resourceInfo.getAttributes()
+              .get(CsiConstants.CSI_VOLUME_NAME));
+        }
+        // CSI driver name
+        if (resourceInfo.getAttributes()
+            .containsKey(CsiConstants.CSI_DRIVER_NAME)) {
+          builder.driverName(resourceInfo.getAttributes()
+              .get(CsiConstants.CSI_DRIVER_NAME));
+        }
+        // Mount path
+        if (resourceInfo.getAttributes()
+            .containsKey(CsiConstants.CSI_VOLUME_MOUNT)) {
+          builder.mountPoint(resourceInfo.getAttributes()
+              .get(CsiConstants.CSI_VOLUME_MOUNT));
+        }
+        // Volume capability
+        VolumeCapabilityRange volumeCapabilityRange =
+            VolumeCapabilityRange.newBuilder()
+                .minCapacity(resourceInfo.getValue())
+                .unit(resourceInfo.getUnits())
+                .build();
+        builder.capability(volumeCapabilityRange);
+        volumeMetaData.add(builder.build());
+      }
+    }
+    return volumeMetaData;
+  }
+
+  @Override
+  public String toString() {
+    JsonObject json = new JsonObject();
+    if (!Strings.isNullOrEmpty(volumeName)) {
+      json.addProperty(CsiConstants.CSI_VOLUME_NAME, volumeName);
+    }
+    if (volumeId != null) {
+      json.addProperty(CsiConstants.CSI_VOLUME_ID, volumeId.toString());
+    }
+    if (volumeCapabilityRange != null) {
+      json.addProperty(CsiConstants.CSI_VOLUME_CAPABILITY,
+          volumeCapabilityRange.toString());
+    }
+    if (!Strings.isNullOrEmpty(driverName)) {
+      json.addProperty(CsiConstants.CSI_DRIVER_NAME, driverName);
+    }
+    if (!Strings.isNullOrEmpty(mountPoint)) {
+      json.addProperty(CsiConstants.CSI_VOLUME_MOUNT, mountPoint);
+    }
+    return json.toString();
+  }
+
+  /**
+   * The builder used to build a VolumeMetaData instance.
+   */
+  public static class VolumeSpecBuilder {
+    // @CreateVolumeRequest
+    // The suggested name for the storage space.
+    private VolumeId volumeId;
+    private String volumeName;
+    private VolumeCapabilityRange volumeCapabilityRange;
+    private String driverName;
+    private String mountPoint;
+
+    public VolumeSpecBuilder volumeId(VolumeId volumeId) {
+      this.volumeId = volumeId;
+      return this;
+    }
+
+    public VolumeSpecBuilder volumeName(String name) {
+      this.volumeName = name;
+      return this;
+    }
+
+    public VolumeSpecBuilder driverName(String driverName) {
+      this.driverName = driverName;
+      return this;
+    }
+
+    public VolumeSpecBuilder mountPoint(String mountPoint) {
+      this.mountPoint = mountPoint;
+      return this;
+    }
+
+    public VolumeSpecBuilder capability(VolumeCapabilityRange capability) {
+      this.volumeCapabilityRange = capability;
+      return this;
+    }
+
+    public VolumeMetaData build() throws InvalidVolumeException {
+      VolumeMetaData spec = new VolumeMetaData();
+      spec.setVolumeId(volumeId);
+      spec.setVolumeName(volumeName);
+      spec.setVolumeCapabilityRange(volumeCapabilityRange);
+      spec.setDriverName(driverName);
+      spec.setMountPoint(mountPoint);
+      validate(spec);
+      return spec;
+    }
+
+    private void validate(VolumeMetaData spec) throws InvalidVolumeException {
+      // Volume name OR Volume ID must be set
+      if (Strings.isNullOrEmpty(spec.getVolumeName())
+          && spec.getVolumeId() == null) {
+        throw new InvalidVolumeException("Invalid volume, both volume name"
+            + " and ID are missing from the spec. Volume spec: "
+            + spec.toString());
+      }
+      // Volume capability must be set
+      if (spec.getVolumeCapabilityRange() == null) {
+        throw new InvalidVolumeException("Invalid volume, volume capability"
+            + " is missing. Volume spec: " + spec.toString());
+      }
+      // CSI driver name must be set
+      if (Strings.isNullOrEmpty(spec.getDriverName())) {
+        throw new InvalidVolumeException("Invalid volume, the csi-driver name"
+            + " is missing. Volume spec: " + spec.toString());
+      }
+      // Mount point must be set
+      if (Strings.isNullOrEmpty(spec.getMountPoint())) {
+        throw new InvalidVolumeException("Invalid volume, the mount point"
+            + " is missing. Volume spec: " + spec.toString());
+      }
+    }
+  }
+}

+ 28 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/InvalidVolumeException.java

@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.volume.csi.exception;
+
+/**
+ * This exception is thrown when a volume is found not valid.
+ */
+public class InvalidVolumeException extends VolumeException {
+
+  public InvalidVolumeException(String message) {
+    super(message);
+  }
+}

+ 34 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/VolumeException.java

@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.volume.csi.exception;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * Base class for all volume related exceptions.
+ */
+public class VolumeException extends YarnException {
+
+  public VolumeException(String message) {
+    super(message);
+  }
+
+  public VolumeException(String message, Exception e) {
+    super(message, e);
+  }
+}

+ 32 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/VolumeProvisioningException.java

@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.volume.csi.exception;
+
+/**
+ * Exception throws when volume provisioning is failed.
+ */
+public class VolumeProvisioningException extends VolumeException {
+
+  public VolumeProvisioningException(String message) {
+    super(message);
+  }
+
+  public VolumeProvisioningException(String message, Exception e) {
+    super(message, e);
+  }
+}

+ 27 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/package-info.java

@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains volume related exception classes.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.volume.csi.exception;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

+ 27 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/package-info.java

@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains common volume related classes.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.volume.csi;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

+ 14 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java

@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManag
 import org.apache.hadoop.yarn.server.resourcemanager.security.ProxyCAManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 
@@ -121,6 +122,7 @@ public class RMActiveServiceContext {
   private MultiNodeSortingManager<SchedulerNode> multiNodeSortingManager;
 
   private ProxyCAManager proxyCAManager;
+  private VolumeManager volumeManager;
 
   public RMActiveServiceContext() {
     queuePlacementManager = new PlacementManager();
@@ -569,4 +571,16 @@ public class RMActiveServiceContext {
   public void setProxyCAManager(ProxyCAManager proxyCAManager) {
     this.proxyCAManager = proxyCAManager;
   }
+
+  @Private
+  @Unstable
+  public VolumeManager getVolumeManager() {
+    return this.volumeManager;
+  }
+
+  @Private
+  @Unstable
+  public void setVolumeManager(VolumeManager volumeManager) {
+    this.volumeManager = volumeManager;
+  }
 }

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java

@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ProxyCAManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager;
 
 /**
  * Context of the ResourceManager.
@@ -193,4 +194,8 @@ public interface RMContext extends ApplicationMasterServiceContext {
   ProxyCAManager getProxyCAManager();
 
   void setProxyCAManager(ProxyCAManager proxyCAManager);
+
+  VolumeManager getVolumeManager();
+
+  void setVolumeManager(VolumeManager volumeManager);
 }

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java

@@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ProxyCAManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager;
 import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
 import org.apache.hadoop.yarn.util.Clock;
 
@@ -648,6 +649,17 @@ public class RMContextImpl implements RMContext {
   public void setProxyCAManager(ProxyCAManager proxyCAManager) {
     this.activeServiceContext.setProxyCAManager(proxyCAManager);
   }
+
+  @Override
+  public VolumeManager getVolumeManager() {
+    return activeServiceContext.getVolumeManager();
+  }
+
+  @Override
+  public void setVolumeManager(VolumeManager volumeManager) {
+    this.activeServiceContext.setVolumeManager(volumeManager);
+  }
+
   // Note: Read java doc before adding any services over here.
 
   @Override

+ 14 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -109,6 +109,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
 import org.apache.hadoop.yarn.server.resourcemanager.security.ProxyCAManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManagerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor.VolumeAMSProcessor;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -136,6 +139,7 @@ import java.nio.charset.Charset;
 import java.security.PrivilegedExceptionAction;
 import java.security.SecureRandom;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -845,6 +849,16 @@ public class ResourceManager extends CompositeService
         addIfService(systemServiceManager);
       }
 
+      // Add volume manager to RM context when it is necessary
+      String[] amsProcessorList = conf.getStrings(
+          YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS);
+      if (amsProcessorList != null&& Arrays.stream(amsProcessorList)
+          .anyMatch(s -> VolumeAMSProcessor.class.getName().equals(s))) {
+        VolumeManager volumeManager = new VolumeManagerImpl();
+        rmContext.setVolumeManager(volumeManager);
+        addIfService(volumeManager);
+      }
+
       super.serviceInit(conf);
     }
 

+ 36 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/CsiAdaptorClient.java

@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
+
+import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException;
+import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
+
+/**
+ * Client talks to CSI adaptor.
+ */
+public class CsiAdaptorClient implements CsiAdaptorClientProtocol {
+
+  @Override
+  public void validateVolume() throws VolumeException {
+    // TODO
+  }
+
+  @Override public void controllerPublishVolume() throws VolumeException {
+    // TODO
+  }
+}

+ 106 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeBuilder.java

@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
+
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeImpl;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeCapabilityRange;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData;
+import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
+
+import java.util.Optional;
+import java.util.UUID;
+
+/**
+ * Helper class to build a {@link Volume}.
+ */
+public final class VolumeBuilder {
+
+  private String id;
+  private String name;
+  private Long min;
+  private Long max;
+  private String unit;
+  private String driver;
+  private String mount;
+
+  private VolumeBuilder() {
+    // hide constructor
+  }
+
+  public static VolumeBuilder newBuilder() {
+    return new VolumeBuilder();
+  }
+
+  public VolumeBuilder volumeId(String volumeId) {
+    this.id = volumeId;
+    return this;
+  }
+
+  public VolumeBuilder volumeName(String volumeName) {
+    this.name = volumeName;
+    return this;
+  }
+
+  public VolumeBuilder minCapability(long minCapability) {
+    this.min = Long.valueOf(minCapability);
+    return this;
+  }
+
+  public VolumeBuilder maxCapability(long maxCapability) {
+    this.max = Long.valueOf(maxCapability);
+    return this;
+  }
+
+  public VolumeBuilder unit(String capUnit) {
+    this.unit = capUnit;
+    return this;
+  }
+
+  public VolumeBuilder driverName(String driverName) {
+    this.driver = driverName;
+    return this;
+  }
+
+  public VolumeBuilder mountPoint(String mountPoint) {
+    this.mount = mountPoint;
+    return this;
+  }
+
+  public Volume build() throws InvalidVolumeException {
+    VolumeId vid = new VolumeId(
+        Optional.ofNullable(id)
+            .orElse(UUID.randomUUID().toString()));
+
+    VolumeCapabilityRange volumeCap = VolumeCapabilityRange.newBuilder()
+        .minCapacity(Optional.ofNullable(min).orElse(0L))
+        .maxCapacity(Optional.ofNullable(max).orElse(Long.MAX_VALUE))
+        .unit(Optional.ofNullable(unit).orElse("Gi"))
+        .build();
+
+    VolumeMetaData meta = VolumeMetaData.newBuilder()
+        .capability(volumeCap)
+        .driverName(Optional.ofNullable(driver).orElse("test-driver"))
+        .mountPoint(Optional.ofNullable(mount).orElse("/mnt/data"))
+        .volumeName(name)
+        .volumeId(vid)
+        .build();
+    return new VolumeImpl(meta);
+  }
+}

+ 63 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java

@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningResults;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningTask;
+import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
+
+import java.util.concurrent.ScheduledFuture;
+
+/**
+ * Main interface for volume manager that manages all volumes.
+ * Volume manager talks to a CSI controller plugin to handle the
+ * volume operations before it is available to be published on
+ * any node manager.
+ */
+@Private
+@Unstable
+public interface VolumeManager {
+
+  /**
+   * @return all known volumes and their states.
+   */
+  @VisibleForTesting
+  VolumeStates getVolumeStates();
+
+  @VisibleForTesting
+  void setClient(CsiAdaptorClientProtocol client);
+
+  /**
+   * Start to supervise on a volume.
+   * @param volume
+   * @return the volume being managed by the manager.
+   */
+  Volume addOrGetVolume(Volume volume);
+
+  /**
+   * Execute volume provisioning tasks as backend threads.
+   * @param volumeProvisioningTask
+   * @param delaySecond
+   */
+  ScheduledFuture<VolumeProvisioningResults> schedule(
+      VolumeProvisioningTask volumeProvisioningTask, int delaySecond);
+}

+ 108 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java

@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningResults;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningTask;
+import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A service manages all volumes.
+ */
+public class VolumeManagerImpl extends AbstractService
+    implements VolumeManager {
+
+  private static final Log LOG = LogFactory.getLog(VolumeManagerImpl.class);
+
+  private final VolumeStates volumeStates;
+  private ScheduledExecutorService provisioningExecutor;
+  private CsiAdaptorClientProtocol adaptorClient;
+
+  private final static int PROVISIONING_TASK_THREAD_POOL_SIZE = 10;
+
+  public VolumeManagerImpl() {
+    super(VolumeManagerImpl.class.getName());
+    this.volumeStates = new VolumeStates();
+    this.provisioningExecutor = Executors
+        .newScheduledThreadPool(PROVISIONING_TASK_THREAD_POOL_SIZE);
+    this.adaptorClient = new CsiAdaptorClient();
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    provisioningExecutor.shutdown();
+    super.serviceStop();
+  }
+
+  @Override
+  public VolumeStates getVolumeStates() {
+    return this.volumeStates;
+  }
+
+  @Override
+  public Volume addOrGetVolume(Volume volume) {
+    if (volumeStates.getVolume(volume.getVolumeId()) != null) {
+      // volume already exists
+      return volumeStates.getVolume(volume.getVolumeId());
+    } else {
+      // add the volume and set the client
+      ((VolumeImpl) volume).setClient(adaptorClient);
+      this.volumeStates.addVolumeIfAbsent(volume);
+      return volume;
+    }
+  }
+
+  @VisibleForTesting
+  public void setClient(CsiAdaptorClientProtocol client) {
+    this.adaptorClient = client;
+  }
+
+  @Override
+  public ScheduledFuture<VolumeProvisioningResults> schedule(
+      VolumeProvisioningTask volumeProvisioningTask,
+      int delaySecond) {
+    LOG.info("Scheduling provision volume task (with delay "
+        + delaySecond + "s)," + " handling "
+        + volumeProvisioningTask.getVolumes().size()
+        + " volume provisioning");
+    return provisioningExecutor.schedule(volumeProvisioningTask,
+        delaySecond, TimeUnit.SECONDS);
+  }
+}

+ 60 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeStates.java

@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
+
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Volume manager states, including all managed volumes and their states.
+ */
+public class VolumeStates {
+
+  private final Map<VolumeId, Volume> volumeStates;
+
+  public VolumeStates() {
+    this.volumeStates = new ConcurrentHashMap<>();
+  }
+
+  public Volume getVolume(VolumeId volumeId) {
+    return volumeStates.get(volumeId);
+  }
+
+  /**
+   * Add volume if it is not yet added.
+   * If a new volume is added with a same {@link VolumeId}
+   * with a existing volume, existing volume will be returned.
+   * @param volume volume to add
+   * @return volume added or existing volume
+   */
+  public Volume addVolumeIfAbsent(Volume volume) {
+    if (volume.getVolumeId() != null) {
+      return volumeStates.putIfAbsent(volume.getVolumeId(), volume);
+    } else {
+      // for dynamical provisioned volumes,
+      // the volume ID might not be available at time being.
+      // we can makeup one with the combination of driver+volumeName+timestamp
+      // once the volume ID is generated, we should replace ID.
+      return volume;
+    }
+  }
+}

+ 30 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/ControllerPublishVolumeEvent.java

@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event;
+
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
+
+/**
+ * Trigger controller publish.
+ */
+public class ControllerPublishVolumeEvent extends VolumeEvent {
+
+  public ControllerPublishVolumeEvent(Volume volume) {
+    super(volume, VolumeEventType.CONTROLLER_PUBLISH_VOLUME_EVENT);
+  }
+}

+ 30 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/ValidateVolumeEvent.java

@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event;
+
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
+
+/**
+ * Validate volume capability with the CSI driver.
+ */
+public class ValidateVolumeEvent extends VolumeEvent {
+
+  public ValidateVolumeEvent(Volume volume) {
+    super(volume, VolumeEventType.VALIDATE_VOLUME_EVENT);
+  }
+}

+ 43 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/VolumeEvent.java

@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
+
+/**
+ * Base volume event class that used to trigger volume state transitions.
+ */
+public class VolumeEvent extends AbstractEvent<VolumeEventType> {
+
+  private Volume volume;
+
+  public VolumeEvent(Volume volume, VolumeEventType volumeEventType) {
+    super(volumeEventType, System.currentTimeMillis());
+    this.volume = volume;
+  }
+
+  public Volume getVolume() {
+    return this.volume;
+  }
+
+  public VolumeId getVolumeId() {
+    return this.volume.getVolumeId();
+  }
+}

+ 29 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/VolumeEventType.java

@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event;
+
+/**
+ * Volume events.
+ */
+public enum VolumeEventType {
+  VALIDATE_VOLUME_EVENT,
+  CREATE_VOLUME_EVENT,
+  CONTROLLER_PUBLISH_VOLUME_EVENT,
+  CONTROLLER_UNPUBLISH_VOLUME_EVENT,
+  DELETE_VOLUME
+}

+ 27 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/package-info.java

@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains volume related events.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

+ 37 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java

@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.VolumeEvent;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
+
+/**
+ * Major volume interface at RM's view, it maintains the volume states and
+ * state transition according to the CSI volume lifecycle.
+ */
+@Private
+@Unstable
+public interface Volume extends EventHandler<VolumeEvent> {
+
+  VolumeState getVolumeState();
+
+  VolumeId getVolumeId();
+}

+ 199 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java

@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.CsiAdaptorClient;
+import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.VolumeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.VolumeEventType;
+import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData;
+import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException;
+
+import java.util.EnumSet;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * This class maintains the volume states and state transition
+ * according to the CSI volume lifecycle. Volume states are stored in
+ * {@link org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeStates}
+ * class.
+ */
+public class VolumeImpl implements Volume {
+
+  private static final Log LOG = LogFactory.getLog(VolumeImpl.class);
+
+  private final Lock readLock;
+  private final Lock writeLock;
+  private final StateMachine<VolumeState, VolumeEventType, VolumeEvent>
+      stateMachine;
+
+  private final VolumeId volumeId;
+  private final VolumeMetaData volumeMeta;
+  private CsiAdaptorClientProtocol client;
+
+  public VolumeImpl(VolumeMetaData volumeMeta) {
+    ReadWriteLock lock = new ReentrantReadWriteLock();
+    this.writeLock = lock.writeLock();
+    this.readLock = lock.readLock();
+    this.volumeId = volumeMeta.getVolumeId();
+    this.volumeMeta = volumeMeta;
+    this.stateMachine = createVolumeStateFactory().make(this);
+    this.client = new CsiAdaptorClient();
+  }
+
+  @VisibleForTesting
+  public void setClient(CsiAdaptorClientProtocol client) {
+    this.client = client;
+  }
+
+  public CsiAdaptorClientProtocol getClient() {
+    return this.client;
+  }
+
+  private StateMachineFactory<VolumeImpl, VolumeState,
+      VolumeEventType, VolumeEvent> createVolumeStateFactory() {
+    return new StateMachineFactory<
+        VolumeImpl, VolumeState, VolumeEventType, VolumeEvent>(VolumeState.NEW)
+        .addTransition(
+            VolumeState.NEW,
+            EnumSet.of(VolumeState.VALIDATED, VolumeState.UNAVAILABLE),
+            VolumeEventType.VALIDATE_VOLUME_EVENT,
+            new ValidateVolumeTransition())
+        .addTransition(VolumeState.VALIDATED, VolumeState.VALIDATED,
+            VolumeEventType.VALIDATE_VOLUME_EVENT)
+        .addTransition(
+            VolumeState.VALIDATED,
+            EnumSet.of(VolumeState.NODE_READY, VolumeState.UNAVAILABLE),
+            VolumeEventType.CONTROLLER_PUBLISH_VOLUME_EVENT,
+            new ControllerPublishVolumeTransition())
+        .addTransition(
+            VolumeState.UNAVAILABLE,
+            EnumSet.of(VolumeState.UNAVAILABLE, VolumeState.VALIDATED),
+            VolumeEventType.VALIDATE_VOLUME_EVENT,
+            new ValidateVolumeTransition())
+        .addTransition(
+            VolumeState.UNAVAILABLE,
+            VolumeState.UNAVAILABLE,
+            EnumSet.of(VolumeEventType.CONTROLLER_PUBLISH_VOLUME_EVENT))
+        .addTransition(
+            VolumeState.NODE_READY,
+            VolumeState.NODE_READY,
+            EnumSet.of(VolumeEventType.CONTROLLER_PUBLISH_VOLUME_EVENT,
+                VolumeEventType.VALIDATE_VOLUME_EVENT))
+        .installTopology();
+  }
+
+  @Override
+  public VolumeState getVolumeState() {
+    try {
+      readLock.lock();
+      return stateMachine.getCurrentState();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public VolumeId getVolumeId() {
+    try {
+      readLock.lock();
+      return this.volumeId;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  private static class ValidateVolumeTransition
+      implements MultipleArcTransition<VolumeImpl, VolumeEvent, VolumeState> {
+    @Override
+    public VolumeState transition(VolumeImpl volume,
+        VolumeEvent volumeEvent) {
+      try {
+        // this call could cross node, we should keep the message tight
+        volume.getClient().validateVolume();
+        return VolumeState.VALIDATED;
+      } catch (VolumeException e) {
+        LOG.warn("Got exception while calling the CSI adaptor", e);
+        return VolumeState.UNAVAILABLE;
+      }
+    }
+  }
+
+  private static class ControllerPublishVolumeTransition
+      implements MultipleArcTransition<VolumeImpl, VolumeEvent, VolumeState> {
+
+    @Override
+    public VolumeState transition(VolumeImpl volume,
+        VolumeEvent volumeEvent) {
+      try {
+        // this call could cross node, we should keep the message tight
+        volume.getClient().controllerPublishVolume();
+        return VolumeState.NODE_READY;
+      } catch (VolumeException e) {
+        LOG.warn("Got exception while calling the CSI adaptor", e);
+        return volume.getVolumeState();
+      }
+    }
+  }
+
+  @Override
+  public void handle(VolumeEvent event) {
+    try {
+      this.writeLock.lock();
+      VolumeId volumeId = event.getVolumeId();
+
+      if (volumeId == null) {
+        // This should not happen, safely ignore the event
+        LOG.warn("Unexpected volume event received, event type is "
+            + event.getType().name() + ", but the volumeId is null.");
+        return;
+      }
+
+      LOG.info("Processing volume event, type=" + event.getType().name()
+          + ", volumeId=" + volumeId.toString());
+
+      VolumeState oldState = null;
+      VolumeState newState = null;
+      try {
+        oldState = stateMachine.getCurrentState();
+        newState = stateMachine.doTransition(event.getType(), event);
+      } catch (InvalidStateTransitionException e) {
+        LOG.warn("Can't handle this event at current state: Current: ["
+            + oldState + "], eventType: [" + event.getType() + "]," +
+            " volumeId: [" + volumeId + "]", e);
+      }
+
+      if (newState != null && oldState != newState) {
+        LOG.info("VolumeImpl " + volumeId + " transitioned from " + oldState
+            + " to " + newState);
+      }
+    }finally {
+      this.writeLock.unlock();
+    }
+  }
+}

+ 35 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeState.java

@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle;
+
+/**
+ * Volume states
+ * Volume states are defined in the CSI spec, see more in volume lifecycle.
+ */
+public enum VolumeState {
+  // initial state
+  NEW,
+  // volume capacity validated
+  VALIDATED,
+  // volume created by the controller
+  CREATED,
+  // controller published the volume
+  NODE_READY,
+  // unavailable
+  UNAVAILABLE
+}

+ 27 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/package-info.java

@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains classes to manage volume lifecycle.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

+ 27 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/package-info.java

@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains classes to manage CSI volumes.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

+ 158 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java

@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor;
+
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningResults;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningTask;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData;
+import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
+import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException;
+import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeProvisioningException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * AMS processor that handles volume resource requests.
+ *
+ */
+public class VolumeAMSProcessor implements ApplicationMasterServiceProcessor {
+
+  private static final Logger LOG =  LoggerFactory
+      .getLogger(VolumeAMSProcessor.class);
+
+  private ApplicationMasterServiceProcessor nextAMSProcessor;
+  private VolumeManager volumeManager;
+
+  @Override
+  public void init(ApplicationMasterServiceContext amsContext,
+      ApplicationMasterServiceProcessor nextProcessor) {
+    LOG.info("Initializing CSI volume processor");
+    this.nextAMSProcessor = nextProcessor;
+    this.volumeManager = ((RMContext) amsContext).getVolumeManager();
+  }
+
+  @Override
+  public void registerApplicationMaster(
+      ApplicationAttemptId applicationAttemptId,
+      RegisterApplicationMasterRequest request,
+      RegisterApplicationMasterResponse response)
+      throws IOException, YarnException {
+    this.nextAMSProcessor.registerApplicationMaster(applicationAttemptId,
+        request, response);
+  }
+
+  @Override
+  public void allocate(ApplicationAttemptId appAttemptId,
+      AllocateRequest request, AllocateResponse response) throws YarnException {
+    List<Volume> volumes = aggregateVolumesFrom(request);
+    if (volumes != null && volumes.size() > 0) {
+      ScheduledFuture<VolumeProvisioningResults> result =
+          this.volumeManager.schedule(new VolumeProvisioningTask(volumes), 0);
+      try {
+        VolumeProvisioningResults volumeResult =
+            result.get(3, TimeUnit.SECONDS);
+        if (!volumeResult.isSuccess()) {
+          throw new VolumeProvisioningException("Volume provisioning failed,"
+              + " result details: " + volumeResult.getBriefMessage());
+        }
+      } catch (TimeoutException | InterruptedException | ExecutionException e) {
+        LOG.warn("Volume provisioning task failed", e);
+        throw new VolumeException("Volume provisioning task failed", e);
+      }
+    }
+
+    // Go to next processor
+    this.nextAMSProcessor.allocate(appAttemptId, request, response);
+  }
+
+  // Currently only scheduling request is supported.
+  private List<Volume> aggregateVolumesFrom(AllocateRequest request)
+      throws VolumeException {
+    List<Volume> volumeList = new ArrayList<>();
+    List<SchedulingRequest> requests = request.getSchedulingRequests();
+    if (requests != null) {
+      for (SchedulingRequest req : requests) {
+        Resource totalResource = req.getResourceSizing().getResources();
+        List<ResourceInformation> resourceList =
+            totalResource.getAllResourcesListCopy();
+        for (ResourceInformation resourceInformation : resourceList) {
+          List<VolumeMetaData> volumes =
+              VolumeMetaData.fromResource(resourceInformation);
+          for (VolumeMetaData vs : volumes) {
+            if (vs.getVolumeCapabilityRange().getMinCapacity() <= 0) {
+              // capacity not specified, ignore
+              continue;
+            } else if (vs.isProvisionedVolume()) {
+              volumeList.add(checkAndGetVolume(vs));
+            } else {
+              throw new InvalidVolumeException("Only pre-provisioned volume"
+                  + " is supported now, volumeID must exist.");
+            }
+          }
+        }
+      }
+    }
+    return volumeList;
+  }
+
+  /**
+   * If given volume ID already exists in the volume manager,
+   * it returns the existing volume. Otherwise, it creates a new
+   * volume and add that to volume manager.
+   * @param metaData
+   * @return volume
+   */
+  private Volume checkAndGetVolume(VolumeMetaData metaData) {
+    Volume toAdd = new VolumeImpl(metaData);
+    return this.volumeManager.addOrGetVolume(toAdd);
+  }
+
+  @Override
+  public void finishApplicationMaster(
+      ApplicationAttemptId applicationAttemptId,
+      FinishApplicationMasterRequest request,
+      FinishApplicationMasterResponse response) {
+    this.nextAMSProcessor.finishApplicationMaster(applicationAttemptId,
+        request, response);
+  }
+}

+ 27 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/package-info.java

@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains AMS processor class for volume handling.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

+ 32 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioner.java

@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+import java.util.concurrent.Callable;
+
+/**
+ * A task interface to provision a volume to expected state.
+ */
+@Private
+@Unstable
+public interface VolumeProvisioner extends Callable<VolumeProvisioningResults> {
+
+}

+ 87 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioningResults.java

@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner;
+
+import com.google.gson.JsonObject;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeState;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Result of volumes' provisioning.
+ */
+public class VolumeProvisioningResults {
+
+  private Map<VolumeId, VolumeProvisioningResult> resultMap;
+
+  public VolumeProvisioningResults() {
+    this.resultMap = new HashMap<>();
+  }
+
+  public boolean isSuccess() {
+    return !resultMap.isEmpty() && resultMap.values().stream()
+        .allMatch(subResult -> subResult.isSuccess());
+  }
+
+  public String getBriefMessage() {
+    JsonObject obj = new JsonObject();
+    obj.addProperty("TotalVolumes", resultMap.size());
+
+    JsonObject failed = new JsonObject();
+    for (VolumeProvisioningResult result : resultMap.values()) {
+      if (!result.isSuccess()) {
+        failed.addProperty(result.getVolumeId().toString(),
+            result.getVolumeState().name());
+      }
+    }
+    obj.add("failedVolumesStates", failed);
+    return obj.toString();
+  }
+
+  static class VolumeProvisioningResult {
+
+    private VolumeId volumeId;
+    private VolumeState volumeState;
+    private boolean success;
+
+    VolumeProvisioningResult(VolumeId volumeId, VolumeState state) {
+      this.volumeId = volumeId;
+      this.volumeState = state;
+      this.success = state == VolumeState.NODE_READY;
+    }
+
+    public boolean isSuccess() {
+      return this.success;
+    }
+
+    public VolumeId getVolumeId() {
+      return this.volumeId;
+    }
+
+    public VolumeState getVolumeState() {
+      return this.volumeState;
+    }
+  }
+
+  public void addResult(VolumeId volumeId, VolumeState state) {
+    this.resultMap.put(volumeId,
+        new VolumeProvisioningResult(volumeId, state));
+  }
+}

+ 66 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioningTask.java

@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner;
+
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.ControllerPublishVolumeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.ValidateVolumeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * A provisioning task encapsulates all the logic required by a storage system
+ * to provision a volume. This class is the common implementation, it might
+ * be override if the provisioning behavior of a certain storage system
+ * is not completely align with this implementation.
+ */
+public class VolumeProvisioningTask implements VolumeProvisioner {
+
+  private static final Logger LOG =  LoggerFactory
+      .getLogger(VolumeProvisioningTask.class);
+
+  private List<Volume> volumes;
+
+  public VolumeProvisioningTask(List<Volume> volumes) {
+    this.volumes = volumes;
+  }
+
+  public List<Volume> getVolumes() {
+    return this.volumes;
+  }
+
+  @Override
+  public VolumeProvisioningResults call() throws Exception {
+    VolumeProvisioningResults vpr = new VolumeProvisioningResults();
+
+    // Wait all volumes are reaching expected state
+    for (Volume vs : volumes) {
+      LOG.info("Provisioning volume : {}", vs.getVolumeId().toString());
+      vs.handle(new ValidateVolumeEvent(vs));
+      vs.handle(new ControllerPublishVolumeEvent(vs));
+    }
+
+    // collect results
+    volumes.stream().forEach(v ->
+        vpr.addResult(v.getVolumeId(), v.getVolumeState()));
+
+    return vpr;
+  }
+}

+ 27 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/package-info.java

@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains the volume provisioning classes.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

+ 27 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/package-info.java

@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains classes to manage storage volumes in YARN.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.resourcemanager.volume;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

+ 67 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeCapabilityRange.java

@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
+
+import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeCapabilityRange;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test cases for volume capability.
+ */
+public class TestVolumeCapabilityRange {
+
+  @Test(expected = InvalidVolumeException.class)
+  public void testInvalidMinCapability() throws InvalidVolumeException {
+    VolumeCapabilityRange.newBuilder()
+        .minCapacity(-1L)
+        .maxCapacity(5L)
+        .unit("Gi")
+        .build();
+  }
+
+  @Test(expected = InvalidVolumeException.class)
+  public void testMissingMinCapability() throws InvalidVolumeException {
+    VolumeCapabilityRange.newBuilder()
+        .maxCapacity(5L)
+        .unit("Gi")
+        .build();
+  }
+
+  @Test(expected = InvalidVolumeException.class)
+  public void testMissingUnit() throws InvalidVolumeException {
+    VolumeCapabilityRange.newBuilder()
+        .minCapacity(0L)
+        .maxCapacity(5L)
+        .build();
+  }
+
+  @Test
+  public void testGetVolumeCapability() throws InvalidVolumeException {
+    VolumeCapabilityRange vc = VolumeCapabilityRange.newBuilder()
+        .minCapacity(0L)
+        .maxCapacity(5L)
+        .unit("Gi")
+        .build();
+
+    Assert.assertEquals(0L, vc.getMinCapacity());
+    Assert.assertEquals(5L, vc.getMaxCapacity());
+    Assert.assertEquals("Gi", vc.getUnit());
+  }
+}

+ 161 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java

@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
+
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.ControllerPublishVolumeEvent;
+import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
+import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.ValidateVolumeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeState;
+import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * Test cases for volume lifecycle management.
+ */
+public class TestVolumeLifecycle {
+
+  @Test
+  public void testValidation() throws InvalidVolumeException {
+    VolumeImpl volume = (VolumeImpl) VolumeBuilder.newBuilder()
+        .volumeId("test_vol_00000001")
+        .maxCapability(5L)
+        .unit("Gi")
+        .mountPoint("/path/to/mount")
+        .driverName("test-driver-name")
+        .build();
+    Assert.assertEquals(VolumeState.NEW, volume.getVolumeState());
+
+    volume.handle(new ValidateVolumeEvent(volume));
+    Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState());
+  }
+
+  @Test
+  public void testValidationFailure() throws VolumeException {
+    VolumeImpl volume = (VolumeImpl) VolumeBuilder
+        .newBuilder().volumeId("test_vol_00000001").build();
+    CsiAdaptorClientProtocol mockedClient = Mockito
+        .mock(CsiAdaptorClientProtocol.class);
+    volume.setClient(mockedClient);
+
+    // NEW -> UNAVAILABLE
+    // Simulate a failed API call to the adaptor
+    doThrow(new VolumeException("failed")).when(mockedClient).validateVolume();
+    volume.handle(new ValidateVolumeEvent(volume));
+
+    try {
+      // Verify the countdown did not happen
+      GenericTestUtils.waitFor(() ->
+          volume.getVolumeState() == VolumeState.VALIDATED, 10, 50);
+      Assert.fail("Validate state not reached,"
+          + " it should keep waiting until timeout");
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof TimeoutException);
+      Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState());
+    }
+  }
+
+  @Test
+  public void testValidated() throws InvalidVolumeException {
+    AtomicInteger validatedTimes = new AtomicInteger(0);
+    VolumeImpl volume = (VolumeImpl) VolumeBuilder
+        .newBuilder().volumeId("test_vol_00000001").build();
+    CsiAdaptorClientProtocol mockedClient = new CsiAdaptorClientProtocol() {
+      @Override
+      public void validateVolume() {
+        validatedTimes.incrementAndGet();
+      }
+
+      @Override
+      public void controllerPublishVolume() {
+        // do nothing
+      }
+    };
+    // The client has a count to memorize how many times being called
+    volume.setClient(mockedClient);
+
+    // NEW -> VALIDATED
+    Assert.assertEquals(VolumeState.NEW, volume.getVolumeState());
+    volume.handle(new ValidateVolumeEvent(volume));
+    Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState());
+    Assert.assertEquals(1, validatedTimes.get());
+
+    // VALIDATED -> VALIDATED
+    volume.handle(new ValidateVolumeEvent(volume));
+    Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState());
+    Assert.assertEquals(1, validatedTimes.get());
+  }
+
+  @Test
+  public void testUnavailableState() throws VolumeException {
+    VolumeImpl volume = (VolumeImpl) VolumeBuilder
+        .newBuilder().volumeId("test_vol_00000001").build();
+    CsiAdaptorClientProtocol mockedClient = Mockito
+        .mock(CsiAdaptorClientProtocol.class);
+    volume.setClient(mockedClient);
+
+    // NEW -> UNAVAILABLE
+    doThrow(new VolumeException("failed")).when(mockedClient)
+        .validateVolume();
+    Assert.assertEquals(VolumeState.NEW, volume.getVolumeState());
+    volume.handle(new ValidateVolumeEvent(volume));
+    Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState());
+
+    // UNAVAILABLE -> UNAVAILABLE
+    volume.handle(new ValidateVolumeEvent(volume));
+    Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState());
+
+    // UNAVAILABLE -> VALIDATED
+    doNothing().when(mockedClient).validateVolume();
+    volume.setClient(mockedClient);
+    volume.handle(new ValidateVolumeEvent(volume));
+    Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState());
+  }
+
+  @Test
+  public void testPublishUnavailableVolume() throws VolumeException {
+    VolumeImpl volume = (VolumeImpl) VolumeBuilder
+        .newBuilder().volumeId("test_vol_00000001").build();
+    CsiAdaptorClientProtocol mockedClient = Mockito
+        .mock(CsiAdaptorClientProtocol.class);
+    volume.setClient(mockedClient);
+
+    // NEW -> UNAVAILABLE (on validateVolume)
+    doThrow(new VolumeException("failed")).when(mockedClient)
+        .validateVolume();
+    Assert.assertEquals(VolumeState.NEW, volume.getVolumeState());
+    volume.handle(new ValidateVolumeEvent(volume));
+    Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState());
+
+    // UNAVAILABLE -> UNAVAILABLE (on publishVolume)
+    volume.handle(new ControllerPublishVolumeEvent(volume));
+    // controller publish is not called since the state is UNAVAILABLE
+    verify(mockedClient, times(0)).controllerPublishVolume();
+    // state remains to UNAVAILABLE
+    Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState());
+  }
+}

+ 178 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeMetaData.java

@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import org.apache.hadoop.yarn.server.volume.csi.CsiConstants;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeCapabilityRange;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData;
+import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+/**
+ * Test cases for volume specification definition and parsing.
+ */
+public class TestVolumeMetaData {
+
+  @Test
+  public void testPreprovisionedVolume() throws InvalidVolumeException {
+    VolumeCapabilityRange cap = VolumeCapabilityRange.newBuilder()
+        .minCapacity(1L)
+        .maxCapacity(5L)
+        .unit("Gi")
+        .build();
+
+    // When volume id is given, volume name is optional
+    VolumeMetaData meta = VolumeMetaData.newBuilder()
+        .volumeId(new VolumeId("id-000001"))
+        .capability(cap)
+        .driverName("csi-demo-driver")
+        .mountPoint("/mnt/data")
+        .build();
+
+    Assert.assertEquals(new VolumeId("id-000001"), meta.getVolumeId());
+    Assert.assertEquals(1L, meta.getVolumeCapabilityRange().getMinCapacity());
+    Assert.assertEquals(5L, meta.getVolumeCapabilityRange().getMaxCapacity());
+    Assert.assertEquals("Gi", meta.getVolumeCapabilityRange().getUnit());
+    Assert.assertEquals("csi-demo-driver", meta.getDriverName());
+    Assert.assertEquals("/mnt/data", meta.getMountPoint());
+    Assert.assertNull(meta.getVolumeName());
+    Assert.assertTrue(meta.isProvisionedVolume());
+
+    // Test toString
+    JsonParser parser = new JsonParser();
+    JsonElement element = parser.parse(meta.toString());
+    JsonObject json = element.getAsJsonObject();
+    Assert.assertNotNull(json);
+    Assert.assertNull(json.get(CsiConstants.CSI_VOLUME_NAME));
+    Assert.assertEquals("id-000001",
+        json.get(CsiConstants.CSI_VOLUME_ID).getAsString());
+    Assert.assertEquals("csi-demo-driver",
+        json.get(CsiConstants.CSI_DRIVER_NAME).getAsString());
+    Assert.assertEquals("/mnt/data",
+        json.get(CsiConstants.CSI_VOLUME_MOUNT).getAsString());
+
+  }
+
+  @Test
+  public void testDynamicalProvisionedVolume() throws InvalidVolumeException {
+    VolumeCapabilityRange cap = VolumeCapabilityRange.newBuilder()
+        .minCapacity(1L)
+        .maxCapacity(5L)
+        .unit("Gi")
+        .build();
+
+    // When volume name is given, volume id is optional
+    VolumeMetaData meta = VolumeMetaData.newBuilder()
+        .volumeName("volume-name")
+        .capability(cap)
+        .driverName("csi-demo-driver")
+        .mountPoint("/mnt/data")
+        .build();
+    Assert.assertNotNull(meta);
+
+    Assert.assertEquals("volume-name", meta.getVolumeName());
+    Assert.assertEquals(1L, meta.getVolumeCapabilityRange().getMinCapacity());
+    Assert.assertEquals(5L, meta.getVolumeCapabilityRange().getMaxCapacity());
+    Assert.assertEquals("Gi", meta.getVolumeCapabilityRange().getUnit());
+    Assert.assertEquals("csi-demo-driver", meta.getDriverName());
+    Assert.assertEquals("/mnt/data", meta.getMountPoint());
+    Assert.assertFalse(meta.isProvisionedVolume());
+
+    // Test toString
+    JsonParser parser = new JsonParser();
+    JsonElement element = parser.parse(meta.toString());
+    JsonObject json = element.getAsJsonObject();
+    Assert.assertNotNull(json);
+    Assert.assertNull(json.get(CsiConstants.CSI_VOLUME_ID));
+    Assert.assertEquals("volume-name",
+        json.get(CsiConstants.CSI_VOLUME_NAME).getAsString());
+    Assert.assertEquals("csi-demo-driver",
+        json.get(CsiConstants.CSI_DRIVER_NAME).getAsString());
+    Assert.assertEquals("/mnt/data",
+        json.get(CsiConstants.CSI_VOLUME_MOUNT).getAsString());
+  }
+
+  @Test(expected = InvalidVolumeException.class)
+  public void testMissingMountpoint() throws InvalidVolumeException {
+    VolumeCapabilityRange cap = VolumeCapabilityRange.newBuilder()
+        .minCapacity(1L)
+        .maxCapacity(5L)
+        .unit("Gi")
+        .build();
+
+    VolumeMetaData.newBuilder()
+        .volumeId(new VolumeId("id-000001"))
+        .capability(cap)
+        .driverName("csi-demo-driver")
+        .build();
+  }
+
+
+  @Test(expected = InvalidVolumeException.class)
+  public void testMissingCsiDriverName() throws InvalidVolumeException {
+    VolumeCapabilityRange cap = VolumeCapabilityRange.newBuilder()
+        .minCapacity(1L)
+        .maxCapacity(5L)
+        .unit("Gi")
+        .build();
+
+    VolumeMetaData.newBuilder()
+        .volumeId(new VolumeId("id-000001"))
+        .capability(cap)
+        .mountPoint("/mnt/data")
+        .build();
+  }
+
+  @Test(expected = InvalidVolumeException.class)
+  public void testMissingVolumeCapability() throws InvalidVolumeException {
+    VolumeMetaData.newBuilder()
+        .volumeId(new VolumeId("id-000001"))
+        .driverName("csi-demo-driver")
+        .mountPoint("/mnt/data")
+        .build();
+  }
+
+  @Test
+  public void testVolumeId() {
+    VolumeId id1 = new VolumeId("test00001");
+    VolumeId id11 = new VolumeId("test00001");
+    VolumeId id2 = new VolumeId("test00002");
+
+    Assert.assertEquals(id1, id11);
+    Assert.assertEquals(id1.hashCode(), id11.hashCode());
+    Assert.assertNotEquals(id1, id2);
+
+    HashMap<VolumeId, String> map = new HashMap<>();
+    map.put(id1, "1");
+    Assert.assertEquals(1, map.size());
+    Assert.assertEquals("1", map.get(id11));
+    map.put(id11, "2");
+    Assert.assertEquals(1, map.size());
+    Assert.assertEquals("2", map.get(id11));
+    Assert.assertEquals("2", map.get(new VolumeId("test00001")));
+
+    Assert.assertNotEquals(id1, id2);
+  }
+}

+ 250 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java

@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeState;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor.VolumeAMSProcessor;
+import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
+import org.apache.hadoop.yarn.server.volume.csi.CsiConstants;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
+import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
+import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException;
+import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeProvisioningException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * Test cases for volume processor.
+ */
+public class TestVolumeProcessor {
+
+  private static final int GB = 1024;
+  private YarnConfiguration conf;
+  private RMNodeLabelsManager mgr;
+  private MockRM rm;
+  private MockNM[] mockNMS;
+  private RMNode[] rmNodes;
+  private static final int NUM_OF_NMS = 4;
+  // resource-types.xml will be created under target/test-classes/ dir,
+  // it must be deleted after the test is done, to avoid it from reading
+  // by other UT classes.
+  private File resourceTypesFile = null;
+
+  private static final String VOLUME_RESOURCE_NAME = "yarn.io/csi-volume";
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new YarnConfiguration();
+    resourceTypesFile = new File(conf.getClassLoader()
+        .getResource(".").getPath(), "resource-types.xml");
+    writeTmpResourceTypesFile(resourceTypesFile);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+        YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
+    conf.set("yarn.scheduler.capacity.resource-calculator",
+        "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
+    conf.set(CapacitySchedulerConfiguration.PREFIX
+        + CapacitySchedulerConfiguration.ROOT + ".default.ordering-policy",
+        "fair");
+    conf.set(YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS,
+        VolumeAMSProcessor.class.getName());
+    mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+    rm = new MockRM(conf) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+    rm.getRMContext().setNodeLabelManager(mgr);
+    rm.start();
+    mockNMS = new MockNM[NUM_OF_NMS];
+    rmNodes = new RMNode[NUM_OF_NMS];
+    for (int i = 0; i < 4; i++) {
+      mockNMS[i] = rm.registerNode("192.168.0." + i + ":1234", 10 * GB);
+      rmNodes[i] = rm.getRMContext().getRMNodes().get(mockNMS[i].getNodeId());
+    }
+  }
+
+  @After
+  public void tearDown() {
+    if (resourceTypesFile != null && resourceTypesFile.exists()) {
+      resourceTypesFile.delete();
+    }
+  }
+
+  private void writeTmpResourceTypesFile(File tmpFile) throws IOException {
+    FileWriter fw = new FileWriter(tmpFile);
+    try {
+      Configuration yarnConf = new YarnConfiguration();
+      yarnConf.set(YarnConfiguration.RESOURCE_TYPES, VOLUME_RESOURCE_NAME);
+      yarnConf.set("yarn.resource-types."
+          + VOLUME_RESOURCE_NAME + ".units", "Mi");
+      yarnConf.writeXml(fw);
+    } finally {
+      fw.close();
+    }
+  }
+
+  @Test (timeout = 10000L)
+  public void testVolumeProvisioning() throws Exception {
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]);
+    Resource resource = Resource.newInstance(1024, 1);
+    ResourceInformation volumeResource = ResourceInformation
+        .newInstance(VOLUME_RESOURCE_NAME, "Mi", 1024,
+            ResourceTypes.COUNTABLE, 0, Long.MAX_VALUE,
+            ImmutableSet.of(CsiConstants.CSI_VOLUME_RESOURCE_TAG),
+            ImmutableMap.of(
+                CsiConstants.CSI_VOLUME_ID, "test-vol-000001",
+                CsiConstants.CSI_DRIVER_NAME, "hostpath",
+                CsiConstants.CSI_VOLUME_MOUNT, "/mnt/data"
+            )
+        );
+    resource.setResourceInformation(VOLUME_RESOURCE_NAME, volumeResource);
+    SchedulingRequest sc = SchedulingRequest
+        .newBuilder().allocationRequestId(0L)
+        .resourceSizing(ResourceSizing.newInstance(1, resource))
+        .build();
+    AllocateRequest ar = AllocateRequest.newBuilder()
+        .schedulingRequests(Arrays.asList(sc))
+        .build();
+
+    am1.allocate(ar);
+    VolumeStates volumeStates =
+        rm.getRMContext().getVolumeManager().getVolumeStates();
+    Assert.assertNotNull(volumeStates);
+    VolumeState volumeState = VolumeState.NEW;
+    while (volumeState != VolumeState.NODE_READY) {
+      Volume volume = volumeStates
+          .getVolume(new VolumeId("test-vol-000001"));
+      if (volume != null) {
+        volumeState = volume.getVolumeState();
+      }
+      am1.doHeartbeat();
+      mockNMS[0].nodeHeartbeat(true);
+      Thread.sleep(500);
+    }
+    rm.stop();
+  }
+
+  @Test (timeout = 30000L)
+  public void testInvalidRequest() throws Exception {
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]);
+    Resource resource = Resource.newInstance(1024, 1);
+    ResourceInformation volumeResource = ResourceInformation
+        .newInstance(VOLUME_RESOURCE_NAME, "Mi", 1024,
+            ResourceTypes.COUNTABLE, 0, Long.MAX_VALUE,
+            ImmutableSet.of(CsiConstants.CSI_VOLUME_RESOURCE_TAG),
+            ImmutableMap.of(
+                // volume ID is missing...
+                CsiConstants.CSI_VOLUME_NAME, "test-vol-000001",
+                CsiConstants.CSI_DRIVER_NAME, "hostpath",
+                CsiConstants.CSI_VOLUME_MOUNT, "/mnt/data"
+            )
+        );
+    resource.setResourceInformation(VOLUME_RESOURCE_NAME, volumeResource);
+    SchedulingRequest sc = SchedulingRequest
+        .newBuilder().allocationRequestId(0L)
+        .resourceSizing(ResourceSizing.newInstance(1, resource))
+        .build();
+    AllocateRequest ar = AllocateRequest.newBuilder()
+        .schedulingRequests(Arrays.asList(sc))
+        .build();
+
+    try {
+      am1.allocate(ar);
+      Assert.fail("allocate should fail because invalid request received");
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof InvalidVolumeException);
+    }
+    rm.stop();
+  }
+
+  @Test (timeout = 30000L)
+  public void testProvisioningFailures() throws Exception {
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]);
+
+    CsiAdaptorClientProtocol mockedClient = Mockito
+        .mock(CsiAdaptorClientProtocol.class);
+    // inject adaptor client
+    rm.getRMContext().getVolumeManager().setClient(mockedClient);
+    Mockito.doThrow(new VolumeException("failed")).when(mockedClient)
+        .validateVolume();
+
+    Resource resource = Resource.newInstance(1024, 1);
+    ResourceInformation volumeResource = ResourceInformation
+        .newInstance(VOLUME_RESOURCE_NAME, "Mi", 1024,
+            ResourceTypes.COUNTABLE, 0, Long.MAX_VALUE,
+            ImmutableSet.of(CsiConstants.CSI_VOLUME_RESOURCE_TAG),
+            ImmutableMap.of(
+                CsiConstants.CSI_VOLUME_ID, "test-vol-000001",
+                CsiConstants.CSI_DRIVER_NAME, "hostpath",
+                CsiConstants.CSI_VOLUME_MOUNT, "/mnt/data"
+            )
+        );
+    resource.setResourceInformation(VOLUME_RESOURCE_NAME, volumeResource);
+    SchedulingRequest sc = SchedulingRequest
+        .newBuilder().allocationRequestId(0L)
+        .resourceSizing(ResourceSizing.newInstance(1, resource))
+        .build();
+    AllocateRequest ar = AllocateRequest.newBuilder()
+        .schedulingRequests(Arrays.asList(sc))
+        .build();
+
+    try {
+      am1.allocate(ar);
+      Assert.fail("allocate should fail");
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof VolumeProvisioningException);
+    }
+    rm.stop();
+  }
+}