Browse Source

YARN-8953. [CSI] CSI driver adaptor module support in NodeManager. Contributed by Weiwei Yang.

Sunil G 6 years ago
parent
commit
5fb14e0635
41 changed files with 2228 additions and 104 deletions
  1. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml
  2. 9 14
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorPB.java
  3. 39 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorProtocol.java
  4. 7 11
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetPluginInfoRequest.java
  5. 43 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetPluginInfoResponse.java
  6. 117 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ValidateVolumeCapabilitiesRequest.java
  7. 46 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ValidateVolumeCapabilitiesResponse.java
  8. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  9. 34 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto
  10. 69 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_csi_adaptor.proto
  11. 91 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/CsiAdaptorProtocolPBClientImpl.java
  12. 75 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/CsiAdaptorProtocolPBServiceImpl.java
  13. 60 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetPluginInfoRequestPBImpl.java
  14. 84 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetPluginInfoResponsePBImpl.java
  15. 121 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ValidateVolumeCapabilitiesRequestPBImpl.java
  16. 87 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ValidateVolumeCapabilitiesResponsePBImpl.java
  17. 13 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  18. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/pom.xml
  19. 122 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorProtocolService.java
  20. 21 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/package-info.java
  21. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java
  22. 11 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClientImpl.java
  23. 44 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/GetPluginInfoResponseProtoTranslator.java
  24. 49 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ProtoTranslator.java
  25. 66 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ProtoTranslatorFactory.java
  26. 93 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ValidateVolumeCapabilitiesRequestProtoTranslator.java
  27. 48 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ValidationVolumeCapabilitiesResponseProtoTranslator.java
  28. 21 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/package-info.java
  29. 61 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/ConfigUtils.java
  30. 256 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java
  31. 66 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestGetPluginInfoRequestResponse.java
  32. 113 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityRequest.java
  33. 61 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityResponse.java
  34. 21 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/package-info.java
  35. 17 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java
  36. 80 11
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java
  37. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java
  38. 38 20
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java
  39. 16 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java
  40. 65 35
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java
  41. 26 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml

@@ -124,6 +124,8 @@
                   <include>server/application_history_server.proto</include>
                   <include>server/application_history_server.proto</include>
                   <include>client_SCM_protocol.proto</include>
                   <include>client_SCM_protocol.proto</include>
                   <include>server/SCM_Admin_protocol.proto</include>
                   <include>server/SCM_Admin_protocol.proto</include>
+                  <include>yarn_csi_adaptor.proto</include>
+                  <include>YarnCsiAdaptor.proto</include>
                 </includes>
                 </includes>
               </source>
               </source>
             </configuration>
             </configuration>

+ 9 - 14
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/CsiAdaptorClient.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorPB.java

@@ -15,22 +15,17 @@
  * See the License for the specific language governing permissions and
  * See the License for the specific language governing permissions and
  * limitations under the License.
  * limitations under the License.
  */
  */
-package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
+package org.apache.hadoop.yarn.api;
 
 
-import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException;
-import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtocol;
 
 
 /**
 /**
- * Client talks to CSI adaptor.
+ * Interface for the CSI adaptor protocol.
  */
  */
-public class CsiAdaptorClient implements CsiAdaptorClientProtocol {
-
-  @Override
-  public void validateVolume() throws VolumeException {
-    // TODO
-  }
-
-  @Override public void controllerPublishVolume() throws VolumeException {
-    // TODO
-  }
+@ProtocolInfo(
+    protocolName = "CsiAdaptorPB",
+    protocolVersion = 1)
+public interface CsiAdaptorPB extends
+    CsiAdaptorProtocol.CsiAdaptorProtocolService.BlockingInterface {
 }
 }

+ 39 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorProtocol.java

@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api;
+
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import java.io.IOException;
+
+/**
+ * CSI adaptor delegates all the calls from YARN to a CSI driver.
+ */
+public interface CsiAdaptorProtocol {
+
+  GetPluginInfoResponse getPluginInfo(GetPluginInfoRequest request)
+      throws YarnException, IOException;
+
+  ValidateVolumeCapabilitiesResponse validateVolumeCapacity(
+      ValidateVolumeCapabilitiesRequest request) throws YarnException,
+      IOException;
+}

+ 7 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiAdaptorClientProtocol.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetPluginInfoRequest.java

@@ -15,20 +15,16 @@
  * See the License for the specific language governing permissions and
  * See the License for the specific language governing permissions and
  * limitations under the License.
  * limitations under the License.
  */
  */
-package org.apache.hadoop.yarn.server.volume.csi;
+package org.apache.hadoop.yarn.api.protocolrecords;
 
 
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException;
+import org.apache.hadoop.yarn.util.Records;
 
 
 /**
 /**
- * Protocol for the CSI adaptor.
+ * Get plugin info request.
  */
  */
-@Private
-@Unstable
-public interface CsiAdaptorClientProtocol {
+public abstract class GetPluginInfoRequest {
 
 
-  void validateVolume() throws VolumeException;
-
-  void controllerPublishVolume() throws VolumeException;
+  public static GetPluginInfoRequest newInstance() {
+    return Records.newRecord(GetPluginInfoRequest.class);
+  }
 }
 }

+ 43 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/GetPluginInfoResponse.java

@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * Get plugin info response.
+ */
+public abstract class GetPluginInfoResponse {
+
+  public static GetPluginInfoResponse newInstance(
+      String driverName, String version) {
+    GetPluginInfoResponse response =
+        Records.newRecord(GetPluginInfoResponse.class);
+    response.setDriverName(driverName);
+    response.setVersion(version);
+    return response;
+  }
+
+  public abstract void setDriverName(String driverName);
+
+  public abstract String getDriverName();
+
+  public abstract void setVersion(String version);
+
+  public abstract String getVersion();
+}

+ 117 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ValidateVolumeCapabilitiesRequest.java

@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.yarn.util.Records;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * YARN internal message used to validate volume capabilities
+ * with a CSI driver controller plugin.
+ */
+public abstract class ValidateVolumeCapabilitiesRequest {
+
+  /**
+   * Volume access mode.
+   */
+  public enum AccessMode {
+    UNKNOWN,
+    SINGLE_NODE_WRITER,
+    SINGLE_NODE_READER_ONLY,
+    MULTI_NODE_READER_ONLY,
+    MULTI_NODE_SINGLE_WRITER,
+    MULTI_NODE_MULTI_WRITER,
+  }
+
+  /**
+   * Volume type.
+   */
+  public enum VolumeType {
+    BLOCK,
+    FILE_SYSTEM
+  }
+
+  /**
+   * Volume capability.
+   */
+  public static class VolumeCapability {
+
+    private AccessMode mode;
+    private VolumeType type;
+    private List<String> flags;
+
+    public VolumeCapability(AccessMode accessMode, VolumeType volumeType,
+        List<String> mountFlags) {
+      this.mode = accessMode;
+      this.type = volumeType;
+      this.flags = mountFlags;
+    }
+
+    public AccessMode getAccessMode() {
+      return mode;
+    }
+
+    public VolumeType getVolumeType() {
+      return type;
+    }
+
+    public List<String> getMountFlags() {
+      return flags;
+    }
+  }
+
+  public static ValidateVolumeCapabilitiesRequest newInstance(
+      String volumeId, List<VolumeCapability> volumeCapabilities,
+      Map<String, String> volumeAttributes) {
+    ValidateVolumeCapabilitiesRequest
+        request =
+        Records.newRecord(
+            ValidateVolumeCapabilitiesRequest.class);
+    request.setVolumeId(volumeId);
+    request.setVolumeAttributes(volumeAttributes);
+    for (VolumeCapability capability : volumeCapabilities) {
+      request.addVolumeCapability(capability);
+    }
+    return request;
+  }
+
+  public static ValidateVolumeCapabilitiesRequest newInstance(
+      String volumeId, Map<String, String> volumeAttributes) {
+    ValidateVolumeCapabilitiesRequest
+        request =
+        Records.newRecord(
+            ValidateVolumeCapabilitiesRequest.class);
+    request.setVolumeId(volumeId);
+    request.setVolumeAttributes(volumeAttributes);
+    return request;
+  }
+
+  public abstract void setVolumeId(String volumeId);
+
+  public abstract String getVolumeId();
+
+  public abstract void setVolumeAttributes(Map<String, String> attributes);
+
+  public abstract Map<String, String> getVolumeAttributes();
+
+  public abstract void addVolumeCapability(VolumeCapability volumeCapability);
+
+  public abstract List<VolumeCapability> getVolumeCapabilities();
+}

+ 46 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ValidateVolumeCapabilitiesResponse.java

@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * YARN internal message used to represent the response of
+ * volume capabilities validation with a CSI driver controller plugin.
+ */
+public abstract class ValidateVolumeCapabilitiesResponse {
+
+  public static ValidateVolumeCapabilitiesResponse newInstance(
+      boolean supported, String responseMessage) {
+    ValidateVolumeCapabilitiesResponse
+        record =
+        Records.newRecord(
+            ValidateVolumeCapabilitiesResponse.class);
+    record.setResponseMessage(responseMessage);
+    record.setSupported(supported);
+    return record;
+  }
+
+  public abstract void setSupported(boolean supported);
+
+  public abstract boolean isSupported();
+
+  public abstract void setResponseMessage(String responseMessage);
+
+  public abstract String getResponseMessage();
+}

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -3427,6 +3427,18 @@ public class YarnConfiguration extends Configuration {
   public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED =
   public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED =
       false;
       false;
 
 
+  ////////////////////////////////
+  // CSI Volume configs
+  ////////////////////////////////
+  /**
+   * One or more socket addresses for csi-adaptor.
+   * Multiple addresses are delimited by ",".
+   */
+  public static final String NM_CSI_ADAPTOR_PREFIX =
+      NM_PREFIX + "csi-driver-adaptor.";
+  public static final String NM_CSI_ADAPTOR_ADDRESSES =
+      NM_CSI_ADAPTOR_PREFIX + "addresses";
+
   ////////////////////////////////
   ////////////////////////////////
   // Other Configs
   // Other Configs
   ////////////////////////////////
   ////////////////////////////////

+ 34 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto

@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.yarn.proto";
+option java_outer_classname = "CsiAdaptorProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.yarn;
+
+import "yarn_csi_adaptor.proto";
+
+service CsiAdaptorProtocolService {
+
+    rpc getPluginInfo (GetPluginInfoRequest)
+    returns (GetPluginInfoResponse);
+
+    rpc validateVolumeCapacity (ValidateVolumeCapabilitiesRequest)
+    returns (ValidateVolumeCapabilitiesResponse);
+}

+ 69 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_csi_adaptor.proto

@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+option java_package = "org.apache.hadoop.yarn.proto";
+option java_outer_classname = "CsiAdaptorProtos";
+option java_generate_equals_and_hash = true;
+package hadoop.yarn;
+
+import "yarn_protos.proto";
+
+message ValidateVolumeCapabilitiesRequest {
+    required string volume_id = 1;
+    repeated VolumeCapability volume_capabilities = 2;
+    repeated StringStringMapProto volume_attributes = 3;
+}
+
+message ValidateVolumeCapabilitiesResponse {
+    // True if the Plugin supports the specified capabilities for the
+    // given volume. This field is REQUIRED.
+    required bool supported = 1;
+
+    // Message to the CO if `supported` above is false. This field is
+    // OPTIONAL.
+    // An empty string is equal to an unspecified field value.
+    optional string message = 2;
+}
+
+message VolumeCapability {
+    enum VolumeType {
+        BLOCK = 0;
+        FILE_SYSTEM = 1;
+    }
+
+    enum AccessMode {
+        UNKNOWN = 0;
+        SINGLE_NODE_WRITER = 1;
+        SINGLE_NODE_READER_ONLY = 2;
+        MULTI_NODE_READER_ONLY = 3;
+        MULTI_NODE_SINGLE_WRITER = 4;
+        MULTI_NODE_MULTI_WRITER = 5;
+    }
+
+    required VolumeType volume_type = 1;
+    required AccessMode access_mode = 2;
+    repeated string mount_flags = 3;
+}
+
+message GetPluginInfoRequest {
+    // Intentionally empty.
+}
+
+message GetPluginInfoResponse {
+    required string name = 1;
+    required string vendor_version = 2;
+}

+ 91 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/CsiAdaptorProtocolPBClientImpl.java

@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.impl.pb.client;
+
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.api.CsiAdaptorPB;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesResponsePBImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * CSI adaptor client implementation.
+ */
+public class CsiAdaptorProtocolPBClientImpl
+    implements CsiAdaptorProtocol, Closeable {
+
+  private final CsiAdaptorPB proxy;
+
+  public CsiAdaptorProtocolPBClientImpl(long clientVersion,
+      InetSocketAddress addr, Configuration conf) throws IOException {
+    RPC.setProtocolEngine(conf, CsiAdaptorPB.class, ProtobufRpcEngine.class);
+    this.proxy = RPC.getProxy(CsiAdaptorPB.class, clientVersion, addr, conf);
+  }
+
+  @Override
+  public GetPluginInfoResponse getPluginInfo(
+      GetPluginInfoRequest request) throws YarnException, IOException {
+    CsiAdaptorProtos.GetPluginInfoRequest requestProto =
+        ((GetPluginInfoRequestPBImpl) request).getProto();
+    try {
+      return new GetPluginInfoResponsePBImpl(
+          proxy.getPluginInfo(null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public ValidateVolumeCapabilitiesResponse validateVolumeCapacity(
+      ValidateVolumeCapabilitiesRequest request)
+      throws YarnException, IOException {
+    CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest requestProto =
+        ((ValidateVolumeCapabilitiesRequestPBImpl) request).getProto();
+    try {
+      return new ValidateVolumeCapabilitiesResponsePBImpl(
+          proxy.validateVolumeCapacity(null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if(this.proxy != null) {
+      RPC.stopProxy(this.proxy);
+    }
+  }
+}

+ 75 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/CsiAdaptorProtocolPBServiceImpl.java

@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.impl.pb.service;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.yarn.api.CsiAdaptorPB;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesResponsePBImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+
+import java.io.IOException;
+
+/**
+ * CSI adaptor server side implementation, this is hosted on a node manager.
+ */
+public class CsiAdaptorProtocolPBServiceImpl implements CsiAdaptorPB {
+
+  private final CsiAdaptorProtocol real;
+  public CsiAdaptorProtocolPBServiceImpl(CsiAdaptorProtocol impl) {
+    this.real = impl;
+  }
+
+  @Override
+  public CsiAdaptorProtos.GetPluginInfoResponse getPluginInfo(
+      RpcController controller, CsiAdaptorProtos.GetPluginInfoRequest request)
+      throws ServiceException {
+    try {
+      GetPluginInfoRequest req =
+          new GetPluginInfoRequestPBImpl(request);
+      GetPluginInfoResponse response = real.getPluginInfo(req);
+      return ((GetPluginInfoResponsePBImpl) response).getProto();
+    } catch (YarnException | IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse
+      validateVolumeCapacity(RpcController controller,
+      CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest request)
+      throws ServiceException {
+    try {
+      ValidateVolumeCapabilitiesRequestPBImpl req =
+          new ValidateVolumeCapabilitiesRequestPBImpl(request);
+      ValidateVolumeCapabilitiesResponse response =
+          real.validateVolumeCapacity(req);
+      return ((ValidateVolumeCapabilitiesResponsePBImpl) response).getProto();
+    } catch (YarnException | IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+}

+ 60 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetPluginInfoRequestPBImpl.java

@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+
+/**
+ * Get plugin info request protobuf impl.
+ */
+public class GetPluginInfoRequestPBImpl extends GetPluginInfoRequest {
+
+  private CsiAdaptorProtos.GetPluginInfoRequest.Builder builder;
+
+  public GetPluginInfoRequestPBImpl(
+      CsiAdaptorProtos.GetPluginInfoRequest requestProto) {
+    this.builder = requestProto.toBuilder();
+  }
+
+  public GetPluginInfoRequestPBImpl() {
+    this.builder = CsiAdaptorProtos.GetPluginInfoRequest.newBuilder();
+  }
+
+  public CsiAdaptorProtos.GetPluginInfoRequest getProto() {
+    Preconditions.checkNotNull(builder);
+    return builder.build();
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+}

+ 84 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/GetPluginInfoResponsePBImpl.java

@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+
+/**
+ * Get plugin info response protobuf impl.
+ */
+public class GetPluginInfoResponsePBImpl extends GetPluginInfoResponse {
+
+  private CsiAdaptorProtos.GetPluginInfoResponse.Builder builder;
+
+  public GetPluginInfoResponsePBImpl(
+      CsiAdaptorProtos.GetPluginInfoResponse responseProto) {
+    this.builder = responseProto.toBuilder();
+  }
+
+  public GetPluginInfoResponsePBImpl() {
+    this.builder = CsiAdaptorProtos.GetPluginInfoResponse.newBuilder();
+  }
+
+  @Override
+  public void setDriverName(String driverName) {
+    Preconditions.checkNotNull(builder);
+    builder.setName(driverName);
+  }
+
+  @Override
+  public String getDriverName() {
+    Preconditions.checkNotNull(builder);
+    return builder.getName();
+  }
+
+  @Override
+  public void setVersion(String version) {
+    Preconditions.checkNotNull(builder);
+    builder.setVendorVersion(version);
+  }
+
+  @Override
+  public String getVersion() {
+    Preconditions.checkNotNull(builder);
+    return builder.getVendorVersion();
+  }
+
+  public CsiAdaptorProtos.GetPluginInfoResponse getProto() {
+    Preconditions.checkNotNull(builder);
+    return builder.build();
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+}

+ 121 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ValidateVolumeCapabilitiesRequestPBImpl.java

@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * PB wrapper for CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest.
+ */
+public class ValidateVolumeCapabilitiesRequestPBImpl extends
+    ValidateVolumeCapabilitiesRequest {
+
+  private CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest.Builder builder;
+
+  public ValidateVolumeCapabilitiesRequestPBImpl(
+      CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest proto) {
+    this.builder = proto.toBuilder();
+  }
+
+  public ValidateVolumeCapabilitiesRequestPBImpl() {
+    this.builder = CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest
+       .newBuilder();
+  }
+
+  @Override
+  public String getVolumeId() {
+    Preconditions.checkNotNull(builder);
+    return builder.getVolumeId();
+  }
+
+  @Override
+  public void setVolumeAttributes(Map<String, String> attributes) {
+    Preconditions.checkNotNull(builder);
+    builder.addAllVolumeAttributes(ProtoUtils.convertToProtoFormat(attributes));
+  }
+
+  @Override
+  public void setVolumeId(String volumeId) {
+    Preconditions.checkNotNull(builder);
+    builder.setVolumeId(volumeId);
+  }
+
+  @Override
+  public void addVolumeCapability(VolumeCapability volumeCapability) {
+    Preconditions.checkNotNull(builder);
+    CsiAdaptorProtos.VolumeCapability vc =
+        CsiAdaptorProtos.VolumeCapability.newBuilder()
+            .setAccessMode(CsiAdaptorProtos.VolumeCapability.AccessMode
+                .valueOf(volumeCapability.getAccessMode().ordinal()))
+            .setVolumeType(CsiAdaptorProtos.VolumeCapability.VolumeType
+                .valueOf(volumeCapability.getVolumeType().ordinal()))
+            .addAllMountFlags(volumeCapability.getMountFlags())
+            .build();
+    builder.addVolumeCapabilities(vc);
+  }
+
+  @Override
+  public List<VolumeCapability> getVolumeCapabilities() {
+    Preconditions.checkNotNull(builder);
+    List<VolumeCapability> caps = new ArrayList<>(
+        builder.getVolumeCapabilitiesCount());
+    builder.getVolumeCapabilitiesList().forEach(capability -> {
+      VolumeCapability vc = new VolumeCapability(
+          AccessMode.valueOf(capability.getAccessMode().name()),
+          VolumeType.valueOf(capability.getVolumeType().name()),
+          capability.getMountFlagsList());
+      caps.add(vc);
+    });
+    return caps;
+  }
+
+  @Override
+  public Map<String, String> getVolumeAttributes() {
+    Preconditions.checkNotNull(builder);
+    return ProtoUtils.convertStringStringMapProtoListToMap(
+        builder.getVolumeAttributesList());
+  }
+
+  public CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest getProto() {
+    Preconditions.checkNotNull(builder);
+    return builder.build();
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+}

+ 87 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/ValidateVolumeCapabilitiesResponsePBImpl.java

@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+
+/**
+ * PB wrapper for CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse.
+ */
+public class ValidateVolumeCapabilitiesResponsePBImpl
+    extends ValidateVolumeCapabilitiesResponse {
+
+  private CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse.Builder builder;
+
+  public ValidateVolumeCapabilitiesResponsePBImpl() {
+    this.builder = CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse
+        .newBuilder();
+  }
+
+  public ValidateVolumeCapabilitiesResponsePBImpl(
+      CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse response) {
+    this.builder = response.toBuilder();
+  }
+
+  @Override
+  public void setSupported(boolean supported) {
+    Preconditions.checkNotNull(builder);
+    this.builder.setSupported(supported);
+  }
+
+  @Override
+  public boolean isSupported() {
+    Preconditions.checkNotNull(builder);
+    return builder.getSupported();
+  }
+
+  @Override
+  public void setResponseMessage(String message) {
+    Preconditions.checkNotNull(builder);
+    this.builder.setMessage(message);
+  }
+
+  @Override
+  public String getResponseMessage() {
+    Preconditions.checkNotNull(builder);
+    return this.builder.getMessage();
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  public CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse getProto() {
+    Preconditions.checkNotNull(builder);
+    return builder.build();
+  }
+
+}

+ 13 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -4037,4 +4037,17 @@
     <name>yarn.node-attribute.fs-store.impl.class</name>
     <name>yarn.node-attribute.fs-store.impl.class</name>
     <value>org.apache.hadoop.yarn.server.resourcemanager.nodelabels.FileSystemNodeAttributeStore</value>
     <value>org.apache.hadoop.yarn.server.resourcemanager.nodelabels.FileSystemNodeAttributeStore</value>
   </property>
   </property>
+
+  <!-- CSI configuration -->
+  <property>
+    <description>
+      CSI driver adaptor addresses on a node manager.
+      This configuration will be loaded by the resource manager to initiate
+      a client for each adaptor in order to communicate with CSI drivers.
+      Note, these addresses should be mapped to the adaptor addresses which
+      runs the controller plugin.
+    </description>
+    <name>yarn.nodemanager.csi-driver-adaptor.addresses</name>
+    <value></value>
+  </property>
 </configuration>
 </configuration>

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/pom.xml

@@ -83,6 +83,18 @@
             <type>test-jar</type>
             <type>test-jar</type>
             <scope>test</scope>
             <scope>test</scope>
         </dependency>
         </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-api</artifactId>
+        </dependency>
         <dependency>
         <dependency>
             <groupId>javax.annotation</groupId>
             <groupId>javax.annotation</groupId>
             <artifactId>javax.annotation-api</artifactId>
             <artifactId>javax.annotation-api</artifactId>

+ 122 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorProtocolService.java

@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;
+
+import com.google.common.annotations.VisibleForTesting;
+import csi.v0.Csi;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import org.apache.hadoop.yarn.csi.client.CsiClient;
+import org.apache.hadoop.yarn.csi.client.CsiClientImpl;
+import org.apache.hadoop.yarn.csi.translator.ProtoTranslatorFactory;
+import org.apache.hadoop.yarn.csi.utils.ConfigUtils;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * This is a Hadoop RPC server, we uses the Hadoop RPC framework here
+ * because we need to stick to the security model current Hadoop supports.
+ */
+public class CsiAdaptorProtocolService extends AbstractService
+    implements CsiAdaptorProtocol {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CsiAdaptorProtocolService.class);
+
+  private Server server;
+  private InetSocketAddress adaptorServiceAddress;
+  private CsiClient csiClient;
+  private String csiDriverName;
+
+  public CsiAdaptorProtocolService(String driverName,
+      String domainSocketPath) {
+    super(CsiAdaptorProtocolService.class.getName());
+    this.csiClient = new CsiClientImpl(domainSocketPath);
+    this.csiDriverName = driverName;
+  }
+
+  @VisibleForTesting
+  public void setCsiClient(CsiClient client) {
+    this.csiClient = client;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    adaptorServiceAddress = ConfigUtils
+        .getCsiAdaptorAddressForDriver(csiDriverName, conf);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    Configuration conf = getConfig();
+    YarnRPC rpc = YarnRPC.create(conf);
+    this.server = rpc.getServer(
+        CsiAdaptorProtocol.class,
+        this, adaptorServiceAddress, conf, null, 1);
+    this.server.start();
+    LOG.info("{} started, listening on address: {}",
+        CsiAdaptorProtocolService.class.getName(),
+        adaptorServiceAddress.toString());
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (this.server != null) {
+      this.server.stop();
+    }
+    super.serviceStop();
+  }
+
+  @Override
+  public GetPluginInfoResponse getPluginInfo(
+      GetPluginInfoRequest request) throws YarnException, IOException {
+    Csi.GetPluginInfoResponse response = csiClient.getPluginInfo();
+    return ProtoTranslatorFactory.getTranslator(
+        GetPluginInfoResponse.class, Csi.GetPluginInfoResponse.class)
+        .convertFrom(response);
+  }
+
+  @Override
+  public ValidateVolumeCapabilitiesResponse validateVolumeCapacity(
+      ValidateVolumeCapabilitiesRequest request) throws YarnException,
+      IOException {
+    Csi.ValidateVolumeCapabilitiesRequest req = ProtoTranslatorFactory
+        .getTranslator(ValidateVolumeCapabilitiesRequest.class,
+            Csi.ValidateVolumeCapabilitiesRequest.class)
+        .convertTo(request);
+    Csi.ValidateVolumeCapabilitiesResponse response =
+        csiClient.validateVolumeCapabilities(req);
+    return ProtoTranslatorFactory.getTranslator(
+        ValidateVolumeCapabilitiesResponse.class,
+        Csi.ValidateVolumeCapabilitiesResponse.class)
+        .convertFrom(response);
+  }
+}

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/package-info.java

@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains CSI adaptor classes.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java

@@ -18,6 +18,7 @@
 
 
 package org.apache.hadoop.yarn.csi.client;
 package org.apache.hadoop.yarn.csi.client;
 
 
+import csi.v0.Csi;
 import csi.v0.Csi.GetPluginInfoResponse;
 import csi.v0.Csi.GetPluginInfoResponse;
 
 
 import java.io.IOException;
 import java.io.IOException;
@@ -36,4 +37,7 @@ public interface CsiClient {
    * @throws IOException when unable to get plugin info from the driver.
    * @throws IOException when unable to get plugin info from the driver.
    */
    */
   GetPluginInfoResponse getPluginInfo() throws IOException;
   GetPluginInfoResponse getPluginInfo() throws IOException;
+
+  Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities(
+      Csi.ValidateVolumeCapabilitiesRequest request) throws IOException;
 }
 }

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClientImpl.java

@@ -18,6 +18,7 @@
 
 
 package org.apache.hadoop.yarn.csi.client;
 package org.apache.hadoop.yarn.csi.client;
 
 
+import csi.v0.Csi;
 import csi.v0.Csi.GetPluginInfoRequest;
 import csi.v0.Csi.GetPluginInfoRequest;
 import csi.v0.Csi.GetPluginInfoResponse;
 import csi.v0.Csi.GetPluginInfoResponse;
 import org.apache.hadoop.yarn.csi.utils.GrpcHelper;
 import org.apache.hadoop.yarn.csi.utils.GrpcHelper;
@@ -48,4 +49,14 @@ public class CsiClientImpl implements CsiClient {
       return client.createIdentityBlockingStub().getPluginInfo(request);
       return client.createIdentityBlockingStub().getPluginInfo(request);
     }
     }
   }
   }
+
+  @Override
+  public Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities(
+      Csi.ValidateVolumeCapabilitiesRequest request) throws IOException {
+    try (CsiGrpcClient client = CsiGrpcClient.newBuilder()
+        .setDomainSocketAddress(address).build()) {
+      return client.createControllerBlockingStub()
+          .validateVolumeCapabilities(request);
+    }
+  }
 }
 }

+ 44 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/GetPluginInfoResponseProtoTranslator.java

@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.translator;
+
+import csi.v0.Csi;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * Protobuf message translator for GetPluginInfoResponse and
+ * Csi.GetPluginInfoResponse.
+ */
+public class GetPluginInfoResponseProtoTranslator implements
+    ProtoTranslator<GetPluginInfoResponse, Csi.GetPluginInfoResponse> {
+
+  @Override public Csi.GetPluginInfoResponse convertTo(
+      GetPluginInfoResponse messageA) throws YarnException {
+    return Csi.GetPluginInfoResponse.newBuilder()
+        .setName(messageA.getDriverName())
+        .setVendorVersion(messageA.getVersion())
+        .build();
+  }
+
+  @Override public GetPluginInfoResponse convertFrom(
+      Csi.GetPluginInfoResponse messageB) throws YarnException {
+    return GetPluginInfoResponse.newInstance(messageB.getName(),
+        messageB.getVendorVersion());
+  }
+}

+ 49 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ProtoTranslator.java

@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.translator;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * ProtoTranslator converts a YARN side message to CSI proto message
+ * and vice versa. Each CSI proto message should have a corresponding
+ * YARN side message implementation, and a transformer to convert them
+ * one to the other. This layer helps we to hide CSI spec messages
+ * from YARN components.
+ *
+ * @param <A> YARN side internal messages
+ * @param <B> CSI proto messages
+ */
+public interface ProtoTranslator<A, B> {
+
+  /**
+   * Convert message from type A to type B.
+   * @param messageA
+   * @return messageB
+   * @throws YarnException
+   */
+  B convertTo(A messageA) throws YarnException;
+
+  /**
+   * Convert message from type B to type A.
+   * @param messageB
+   * @return messageA
+   * @throws YarnException
+   */
+  A convertFrom(B messageB) throws YarnException;
+}

+ 66 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ProtoTranslatorFactory.java

@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.translator;
+
+import csi.v0.Csi;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+
+/**
+ * Factory class to get desired proto transformer instance.
+ */
+public final class ProtoTranslatorFactory {
+
+  private ProtoTranslatorFactory() {
+    // hide constructor for the factory class
+  }
+
+  /**
+   * Get a {@link ProtoTranslator} based on the given input message
+   * types. If the type is not supported, a IllegalArgumentException
+   * will be thrown. When adding more transformers to this factory class,
+   * note each transformer works exactly for one message to another
+   * (and vice versa). For each type of the message, make sure there is
+   * a corresponding unit test added, such as
+   * TestValidateVolumeCapabilitiesRequest.
+   *
+   * @param yarnProto yarn proto message
+   * @param csiProto CSI proto message
+   * @param <A> yarn proto message
+   * @param <B> CSI proto message
+   * @throws IllegalArgumentException
+   *   when given types are not supported
+   * @return
+   *   a proto message transformer that transforms
+   *   YARN internal proto message to CSI
+   */
+  public static <A, B> ProtoTranslator<A, B> getTranslator(
+      Class<A> yarnProto, Class<B> csiProto) {
+    if (yarnProto == ValidateVolumeCapabilitiesRequest.class
+        && csiProto == Csi.ValidateVolumeCapabilitiesRequest.class) {
+      return new ValidateVolumeCapabilitiesRequestProtoTranslator();
+    } else if (yarnProto == ValidateVolumeCapabilitiesResponse.class
+        && csiProto == Csi.ValidateVolumeCapabilitiesResponse.class) {
+      return new ValidationVolumeCapabilitiesResponseProtoTranslator();
+    }
+    throw new IllegalArgumentException("A problem is found while processing"
+        + " proto message translating. Unexpected message types,"
+        + " no transformer is found can handle the transformation from type "
+        + yarnProto.getName() + " <-> " + csiProto.getName());
+  }
+}

+ 93 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ValidateVolumeCapabilitiesRequestProtoTranslator.java

@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.csi.translator;
+
+import csi.v0.Csi;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeCapability;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeType;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Proto message translator for ValidateVolumeCapabilitiesRequest.
+ * @param <A> ValidateVolumeCapabilitiesRequest
+ * @param <B> Csi.ValidateVolumeCapabilitiesRequest
+ */
+public class ValidateVolumeCapabilitiesRequestProtoTranslator<A, B>
+    implements ProtoTranslator<ValidateVolumeCapabilitiesRequest,
+            Csi.ValidateVolumeCapabilitiesRequest> {
+
+  @Override
+  public Csi.ValidateVolumeCapabilitiesRequest convertTo(
+      ValidateVolumeCapabilitiesRequest request) throws YarnException {
+    Csi.ValidateVolumeCapabilitiesRequest.Builder buidler =
+        Csi.ValidateVolumeCapabilitiesRequest.newBuilder();
+    buidler.setVolumeId(request.getVolumeId());
+    if (request.getVolumeCapabilities() != null
+        && request.getVolumeCapabilities().size() > 0) {
+      buidler.putAllVolumeAttributes(request.getVolumeAttributes());
+    }
+    for (VolumeCapability cap :
+        request.getVolumeCapabilities()) {
+      Csi.VolumeCapability.AccessMode accessMode =
+          Csi.VolumeCapability.AccessMode.newBuilder()
+              .setModeValue(cap.getAccessMode().ordinal())
+              .build();
+      Csi.VolumeCapability.MountVolume mountVolume =
+          Csi.VolumeCapability.MountVolume.newBuilder()
+              .addAllMountFlags(cap.getMountFlags())
+              .build();
+      Csi.VolumeCapability capability =
+          Csi.VolumeCapability.newBuilder()
+              .setAccessMode(accessMode)
+              .setMount(mountVolume)
+              .build();
+      buidler.addVolumeCapabilities(capability);
+    }
+    return buidler.build();
+  }
+
+  @Override
+  public ValidateVolumeCapabilitiesRequest convertFrom(
+      Csi.ValidateVolumeCapabilitiesRequest request) throws YarnException {
+    ValidateVolumeCapabilitiesRequest result = ValidateVolumeCapabilitiesRequest
+        .newInstance(request.getVolumeId(), request.getVolumeAttributesMap());
+    for (Csi.VolumeCapability csiCap :
+        request.getVolumeCapabilitiesList()) {
+      ValidateVolumeCapabilitiesRequest.AccessMode mode =
+          ValidateVolumeCapabilitiesRequest.AccessMode
+              .valueOf(csiCap.getAccessMode().getMode().name());
+      if (!csiCap.hasMount()) {
+        throw new YarnException("Invalid request,"
+            + " mount is not found in the request.");
+      }
+      List<String> mountFlags = new ArrayList<>();
+      for (int i=0; i<csiCap.getMount().getMountFlagsCount(); i++) {
+        mountFlags.add(csiCap.getMount().getMountFlags(i));
+      }
+      VolumeCapability capability = new VolumeCapability(mode,
+          VolumeType.FILE_SYSTEM, mountFlags);
+      result.addVolumeCapability(capability);
+    }
+    return result;
+  }
+}

+ 48 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ValidationVolumeCapabilitiesResponseProtoTranslator.java

@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.translator;
+
+import csi.v0.Csi;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * Proto message translator for ValidateVolumeCapabilitiesResponse.
+ * @param <A> ValidateVolumeCapabilitiesResponse
+ * @param <B> Csi.ValidateVolumeCapabilitiesResponse
+ */
+public class ValidationVolumeCapabilitiesResponseProtoTranslator<A, B>
+    implements ProtoTranslator<ValidateVolumeCapabilitiesResponse,
+            Csi.ValidateVolumeCapabilitiesResponse> {
+
+  @Override
+  public Csi.ValidateVolumeCapabilitiesResponse convertTo(
+      ValidateVolumeCapabilitiesResponse response) throws YarnException {
+    return Csi.ValidateVolumeCapabilitiesResponse.newBuilder()
+        .setSupported(response.isSupported())
+        .setMessage(response.getResponseMessage())
+        .build();
+  }
+
+  @Override
+  public ValidateVolumeCapabilitiesResponse convertFrom(
+      Csi.ValidateVolumeCapabilitiesResponse response) throws YarnException {
+    return ValidateVolumeCapabilitiesResponse.newInstance(
+        response.getSupported(), response.getMessage());
+  }
+}

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/package-info.java

@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains classes for protocol translation between YARN and CSI.
+ */
+package org.apache.hadoop.yarn.csi.translator;

+ 61 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/ConfigUtils.java

@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Utility class to load configurations.
+ */
+public final class ConfigUtils {
+
+  private ConfigUtils() {
+    // Hide constructor for utility class.
+  }
+  /**
+   * Resolve the CSI adaptor address for a CSI driver from configuration.
+   * Expected configuration property name is
+   * yarn.nodemanager.csi-driver-adaptor.${driverName}.address.
+   * @param driverName
+   * @param conf
+   * @return adaptor service address
+   * @throws YarnException
+   */
+  public static InetSocketAddress getCsiAdaptorAddressForDriver(
+      String driverName, Configuration conf) throws YarnException {
+    String configName = YarnConfiguration.NM_CSI_ADAPTOR_PREFIX
+        + driverName + ".address";
+    String errorMessage = "Failed to load CSI adaptor address for driver "
+        + driverName + ", configuration property " + configName
+        + " is not defined or invalid.";
+    try {
+      InetSocketAddress address = conf
+          .getSocketAddr(configName, null, -1);
+      if (address == null) {
+        throw new YarnException(errorMessage);
+      }
+      return address;
+    } catch (IllegalArgumentException e) {
+      throw new YarnException(errorMessage);
+    }
+  }
+}

+ 256 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java

@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import csi.v0.Csi;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.ServiceStateException;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
+import org.apache.hadoop.yarn.api.impl.pb.client.CsiAdaptorProtocolPBClientImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl;
+import org.apache.hadoop.yarn.client.NMProxy;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.csi.client.CsiClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+
+import static org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.AccessMode.MULTI_NODE_MULTI_WRITER;
+import static org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeType.FILE_SYSTEM;
+
+/**
+ * UT for {@link CsiAdaptorProtocolService}.
+ */
+public class TestCsiAdaptorService {
+
+  private static File testRoot = null;
+  private static String domainSocket = null;
+
+  @BeforeClass
+  public static void setUp() throws IOException {
+    testRoot = GenericTestUtils.getTestDir("csi-test");
+    File socketPath = new File(testRoot, "csi.sock");
+    FileUtils.forceMkdirParent(socketPath);
+    domainSocket = "unix://" + socketPath.getAbsolutePath();
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    if (testRoot != null) {
+      FileUtils.deleteDirectory(testRoot);
+    }
+  }
+
+  @Test
+  public void testValidateVolume() throws IOException, YarnException {
+    ServerSocket ss = new ServerSocket(0);
+    ss.close();
+    InetSocketAddress address = new InetSocketAddress(ss.getLocalPort());
+    Configuration conf = new Configuration();
+    conf.setSocketAddr(
+        YarnConfiguration.NM_CSI_ADAPTOR_PREFIX + "test-driver.address",
+        address);
+    CsiAdaptorProtocolService service =
+        new CsiAdaptorProtocolService("test-driver", domainSocket);
+
+    // inject a fake CSI client
+    // this client validates if the ValidateVolumeCapabilitiesRequest
+    // is integrity, and then reply a fake response
+    service.setCsiClient(new CsiClient() {
+      @Override
+      public Csi.GetPluginInfoResponse getPluginInfo() {
+        return Csi.GetPluginInfoResponse.newBuilder()
+            .setName("test-plugin")
+            .setVendorVersion("0.1")
+            .build();
+      }
+
+      @Override
+      public Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities(
+          Csi.ValidateVolumeCapabilitiesRequest request) {
+        // validate we get all info from the request
+        Assert.assertEquals("volume-id-0000123", request.getVolumeId());
+        Assert.assertEquals(1, request.getVolumeCapabilitiesCount());
+        Assert.assertEquals(Csi.VolumeCapability.AccessMode
+            .newBuilder().setModeValue(5).build(),
+            request.getVolumeCapabilities(0).getAccessMode());
+        Assert.assertTrue(request.getVolumeCapabilities(0).hasMount());
+        Assert.assertEquals(2, request.getVolumeCapabilities(0)
+            .getMount().getMountFlagsCount());
+        Assert.assertTrue(request.getVolumeCapabilities(0)
+            .getMount().getMountFlagsList().contains("mountFlag1"));
+        Assert.assertTrue(request.getVolumeCapabilities(0)
+            .getMount().getMountFlagsList().contains("mountFlag2"));
+        Assert.assertEquals(2, request.getVolumeAttributesCount());
+        Assert.assertEquals("v1", request.getVolumeAttributesMap().get("k1"));
+        Assert.assertEquals("v2", request.getVolumeAttributesMap().get("k2"));
+        // return a fake result
+        return Csi.ValidateVolumeCapabilitiesResponse.newBuilder()
+            .setSupported(false)
+            .setMessage("this is a test")
+            .build();
+      }
+    });
+
+    service.init(conf);
+    service.start();
+
+    try (CsiAdaptorProtocolPBClientImpl client =
+        new CsiAdaptorProtocolPBClientImpl(1L, address, new Configuration())) {
+      ValidateVolumeCapabilitiesRequest request =
+          ValidateVolumeCapabilitiesRequestPBImpl
+              .newInstance("volume-id-0000123",
+                  ImmutableList.of(
+                      new ValidateVolumeCapabilitiesRequest
+                          .VolumeCapability(
+                              MULTI_NODE_MULTI_WRITER, FILE_SYSTEM,
+                          ImmutableList.of("mountFlag1", "mountFlag2"))),
+              ImmutableMap.of("k1", "v1", "k2", "v2"));
+
+      ValidateVolumeCapabilitiesResponse response = client
+          .validateVolumeCapacity(request);
+
+      Assert.assertEquals(false, response.isSupported());
+      Assert.assertEquals("this is a test", response.getResponseMessage());
+    } finally {
+      service.stop();
+    }
+  }
+
+  @Test
+  public void testValidateVolumeWithNMProxy() throws Exception {
+    ServerSocket ss = new ServerSocket(0);
+    ss.close();
+    InetSocketAddress address = new InetSocketAddress(ss.getLocalPort());
+    Configuration conf = new Configuration();
+    conf.setSocketAddr(
+        YarnConfiguration.NM_CSI_ADAPTOR_PREFIX + "test-driver.address",
+        address);
+    CsiAdaptorProtocolService service =
+        new CsiAdaptorProtocolService("test-driver", domainSocket);
+
+    // inject a fake CSI client
+    // this client validates if the ValidateVolumeCapabilitiesRequest
+    // is integrity, and then reply a fake response
+    service.setCsiClient(new CsiClient() {
+      @Override
+      public Csi.GetPluginInfoResponse getPluginInfo() {
+        return Csi.GetPluginInfoResponse.newBuilder()
+            .setName("test-plugin")
+            .setVendorVersion("0.1")
+            .build();
+      }
+
+      @Override
+      public Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities(
+          Csi.ValidateVolumeCapabilitiesRequest request) {
+        // validate we get all info from the request
+        Assert.assertEquals("volume-id-0000123", request.getVolumeId());
+        Assert.assertEquals(1, request.getVolumeCapabilitiesCount());
+        Assert.assertEquals(Csi.VolumeCapability.AccessMode
+                .newBuilder().setModeValue(5).build(),
+            request.getVolumeCapabilities(0).getAccessMode());
+        Assert.assertTrue(request.getVolumeCapabilities(0).hasMount());
+        Assert.assertEquals(2, request.getVolumeCapabilities(0)
+            .getMount().getMountFlagsCount());
+        Assert.assertTrue(request.getVolumeCapabilities(0)
+            .getMount().getMountFlagsList().contains("mountFlag1"));
+        Assert.assertTrue(request.getVolumeCapabilities(0)
+            .getMount().getMountFlagsList().contains("mountFlag2"));
+        Assert.assertEquals(2, request.getVolumeAttributesCount());
+        Assert.assertEquals("v1", request.getVolumeAttributesMap().get("k1"));
+        Assert.assertEquals("v2", request.getVolumeAttributesMap().get("k2"));
+        // return a fake result
+        return Csi.ValidateVolumeCapabilitiesResponse.newBuilder()
+            .setSupported(false)
+            .setMessage("this is a test")
+            .build();
+      }
+    });
+
+    service.init(conf);
+    service.start();
+
+    YarnRPC rpc = YarnRPC.create(conf);
+    UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+    CsiAdaptorProtocol adaptorClient = NMProxy
+        .createNMProxy(conf, CsiAdaptorProtocol.class, currentUser, rpc,
+            NetUtils.createSocketAddrForHost("localhost", ss.getLocalPort()));
+    ValidateVolumeCapabilitiesRequest request =
+        ValidateVolumeCapabilitiesRequestPBImpl
+            .newInstance("volume-id-0000123",
+                ImmutableList.of(new ValidateVolumeCapabilitiesRequest
+                    .VolumeCapability(
+                        MULTI_NODE_MULTI_WRITER, FILE_SYSTEM,
+                    ImmutableList.of("mountFlag1", "mountFlag2"))),
+                ImmutableMap.of("k1", "v1", "k2", "v2"));
+
+    ValidateVolumeCapabilitiesResponse response = adaptorClient
+        .validateVolumeCapacity(request);
+    Assert.assertEquals(false, response.isSupported());
+    Assert.assertEquals("this is a test", response.getResponseMessage());
+
+    service.stop();
+  }
+
+  @Test (expected = ServiceStateException.class)
+  public void testMissingConfiguration() {
+    Configuration conf = new Configuration();
+    CsiAdaptorProtocolService service =
+        new CsiAdaptorProtocolService("test-driver", domainSocket);
+    service.init(conf);
+  }
+
+  @Test (expected = ServiceStateException.class)
+  public void testInvalidServicePort() {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.NM_CSI_ADAPTOR_PREFIX
+        + "test-driver-0001.address",
+        "0.0.0.0:-100"); // this is an invalid address
+    CsiAdaptorProtocolService service =
+        new CsiAdaptorProtocolService("test-driver-0001", domainSocket);
+    service.init(conf);
+  }
+
+  @Test (expected = ServiceStateException.class)
+  public void testInvalidHost() {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.NM_CSI_ADAPTOR_PREFIX
+            + "test-driver-0001.address",
+        "192.0.1:8999"); // this is an invalid ip address
+    CsiAdaptorProtocolService service =
+        new CsiAdaptorProtocolService("test-driver-0001", domainSocket);
+    service.init(conf);
+  }
+}

+ 66 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestGetPluginInfoRequestResponse.java

@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;
+
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoResponsePBImpl;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Verify the integrity of GetPluginInfoRequest and GetPluginInfoResponse.
+ */
+public class TestGetPluginInfoRequestResponse {
+
+  @Test
+  public void testGetPluginInfoRequestPBRecord() {
+    CsiAdaptorProtos.GetPluginInfoRequest requestProto =
+        CsiAdaptorProtos.GetPluginInfoRequest.newBuilder().build();
+    GetPluginInfoRequestPBImpl pbImpl =
+        new GetPluginInfoRequestPBImpl(requestProto);
+    Assert.assertNotNull(pbImpl);
+    Assert.assertEquals(requestProto, pbImpl.getProto());
+  }
+
+  @Test
+  public void testGetPluginInfoResponsePBRecord() {
+    CsiAdaptorProtos.GetPluginInfoResponse responseProto =
+        CsiAdaptorProtos.GetPluginInfoResponse.newBuilder()
+        .setName("test-driver")
+        .setVendorVersion("1.0.1")
+        .build();
+
+    GetPluginInfoResponsePBImpl pbImpl =
+        new GetPluginInfoResponsePBImpl(responseProto);
+    Assert.assertEquals("test-driver", pbImpl.getDriverName());
+    Assert.assertEquals("1.0.1", pbImpl.getVersion());
+    Assert.assertEquals(responseProto, pbImpl.getProto());
+
+    GetPluginInfoResponse pbImpl2 = GetPluginInfoResponsePBImpl
+        .newInstance("test-driver", "1.0.1");
+    Assert.assertEquals("test-driver", pbImpl2.getDriverName());
+    Assert.assertEquals("1.0.1", pbImpl2.getVersion());
+
+    CsiAdaptorProtos.GetPluginInfoResponse proto =
+        ((GetPluginInfoResponsePBImpl) pbImpl2).getProto();
+    Assert.assertEquals("test-driver", proto.getName());
+    Assert.assertEquals("1.0.1", proto.getVendorVersion());
+  }
+}

+ 113 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityRequest.java

@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeCapability;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos.VolumeCapability.AccessMode;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos.VolumeCapability.VolumeType;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.AccessMode.MULTI_NODE_MULTI_WRITER;
+import static org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeType.FILE_SYSTEM;
+
+/**
+ * UT for message exchanges.
+ */
+public class TestValidateVolumeCapabilityRequest {
+
+  @Test
+  public void testPBRecord() {
+    CsiAdaptorProtos.VolumeCapability vcProto =
+        CsiAdaptorProtos.VolumeCapability.newBuilder()
+            .setAccessMode(AccessMode.MULTI_NODE_MULTI_WRITER)
+            .setVolumeType(VolumeType.FILE_SYSTEM)
+            .addMountFlags("flag0")
+            .addMountFlags("flag1")
+            .build();
+
+    CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest requestProto =
+        CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest.newBuilder()
+            .setVolumeId("volume-id-0000001")
+            .addVolumeCapabilities(vcProto)
+            .addVolumeAttributes(YarnProtos.StringStringMapProto
+                .newBuilder().setKey("attr0")
+                .setValue("value0")
+                .build())
+            .addVolumeAttributes(YarnProtos.StringStringMapProto
+                .newBuilder().setKey("attr1")
+                .setValue("value1")
+                .build())
+            .build();
+
+    ValidateVolumeCapabilitiesRequestPBImpl request =
+        new ValidateVolumeCapabilitiesRequestPBImpl(requestProto);
+
+    Assert.assertEquals("volume-id-0000001", request.getVolumeId());
+    Assert.assertEquals(2, request.getVolumeAttributes().size());
+    Assert.assertEquals("value0", request.getVolumeAttributes().get("attr0"));
+    Assert.assertEquals("value1", request.getVolumeAttributes().get("attr1"));
+    Assert.assertEquals(1, request.getVolumeCapabilities().size());
+    VolumeCapability vc =
+        request.getVolumeCapabilities().get(0);
+    Assert.assertEquals(MULTI_NODE_MULTI_WRITER, vc.getAccessMode());
+    Assert.assertEquals(FILE_SYSTEM, vc.getVolumeType());
+    Assert.assertEquals(2, vc.getMountFlags().size());
+
+    Assert.assertEquals(requestProto, request.getProto());
+  }
+
+  @Test
+  public void testNewInstance() {
+    ValidateVolumeCapabilitiesRequest pbImpl =
+        ValidateVolumeCapabilitiesRequestPBImpl
+            .newInstance("volume-id-0000123",
+                ImmutableList.of(
+                    new VolumeCapability(
+                        MULTI_NODE_MULTI_WRITER, FILE_SYSTEM,
+                        ImmutableList.of("mountFlag1", "mountFlag2"))),
+                ImmutableMap.of("k1", "v1", "k2", "v2"));
+
+    Assert.assertEquals("volume-id-0000123", pbImpl.getVolumeId());
+    Assert.assertEquals(1, pbImpl.getVolumeCapabilities().size());
+    Assert.assertEquals(FILE_SYSTEM,
+        pbImpl.getVolumeCapabilities().get(0).getVolumeType());
+    Assert.assertEquals(MULTI_NODE_MULTI_WRITER,
+        pbImpl.getVolumeCapabilities().get(0).getAccessMode());
+    Assert.assertEquals(2, pbImpl.getVolumeAttributes().size());
+
+    CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest proto =
+        ((ValidateVolumeCapabilitiesRequestPBImpl) pbImpl).getProto();
+    Assert.assertEquals("volume-id-0000123", proto.getVolumeId());
+    Assert.assertEquals(1, proto.getVolumeCapabilitiesCount());
+    Assert.assertEquals(AccessMode.MULTI_NODE_MULTI_WRITER,
+        proto.getVolumeCapabilities(0).getAccessMode());
+    Assert.assertEquals(VolumeType.FILE_SYSTEM,
+        proto.getVolumeCapabilities(0).getVolumeType());
+    Assert.assertEquals(2, proto.getVolumeCapabilities(0)
+        .getMountFlagsCount());
+    Assert.assertEquals(2, proto.getVolumeCapabilities(0)
+        .getMountFlagsList().size());
+  }
+}

+ 61 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityResponse.java

@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;
+
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesResponsePBImpl;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * UT for message exchanges.
+ */
+public class TestValidateVolumeCapabilityResponse {
+
+  @Test
+  public void testPBRecord() {
+    CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse proto =
+        CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse.newBuilder()
+        .setSupported(true)
+        .setMessage("capability is supported")
+        .build();
+
+    ValidateVolumeCapabilitiesResponsePBImpl pbImpl =
+        new ValidateVolumeCapabilitiesResponsePBImpl(proto);
+
+    Assert.assertEquals(true, pbImpl.isSupported());
+    Assert.assertEquals("capability is supported", pbImpl.getResponseMessage());
+    Assert.assertEquals(proto, pbImpl.getProto());
+  }
+
+  @Test
+  public void testNewInstance() {
+    ValidateVolumeCapabilitiesResponse pbImpl =
+        ValidateVolumeCapabilitiesResponsePBImpl
+            .newInstance(false, "capability not supported");
+    Assert.assertEquals(false, pbImpl.isSupported());
+    Assert.assertEquals("capability not supported",
+        pbImpl.getResponseMessage());
+
+    CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse proto =
+        ((ValidateVolumeCapabilitiesResponsePBImpl) pbImpl).getProto();
+    Assert.assertEquals(false, proto.getSupported());
+    Assert.assertEquals("capability not supported", proto.getMessage());
+  }
+}

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/package-info.java

@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains UT classes for CSI adaptor.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;

+ 17 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java

@@ -17,13 +17,12 @@
  */
  */
 package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
 package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
 
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningResults;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningResults;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningTask;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningTask;
-import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
 
 
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledFuture;
 
 
@@ -40,12 +39,8 @@ public interface VolumeManager {
   /**
   /**
    * @return all known volumes and their states.
    * @return all known volumes and their states.
    */
    */
-  @VisibleForTesting
   VolumeStates getVolumeStates();
   VolumeStates getVolumeStates();
 
 
-  @VisibleForTesting
-  void setClient(CsiAdaptorClientProtocol client);
-
   /**
   /**
    * Start to supervise on a volume.
    * Start to supervise on a volume.
    * @param volume
    * @param volume
@@ -60,4 +55,20 @@ public interface VolumeManager {
    */
    */
   ScheduledFuture<VolumeProvisioningResults> schedule(
   ScheduledFuture<VolumeProvisioningResults> schedule(
       VolumeProvisioningTask volumeProvisioningTask, int delaySecond);
       VolumeProvisioningTask volumeProvisioningTask, int delaySecond);
+
+  /**
+   * Register a csi-driver-adaptor to the volume manager.
+   * @param driverName
+   * @param client
+   */
+  void registerCsiDriverAdaptor(String driverName, CsiAdaptorProtocol client);
+
+  /**
+   * Returns the csi-driver-adaptor client from cache by the given driver name.
+   * If the client is not found, null is returned.
+   * @param driverName
+   * @return a csi-driver-adaptor client working for given driver or null
+   * if the adaptor could not be found.
+   */
+  CsiAdaptorProtocol getAdaptorByDriverName(String driverName);
 }
 }

+ 80 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java

@@ -18,16 +18,28 @@
 package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
 package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
 
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.client.NMProxy;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
-import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningResults;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningResults;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningTask;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningTask;
-import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
 
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledFuture;
@@ -43,20 +55,84 @@ public class VolumeManagerImpl extends AbstractService
 
 
   private final VolumeStates volumeStates;
   private final VolumeStates volumeStates;
   private ScheduledExecutorService provisioningExecutor;
   private ScheduledExecutorService provisioningExecutor;
-  private CsiAdaptorClientProtocol adaptorClient;
+  private Map<String, CsiAdaptorProtocol> csiAdaptorMap;
 
 
   private final static int PROVISIONING_TASK_THREAD_POOL_SIZE = 10;
   private final static int PROVISIONING_TASK_THREAD_POOL_SIZE = 10;
 
 
   public VolumeManagerImpl() {
   public VolumeManagerImpl() {
     super(VolumeManagerImpl.class.getName());
     super(VolumeManagerImpl.class.getName());
     this.volumeStates = new VolumeStates();
     this.volumeStates = new VolumeStates();
+    this.csiAdaptorMap = new ConcurrentHashMap<>();
     this.provisioningExecutor = Executors
     this.provisioningExecutor = Executors
         .newScheduledThreadPool(PROVISIONING_TASK_THREAD_POOL_SIZE);
         .newScheduledThreadPool(PROVISIONING_TASK_THREAD_POOL_SIZE);
-    this.adaptorClient = new CsiAdaptorClient();
+  }
+
+  // Init the CSI adaptor cache according to the configuration.
+  // user only needs to configure a list of adaptor addresses,
+  // this method extracts each address and init an adaptor client,
+  // then proceed with a hand-shake by calling adaptor's getPluginInfo
+  // method to retrieve the driver info. If the driver can be resolved,
+  // it is then added to the cache. Note, we don't allow two drivers
+  // specified with same driver-name even version is different.
+  private void initCsiAdaptorCache(
+      final Map<String, CsiAdaptorProtocol> adaptorMap, Configuration conf)
+      throws IOException, YarnException {
+    LOG.info("Initializing cache for csi-driver-adaptors");
+    String[] addresses =
+        conf.getStrings(YarnConfiguration.NM_CSI_ADAPTOR_ADDRESSES);
+    if (addresses != null && addresses.length > 0) {
+      for (String addr : addresses) {
+        LOG.info("Found csi-driver-adaptor socket address: " + addr);
+        InetSocketAddress address = NetUtils.createSocketAddr(addr);
+        YarnRPC rpc = YarnRPC.create(conf);
+        UserGroupInformation currentUser =
+            UserGroupInformation.getCurrentUser();
+        CsiAdaptorProtocol adaptorClient = NMProxy
+            .createNMProxy(conf, CsiAdaptorProtocol.class, currentUser, rpc,
+                address);
+        // Attempt to resolve the driver by contacting to
+        // the diver's identity service on the given address.
+        // If the call failed, the initialization is also failed
+        // in order running into inconsistent state.
+        LOG.info("Retrieving info from csi-driver-adaptor on address " + addr);
+        GetPluginInfoResponse response =
+            adaptorClient.getPluginInfo(GetPluginInfoRequest.newInstance());
+        if (!Strings.isNullOrEmpty(response.getDriverName())) {
+          String driverName = response.getDriverName();
+          if (adaptorMap.containsKey(driverName)) {
+            throw new YarnException(
+                "Duplicate driver adaptor found," + " driver name: "
+                    + driverName);
+          }
+          adaptorMap.put(driverName, adaptorClient);
+          LOG.info("CSI Adaptor added to the cache, adaptor name: " + driverName
+              + ", driver version: " + response.getVersion());
+        }
+      }
+    }
+  }
+
+  /**
+   * Returns a CsiAdaptorProtocol client by the given driver name,
+   * returns null if no adaptor is found for the driver, that means
+   * the driver has not registered to the volume manager yet enhance not valid.
+   * @param driverName the name of the driver
+   * @return CsiAdaptorProtocol client or null if driver not registered
+   */
+  public CsiAdaptorProtocol getAdaptorByDriverName(String driverName) {
+    return csiAdaptorMap.get(driverName);
+  }
+
+  @VisibleForTesting
+  @Override
+  public void registerCsiDriverAdaptor(String driverName,
+      CsiAdaptorProtocol client) {
+    this.csiAdaptorMap.put(driverName, client);
   }
   }
 
 
   @Override
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
   protected void serviceInit(Configuration conf) throws Exception {
+    initCsiAdaptorCache(csiAdaptorMap, conf);
     super.serviceInit(conf);
     super.serviceInit(conf);
   }
   }
 
 
@@ -82,18 +158,11 @@ public class VolumeManagerImpl extends AbstractService
       // volume already exists
       // volume already exists
       return volumeStates.getVolume(volume.getVolumeId());
       return volumeStates.getVolume(volume.getVolumeId());
     } else {
     } else {
-      // add the volume and set the client
-      ((VolumeImpl) volume).setClient(adaptorClient);
       this.volumeStates.addVolumeIfAbsent(volume);
       this.volumeStates.addVolumeIfAbsent(volume);
       return volume;
       return volume;
     }
     }
   }
   }
 
 
-  @VisibleForTesting
-  public void setClient(CsiAdaptorClientProtocol client) {
-    this.adaptorClient = client;
-  }
-
   @Override
   @Override
   public ScheduledFuture<VolumeProvisioningResults> schedule(
   public ScheduledFuture<VolumeProvisioningResults> schedule(
       VolumeProvisioningTask volumeProvisioningTask,
       VolumeProvisioningTask volumeProvisioningTask,

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java

@@ -19,9 +19,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle;
 
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.VolumeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.VolumeEvent;
 import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
 import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData;
 
 
 /**
 /**
  * Major volume interface at RM's view, it maintains the volume states and
  * Major volume interface at RM's view, it maintains the volume states and
@@ -34,4 +36,10 @@ public interface Volume extends EventHandler<VolumeEvent> {
   VolumeState getVolumeState();
   VolumeState getVolumeState();
 
 
   VolumeId getVolumeId();
   VolumeId getVolumeId();
+
+  VolumeMetaData getVolumeMeta();
+
+  CsiAdaptorProtocol getClient();
+
+  void setClient(CsiAdaptorProtocol client);
 }
 }

+ 38 - 20
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java

@@ -18,10 +18,15 @@
 package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle;
 package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle;
 
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.CsiAdaptorClient;
-import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeCapability;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.VolumeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.VolumeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.VolumeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.VolumeEventType;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
@@ -30,13 +35,16 @@ import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
 import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
 import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData;
 import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData;
-import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException;
 
 
+import java.io.IOException;
 import java.util.EnumSet;
 import java.util.EnumSet;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 
+import static org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.AccessMode.SINGLE_NODE_READER_ONLY;
+import static org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeType.FILE_SYSTEM;
+
 /**
 /**
  * This class maintains the volume states and state transition
  * This class maintains the volume states and state transition
  * according to the CSI volume lifecycle. Volume states are stored in
  * according to the CSI volume lifecycle. Volume states are stored in
@@ -54,7 +62,7 @@ public class VolumeImpl implements Volume {
 
 
   private final VolumeId volumeId;
   private final VolumeId volumeId;
   private final VolumeMetaData volumeMeta;
   private final VolumeMetaData volumeMeta;
-  private CsiAdaptorClientProtocol client;
+  private CsiAdaptorProtocol adaptorClient;
 
 
   public VolumeImpl(VolumeMetaData volumeMeta) {
   public VolumeImpl(VolumeMetaData volumeMeta) {
     ReadWriteLock lock = new ReentrantReadWriteLock();
     ReadWriteLock lock = new ReentrantReadWriteLock();
@@ -63,16 +71,21 @@ public class VolumeImpl implements Volume {
     this.volumeId = volumeMeta.getVolumeId();
     this.volumeId = volumeMeta.getVolumeId();
     this.volumeMeta = volumeMeta;
     this.volumeMeta = volumeMeta;
     this.stateMachine = createVolumeStateFactory().make(this);
     this.stateMachine = createVolumeStateFactory().make(this);
-    this.client = new CsiAdaptorClient();
   }
   }
 
 
   @VisibleForTesting
   @VisibleForTesting
-  public void setClient(CsiAdaptorClientProtocol client) {
-    this.client = client;
+  public void setClient(CsiAdaptorProtocol csiAdaptorClient) {
+    this.adaptorClient = csiAdaptorClient;
   }
   }
 
 
-  public CsiAdaptorClientProtocol getClient() {
-    return this.client;
+  @Override
+  public CsiAdaptorProtocol getClient() {
+    return this.adaptorClient;
+  }
+
+  @Override
+  public VolumeMetaData getVolumeMeta() {
+    return this.volumeMeta;
   }
   }
 
 
   private StateMachineFactory<VolumeImpl, VolumeState,
   private StateMachineFactory<VolumeImpl, VolumeState,
@@ -135,9 +148,20 @@ public class VolumeImpl implements Volume {
         VolumeEvent volumeEvent) {
         VolumeEvent volumeEvent) {
       try {
       try {
         // this call could cross node, we should keep the message tight
         // this call could cross node, we should keep the message tight
-        volume.getClient().validateVolume();
-        return VolumeState.VALIDATED;
-      } catch (VolumeException e) {
+        // TODO we should parse the capability from volume resource spec
+        VolumeCapability capability = new VolumeCapability(
+            SINGLE_NODE_READER_ONLY, FILE_SYSTEM,
+            ImmutableList.of());
+        ValidateVolumeCapabilitiesRequest request =
+            ValidateVolumeCapabilitiesRequest
+                .newInstance(volume.getVolumeId().getId(),
+                    ImmutableList.of(capability),
+                    ImmutableMap.of());
+        ValidateVolumeCapabilitiesResponse response = volume.getClient()
+            .validateVolumeCapacity(request);
+        return response.isSupported() ? VolumeState.VALIDATED
+            : VolumeState.UNAVAILABLE;
+      } catch (YarnException | IOException e) {
         LOG.warn("Got exception while calling the CSI adaptor", e);
         LOG.warn("Got exception while calling the CSI adaptor", e);
         return VolumeState.UNAVAILABLE;
         return VolumeState.UNAVAILABLE;
       }
       }
@@ -150,14 +174,8 @@ public class VolumeImpl implements Volume {
     @Override
     @Override
     public VolumeState transition(VolumeImpl volume,
     public VolumeState transition(VolumeImpl volume,
         VolumeEvent volumeEvent) {
         VolumeEvent volumeEvent) {
-      try {
-        // this call could cross node, we should keep the message tight
-        volume.getClient().controllerPublishVolume();
-        return VolumeState.NODE_READY;
-      } catch (VolumeException e) {
-        LOG.warn("Got exception while calling the CSI adaptor", e);
-        return volume.getVolumeState();
-      }
+      // this call could cross node, we should keep the message tight
+      return VolumeState.NODE_READY;
     }
     }
   }
   }
 
 

+ 16 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor;
 
 
 import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
 import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
 import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
 import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -29,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager;
@@ -142,8 +144,21 @@ public class VolumeAMSProcessor implements ApplicationMasterServiceProcessor {
    * @param metaData
    * @param metaData
    * @return volume
    * @return volume
    */
    */
-  private Volume checkAndGetVolume(VolumeMetaData metaData) {
+  private Volume checkAndGetVolume(VolumeMetaData metaData)
+      throws InvalidVolumeException {
     Volume toAdd = new VolumeImpl(metaData);
     Volume toAdd = new VolumeImpl(metaData);
+    CsiAdaptorProtocol adaptor = volumeManager
+        .getAdaptorByDriverName(metaData.getDriverName());
+    if (adaptor == null) {
+      throw new InvalidVolumeException("It seems for the driver name"
+          + " specified in the volume " + metaData.getDriverName()
+          + " ,there is no matched driver-adaptor can be found. "
+          + "Is the driver probably registered? Please check if"
+          + " adaptors service addresses defined in "
+          + YarnConfiguration.NM_CSI_ADAPTOR_ADDRESSES
+          + " are correct and services are started.");
+    }
+    toAdd.setClient(adaptor);
     return this.volumeManager.addOrGetVolume(toAdd);
     return this.volumeManager.addOrGetVolume(toAdd);
   }
   }
 
 

+ 65 - 35
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java

@@ -18,9 +18,11 @@
 package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
 package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
 
 
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.ControllerPublishVolumeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.ControllerPublishVolumeEvent;
-import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
-import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.ValidateVolumeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.ValidateVolumeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeState;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeState;
@@ -29,8 +31,8 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.Mockito;
 
 
+import java.io.IOException;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
 
 
 import static org.mockito.Mockito.*;
 import static org.mockito.Mockito.*;
 
 
@@ -40,7 +42,13 @@ import static org.mockito.Mockito.*;
 public class TestVolumeLifecycle {
 public class TestVolumeLifecycle {
 
 
   @Test
   @Test
-  public void testValidation() throws InvalidVolumeException {
+  public void testValidation() throws YarnException, IOException {
+    CsiAdaptorProtocol mockedClient = Mockito
+        .mock(CsiAdaptorProtocol.class);
+    doReturn(ValidateVolumeCapabilitiesResponse.newInstance(true, ""))
+        .when(mockedClient)
+       .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
+
     VolumeImpl volume = (VolumeImpl) VolumeBuilder.newBuilder()
     VolumeImpl volume = (VolumeImpl) VolumeBuilder.newBuilder()
         .volumeId("test_vol_00000001")
         .volumeId("test_vol_00000001")
         .maxCapability(5L)
         .maxCapability(5L)
@@ -48,6 +56,7 @@ public class TestVolumeLifecycle {
         .mountPoint("/path/to/mount")
         .mountPoint("/path/to/mount")
         .driverName("test-driver-name")
         .driverName("test-driver-name")
         .build();
         .build();
+    volume.setClient(mockedClient);
     Assert.assertEquals(VolumeState.NEW, volume.getVolumeState());
     Assert.assertEquals(VolumeState.NEW, volume.getVolumeState());
 
 
     volume.handle(new ValidateVolumeEvent(volume));
     volume.handle(new ValidateVolumeEvent(volume));
@@ -55,16 +64,19 @@ public class TestVolumeLifecycle {
   }
   }
 
 
   @Test
   @Test
-  public void testValidationFailure() throws VolumeException {
+  public void testVolumeCapacityNotSupported() throws Exception {
+    CsiAdaptorProtocol mockedClient = Mockito
+        .mock(CsiAdaptorProtocol.class);
+
     VolumeImpl volume = (VolumeImpl) VolumeBuilder
     VolumeImpl volume = (VolumeImpl) VolumeBuilder
         .newBuilder().volumeId("test_vol_00000001").build();
         .newBuilder().volumeId("test_vol_00000001").build();
-    CsiAdaptorClientProtocol mockedClient = Mockito
-        .mock(CsiAdaptorClientProtocol.class);
     volume.setClient(mockedClient);
     volume.setClient(mockedClient);
 
 
     // NEW -> UNAVAILABLE
     // NEW -> UNAVAILABLE
     // Simulate a failed API call to the adaptor
     // Simulate a failed API call to the adaptor
-    doThrow(new VolumeException("failed")).when(mockedClient).validateVolume();
+    doReturn(ValidateVolumeCapabilitiesResponse.newInstance(false, ""))
+        .when(mockedClient)
+        .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
     volume.handle(new ValidateVolumeEvent(volume));
     volume.handle(new ValidateVolumeEvent(volume));
 
 
     try {
     try {
@@ -80,47 +92,62 @@ public class TestVolumeLifecycle {
   }
   }
 
 
   @Test
   @Test
-  public void testValidated() throws InvalidVolumeException {
-    AtomicInteger validatedTimes = new AtomicInteger(0);
+  public void testValidationFailure() throws YarnException, IOException {
+    CsiAdaptorProtocol mockedClient = Mockito
+        .mock(CsiAdaptorProtocol.class);
+    doThrow(new VolumeException("fail"))
+        .when(mockedClient)
+        .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
+
+    VolumeImpl volume = (VolumeImpl) VolumeBuilder
+        .newBuilder().volumeId("test_vol_00000001").build();
+    volume.setClient(mockedClient);
+
+    // NEW -> UNAVAILABLE
+    // Simulate a failed API call to the adaptor
+    doThrow(new VolumeException("failed"))
+        .when(mockedClient)
+        .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
+    volume.handle(new ValidateVolumeEvent(volume));
+  }
+
+  @Test
+  public void testValidated() throws YarnException, IOException {
     VolumeImpl volume = (VolumeImpl) VolumeBuilder
     VolumeImpl volume = (VolumeImpl) VolumeBuilder
         .newBuilder().volumeId("test_vol_00000001").build();
         .newBuilder().volumeId("test_vol_00000001").build();
-    CsiAdaptorClientProtocol mockedClient = new CsiAdaptorClientProtocol() {
-      @Override
-      public void validateVolume() {
-        validatedTimes.incrementAndGet();
-      }
-
-      @Override
-      public void controllerPublishVolume() {
-        // do nothing
-      }
-    };
+    CsiAdaptorProtocol mockedClient = Mockito.mock(CsiAdaptorProtocol.class);
     // The client has a count to memorize how many times being called
     // The client has a count to memorize how many times being called
     volume.setClient(mockedClient);
     volume.setClient(mockedClient);
 
 
     // NEW -> VALIDATED
     // NEW -> VALIDATED
+    doReturn(ValidateVolumeCapabilitiesResponse.newInstance(true, ""))
+        .when(mockedClient)
+        .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
     Assert.assertEquals(VolumeState.NEW, volume.getVolumeState());
     Assert.assertEquals(VolumeState.NEW, volume.getVolumeState());
     volume.handle(new ValidateVolumeEvent(volume));
     volume.handle(new ValidateVolumeEvent(volume));
     Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState());
     Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState());
-    Assert.assertEquals(1, validatedTimes.get());
+    verify(mockedClient, times(1))
+        .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
 
 
     // VALIDATED -> VALIDATED
     // VALIDATED -> VALIDATED
     volume.handle(new ValidateVolumeEvent(volume));
     volume.handle(new ValidateVolumeEvent(volume));
     Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState());
     Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState());
-    Assert.assertEquals(1, validatedTimes.get());
+    verify(mockedClient, times(1))
+        .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
   }
   }
 
 
   @Test
   @Test
-  public void testUnavailableState() throws VolumeException {
+  public void testUnavailableState() throws YarnException, IOException {
     VolumeImpl volume = (VolumeImpl) VolumeBuilder
     VolumeImpl volume = (VolumeImpl) VolumeBuilder
         .newBuilder().volumeId("test_vol_00000001").build();
         .newBuilder().volumeId("test_vol_00000001").build();
-    CsiAdaptorClientProtocol mockedClient = Mockito
-        .mock(CsiAdaptorClientProtocol.class);
+    CsiAdaptorProtocol mockedClient = Mockito
+        .mock(CsiAdaptorProtocol.class);
     volume.setClient(mockedClient);
     volume.setClient(mockedClient);
 
 
     // NEW -> UNAVAILABLE
     // NEW -> UNAVAILABLE
-    doThrow(new VolumeException("failed")).when(mockedClient)
-        .validateVolume();
+    doThrow(new VolumeException("failed"))
+        .when(mockedClient)
+        .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
     Assert.assertEquals(VolumeState.NEW, volume.getVolumeState());
     Assert.assertEquals(VolumeState.NEW, volume.getVolumeState());
     volume.handle(new ValidateVolumeEvent(volume));
     volume.handle(new ValidateVolumeEvent(volume));
     Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState());
     Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState());
@@ -130,23 +157,26 @@ public class TestVolumeLifecycle {
     Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState());
     Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState());
 
 
     // UNAVAILABLE -> VALIDATED
     // UNAVAILABLE -> VALIDATED
-    doNothing().when(mockedClient).validateVolume();
+    doReturn(ValidateVolumeCapabilitiesResponse.newInstance(true, ""))
+        .when(mockedClient)
+        .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
     volume.setClient(mockedClient);
     volume.setClient(mockedClient);
     volume.handle(new ValidateVolumeEvent(volume));
     volume.handle(new ValidateVolumeEvent(volume));
     Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState());
     Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState());
   }
   }
 
 
   @Test
   @Test
-  public void testPublishUnavailableVolume() throws VolumeException {
+  public void testPublishUnavailableVolume() throws YarnException, IOException {
     VolumeImpl volume = (VolumeImpl) VolumeBuilder
     VolumeImpl volume = (VolumeImpl) VolumeBuilder
         .newBuilder().volumeId("test_vol_00000001").build();
         .newBuilder().volumeId("test_vol_00000001").build();
-    CsiAdaptorClientProtocol mockedClient = Mockito
-        .mock(CsiAdaptorClientProtocol.class);
+    CsiAdaptorProtocol mockedClient = Mockito
+        .mock(CsiAdaptorProtocol.class);
     volume.setClient(mockedClient);
     volume.setClient(mockedClient);
 
 
     // NEW -> UNAVAILABLE (on validateVolume)
     // NEW -> UNAVAILABLE (on validateVolume)
-    doThrow(new VolumeException("failed")).when(mockedClient)
-        .validateVolume();
+    doThrow(new VolumeException("failed"))
+        .when(mockedClient)
+        .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
     Assert.assertEquals(VolumeState.NEW, volume.getVolumeState());
     Assert.assertEquals(VolumeState.NEW, volume.getVolumeState());
     volume.handle(new ValidateVolumeEvent(volume));
     volume.handle(new ValidateVolumeEvent(volume));
     Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState());
     Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState());
@@ -154,7 +184,7 @@ public class TestVolumeLifecycle {
     // UNAVAILABLE -> UNAVAILABLE (on publishVolume)
     // UNAVAILABLE -> UNAVAILABLE (on publishVolume)
     volume.handle(new ControllerPublishVolumeEvent(volume));
     volume.handle(new ControllerPublishVolumeEvent(volume));
     // controller publish is not called since the state is UNAVAILABLE
     // controller publish is not called since the state is UNAVAILABLE
-    verify(mockedClient, times(0)).controllerPublishVolume();
+    // verify(mockedClient, times(0)).controllerPublishVolume();
     // state remains to UNAVAILABLE
     // state remains to UNAVAILABLE
     Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState());
     Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState());
   }
   }

+ 26 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java

@@ -20,8 +20,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.api.records.ResourceSizing;
 import org.apache.hadoop.yarn.api.records.ResourceSizing;
@@ -40,7 +43,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeState;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeState;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor.VolumeAMSProcessor;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor.VolumeAMSProcessor;
-import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
 import org.apache.hadoop.yarn.server.volume.csi.CsiConstants;
 import org.apache.hadoop.yarn.server.volume.csi.CsiConstants;
 import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
 import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
 import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
 import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
@@ -57,6 +59,10 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Arrays;
 
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+
 /**
 /**
  * Test cases for volume processor.
  * Test cases for volume processor.
  */
  */
@@ -91,6 +97,7 @@ public class TestVolumeProcessor {
     conf.set(CapacitySchedulerConfiguration.PREFIX
     conf.set(CapacitySchedulerConfiguration.PREFIX
         + CapacitySchedulerConfiguration.ROOT + ".default.ordering-policy",
         + CapacitySchedulerConfiguration.ROOT + ".default.ordering-policy",
         "fair");
         "fair");
+    // this is required to enable volume processor
     conf.set(YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS,
     conf.set(YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS,
         VolumeAMSProcessor.class.getName());
         VolumeAMSProcessor.class.getName());
     mgr = new NullRMNodeLabelsManager();
     mgr = new NullRMNodeLabelsManager();
@@ -155,6 +162,17 @@ public class TestVolumeProcessor {
         .schedulingRequests(Arrays.asList(sc))
         .schedulingRequests(Arrays.asList(sc))
         .build();
         .build();
 
 
+    // inject adaptor client for testing
+    CsiAdaptorProtocol mockedClient = Mockito
+        .mock(CsiAdaptorProtocol.class);
+    rm.getRMContext().getVolumeManager()
+        .registerCsiDriverAdaptor("hostpath", mockedClient);
+
+    // simulate validation succeed
+    doReturn(ValidateVolumeCapabilitiesResponse.newInstance(true, ""))
+        .when(mockedClient)
+        .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
+
     am1.allocate(ar);
     am1.allocate(ar);
     VolumeStates volumeStates =
     VolumeStates volumeStates =
         rm.getRMContext().getVolumeManager().getVolumeStates();
         rm.getRMContext().getVolumeManager().getVolumeStates();
@@ -212,12 +230,14 @@ public class TestVolumeProcessor {
     RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
     RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]);
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]);
 
 
-    CsiAdaptorClientProtocol mockedClient = Mockito
-        .mock(CsiAdaptorClientProtocol.class);
+    CsiAdaptorProtocol mockedClient = Mockito
+        .mock(CsiAdaptorProtocol.class);
     // inject adaptor client
     // inject adaptor client
-    rm.getRMContext().getVolumeManager().setClient(mockedClient);
-    Mockito.doThrow(new VolumeException("failed")).when(mockedClient)
-        .validateVolume();
+    rm.getRMContext().getVolumeManager()
+        .registerCsiDriverAdaptor("hostpath", mockedClient);
+    doThrow(new VolumeException("failed"))
+        .when(mockedClient)
+        .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
 
 
     Resource resource = Resource.newInstance(1024, 1);
     Resource resource = Resource.newInstance(1024, 1);
     ResourceInformation volumeResource = ResourceInformation
     ResourceInformation volumeResource = ResourceInformation