소스 검색

YARN-9038. [CSI] Add ability to publish/unpublish volumes on node managers. Contributed by Weiwei Yang.

Sunil G 6 년 전
부모
커밋
f4906ac019
43개의 변경된 파일1597개의 추가작업 그리고 24개의 파일을 삭제
  1. 44 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorProtocol.java
  2. 94 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodePublishVolumeRequest.java
  3. 31 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodePublishVolumeResponse.java
  4. 44 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodeUnpublishVolumeRequest.java
  5. 31 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodeUnpublishVolumeResponse.java
  6. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
  7. 17 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  8. 23 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/csi/CsiConfigUtils.java
  9. 21 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/csi/package-info.java
  10. 6 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto
  11. 24 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_csi_adaptor.proto
  12. 17 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ResourceInformation.java
  13. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
  14. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/conf/TestAppJsonResolve.java
  15. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/org/apache/hadoop/yarn/service/conf/examples/external3.json
  16. 36 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/CsiAdaptorProtocolPBClientImpl.java
  17. 36 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/CsiAdaptorProtocolPBServiceImpl.java
  18. 201 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodePublishVolumeRequestPBImpl.java
  19. 62 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodePublishVolumeResponsePBImpl.java
  20. 89 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodeUnpublishVolumeRequestPBImpl.java
  21. 61 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodeUnpublishVolumeResponsePBImpl.java
  22. 15 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  23. 18 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/pom.xml
  24. 72 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorProtocolService.java
  25. 6 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java
  26. 20 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClientImpl.java
  27. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/GetPluginInfoResponseProtoTranslator.java
  28. 77 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/NodePublishVolumeRequestProtoTranslator.java
  29. 49 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/NodeUnpublishVolumeRequestProtoTranslator.java
  30. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ProtoTranslatorFactory.java
  31. 14 10
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java
  32. 55 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestNodePublishVolumeRequest.java
  33. 53 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/client/ICsiClientTest.java
  34. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
  35. 11 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
  36. 32 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
  37. 64 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java
  38. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
  39. 205 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/volume/csi/ContainerVolumePublisher.java
  40. 22 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/volume/csi/package-info.java
  41. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerStartContext.java
  42. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
  43. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java

+ 44 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorProtocol.java

@@ -19,6 +19,10 @@ package org.apache.hadoop.yarn.api;
 
 import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -30,10 +34,50 @@ import java.io.IOException;
  */
 public interface CsiAdaptorProtocol {
 
+  /**
+   * Get plugin info from the CSI driver. The driver usually returns
+   * the name of the driver and its version.
+   * @param request get plugin info request.
+   * @return response that contains driver name and its version.
+   * @throws YarnException
+   * @throws IOException
+   */
   GetPluginInfoResponse getPluginInfo(GetPluginInfoRequest request)
       throws YarnException, IOException;
 
+  /**
+   * Validate if the volume capacity can be satisfied on the underneath
+   * storage system. This method responses if the capacity can be satisfied
+   * or not, with a detailed message.
+   * @param request validate volume capability request.
+   * @return validation response.
+   * @throws YarnException
+   * @throws IOException
+   */
   ValidateVolumeCapabilitiesResponse validateVolumeCapacity(
       ValidateVolumeCapabilitiesRequest request) throws YarnException,
       IOException;
+
+  /**
+   * Publish the volume on a node manager, the volume will be mounted
+   * to the local file system and become visible for clients.
+   * @param request publish volume request.
+   * @return publish volume response.
+   * @throws YarnException
+   * @throws IOException
+   */
+  NodePublishVolumeResponse nodePublishVolume(
+      NodePublishVolumeRequest request) throws YarnException, IOException;
+
+  /**
+   * This is a reverse operation of
+   * {@link #nodePublishVolume(NodePublishVolumeRequest)}, it un-mounts the
+   * volume from given node.
+   * @param request un-publish volume request.
+   * @return un-publish volume response.
+   * @throws YarnException
+   * @throws IOException
+   */
+  NodeUnpublishVolumeResponse nodeUnpublishVolume(
+      NodeUnpublishVolumeRequest request) throws YarnException, IOException;
 }

+ 94 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodePublishVolumeRequest.java

@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import com.google.gson.JsonObject;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeCapability;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.util.Map;
+
+/**
+ * The request sent by node manager to CSI driver adaptor
+ * to publish a volume on a node.
+ */
+public abstract class NodePublishVolumeRequest {
+
+  public static NodePublishVolumeRequest newInstance(String volumeId,
+      boolean readOnly, String targetPath, String stagingPath,
+      VolumeCapability capability,
+      Map<String, String> publishContext,
+      Map<String, String> secrets) {
+    NodePublishVolumeRequest request =
+        Records.newRecord(NodePublishVolumeRequest.class);
+    request.setVolumeId(volumeId);
+    request.setReadonly(readOnly);
+    request.setTargetPath(targetPath);
+    request.setStagingPath(stagingPath);
+    request.setVolumeCapability(capability);
+    request.setPublishContext(publishContext);
+    request.setSecrets(secrets);
+    return request;
+  }
+
+  public abstract void setVolumeId(String volumeId);
+
+  public abstract String getVolumeId();
+
+  public abstract void setReadonly(boolean readonly);
+
+  public abstract boolean getReadOnly();
+
+  public abstract void setTargetPath(String targetPath);
+
+  public abstract String getTargetPath();
+
+  public abstract void setStagingPath(String stagingPath);
+
+  public abstract String getStagingPath();
+
+  public abstract void setVolumeCapability(VolumeCapability capability);
+
+  public abstract VolumeCapability getVolumeCapability();
+
+  public abstract void setPublishContext(Map<String, String> publishContext);
+
+  public abstract Map<String, String> getPublishContext();
+
+  public abstract void setSecrets(Map<String, String> secrets);
+
+  public abstract Map<String, String> getSecrets();
+
+  public String toString() {
+    JsonObject jsonObject = new JsonObject();
+    jsonObject.addProperty("VolumeId", getVolumeId());
+    jsonObject.addProperty("ReadOnly", getReadOnly());
+    jsonObject.addProperty("TargetPath", getTargetPath());
+    jsonObject.addProperty("StagingPath", getStagingPath());
+    if (getVolumeCapability() != null) {
+      JsonObject jsonCap = new JsonObject();
+      jsonCap.addProperty("AccessMode",
+          getVolumeCapability().getAccessMode().name());
+      jsonCap.addProperty("VolumeType",
+          getVolumeCapability().getVolumeType().name());
+      jsonObject.addProperty("VolumeCapability",
+          jsonCap.toString());
+    }
+    return jsonObject.toString();
+  }
+}

+ 31 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodePublishVolumeResponse.java

@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * The response sent by a CSI driver adaptor to the node manager
+ * after publishing a volume on the node.
+ */
+public abstract class NodePublishVolumeResponse {
+
+  public static NodePublishVolumeResponse newInstance() {
+    return Records.newRecord(NodePublishVolumeResponse.class);
+  }
+}

+ 44 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodeUnpublishVolumeRequest.java

@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * The request sent by node manager to CSI driver adaptor
+ * to un-publish a volume on a node.
+ */
+public abstract class NodeUnpublishVolumeRequest {
+
+  public static NodeUnpublishVolumeRequest newInstance(String volumeId,
+      String targetPath) {
+    NodeUnpublishVolumeRequest request =
+        Records.newRecord(NodeUnpublishVolumeRequest.class);
+    request.setVolumeId(volumeId);
+    request.setTargetPath(targetPath);
+    return request;
+  }
+
+  public abstract void setVolumeId(String volumeId);
+
+  public abstract void setTargetPath(String targetPath);
+
+  public abstract String getVolumeId();
+
+  public abstract String getTargetPath();
+}

+ 31 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodeUnpublishVolumeResponse.java

@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * The response sent by a CSI driver adaptor to the node manager
+ * after un-publishing a volume on the node.
+ */
+public class NodeUnpublishVolumeResponse {
+
+  public static NodeUnpublishVolumeResponse newInstance() {
+    return Records.newRecord(NodeUnpublishVolumeResponse.class);
+  }
+}

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java

@@ -276,10 +276,10 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
   }
 
   public static ResourceInformation newInstance(String name, String units,
-      long value, Map<String, String> attributes) {
+      long value, Set<String> tags, Map<String, String> attributes) {
     return ResourceInformation
         .newInstance(name, units, value, ResourceTypes.COUNTABLE, 0L,
-            Long.MAX_VALUE, null, attributes);
+            Long.MAX_VALUE, tags, attributes);
   }
 
   public static ResourceInformation newInstance(String name, String units,

+ 17 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -3443,13 +3443,28 @@ public class YarnConfiguration extends Configuration {
   // CSI Volume configs
   ////////////////////////////////
   /**
-   * One or more socket addresses for csi-adaptor.
-   * Multiple addresses are delimited by ",".
+   * TERMS:
+   * csi-driver: a 3rd party CSI driver which implements the CSI protocol.
+   *   It is provided by the storage system.
+   * csi-driver-adaptor: this is an internal RPC service working
+   *   as a bridge between YARN and a csi-driver.
    */
   public static final String NM_CSI_ADAPTOR_PREFIX =
       NM_PREFIX + "csi-driver-adaptor.";
+  public static final String NM_CSI_DRIVER_PREFIX =
+      NM_PREFIX + "csi-driver.";
+  public static final String NM_CSI_DRIVER_ENDPOINT_SUFFIX =
+      ".endpoint";
+  public static final String NM_CSI_ADAPTOR_ADDRESS_SUFFIX =
+      ".address";
+  /**
+   * One or more socket addresses for csi-adaptor.
+   * Multiple addresses are delimited by ",".
+   */
   public static final String NM_CSI_ADAPTOR_ADDRESSES =
       NM_CSI_ADAPTOR_PREFIX + "addresses";
+  public static final String NM_CSI_DRIVER_NAMES =
+      NM_CSI_DRIVER_PREFIX + "names";
 
   ////////////////////////////////
   // Other Configs

+ 23 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/ConfigUtils.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/csi/CsiConfigUtils.java

@@ -15,8 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.csi.utils;
+package org.apache.hadoop.yarn.util.csi;
 
+import com.google.common.base.Strings;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -24,13 +25,30 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import java.net.InetSocketAddress;
 
 /**
- * Utility class to load configurations.
+ * Utility class for CSI in the API level.
  */
-public final class ConfigUtils {
+public final class CsiConfigUtils {
 
-  private ConfigUtils() {
+  private CsiConfigUtils() {
     // Hide constructor for utility class.
   }
+
+  public static String[] getCsiDriverNames(Configuration conf) {
+    return conf.getStrings(YarnConfiguration.NM_CSI_DRIVER_NAMES);
+  }
+
+  public static String getCsiDriverEndpoint(String driverName,
+      Configuration conf) throws YarnException {
+    String driverEndpointProperty = YarnConfiguration.NM_CSI_DRIVER_PREFIX
+        + driverName + YarnConfiguration.NM_CSI_DRIVER_ENDPOINT_SUFFIX;
+    String driverEndpoint = conf.get(driverEndpointProperty);
+    if (Strings.isNullOrEmpty(driverEndpoint)) {
+      throw new YarnException("CSI driver's endpoint is not specified or"
+          + " invalid, property "+ driverEndpointProperty + " is not defined");
+    }
+    return driverEndpoint;
+  }
+
   /**
    * Resolve the CSI adaptor address for a CSI driver from configuration.
    * Expected configuration property name is
@@ -43,7 +61,7 @@ public final class ConfigUtils {
   public static InetSocketAddress getCsiAdaptorAddressForDriver(
       String driverName, Configuration conf) throws YarnException {
     String configName = YarnConfiguration.NM_CSI_ADAPTOR_PREFIX
-        + driverName + ".address";
+        + driverName + YarnConfiguration.NM_CSI_ADAPTOR_ADDRESS_SUFFIX;
     String errorMessage = "Failed to load CSI adaptor address for driver "
         + driverName + ", configuration property " + configName
         + " is not defined or invalid.";

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/csi/package-info.java

@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Package that includes some CSI utility classes.
+ */
+package org.apache.hadoop.yarn.util.csi;

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto

@@ -31,4 +31,10 @@ service CsiAdaptorProtocolService {
 
     rpc validateVolumeCapacity (ValidateVolumeCapabilitiesRequest)
     returns (ValidateVolumeCapabilitiesResponse);
+
+    rpc nodePublishVolume (NodePublishVolumeRequest)
+    returns (NodePublishVolumeResponse);
+
+    rpc nodeUnpublishVolume (NodeUnpublishVolumeRequest)
+    returns (NodeUnpublishVolumeResponse);
 }

+ 24 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_csi_adaptor.proto

@@ -66,4 +66,28 @@ message GetPluginInfoRequest {
 message GetPluginInfoResponse {
     required string name = 1;
     required string vendor_version = 2;
+}
+
+message NodePublishVolumeRequest {
+    required string volume_id = 1;
+    repeated StringStringMapProto publish_context = 2;
+    optional string staging_target_path = 3;
+    required string target_path = 4;
+    required VolumeCapability volume_capability = 5;
+    required bool readonly = 6;
+    repeated StringStringMapProto secrets = 7;
+    repeated StringStringMapProto volume_context = 8;
+}
+
+message NodePublishVolumeResponse {
+    // Intentionally empty.
+}
+
+message NodeUnpublishVolumeRequest {
+    required string volume_id = 1;
+    required string target_path = 2;
+}
+
+message NodeUnpublishVolumeResponse {
+    // Intentionally empty.
 }

+ 17 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ResourceInformation.java

@@ -19,12 +19,14 @@ package org.apache.hadoop.yarn.service.api.records;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.google.gson.annotations.SerializedName;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 
 /**
  * ResourceInformation determines unit/name/value of resource types in addition to memory and vcores. It will be part of Resource object
@@ -40,11 +42,25 @@ public class ResourceInformation {
   @SerializedName("attributes")
   private Map<String, String> attributes = null;
 
+  @SerializedName("tags")
+  private Set<String> tags = null;
+
   public ResourceInformation value(Long value) {
     this.value = value;
     return this;
   }
 
+  public ResourceInformation tags(Set<String> resourceTags) {
+    this.tags = resourceTags;
+    return this;
+  }
+
+  @ApiModelProperty(value = "")
+  @JsonProperty("tags")
+  public Set<String> getTags() {
+    return tags == null ? ImmutableSet.of() : tags;
+  }
+
   @ApiModelProperty(value = "")
   @JsonProperty("attributes")
   public Map<String, String> getAttributes() {
@@ -116,6 +132,7 @@ public class ResourceInformation {
     sb.append("    unit: ").append(toIndentedString(unit)).append("\n");
     sb.append("    attributes: ").append(toIndentedString(attributes))
         .append("\n");
+    sb.append("    tags: ").append(toIndentedString(tags)).append("\n");
     sb.append("}");
     return sb.toString();
   }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java

@@ -755,6 +755,7 @@ public class Component implements EventHandler<ComponentEvent> {
                 entry.getKey(),
                 specInfo.getUnit(),
                 specInfo.getValue(),
+                specInfo.getTags(),
                 specInfo.getAttributes());
         resource.setResourceInformation(resourceName, ri);
       }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/conf/TestAppJsonResolve.java

@@ -231,5 +231,6 @@ public class TestAppJsonResolve extends Assert {
     Assert.assertEquals("yarn.io/csi-volume", volume.getKey());
     Assert.assertEquals(100L, volume.getValue().getValue().longValue());
     Assert.assertEquals(2, volume.getValue().getAttributes().size());
+    Assert.assertEquals(1, volume.getValue().getTags().size());
   }
 }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/org/apache/hadoop/yarn/service/conf/examples/external3.json

@@ -14,6 +14,7 @@
           "yarn.io/csi-volume": {
             "value": 100,
             "unit": "Gi",
+            "tags": ["sample-tag"],
             "attributes" : {
               "driver" : "hostpath",
               "mountPath" : "/mnt/data"

+ 36 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/CsiAdaptorProtocolPBClientImpl.java

@@ -25,10 +25,18 @@ import org.apache.hadoop.yarn.api.CsiAdaptorPB;
 import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodePublishVolumeRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodePublishVolumeResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodeUnpublishVolumeRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodeUnpublishVolumeResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesResponsePBImpl;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -82,6 +90,34 @@ public class CsiAdaptorProtocolPBClientImpl
     }
   }
 
+  @Override
+  public NodePublishVolumeResponse nodePublishVolume(
+      NodePublishVolumeRequest request) throws IOException, YarnException {
+    CsiAdaptorProtos.NodePublishVolumeRequest requestProto =
+        ((NodePublishVolumeRequestPBImpl) request).getProto();
+    try {
+      return new NodePublishVolumeResponsePBImpl(
+          proxy.nodePublishVolume(null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public NodeUnpublishVolumeResponse nodeUnpublishVolume(
+      NodeUnpublishVolumeRequest request) throws YarnException, IOException {
+    CsiAdaptorProtos.NodeUnpublishVolumeRequest requestProto =
+        ((NodeUnpublishVolumeRequestPBImpl) request).getProto();
+    try {
+      return new NodeUnpublishVolumeResponsePBImpl(
+          proxy.nodeUnpublishVolume(null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+
   @Override
   public void close() throws IOException {
     if(this.proxy != null) {

+ 36 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/CsiAdaptorProtocolPBServiceImpl.java

@@ -23,9 +23,15 @@ import org.apache.hadoop.yarn.api.CsiAdaptorPB;
 import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodePublishVolumeRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodePublishVolumeResponsePBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodeUnpublishVolumeRequestPBImpl;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodeUnpublishVolumeResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesResponsePBImpl;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -72,4 +78,34 @@ public class CsiAdaptorProtocolPBServiceImpl implements CsiAdaptorPB {
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public CsiAdaptorProtos.NodePublishVolumeResponse nodePublishVolume(
+      RpcController controller,
+      CsiAdaptorProtos.NodePublishVolumeRequest request)
+      throws ServiceException {
+    try {
+      NodePublishVolumeRequestPBImpl req =
+          new NodePublishVolumeRequestPBImpl(request);
+      NodePublishVolumeResponse response = real.nodePublishVolume(req);
+      return ((NodePublishVolumeResponsePBImpl) response).getProto();
+    } catch (YarnException | IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public CsiAdaptorProtos.NodeUnpublishVolumeResponse nodeUnpublishVolume(
+      RpcController controller,
+      CsiAdaptorProtos.NodeUnpublishVolumeRequest request)
+      throws ServiceException {
+    try {
+      NodeUnpublishVolumeRequestPBImpl req =
+          new NodeUnpublishVolumeRequestPBImpl(request);
+      NodeUnpublishVolumeResponse response = real.nodeUnpublishVolume(req);
+      return ((NodeUnpublishVolumeResponsePBImpl) response).getProto();
+    } catch (YarnException | IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }

+ 201 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodePublishVolumeRequestPBImpl.java

@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.protobuf.TextFormat;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeCapability;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+
+import java.util.Map;
+
+/**
+ * Request to publish volume on node manager.
+ */
+public class NodePublishVolumeRequestPBImpl extends
+    NodePublishVolumeRequest {
+
+  private CsiAdaptorProtos.NodePublishVolumeRequest.Builder builder;
+
+  public NodePublishVolumeRequestPBImpl() {
+    this.builder = CsiAdaptorProtos.NodePublishVolumeRequest.newBuilder();
+  }
+
+  public NodePublishVolumeRequestPBImpl(
+      CsiAdaptorProtos.NodePublishVolumeRequest request) {
+    this.builder = request.toBuilder();
+  }
+
+  public CsiAdaptorProtos.NodePublishVolumeRequest getProto() {
+    Preconditions.checkNotNull(builder);
+    return builder.build();
+  }
+
+  @Override
+  public void setVolumeId(String volumeId) {
+    Preconditions.checkNotNull(builder);
+    builder.setVolumeId(volumeId);
+  }
+
+  @Override
+  public String getVolumeId() {
+    Preconditions.checkNotNull(builder);
+    return builder.getVolumeId();
+  }
+
+  @Override
+  public void setReadonly(boolean readonly) {
+    Preconditions.checkNotNull(builder);
+    builder.setReadonly(readonly);
+  }
+
+  @Override
+  public boolean getReadOnly() {
+    Preconditions.checkNotNull(builder);
+    return builder.getReadonly();
+  }
+
+  @Override
+  public void setSecrets(Map<String, String> secrets) {
+    if (secrets != null) {
+      Preconditions.checkNotNull(builder);
+      for(Map.Entry<String, String> entry : secrets.entrySet()) {
+        YarnProtos.StringStringMapProto mapEntry =
+            YarnProtos.StringStringMapProto.newBuilder()
+                .setKey(entry.getKey())
+                .setValue(entry.getValue())
+                .build();
+        builder.addSecrets(mapEntry);
+      }
+    }
+  }
+
+  @Override
+  public Map<String, String> getSecrets() {
+    Preconditions.checkNotNull(builder);
+    return builder.getSecretsCount() > 0 ?
+        ProtoUtils.convertStringStringMapProtoListToMap(
+            builder.getSecretsList()) : ImmutableMap.of();
+  }
+
+  @Override
+  public String getTargetPath() {
+    Preconditions.checkNotNull(builder);
+    return builder.getTargetPath();
+  }
+
+  @Override
+  public void setStagingPath(String stagingPath) {
+    Preconditions.checkNotNull(builder);
+    builder.setStagingTargetPath(stagingPath);
+  }
+
+  @Override
+  public String getStagingPath() {
+    Preconditions.checkNotNull(builder);
+    return builder.getStagingTargetPath();
+  }
+
+  @Override
+  public void setPublishContext(Map<String, String> publishContext) {
+    if (publishContext != null) {
+      Preconditions.checkNotNull(builder);
+      for(Map.Entry<String, String> entry : publishContext.entrySet()) {
+        YarnProtos.StringStringMapProto mapEntry =
+            YarnProtos.StringStringMapProto.newBuilder()
+                .setKey(entry.getKey())
+                .setValue(entry.getValue())
+                .build();
+        builder.addPublishContext(mapEntry);
+      }
+    }
+  }
+
+  @Override
+  public Map<String, String> getPublishContext() {
+    Preconditions.checkNotNull(builder);
+    return builder.getPublishContextCount() > 0 ?
+        ProtoUtils.convertStringStringMapProtoListToMap(
+            builder.getPublishContextList()) : ImmutableMap.of();
+  }
+
+  @Override
+  public void setTargetPath(String targetPath) {
+    if (targetPath != null) {
+      Preconditions.checkNotNull(builder);
+      builder.setTargetPath(targetPath);
+    }
+  }
+
+  @Override
+  public void setVolumeCapability(
+      VolumeCapability capability) {
+    if (capability != null) {
+      CsiAdaptorProtos.VolumeCapability vc =
+          CsiAdaptorProtos.VolumeCapability.newBuilder()
+              .setAccessMode(CsiAdaptorProtos.VolumeCapability
+                  .AccessMode.valueOf(
+                      capability.getAccessMode().ordinal()))
+              .setVolumeType(CsiAdaptorProtos.VolumeCapability
+                  .VolumeType.valueOf(capability.getVolumeType().ordinal()))
+              .addAllMountFlags(capability.getMountFlags())
+              .build();
+      builder.setVolumeCapability(vc);
+    }
+  }
+
+  @Override
+  public VolumeCapability getVolumeCapability() {
+    CsiAdaptorProtos.VolumeCapability cap0 = builder.getVolumeCapability();
+    if (builder.hasVolumeCapability()) {
+      return new VolumeCapability(
+          ValidateVolumeCapabilitiesRequest.AccessMode
+              .valueOf(cap0.getAccessMode().name()),
+          ValidateVolumeCapabilitiesRequest.VolumeType
+              .valueOf(cap0.getVolumeType().name()),
+          cap0.getMountFlagsList());
+    }
+    return null;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+}

+ 62 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodePublishVolumeResponsePBImpl.java

@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeResponse;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+
+/**
+ * Protobuf record class for node publish response.
+ */
+public class NodePublishVolumeResponsePBImpl
+    extends NodePublishVolumeResponse {
+
+  private CsiAdaptorProtos.NodePublishVolumeResponse.Builder builder;
+
+  public NodePublishVolumeResponsePBImpl(
+      CsiAdaptorProtos.NodePublishVolumeResponse proto) {
+    this.builder = proto.toBuilder();
+  }
+
+  public NodePublishVolumeResponsePBImpl() {
+    this.builder = CsiAdaptorProtos.NodePublishVolumeResponse
+        .newBuilder();
+  }
+
+  public CsiAdaptorProtos.NodePublishVolumeResponse getProto() {
+    Preconditions.checkNotNull(builder);
+    return builder.build();
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+}

+ 89 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodeUnpublishVolumeRequestPBImpl.java

@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.TextFormat;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+
+/**
+ * The protobuf record class for request to un-publish volume on node manager.
+ */
+public class NodeUnpublishVolumeRequestPBImpl extends
+    NodeUnpublishVolumeRequest {
+
+  private CsiAdaptorProtos.NodeUnpublishVolumeRequest.Builder builder;
+
+  public NodeUnpublishVolumeRequestPBImpl() {
+    this.builder = CsiAdaptorProtos.NodeUnpublishVolumeRequest.newBuilder();
+  }
+
+  public NodeUnpublishVolumeRequestPBImpl(
+      CsiAdaptorProtos.NodeUnpublishVolumeRequest request) {
+    this.builder = request.toBuilder();
+  }
+
+  public CsiAdaptorProtos.NodeUnpublishVolumeRequest getProto() {
+    Preconditions.checkNotNull(builder);
+    return builder.build();
+  }
+
+  @Override
+  public void setVolumeId(String volumeId) {
+    Preconditions.checkNotNull(builder);
+    this.builder.setVolumeId(volumeId);
+  }
+
+  @Override
+  public void setTargetPath(String targetPath) {
+    Preconditions.checkNotNull(builder);
+    this.builder.setTargetPath(targetPath);
+  }
+
+  @Override
+  public String getVolumeId() {
+    return builder.getVolumeId();
+  }
+
+  @Override
+  public String getTargetPath() {
+    return builder.getTargetPath();
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+}

+ 61 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodeUnpublishVolumeResponsePBImpl.java

@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+
+/**
+ * Response to the un-publish volume request on node manager.
+ */
+public class NodeUnpublishVolumeResponsePBImpl extends
+    NodeUnpublishVolumeResponse {
+
+  private CsiAdaptorProtos.NodeUnpublishVolumeResponse.Builder builder;
+
+  public NodeUnpublishVolumeResponsePBImpl() {
+    this.builder = CsiAdaptorProtos.NodeUnpublishVolumeResponse.newBuilder();
+  }
+
+  public NodeUnpublishVolumeResponsePBImpl(
+      CsiAdaptorProtos.NodeUnpublishVolumeResponse response) {
+    this.builder = response.toBuilder();
+  }
+
+  public CsiAdaptorProtos.NodeUnpublishVolumeResponse getProto() {
+    Preconditions.checkNotNull(builder);
+    return builder.build();
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+}

+ 15 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -4083,4 +4083,19 @@
     <name>yarn.nodemanager.csi-driver-adaptor.addresses</name>
     <value></value>
   </property>
+
+  <property>
+    <description>
+      CSI driver names running on this node, multiple driver names need to
+      be delimited by comma. The driver name should be same value returned
+      by the getPluginInfo call. For each of the CSI driver name, it must
+      to define following two corresponding properties:
+        "yarn.nodemanager.csi-driver.${NAME}.endpoint"
+        "yarn.nodemanager.csi-driver-adaptor.${NAME}.address"
+      The 1st property defines where the driver's endpoint is;
+      2nd property defines where the mapping csi-driver-adaptor's address is.
+    </description>
+    <name>yarn.nodemanager.csi-driver.names</name>
+    <value></value>
+  </property>
 </configuration>

+ 18 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/pom.xml

@@ -180,6 +180,24 @@
                     <excludePackageNames>csi.v0</excludePackageNames>
                 </configuration>
             </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>${basedir}/target/generated-sources/protobuf/java</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 </project>

+ 72 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorProtocolService.java

@@ -21,29 +21,36 @@ import com.google.common.annotations.VisibleForTesting;
 import csi.v0.Csi;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
 import org.apache.hadoop.yarn.csi.client.CsiClient;
 import org.apache.hadoop.yarn.csi.client.CsiClientImpl;
 import org.apache.hadoop.yarn.csi.translator.ProtoTranslatorFactory;
-import org.apache.hadoop.yarn.csi.utils.ConfigUtils;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
+import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
+import org.apache.hadoop.yarn.server.api.AuxiliaryService;
+import org.apache.hadoop.yarn.util.csi.CsiConfigUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
 
 /**
  * This is a Hadoop RPC server, we uses the Hadoop RPC framework here
  * because we need to stick to the security model current Hadoop supports.
  */
-public class CsiAdaptorProtocolService extends AbstractService
+public class CsiAdaptorProtocolService extends AuxiliaryService
     implements CsiAdaptorProtocol {
 
   private static final Logger LOG =
@@ -54,6 +61,12 @@ public class CsiAdaptorProtocolService extends AbstractService
   private CsiClient csiClient;
   private String csiDriverName;
 
+  public CsiAdaptorProtocolService() {
+    super(CsiAdaptorProtocolService.class.getName());
+    // TODO read this from configuration
+    this.csiDriverName =  "ch.ctrox.csi.s3-driver";
+  }
+
   public CsiAdaptorProtocolService(String driverName,
       String domainSocketPath) {
     super(CsiAdaptorProtocolService.class.getName());
@@ -68,7 +81,11 @@ public class CsiAdaptorProtocolService extends AbstractService
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    adaptorServiceAddress = ConfigUtils
+
+    String driverEndpoint = CsiConfigUtils
+        .getCsiDriverEndpoint(csiDriverName, conf);
+    this.csiClient = new CsiClientImpl(driverEndpoint);
+    adaptorServiceAddress = CsiConfigUtils
         .getCsiAdaptorAddressForDriver(csiDriverName, conf);
     super.serviceInit(conf);
   }
@@ -119,4 +136,55 @@ public class CsiAdaptorProtocolService extends AbstractService
         Csi.ValidateVolumeCapabilitiesResponse.class)
         .convertFrom(response);
   }
+
+  @Override
+  public NodePublishVolumeResponse nodePublishVolume(
+      NodePublishVolumeRequest request) throws YarnException, IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Received nodePublishVolume call, request: {}",
+          request.toString());
+    }
+    Csi.NodePublishVolumeRequest req = ProtoTranslatorFactory
+        .getTranslator(NodePublishVolumeRequest.class,
+            Csi.NodePublishVolumeRequest.class).convertTo(request);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Translate to CSI proto message: {}", req.toString());
+    }
+    csiClient.nodePublishVolume(req);
+    return NodePublishVolumeResponse.newInstance();
+  }
+
+  @Override
+  public NodeUnpublishVolumeResponse nodeUnpublishVolume(
+      NodeUnpublishVolumeRequest request) throws YarnException, IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Received nodeUnpublishVolume call, request: {}",
+          request.toString());
+    }
+    Csi.NodeUnpublishVolumeRequest req = ProtoTranslatorFactory
+        .getTranslator(NodeUnpublishVolumeRequest.class,
+            Csi.NodeUnpublishVolumeRequest.class).convertTo(request);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Translate to CSI proto message: {}", req.toString());
+    }
+    csiClient.nodeUnpublishVolume(req);
+    return NodeUnpublishVolumeResponse.newInstance();
+  }
+
+  @Override
+  public void initializeApplication(
+      ApplicationInitializationContext initAppContext) {
+    // do nothing
+  }
+
+  @Override
+  public void stopApplication(
+      ApplicationTerminationContext stopAppContext) {
+    // do nothing
+  }
+
+  @Override
+  public ByteBuffer getMetaData() {
+    return ByteBuffer.allocate(0);
+  }
 }

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java

@@ -40,4 +40,10 @@ public interface CsiClient {
 
   Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities(
       Csi.ValidateVolumeCapabilitiesRequest request) throws IOException;
+
+  Csi.NodePublishVolumeResponse nodePublishVolume(
+      Csi.NodePublishVolumeRequest request) throws IOException;
+
+  Csi.NodeUnpublishVolumeResponse nodeUnpublishVolume(
+      Csi.NodeUnpublishVolumeRequest request) throws IOException;
 }

+ 20 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClientImpl.java

@@ -59,4 +59,24 @@ public class CsiClientImpl implements CsiClient {
           .validateVolumeCapabilities(request);
     }
   }
+
+  @Override
+  public Csi.NodePublishVolumeResponse nodePublishVolume(
+      Csi.NodePublishVolumeRequest request) throws IOException {
+    try (CsiGrpcClient client = CsiGrpcClient.newBuilder()
+        .setDomainSocketAddress(address).build()) {
+      return client.createNodeBlockingStub()
+          .nodePublishVolume(request);
+    }
+  }
+
+  @Override
+  public Csi.NodeUnpublishVolumeResponse nodeUnpublishVolume(
+      Csi.NodeUnpublishVolumeRequest request) throws IOException {
+    try (CsiGrpcClient client = CsiGrpcClient.newBuilder()
+        .setDomainSocketAddress(address).build()) {
+      return client.createNodeBlockingStub()
+          .nodeUnpublishVolume(request);
+    }
+  }
 }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/GetPluginInfoResponseProtoTranslator.java

@@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
  * Protobuf message translator for GetPluginInfoResponse and
  * Csi.GetPluginInfoResponse.
  */
-public class GetPluginInfoResponseProtoTranslator implements
+public class GetPluginInfoResponseProtoTranslator<A, B> implements
     ProtoTranslator<GetPluginInfoResponse, Csi.GetPluginInfoResponse> {
 
   @Override public Csi.GetPluginInfoResponse convertTo(

+ 77 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/NodePublishVolumeRequestProtoTranslator.java

@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.translator;
+
+import csi.v0.Csi;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * This class helps to transform a YARN side NodePublishVolumeRequest
+ * to corresponding CSI protocol message.
+ * @param <A> YARN NodePublishVolumeRequest
+ * @param <B> CSI NodePublishVolumeRequest
+ */
+public class NodePublishVolumeRequestProtoTranslator<A, B> implements
+    ProtoTranslator<NodePublishVolumeRequest,
+        Csi.NodePublishVolumeRequest> {
+
+  @Override
+  public Csi.NodePublishVolumeRequest convertTo(
+      NodePublishVolumeRequest messageA) throws YarnException {
+    Csi.NodePublishVolumeRequest.Builder builder =
+        Csi.NodePublishVolumeRequest.newBuilder();
+    ValidateVolumeCapabilitiesRequest.VolumeCapability cap =
+        messageA.getVolumeCapability();
+    Csi.VolumeCapability csiVolumeCap = Csi.VolumeCapability.newBuilder()
+        .setAccessMode(Csi.VolumeCapability.AccessMode.newBuilder()
+            .setModeValue(cap.getAccessMode().ordinal())) // access mode
+        // TODO support block
+        .setMount(Csi.VolumeCapability.MountVolume.newBuilder()
+            // TODO support fsType
+            .setFsType("xfs") // fs type
+            .addAllMountFlags(cap.getMountFlags())) // mount flags
+        .build();
+    builder.setVolumeCapability(csiVolumeCap);
+    builder.setVolumeId(messageA.getVolumeId());
+    builder.setTargetPath(messageA.getTargetPath());
+    builder.setReadonly(messageA.getReadOnly());
+    builder.putAllNodePublishSecrets(messageA.getSecrets());
+    builder.putAllPublishInfo(messageA.getPublishContext());
+    builder.setStagingTargetPath(messageA.getStagingPath());
+    return builder.build();
+  }
+
+  @Override
+  public NodePublishVolumeRequest convertFrom(
+      Csi.NodePublishVolumeRequest messageB) throws YarnException {
+    Csi.VolumeCapability cap0 = messageB.getVolumeCapability();
+    ValidateVolumeCapabilitiesRequest.VolumeCapability cap =
+        new ValidateVolumeCapabilitiesRequest.VolumeCapability(
+            ValidateVolumeCapabilitiesRequest.AccessMode
+                .valueOf(cap0.getAccessMode().getMode().name()),
+            ValidateVolumeCapabilitiesRequest.VolumeType.FILE_SYSTEM,
+            cap0.getMount().getMountFlagsList());
+    return NodePublishVolumeRequest.newInstance(
+        messageB.getVolumeId(), messageB.getReadonly(),
+        messageB.getTargetPath(), messageB.getStagingTargetPath(),
+        cap, messageB.getPublishInfoMap(),
+        messageB.getNodePublishSecretsMap());
+  }
+}

+ 49 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/NodeUnpublishVolumeRequestProtoTranslator.java

@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.translator;
+
+import csi.v0.Csi;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * This class helps to transform a YARN side NodeUnpublishVolumeRequest
+ * to corresponding CSI protocol message.
+ * @param <A> YARN NodeUnpublishVolumeRequest
+ * @param <B> CSI NodeUnpublishVolumeRequest
+ */
+public class NodeUnpublishVolumeRequestProtoTranslator<A, B> implements
+    ProtoTranslator<NodeUnpublishVolumeRequest,
+        Csi.NodeUnpublishVolumeRequest> {
+
+  @Override
+  public Csi.NodeUnpublishVolumeRequest convertTo(
+      NodeUnpublishVolumeRequest messageA) throws YarnException {
+    return Csi.NodeUnpublishVolumeRequest.newBuilder()
+        .setVolumeId(messageA.getVolumeId())
+        .setTargetPath(messageA.getTargetPath())
+        .build();
+  }
+
+  @Override
+  public NodeUnpublishVolumeRequest convertFrom(
+      Csi.NodeUnpublishVolumeRequest messageB) throws YarnException {
+    return NodeUnpublishVolumeRequest
+        .newInstance(messageB.getVolumeId(), messageB.getTargetPath());
+  }
+}

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ProtoTranslatorFactory.java

@@ -18,8 +18,11 @@
 package org.apache.hadoop.yarn.csi.translator;
 
 import csi.v0.Csi;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
 
 /**
  * Factory class to get desired proto transformer instance.
@@ -57,6 +60,15 @@ public final class ProtoTranslatorFactory {
     } else if (yarnProto == ValidateVolumeCapabilitiesResponse.class
         && csiProto == Csi.ValidateVolumeCapabilitiesResponse.class) {
       return new ValidationVolumeCapabilitiesResponseProtoTranslator();
+    } else if (yarnProto == NodePublishVolumeRequest.class
+        && csiProto == Csi.NodePublishVolumeRequest.class) {
+      return new NodePublishVolumeRequestProtoTranslator();
+    } else if (yarnProto == GetPluginInfoResponse.class
+        && csiProto == Csi.GetPluginInfoResponse.class) {
+      return new GetPluginInfoResponseProtoTranslator();
+    } else if (yarnProto == NodeUnpublishVolumeRequest.class
+        && csiProto == Csi.NodeUnpublishVolumeRequest.class) {
+      return new NodeUnpublishVolumeRequestProtoTranslator();
     }
     throw new IllegalArgumentException("A problem is found while processing"
         + " proto message translating. Unexpected message types,"

+ 14 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java

@@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResp
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl;
 import org.apache.hadoop.yarn.client.NMProxy;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.csi.client.CsiClient;
+import org.apache.hadoop.yarn.csi.client.ICsiClientTest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.junit.AfterClass;
@@ -81,13 +81,18 @@ public class TestCsiAdaptorService {
     conf.setSocketAddr(
         YarnConfiguration.NM_CSI_ADAPTOR_PREFIX + "test-driver.address",
         address);
+    conf.set(
+        YarnConfiguration.NM_CSI_DRIVER_PREFIX + "test-driver.endpoint",
+        "unix:///tmp/test-driver.scok");
     CsiAdaptorProtocolService service =
         new CsiAdaptorProtocolService("test-driver", domainSocket);
+    service.init(conf);
+    service.start();
 
     // inject a fake CSI client
     // this client validates if the ValidateVolumeCapabilitiesRequest
     // is integrity, and then reply a fake response
-    service.setCsiClient(new CsiClient() {
+    service.setCsiClient(new ICsiClientTest() {
       @Override
       public Csi.GetPluginInfoResponse getPluginInfo() {
         return Csi.GetPluginInfoResponse.newBuilder()
@@ -103,7 +108,7 @@ public class TestCsiAdaptorService {
         Assert.assertEquals("volume-id-0000123", request.getVolumeId());
         Assert.assertEquals(1, request.getVolumeCapabilitiesCount());
         Assert.assertEquals(Csi.VolumeCapability.AccessMode
-            .newBuilder().setModeValue(5).build(),
+                .newBuilder().setModeValue(5).build(),
             request.getVolumeCapabilities(0).getAccessMode());
         Assert.assertTrue(request.getVolumeCapabilities(0).hasMount());
         Assert.assertEquals(2, request.getVolumeCapabilities(0)
@@ -123,9 +128,6 @@ public class TestCsiAdaptorService {
       }
     });
 
-    service.init(conf);
-    service.start();
-
     try (CsiAdaptorProtocolPBClientImpl client =
         new CsiAdaptorProtocolPBClientImpl(1L, address, new Configuration())) {
       ValidateVolumeCapabilitiesRequest request =
@@ -157,13 +159,18 @@ public class TestCsiAdaptorService {
     conf.setSocketAddr(
         YarnConfiguration.NM_CSI_ADAPTOR_PREFIX + "test-driver.address",
         address);
+    conf.set(
+        YarnConfiguration.NM_CSI_DRIVER_PREFIX + "test-driver.endpoint",
+        "unix:///tmp/test-driver.scok");
     CsiAdaptorProtocolService service =
         new CsiAdaptorProtocolService("test-driver", domainSocket);
+    service.init(conf);
+    service.start();
 
     // inject a fake CSI client
     // this client validates if the ValidateVolumeCapabilitiesRequest
     // is integrity, and then reply a fake response
-    service.setCsiClient(new CsiClient() {
+    service.setCsiClient(new ICsiClientTest() {
       @Override
       public Csi.GetPluginInfoResponse getPluginInfo() {
         return Csi.GetPluginInfoResponse.newBuilder()
@@ -199,9 +206,6 @@ public class TestCsiAdaptorService {
       }
     });
 
-    service.init(conf);
-    service.start();
-
     YarnRPC rpc = YarnRPC.create(conf);
     UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
     CsiAdaptorProtocol adaptorClient = NMProxy

+ 55 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestNodePublishVolumeRequest.java

@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;
+
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodePublishVolumeRequestPBImpl;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos.VolumeCapability.AccessMode;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos.VolumeCapability.VolumeType;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * UT for NodePublishVolumeRequest.
+ */
+public class TestNodePublishVolumeRequest {
+
+  @Test
+  public void testPBRecord() {
+    CsiAdaptorProtos.VolumeCapability capability =
+        CsiAdaptorProtos.VolumeCapability.newBuilder()
+            .setAccessMode(AccessMode.MULTI_NODE_READER_ONLY)
+            .setVolumeType(VolumeType.FILE_SYSTEM)
+            .build();
+    CsiAdaptorProtos.NodePublishVolumeRequest proto =
+        CsiAdaptorProtos.NodePublishVolumeRequest.newBuilder()
+            .setReadonly(false)
+            .setVolumeId("test-vol-000001")
+            .setTargetPath("/mnt/data")
+            .setStagingTargetPath("/mnt/staging")
+            .setVolumeCapability(capability)
+            .build();
+
+    NodePublishVolumeRequestPBImpl pbImpl =
+        new NodePublishVolumeRequestPBImpl(proto);
+    Assert.assertEquals("test-vol-000001", pbImpl.getVolumeId());
+    Assert.assertEquals("/mnt/data", pbImpl.getTargetPath());
+    Assert.assertEquals("/mnt/staging", pbImpl.getStagingPath());
+    Assert.assertFalse(pbImpl.getReadOnly());
+  }
+}

+ 53 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/client/ICsiClientTest.java

@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.client;
+
+import csi.v0.Csi;
+
+import java.io.IOException;
+
+/**
+ * This interface is used only in testing. It gives default implementation
+ * of all methods.
+ */
+public interface ICsiClientTest extends CsiClient {
+
+  @Override
+  default Csi.GetPluginInfoResponse getPluginInfo()
+      throws IOException {
+    return null;
+  }
+
+  @Override
+  default Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities(
+      Csi.ValidateVolumeCapabilitiesRequest request) throws IOException {
+    return null;
+  }
+
+  @Override
+  default Csi.NodePublishVolumeResponse nodePublishVolume(
+      Csi.NodePublishVolumeRequest request) throws IOException {
+    return null;
+  }
+
+  @Override
+  default Csi.NodeUnpublishVolumeResponse nodeUnpublishVolume(
+      Csi.NodeUnpublishVolumeRequest request) throws IOException {
+    return null;
+  }
+}

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java

@@ -75,6 +75,10 @@ public interface Container extends EventHandler<ContainerEvent> {
 
   void setWorkDir(String workDir);
 
+  String getCsiVolumesRootDir();
+
+  void setCsiVolumesRootDir(String volumesRootDir);
+
   String getLogDir();
 
   void setLogDir(String logDir);

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java

@@ -172,6 +172,7 @@ public class ContainerImpl implements Container {
   private SlidingWindowRetryPolicy.RetryContext windowRetryContext;
   private SlidingWindowRetryPolicy retryPolicy;
 
+  private String csiVolumesRootDir;
   private String workDir;
   private String logDir;
   private String host;
@@ -936,6 +937,16 @@ public class ContainerImpl implements Container {
     this.workDir = workDir;
   }
 
+  @Override
+  public String getCsiVolumesRootDir() {
+    return csiVolumesRootDir;
+  }
+
+  @Override
+  public void setCsiVolumesRootDir(String volumesRootDir) {
+    this.csiVolumesRootDir = volumesRootDir;
+  }
+
   private void clearIpAndHost() {
     LOG.info("{} clearing ip and host", containerId);
     this.ips = null;

+ 32 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java

@@ -250,6 +250,10 @@ public class ContainerLaunch implements Callable<Integer> {
       Path containerWorkDir = deriveContainerWorkDir();
       recordContainerWorkDir(containerID, containerWorkDir.toString());
 
+      // Select a root dir for all csi volumes for the container
+      Path csiVolumesRoot = deriveCsiVolumesRootDir();
+      recordContainerCsiVolumesRootDir(containerID, csiVolumesRoot.toString());
+
       String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr);
       // pid file should be in nm private dir so that it is not
       // accessible by users
@@ -358,6 +362,7 @@ public class ContainerLaunch implements Callable<Integer> {
           .setUser(user)
           .setAppId(appIdStr)
           .setContainerWorkDir(containerWorkDir)
+          .setContainerCsiVolumesRootDir(csiVolumesRoot)
           .setLocalDirs(localDirs)
           .setLogDirs(logDirs)
           .setFilecacheDirs(filecacheDirs)
@@ -388,6 +393,27 @@ public class ContainerLaunch implements Callable<Integer> {
     return ret;
   }
 
+  /**
+   * Volumes mount point root:
+   *   ${YARN_LOCAL_DIR}/usercache/${user}/filecache/csiVolumes/app/container
+   * CSI volumes may creates the mount point with different permission bits.
+   * If we create the volume mount under container work dir, it may
+   * mess up the existing permission structure, which is restricted by
+   * linux container executor. So we put all volume mounts under a same
+   * root dir so it is easier cleanup.
+   **/
+  private Path deriveCsiVolumesRootDir() throws IOException {
+    final String containerVolumePath =
+        ContainerLocalizer.USERCACHE + Path.SEPARATOR
+            + container.getUser() + Path.SEPARATOR
+            + ContainerLocalizer.FILECACHE + Path.SEPARATOR
+            + ContainerLocalizer.CSI_VOLIUME_MOUNTS_ROOT + Path.SEPARATOR
+            + app.getAppId().toString() + Path.SEPARATOR
+            + container.getContainerId().toString();
+    return dirsHandler.getLocalPathForWrite(containerVolumePath,
+        LocalDirAllocator.SIZE_UNKNOWN, false);
+  }
+
   private Path deriveContainerWorkDir() throws IOException {
 
     final String containerWorkDirPath =
@@ -1752,6 +1778,12 @@ public class ContainerLaunch implements Callable<Integer> {
     }
   }
 
+  private void recordContainerCsiVolumesRootDir(ContainerId containerId,
+      String volumesRoot) throws IOException {
+    container.setCsiVolumesRootDir(volumesRoot);
+    // TODO persistent to the NM store...
+  }
+
   protected Path getContainerWorkDir() throws IOException {
     String containerWorkDir = container.getWorkDir();
     if (containerWorkDir == null

+ 64 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java

@@ -24,7 +24,10 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
+import org.apache.hadoop.yarn.api.impl.pb.client.CsiAdaptorProtocolPBClientImpl;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommand;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommandExecutor;
@@ -35,7 +38,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerVolumeCommand;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.DockerCommandPlugin;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.volume.csi.ContainerVolumePublisher;
 import org.apache.hadoop.yarn.util.DockerClientConfigHandler;
+import org.apache.hadoop.yarn.util.csi.CsiConfigUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -69,6 +74,7 @@ import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
@@ -76,6 +82,7 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -262,6 +269,7 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
   private Configuration conf;
   private Context nmContext;
   private DockerClient dockerClient;
+  private Map<String, CsiAdaptorProtocol> csiClients = new HashMap<>();
   private PrivilegedOperationExecutor privilegedOperationExecutor;
   private String defaultImageName;
   private Set<String> allowedNetworks = new HashSet<>();
@@ -363,6 +371,9 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
       throw new ContainerExecutionException(message);
     }
 
+    // initialize csi adaptors if necessary
+    initiateCsiClients(conf);
+
     privilegedContainersAcl = new AccessControlList(conf.getTrimmed(
         YarnConfiguration.NM_DOCKER_PRIVILEGED_CONTAINERS_ACL,
         YarnConfiguration.DEFAULT_NM_DOCKER_PRIVILEGED_CONTAINERS_ACL));
@@ -398,6 +409,10 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
         YarnConfiguration.NM_DOCKER_DEFAULT_TMPFS_MOUNTS)));
   }
 
+  public Map<String, CsiAdaptorProtocol> getCsiClients() {
+    return csiClients;
+  }
+
   @Override
   public boolean isRuntimeRequested(Map<String, String> env) {
     return isDockerContainerRequested(conf, env);
@@ -942,6 +957,18 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
       }
     }
 
+    ContainerVolumePublisher publisher = new ContainerVolumePublisher(
+        container, container.getCsiVolumesRootDir(), this);
+    try {
+      Map<String, String> volumeMounts = publisher.publishVolumes();
+      volumeMounts.forEach((local, remote) ->
+          runCommand.addReadWriteMountLocation(local, remote));
+    } catch (YarnException | IOException e) {
+      throw new ContainerExecutionException(
+          "Container requests for volume resource but we are failed"
+              + " to publish volumes on this node");
+    }
+
     if (environment.containsKey(ENV_DOCKER_CONTAINER_TMPFS_MOUNTS)) {
       String[] tmpfsMounts = environment.get(ENV_DOCKER_CONTAINER_TMPFS_MOUNTS)
           .split(",");
@@ -1442,6 +1469,14 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
       ContainerExecutor.Signal signal) throws ContainerExecutionException {
     Container container = ctx.getContainer();
 
+    ContainerVolumePublisher publisher = new ContainerVolumePublisher(
+        container, container.getCsiVolumesRootDir(), this);
+    try {
+      publisher.unpublishVolumes();
+    } catch (YarnException | IOException e) {
+      throw new ContainerExecutionException(e);
+    }
+
     // Only need to check whether the container was asked to be privileged.
     // If the container had failed the permissions checks upon launch, it
     // would have never been launched and thus we wouldn't be here
@@ -1537,4 +1572,33 @@ public class DockerLinuxContainerRuntime implements LinuxContainerRuntime {
     }
   }
 
+  /**
+   * Initiate CSI clients to talk to the CSI adaptors on this node and
+   * cache the clients for easier fetch.
+   * @param config configuration
+   * @throws ContainerExecutionException
+   */
+  private void initiateCsiClients(Configuration config)
+      throws ContainerExecutionException {
+    String[] driverNames = CsiConfigUtils.getCsiDriverNames(config);
+    if (driverNames != null && driverNames.length > 0) {
+      for (String driverName : driverNames) {
+        try {
+          // find out the adaptors service address
+          InetSocketAddress adaptorServiceAddress =
+              CsiConfigUtils.getCsiAdaptorAddressForDriver(driverName, config);
+          LOG.info("Initializing a csi-adaptor-client for csi-adaptor {},"
+              + " csi-driver {}", adaptorServiceAddress.toString(), driverName);
+          CsiAdaptorProtocolPBClientImpl client =
+              new CsiAdaptorProtocolPBClientImpl(1L, adaptorServiceAddress,
+                  config);
+          csiClients.put(driverName, client);
+        } catch (IOException e1) {
+          throw new ContainerExecutionException(e1.getMessage());
+        } catch (YarnException e2) {
+          throw new ContainerExecutionException(e2.getMessage());
+        }
+      }
+    }
+  }
 }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java

@@ -101,6 +101,7 @@ public class ContainerLocalizer {
       new FsPermission((short)0710);
   private static final FsPermission USERCACHE_FOLDER_PERMS =
       new FsPermission((short) 0755);
+  public static final String CSI_VOLIUME_MOUNTS_ROOT = "csivolumes";
 
   private final String user;
   private final String appId;

+ 205 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/volume/csi/ContainerVolumePublisher.java

@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.volume.csi;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DockerLinuxContainerRuntime;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData;
+import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Publish/un-publish CSI volumes on node manager.
+ */
+public class ContainerVolumePublisher {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerVolumePublisher.class);
+
+  private final Container container;
+  private final String localMountRoot;
+  private final DockerLinuxContainerRuntime runtime;
+
+  public ContainerVolumePublisher(Container container, String localMountRoot,
+      DockerLinuxContainerRuntime runtime) {
+    LOG.info("Initiate container volume publisher, containerID={},"
+            + " volume local mount rootDir={}",
+        container.getContainerId().toString(), localMountRoot);
+    this.container = container;
+    this.localMountRoot = localMountRoot;
+    this.runtime = runtime;
+  }
+
+  /**
+   * It first discovers the volume info from container resource;
+   * then negotiates with CSI driver adaptor to publish the volume on this
+   * node manager, on a specific directory under container's work dir;
+   * and then map the local mounted directory to volume target mount in
+   * the docker container.
+   *
+   * CSI volume publish is a two phase work, by reaching up here
+   * we can assume the 1st phase is done on the RM side, which means
+   * YARN is already called the controller service of csi-driver
+   * to publish the volume; here we only need to call the node service of
+   * csi-driver to publish the volume on this local node manager.
+   *
+   * @return a map where each key is the local mounted path on current node,
+   *   and value is the remote mount path on the container.
+   * @throws YarnException
+   * @throws IOException
+   */
+  public Map<String, String> publishVolumes() throws YarnException,
+      IOException {
+    LOG.info("publishing volumes");
+    Map<String, String> volumeMounts = new HashMap<>();
+    List<VolumeMetaData> volumes = getVolumes();
+    LOG.info("Found {} volumes to be published on this node", volumes.size());
+    for (VolumeMetaData volume : volumes) {
+      Map<String, String> bindings = publishVolume(volume);
+      if (bindings != null && !bindings.isEmpty()) {
+        volumeMounts.putAll(bindings);
+      }
+    }
+    return volumeMounts;
+  }
+
+  public void unpublishVolumes() throws YarnException, IOException {
+    LOG.info("Un-publishing Volumes");
+    List<VolumeMetaData> volumes = getVolumes();
+    LOG.info("Volumes to un-publish {}", volumes.size());
+    for (VolumeMetaData volume : volumes) {
+      this.unpublishVolume(volume);
+    }
+  }
+
+  private File getLocalVolumeMountPath(
+      String containerWorkDir, String volumeId) {
+    return new File(containerWorkDir, volumeId + "_mount");
+  }
+
+  private File getLocalVolumeStagingPath(
+      String containerWorkDir, String volumeId) {
+    return new File(containerWorkDir, volumeId + "_staging");
+  }
+
+  private List<VolumeMetaData> getVolumes() throws InvalidVolumeException {
+    List<VolumeMetaData> volumes = new ArrayList<>();
+    Resource containerResource = container.getResource();
+    if (containerResource != null) {
+      for (ResourceInformation resourceInformation :
+          containerResource.getAllResourcesListCopy()) {
+        if (resourceInformation.getTags().contains("system:csi-volume")) {
+          volumes.addAll(VolumeMetaData.fromResource(resourceInformation));
+        }
+      }
+    }
+    if (volumes.size() > 0) {
+      LOG.info("Total number of volumes require provisioning is {}",
+          volumes.size());
+    }
+    return volumes;
+  }
+
+  private Map<String, String> publishVolume(VolumeMetaData volume)
+      throws IOException, YarnException {
+    Map<String, String> bindVolumes = new HashMap<>();
+    // compose a local mount for CSI volume with the container ID
+    File localMount = getLocalVolumeMountPath(
+        localMountRoot, volume.getVolumeId().toString());
+    File localStaging = getLocalVolumeStagingPath(
+        localMountRoot, volume.getVolumeId().toString());
+    LOG.info("Volume {}, local mount path: {}, local staging path {}",
+        volume.getVolumeId().toString(), localMount, localStaging);
+
+    NodePublishVolumeRequest publishRequest = NodePublishVolumeRequest
+        .newInstance(volume.getVolumeId().getId(), // volume Id
+            false, // read only flag
+            localMount.getAbsolutePath(), // target path
+            localStaging.getAbsolutePath(), // staging path
+            new ValidateVolumeCapabilitiesRequest.VolumeCapability(
+                ValidateVolumeCapabilitiesRequest
+                    .AccessMode.SINGLE_NODE_WRITER,
+                ValidateVolumeCapabilitiesRequest.VolumeType.FILE_SYSTEM,
+                ImmutableList.of()), // capability
+            ImmutableMap.of(), // publish context
+            ImmutableMap.of());  // secrets
+
+    // make sure the volume is a known type
+    if (runtime.getCsiClients().get(volume.getDriverName()) == null) {
+      throw new YarnException("No csi-adaptor is found that can talk"
+          + " to csi-driver " + volume.getDriverName());
+    }
+
+    // publish volume to node
+    LOG.info("Publish volume on NM, request {}",
+        publishRequest.toString());
+    runtime.getCsiClients().get(volume.getDriverName())
+        .nodePublishVolume(publishRequest);
+    // once succeed, bind the container to this mount
+    String containerMountPath = volume.getMountPoint();
+    bindVolumes.put(localMount.getAbsolutePath(), containerMountPath);
+    return bindVolumes;
+  }
+
+  private void unpublishVolume(VolumeMetaData volume)
+      throws YarnException, IOException {
+    CsiAdaptorProtocol csiClient =
+        runtime.getCsiClients().get(volume.getDriverName());
+    if (csiClient == null) {
+      throw new YarnException(
+          "No csi-adaptor is found that can talk"
+              + " to csi-driver " + volume.getDriverName());
+    }
+
+    // When container is launched, the container work dir is memorized,
+    // and that is also the dir we mount the volume to.
+    File localMount = getLocalVolumeMountPath(container.getCsiVolumesRootDir(),
+        volume.getVolumeId().toString());
+    if (!localMount.exists()) {
+      LOG.info("Local mount {} no longer exist, skipping cleaning"
+          + " up the volume", localMount.getAbsolutePath());
+      return;
+    }
+    NodeUnpublishVolumeRequest unpublishRequest =
+        NodeUnpublishVolumeRequest.newInstance(
+            volume.getVolumeId().getId(), // volume id
+            localMount.getAbsolutePath());  // target path
+
+    // un-publish volume from node
+    LOG.info("Un-publish volume {}, request {}",
+        volume.getVolumeId().toString(), unpublishRequest.toString());
+    csiClient.nodeUnpublishVolume(unpublishRequest);
+  }
+}

+ 22 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/volume/csi/package-info.java

@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * CSI volumes.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.volume.csi;

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerStartContext.java

@@ -45,6 +45,7 @@ public final class ContainerStartContext {
   private final String user;
   private final String appId;
   private final Path containerWorkDir;
+  private final Path csiVolumesRootDir;
   private final List<String> localDirs;
   private final List<String> logDirs;
   private final List<String> filecacheDirs;
@@ -64,6 +65,7 @@ public final class ContainerStartContext {
     private String user;
     private String appId;
     private Path containerWorkDir;
+    private Path csiVolumesRoot;
     private List<String> localDirs;
     private List<String> logDirs;
     private List<String> filecacheDirs;
@@ -118,6 +120,11 @@ public final class ContainerStartContext {
       return this;
     }
 
+    public Builder setContainerCsiVolumesRootDir(Path csiVolumesRootDir) {
+      this.csiVolumesRoot = csiVolumesRootDir;
+      return this;
+    }
+
     public Builder setContainerWorkDir(Path containerWorkDir) {
       this.containerWorkDir = containerWorkDir;
       return this;
@@ -188,6 +195,7 @@ public final class ContainerStartContext {
     this.containerLogDirs = builder.containerLogDirs;
     this.userFilecacheDirs = builder.userFilecacheDirs;
     this.applicationLocalDirs = builder.applicationLocalDirs;
+    this.csiVolumesRootDir = builder.csiVolumesRoot;
   }
 
   public Container getContainer() {
@@ -262,4 +270,8 @@ public final class ContainerStartContext {
   public List<String> getApplicationLocalDirs() {
     return Collections.unmodifiableList(this.applicationLocalDirs);
   }
+
+  public Path getCsiVolumesRootDir() {
+    return this.csiVolumesRootDir;
+  }
 }

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java

@@ -173,6 +173,16 @@ public class MockContainer implements Container {
   public void setWorkDir(String workDir) {
   }
 
+  @Override
+  public String getCsiVolumesRootDir() {
+    return null;
+  }
+
+  @Override
+  public void setCsiVolumesRootDir(String volumesRootDir) {
+
+  }
+
   @Override
   public String getLogDir() {
     return null;

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java

@@ -146,6 +146,8 @@ public class VolumeImpl implements Volume {
     @Override
     public VolumeState transition(VolumeImpl volume,
         VolumeEvent volumeEvent) {
+      // Some of CSI driver implementation does't provide the capability
+      // to validate volumes. Skip this for now.
       try {
         // this call could cross node, we should keep the message tight
         // TODO we should parse the capability from volume resource spec