Browse Source

YARN-8873. [YARN-8811] Add CSI java-based client library. Contributed by Weiwei Yang.

Sunil G 6 years ago
parent
commit
0efddd85f0
15 changed files with 1880 additions and 0 deletions
  1. 19 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/dev-support/findbugs-exclude.xml
  2. 209 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/pom.xml
  3. 39 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java
  4. 51 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClientImpl.java
  5. 127 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiGrpcClient.java
  6. 21 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/package-info.java
  7. 21 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/package-info.java
  8. 52 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/GrpcHelper.java
  9. 21 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/package-info.java
  10. 1114 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/proto/csi.proto
  11. 65 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/client/FakeCsiDriver.java
  12. 42 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/client/FakeCsiIdentityService.java
  13. 77 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/client/TestCsiClient.java
  14. 21 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/client/package-info.java
  15. 1 0
      hadoop-yarn-project/hadoop-yarn/pom.xml

+ 19 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/dev-support/findbugs-exclude.xml

@@ -0,0 +1,19 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<FindBugsFilter>
+
+</FindBugsFilter>

+ 209 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/pom.xml

@@ -0,0 +1,209 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>hadoop-yarn</artifactId>
+        <groupId>org.apache.hadoop</groupId>
+        <version>3.3.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>hadoop-yarn-csi</artifactId>
+    <name>Apache Hadoop YARN CSI</name>
+    <packaging>jar</packaging>
+
+    <properties>
+        <protobuf.version>3.6.1</protobuf.version>
+        <guava.version>20.0</guava.version>
+        <grpc.version>1.15.1</grpc.version>
+        <netty-all.version>4.1.27.Final</netty-all.version>
+        <os-maven-plugin.version>1.5.0.Final</os-maven-plugin.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>${protobuf.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+            <version>${netty-all.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-core</artifactId>
+            <version>${grpc.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-protobuf</artifactId>
+            <version>${grpc.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-stub</artifactId>
+            <version>${grpc.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-netty</artifactId>
+            <version>${grpc.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <extensions>
+            <extension>
+                <groupId>kr.motd.maven</groupId>
+                <artifactId>os-maven-plugin</artifactId>
+                <version>${os-maven-plugin.version}</version>
+            </extension>
+        </extensions>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <artifactId>maven-clean-plugin</artifactId>
+                <configuration>
+                    <filesets>
+                        <fileset>
+                            <directory>target/</directory>
+                        </fileset>
+                    </filesets>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.xolstice.maven.plugins</groupId>
+                <artifactId>protobuf-maven-plugin</artifactId>
+                <version>${protobuf-maven-plugin.version}</version>
+                <configuration>
+                    <protocArtifact>com.google.protobuf:protoc:3.6.1:exe:${os.detected.classifier}</protocArtifact>
+                    <pluginId>grpc-java</pluginId>
+                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.15.1:exe:${os.detected.classifier}</pluginArtifact>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>compile-custom</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>${maven-shade-plugin.version}</version>
+                <configuration>
+                    <createDependencyReducedPom>false</createDependencyReducedPom>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <relocations>
+                                <relocation>
+                                    <pattern>io.grpc</pattern>
+                                    <shadedPattern>csi.io.grpc</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <!-- this includes protobuf-java and guava libraries -->
+                                    <pattern>com.google</pattern>
+                                    <shadedPattern>csi.com.google</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>io.netty</pattern>
+                                    <shadedPattern>csi.io.netty</shadedPattern>
+                                </relocation>
+                            </relocations>
+                            <transformers>
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" />
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <!-- We need to rename the native library file in order to let shaded classes to load them -->
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>unpack</id>
+                        <phase>package</phase>
+                        <configuration>
+                            <target>
+                                <echo message="Unpack hadoop-yarn-csi jar file"/>
+                                <unzip src="${project.build.directory}/${project.artifactId}-${project.version}.jar"
+                                       dest="${project.build.directory}/unpacked/"/>
+                                <echo message="Append the shaded prefix to netty's native file in META-INF"/>
+                                <move file="${project.build.directory}/unpacked/META-INF/native/libnetty_transport_native_epoll_x86_64.so"
+                                      tofile="${project.build.directory}/unpacked/META-INF/native/libcsi_netty_transport_native_epoll_x86_64.so" />
+                                <echo message="Re-pack the jar"/>
+                                <jar destfile="${project.build.directory}/${project.artifactId}-${project.version}.jar"
+                                     basedir="${project.build.directory}/unpacked"/>
+                            </target>
+                        </configuration>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes>
+                        <exclude>target/generated-sources/**</exclude>
+                        <exclude>target/surefire-reports/**</exclude>
+                        <exclude>target/protoc-dependencies/**</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>

+ 39 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java

@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.csi.client;
+
+import csi.v0.Csi.GetPluginInfoResponse;
+
+import java.io.IOException;
+
+/**
+ * General interface for a CSI client. This interface defines all APIs
+ * that CSI spec supports, including both identity/controller/node service
+ * APIs.
+ */
+public interface CsiClient {
+
+  /**
+   * Gets some basic info about the CSI plugin, including the driver name,
+   * version and optionally some manifest info.
+   * @return {@link GetPluginInfoResponse}
+   * @throws IOException
+   */
+  GetPluginInfoResponse getPluginInfo() throws IOException;
+}

+ 51 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClientImpl.java

@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.csi.client;
+
+import csi.v0.Csi.GetPluginInfoRequest;
+import csi.v0.Csi.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.csi.utils.GrpcHelper;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+
+/**
+ * A CSI client implementation that communicates with a CSI driver via
+ * unix domain socket. It leverages gRPC blocking stubs to synchronize
+ * the call with CSI driver. CSI spec is designed as a set of synchronized
+ * APIs, in order to make the call idempotent for failure recovery,
+ * so the client does the same.
+ */
+public class CsiClientImpl implements CsiClient {
+
+  private final SocketAddress address;
+
+  public CsiClientImpl(String address) {
+    this.address = GrpcHelper.getSocketAddress(address);
+  }
+
+  @Override
+  public GetPluginInfoResponse getPluginInfo() throws IOException {
+    try (CsiGrpcClient client = CsiGrpcClient.newBuilder()
+        .setDomainSocketAddress(address).build()) {
+      GetPluginInfoRequest request = GetPluginInfoRequest.getDefaultInstance();
+      return client.createIdentityBlockingStub().getPluginInfo(request);
+    }
+  }
+}

+ 127 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiGrpcClient.java

@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.csi.client;
+
+import csi.v0.ControllerGrpc;
+import csi.v0.IdentityGrpc;
+import csi.v0.NodeGrpc;
+import io.grpc.ManagedChannel;
+import io.grpc.netty.NettyChannelBuilder;
+import io.netty.channel.epoll.EpollDomainSocketChannel;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.unix.DomainSocketAddress;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A CSI gRPC client, it connects a CSI driver via a given unix domain socket.
+ */
+public final class CsiGrpcClient implements AutoCloseable {
+
+  private static final Log LOG = LogFactory.getLog(CsiGrpcClient.class);
+
+  private final ManagedChannel channel;
+
+  private CsiGrpcClient(ManagedChannel channel) {
+    this.channel = channel;
+  }
+
+  public static GrpcClientBuilder newBuilder() {
+    return new GrpcClientBuilder();
+  }
+
+  /**
+   * The Grpc Client builder.
+   */
+  public static class GrpcClientBuilder {
+
+    private SocketAddress socket;
+
+    public GrpcClientBuilder setDomainSocketAddress(SocketAddress address) {
+      this.socket = address;
+      return this;
+    }
+
+    private ManagedChannel getChannel(SocketAddress socketAddress)
+        throws IOException {
+      DefaultThreadFactory tf = new DefaultThreadFactory(
+          "yarn-csi-client-", true);
+      EpollEventLoopGroup loopGroup = new EpollEventLoopGroup(0, tf);
+      if (socketAddress instanceof DomainSocketAddress) {
+        ManagedChannel channel = NettyChannelBuilder.forAddress(socketAddress)
+            .channelType(EpollDomainSocketChannel.class)
+            .eventLoopGroup(loopGroup)
+            .usePlaintext()
+            .build();
+        return channel;
+      } else {
+        throw new IOException("Currently only unix domain socket is supported");
+      }
+    }
+
+    public CsiGrpcClient build() throws IOException {
+      ManagedChannel socketChannel = getChannel(socket);
+      return new CsiGrpcClient(socketChannel);
+    }
+  }
+
+  /**
+   * Shutdown the communication channel gracefully,
+   * wait for 5 seconds before it is enforced.
+   * @throws InterruptedException
+   */
+  @Override
+  public void close() {
+    try {
+      this.channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOG.error("Failed to gracefully shutdown"
+          + " gRPC communication channel in 5 seconds", e);
+    }
+  }
+
+  /**
+   * Creates a blocking stub for CSI identity plugin on the given channel.
+   * @return the blocking stub
+   */
+  public IdentityGrpc.IdentityBlockingStub createIdentityBlockingStub() {
+    return IdentityGrpc.newBlockingStub(channel);
+  }
+
+  /**
+   * Creates a blocking stub for CSI controller plugin on the given channel.
+   * @return the blocking stub
+   */
+  public ControllerGrpc.ControllerBlockingStub createControllerBlockingStub(){
+    return ControllerGrpc.newBlockingStub(channel);
+  }
+
+  /**
+   * Creates a blocking stub for CSI node plugin on the given channel.
+   * @return the blocking stub
+   */
+  public NodeGrpc.NodeBlockingStub createNodeBlockingStub() {
+    return NodeGrpc.newBlockingStub(channel);
+  }
+}

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/package-info.java

@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains CSI client classes.
+ */
+package org.apache.hadoop.yarn.csi.client;

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/package-info.java

@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains classes for CSI.
+ */
+package org.apache.hadoop.yarn.csi;

+ 52 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/GrpcHelper.java

@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.csi.utils;
+
+import io.netty.channel.unix.DomainSocketAddress;
+
+import java.io.File;
+import java.net.SocketAddress;
+
+/**
+ * Helper classes for gRPC utility functions.
+ */
+public final class GrpcHelper {
+
+  protected static final String UNIX_DOMAIN_SOCKET_PREFIX = "unix://";
+
+  private GrpcHelper() {
+    // hide constructor for utility class
+  }
+
+  public static SocketAddress getSocketAddress(String value) {
+    if (value.startsWith(UNIX_DOMAIN_SOCKET_PREFIX)) {
+      String filePath = value.substring(UNIX_DOMAIN_SOCKET_PREFIX.length());
+      File file = new File(filePath);
+      if (!file.isAbsolute()) {
+        throw new IllegalArgumentException(
+            "Unix domain socket file path must be absolute, file: " + value);
+      }
+      // Create the SocketAddress referencing the file.
+      return new DomainSocketAddress(file);
+    } else {
+      throw new IllegalArgumentException("Given address " + value
+          + " is not a valid unix domain socket path");
+    }
+  }
+}

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/package-info.java

@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains utility classes for CSI.
+ */
+package org.apache.hadoop.yarn.csi.utils;

+ 1114 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/proto/csi.proto

@@ -0,0 +1,1114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// https://github.com/container-storage-interface/spec/blob/v0.3.0/csi.proto
+// ** v0.3 **
+
+// Code generated by make; DO NOT EDIT.
+syntax = "proto3";
+package csi.v0;
+
+import "google/protobuf/wrappers.proto";
+
+option go_package = "csi";
+service Identity {
+    rpc GetPluginInfo(GetPluginInfoRequest)
+    returns (GetPluginInfoResponse) {}
+
+    rpc GetPluginCapabilities(GetPluginCapabilitiesRequest)
+    returns (GetPluginCapabilitiesResponse) {}
+
+    rpc Probe (ProbeRequest)
+    returns (ProbeResponse) {}
+}
+
+service Controller {
+    rpc CreateVolume (CreateVolumeRequest)
+    returns (CreateVolumeResponse) {}
+
+    rpc DeleteVolume (DeleteVolumeRequest)
+    returns (DeleteVolumeResponse) {}
+
+    rpc ControllerPublishVolume (ControllerPublishVolumeRequest)
+    returns (ControllerPublishVolumeResponse) {}
+
+    rpc ControllerUnpublishVolume (ControllerUnpublishVolumeRequest)
+    returns (ControllerUnpublishVolumeResponse) {}
+
+    rpc ValidateVolumeCapabilities (ValidateVolumeCapabilitiesRequest)
+    returns (ValidateVolumeCapabilitiesResponse) {}
+
+    rpc ListVolumes (ListVolumesRequest)
+    returns (ListVolumesResponse) {}
+
+    rpc GetCapacity (GetCapacityRequest)
+    returns (GetCapacityResponse) {}
+
+    rpc ControllerGetCapabilities (ControllerGetCapabilitiesRequest)
+    returns (ControllerGetCapabilitiesResponse) {}
+
+    rpc CreateSnapshot (CreateSnapshotRequest)
+    returns (CreateSnapshotResponse) {}
+
+    rpc DeleteSnapshot (DeleteSnapshotRequest)
+    returns (DeleteSnapshotResponse) {}
+
+    rpc ListSnapshots (ListSnapshotsRequest)
+    returns (ListSnapshotsResponse) {}
+}
+
+service Node {
+    rpc NodeStageVolume (NodeStageVolumeRequest)
+    returns (NodeStageVolumeResponse) {}
+
+    rpc NodeUnstageVolume (NodeUnstageVolumeRequest)
+    returns (NodeUnstageVolumeResponse) {}
+
+    rpc NodePublishVolume (NodePublishVolumeRequest)
+    returns (NodePublishVolumeResponse) {}
+
+    rpc NodeUnpublishVolume (NodeUnpublishVolumeRequest)
+    returns (NodeUnpublishVolumeResponse) {}
+
+    // NodeGetId is being deprecated in favor of NodeGetInfo and will be
+    // removed in CSI 1.0. Existing drivers, however, may depend on this
+    // RPC call and hence this RPC call MUST be implemented by the CSI
+    // plugin prior to v1.0.
+    rpc NodeGetId (NodeGetIdRequest)
+    returns (NodeGetIdResponse) {
+        option deprecated = true;
+    }
+
+    rpc NodeGetCapabilities (NodeGetCapabilitiesRequest)
+    returns (NodeGetCapabilitiesResponse) {}
+
+    // Prior to CSI 1.0 - CSI plugins MUST implement both NodeGetId and
+    // NodeGetInfo RPC calls.
+    rpc NodeGetInfo (NodeGetInfoRequest)
+    returns (NodeGetInfoResponse) {}
+}
+message GetPluginInfoRequest {
+    // Intentionally empty.
+}
+
+message GetPluginInfoResponse {
+    // The name MUST follow reverse domain name notation format
+    // (https://en.wikipedia.org/wiki/Reverse_domain_name_notation).
+    // It SHOULD include the plugin's host company name and the plugin
+    // name, to minimize the possibility of collisions. It MUST be 63
+    // characters or less, beginning and ending with an alphanumeric
+    // character ([a-z0-9A-Z]) with dashes (-), underscores (_),
+    // dots (.), and alphanumerics between. This field is REQUIRED.
+    string name = 1;
+
+    // This field is REQUIRED. Value of this field is opaque to the CO.
+    string vendor_version = 2;
+
+    // This field is OPTIONAL. Values are opaque to the CO.
+    map<string, string> manifest = 3;
+}
+message GetPluginCapabilitiesRequest {
+    // Intentionally empty.
+}
+
+message GetPluginCapabilitiesResponse {
+    // All the capabilities that the controller service supports. This
+    // field is OPTIONAL.
+    repeated PluginCapability capabilities = 2;
+}
+
+// Specifies a capability of the plugin.
+message PluginCapability {
+    message Service {
+        enum Type {
+            UNKNOWN = 0;
+
+            // CONTROLLER_SERVICE indicates that the Plugin provides RPCs for
+            // the ControllerService. Plugins SHOULD provide this capability.
+            // In rare cases certain plugins may wish to omit the
+            // ControllerService entirely from their implementation, but such
+            // SHOULD NOT be the common case.
+            // The presence of this capability determines whether the CO will
+            // attempt to invoke the REQUIRED ControllerService RPCs, as well
+            // as specific RPCs as indicated by ControllerGetCapabilities.
+            CONTROLLER_SERVICE = 1;
+
+            // ACCESSIBILITY_CONSTRAINTS indicates that the volumes for this
+            // plugin may not be equally accessible by all nodes in the
+            // cluster. The CO MUST use the topology information returned by
+            // CreateVolumeRequest along with the topology information
+            // returned by NodeGetInfo to ensure that a given volume is
+            // accessible from a given node when scheduling workloads.
+            ACCESSIBILITY_CONSTRAINTS = 2;
+        }
+        Type type = 1;
+    }
+
+    oneof type {
+        // Service that the plugin supports.
+        Service service = 1;
+    }
+}
+message ProbeRequest {
+    // Intentionally empty.
+}
+
+message ProbeResponse {
+    // Readiness allows a plugin to report its initialization status back
+    // to the CO. Initialization for some plugins MAY be time consuming
+    // and it is important for a CO to distinguish between the following
+    // cases:
+    //
+    // 1) The plugin is in an unhealthy state and MAY need restarting. In
+    //    this case a gRPC error code SHALL be returned.
+    // 2) The plugin is still initializing, but is otherwise perfectly
+    //    healthy. In this case a successful response SHALL be returned
+    //    with a readiness value of `false`. Calls to the plugin's
+    //    Controller and/or Node services MAY fail due to an incomplete
+    //    initialization state.
+    // 3) The plugin has finished initializing and is ready to service
+    //    calls to its Controller and/or Node services. A successful
+    //    response is returned with a readiness value of `true`.
+    //
+    // This field is OPTIONAL. If not present, the caller SHALL assume
+    // that the plugin is in a ready state and is accepting calls to its
+    // Controller and/or Node services (according to the plugin's reported
+    // capabilities).
+    .google.protobuf.BoolValue ready = 1;
+}
+message CreateVolumeRequest {
+    // The suggested name for the storage space. This field is REQUIRED.
+    // It serves two purposes:
+    // 1) Idempotency - This name is generated by the CO to achieve
+    //    idempotency. If `CreateVolume` fails, the volume may or may not
+    //    be provisioned. In this case, the CO may call `CreateVolume`
+    //    again, with the same name, to ensure the volume exists. The
+    //    Plugin should ensure that multiple `CreateVolume` calls for the
+    //    same name do not result in more than one piece of storage
+    //    provisioned corresponding to that name. If a Plugin is unable to
+    //    enforce idempotency, the CO's error recovery logic could result
+    //    in multiple (unused) volumes being provisioned.
+    // 2) Suggested name - Some storage systems allow callers to specify
+    //    an identifier by which to refer to the newly provisioned
+    //    storage. If a storage system supports this, it can optionally
+    //    use this name as the identifier for the new volume.
+    string name = 1;
+
+    // This field is OPTIONAL. This allows the CO to specify the capacity
+    // requirement of the volume to be provisioned. If not specified, the
+    // Plugin MAY choose an implementation-defined capacity range. If
+    // specified it MUST always be honored, even when creating volumes
+    // from a source; which may force some backends to internally extend
+    // the volume after creating it.
+
+    CapacityRange capacity_range = 2;
+
+    // The capabilities that the provisioned volume MUST have: the Plugin
+    // MUST provision a volume that could satisfy ALL of the
+    // capabilities specified in this list. The Plugin MUST assume that
+    // the CO MAY use the  provisioned volume later with ANY of the
+    // capabilities specified in this list. This also enables the CO to do
+    // early validation: if ANY of the specified volume capabilities are
+    // not supported by the Plugin, the call SHALL fail. This field is
+    // REQUIRED.
+    repeated VolumeCapability volume_capabilities = 3;
+
+    // Plugin specific parameters passed in as opaque key-value pairs.
+    // This field is OPTIONAL. The Plugin is responsible for parsing and
+    // validating these parameters. COs will treat these as opaque.
+    map<string, string> parameters = 4;
+
+    // Secrets required by plugin to complete volume creation request.
+    // This field is OPTIONAL. Refer to the `Secrets Requirements`
+    // section on how to use this field.
+    map<string, string> controller_create_secrets = 5;
+
+    // If specified, the new volume will be pre-populated with data from
+    // this source. This field is OPTIONAL.
+    VolumeContentSource volume_content_source = 6;
+
+    // Specifies where (regions, zones, racks, etc.) the provisioned
+    // volume MUST be accessible from.
+    // An SP SHALL advertise the requirements for topological
+    // accessibility information in documentation. COs SHALL only specify
+    // topological accessibility information supported by the SP.
+    // This field is OPTIONAL.
+    // This field SHALL NOT be specified unless the SP has the
+    // ACCESSIBILITY_CONSTRAINTS plugin capability.
+    // If this field is not specified and the SP has the
+    // ACCESSIBILITY_CONSTRAINTS plugin capability, the SP MAY choose
+    // where the provisioned volume is accessible from.
+    TopologyRequirement accessibility_requirements = 7;
+}
+
+// Specifies what source the volume will be created from. One of the
+// type fields MUST be specified.
+message VolumeContentSource {
+    message SnapshotSource {
+        // Contains identity information for the existing source snapshot.
+        // This field is REQUIRED. Plugin is REQUIRED to support creating
+        // volume from snapshot if it supports the capability
+        // CREATE_DELETE_SNAPSHOT.
+        string id = 1;
+    }
+
+    oneof type {
+        SnapshotSource snapshot = 1;
+    }
+}
+
+message CreateVolumeResponse {
+    // Contains all attributes of the newly created volume that are
+    // relevant to the CO along with information required by the Plugin
+    // to uniquely identify the volume. This field is REQUIRED.
+    Volume volume = 1;
+}
+
+// Specify a capability of a volume.
+message VolumeCapability {
+    // Indicate that the volume will be accessed via the block device API.
+    message BlockVolume {
+        // Intentionally empty, for now.
+    }
+
+    // Indicate that the volume will be accessed via the filesystem API.
+    message MountVolume {
+        // The filesystem type. This field is OPTIONAL.
+        // An empty string is equal to an unspecified field value.
+        string fs_type = 1;
+
+        // The mount options that can be used for the volume. This field is
+        // OPTIONAL. `mount_flags` MAY contain sensitive information.
+        // Therefore, the CO and the Plugin MUST NOT leak this information
+        // to untrusted entities. The total size of this repeated field
+        // SHALL NOT exceed 4 KiB.
+        repeated string mount_flags = 2;
+    }
+
+    // Specify how a volume can be accessed.
+    message AccessMode {
+        enum Mode {
+            UNKNOWN = 0;
+
+            // Can only be published once as read/write on a single node, at
+            // any given time.
+            SINGLE_NODE_WRITER = 1;
+
+            // Can only be published once as readonly on a single node, at
+            // any given time.
+            SINGLE_NODE_READER_ONLY = 2;
+
+            // Can be published as readonly at multiple nodes simultaneously.
+            MULTI_NODE_READER_ONLY = 3;
+
+            // Can be published at multiple nodes simultaneously. Only one of
+            // the node can be used as read/write. The rest will be readonly.
+            MULTI_NODE_SINGLE_WRITER = 4;
+
+            // Can be published as read/write at multiple nodes
+            // simultaneously.
+            MULTI_NODE_MULTI_WRITER = 5;
+        }
+
+        // This field is REQUIRED.
+        Mode mode = 1;
+    }
+
+    // Specifies what API the volume will be accessed using. One of the
+    // following fields MUST be specified.
+    oneof access_type {
+        BlockVolume block = 1;
+        MountVolume mount = 2;
+    }
+
+    // This is a REQUIRED field.
+    AccessMode access_mode = 3;
+}
+
+// The capacity of the storage space in bytes. To specify an exact size,
+// `required_bytes` and `limit_bytes` SHALL be set to the same value. At
+// least one of the these fields MUST be specified.
+message CapacityRange {
+    // Volume MUST be at least this big. This field is OPTIONAL.
+    // A value of 0 is equal to an unspecified field value.
+    // The value of this field MUST NOT be negative.
+    int64 required_bytes = 1;
+
+    // Volume MUST not be bigger than this. This field is OPTIONAL.
+    // A value of 0 is equal to an unspecified field value.
+    // The value of this field MUST NOT be negative.
+    int64 limit_bytes = 2;
+}
+
+// The information about a provisioned volume.
+message Volume {
+    // The capacity of the volume in bytes. This field is OPTIONAL. If not
+    // set (value of 0), it indicates that the capacity of the volume is
+    // unknown (e.g., NFS share).
+    // The value of this field MUST NOT be negative.
+    int64 capacity_bytes = 1;
+
+    // Contains identity information for the created volume. This field is
+    // REQUIRED. The identity information will be used by the CO in
+    // subsequent calls to refer to the provisioned volume.
+    string id = 2;
+
+    // Attributes reflect static properties of a volume and MUST be passed
+    // to volume validation and publishing calls.
+    // Attributes SHALL be opaque to a CO. Attributes SHALL NOT be mutable
+    // and SHALL be safe for the CO to cache. Attributes SHOULD NOT
+    // contain sensitive information. Attributes MAY NOT uniquely identify
+    // a volume. A volume uniquely identified by `id` SHALL always report
+    // the same attributes. This field is OPTIONAL and when present MUST
+    // be passed to volume validation and publishing calls.
+    map<string, string> attributes = 3;
+
+    // If specified, indicates that the volume is not empty and is
+    // pre-populated with data from the specified source.
+    // This field is OPTIONAL.
+    VolumeContentSource content_source = 4;
+
+    // Specifies where (regions, zones, racks, etc.) the provisioned
+    // volume is accessible from.
+    // A plugin that returns this field MUST also set the
+    // ACCESSIBILITY_CONSTRAINTS plugin capability.
+    // An SP MAY specify multiple topologies to indicate the volume is
+    // accessible from multiple locations.
+    // COs MAY use this information along with the topology information
+    // returned by NodeGetInfo to ensure that a given volume is accessible
+    // from a given node when scheduling workloads.
+    // This field is OPTIONAL. If it is not specified, the CO MAY assume
+    // the volume is equally accessible from all nodes in the cluster and
+    // may schedule workloads referencing the volume on any available
+    // node.
+    //
+    // Example 1:
+    //   accessible_topology = {"region": "R1", "zone": "Z2"}
+    // Indicates a volume accessible only from the "region" "R1" and the
+    // "zone" "Z2".
+    //
+    // Example 2:
+    //   accessible_topology =
+    //     {"region": "R1", "zone": "Z2"},
+    //     {"region": "R1", "zone": "Z3"}
+    // Indicates a volume accessible from both "zone" "Z2" and "zone" "Z3"
+    // in the "region" "R1".
+    repeated Topology accessible_topology = 5;
+}
+
+message TopologyRequirement {
+    // Specifies the list of topologies the provisioned volume MUST be
+    // accessible from.
+    // This field is OPTIONAL. If TopologyRequirement is specified either
+    // requisite or preferred or both MUST be specified.
+    //
+    // If requisite is specified, the provisioned volume MUST be
+    // accessible from at least one of the requisite topologies.
+    //
+    // Given
+    //   x = number of topologies provisioned volume is accessible from
+    //   n = number of requisite topologies
+    // The CO MUST ensure n >= 1. The SP MUST ensure x >= 1
+    // If x==n, than the SP MUST make the provisioned volume available to
+    // all topologies from the list of requisite topologies. If it is
+    // unable to do so, the SP MUST fail the CreateVolume call.
+    // For example, if a volume should be accessible from a single zone,
+    // and requisite =
+    //   {"region": "R1", "zone": "Z2"}
+    // then the provisioned volume MUST be accessible from the "region"
+    // "R1" and the "zone" "Z2".
+    // Similarly, if a volume should be accessible from two zones, and
+    // requisite =
+    //   {"region": "R1", "zone": "Z2"},
+    //   {"region": "R1", "zone": "Z3"}
+    // then the provisioned volume MUST be accessible from the "region"
+    // "R1" and both "zone" "Z2" and "zone" "Z3".
+    //
+    // If x<n, than the SP SHALL choose x unique topologies from the list
+    // of requisite topologies. If it is unable to do so, the SP MUST fail
+    // the CreateVolume call.
+    // For example, if a volume should be accessible from a single zone,
+    // and requisite =
+    //   {"region": "R1", "zone": "Z2"},
+    //   {"region": "R1", "zone": "Z3"}
+    // then the SP may choose to make the provisioned volume available in
+    // either the "zone" "Z2" or the "zone" "Z3" in the "region" "R1".
+    // Similarly, if a volume should be accessible from two zones, and
+    // requisite =
+    //   {"region": "R1", "zone": "Z2"},
+    //   {"region": "R1", "zone": "Z3"},
+    //   {"region": "R1", "zone": "Z4"}
+    // then the provisioned volume MUST be accessible from any combination
+    // of two unique topologies: e.g. "R1/Z2" and "R1/Z3", or "R1/Z2" and
+    //  "R1/Z4", or "R1/Z3" and "R1/Z4".
+    //
+    // If x>n, than the SP MUST make the provisioned volume available from
+    // all topologies from the list of requisite topologies and MAY choose
+    // the remaining x-n unique topologies from the list of all possible
+    // topologies. If it is unable to do so, the SP MUST fail the
+    // CreateVolume call.
+    // For example, if a volume should be accessible from two zones, and
+    // requisite =
+    //   {"region": "R1", "zone": "Z2"}
+    // then the provisioned volume MUST be accessible from the "region"
+    // "R1" and the "zone" "Z2" and the SP may select the second zone
+    // independently, e.g. "R1/Z4".
+    repeated Topology requisite = 1;
+
+    // Specifies the list of topologies the CO would prefer the volume to
+    // be provisioned in.
+    //
+    // This field is OPTIONAL. If TopologyRequirement is specified either
+    // requisite or preferred or both MUST be specified.
+    //
+    // An SP MUST attempt to make the provisioned volume available using
+    // the preferred topologies in order from first to last.
+    //
+    // If requisite is specified, all topologies in preferred list MUST
+    // also be present in the list of requisite topologies.
+    //
+    // If the SP is unable to to make the provisioned volume available
+    // from any of the preferred topologies, the SP MAY choose a topology
+    // from the list of requisite topologies.
+    // If the list of requisite topologies is not specified, then the SP
+    // MAY choose from the list of all possible topologies.
+    // If the list of requisite topologies is specified and the SP is
+    // unable to to make the provisioned volume available from any of the
+    // requisite topologies it MUST fail the CreateVolume call.
+    //
+    // Example 1:
+    // Given a volume should be accessible from a single zone, and
+    // requisite =
+    //   {"region": "R1", "zone": "Z2"},
+    //   {"region": "R1", "zone": "Z3"}
+    // preferred =
+    //   {"region": "R1", "zone": "Z3"}
+    // then the the SP SHOULD first attempt to make the provisioned volume
+    // available from "zone" "Z3" in the "region" "R1" and fall back to
+    // "zone" "Z2" in the "region" "R1" if that is not possible.
+    //
+    // Example 2:
+    // Given a volume should be accessible from a single zone, and
+    // requisite =
+    //   {"region": "R1", "zone": "Z2"},
+    //   {"region": "R1", "zone": "Z3"},
+    //   {"region": "R1", "zone": "Z4"},
+    //   {"region": "R1", "zone": "Z5"}
+    // preferred =
+    //   {"region": "R1", "zone": "Z4"},
+    //   {"region": "R1", "zone": "Z2"}
+    // then the the SP SHOULD first attempt to make the provisioned volume
+    // accessible from "zone" "Z4" in the "region" "R1" and fall back to
+    // "zone" "Z2" in the "region" "R1" if that is not possible. If that
+    // is not possible, the SP may choose between either the "zone"
+    // "Z3" or "Z5" in the "region" "R1".
+    //
+    // Example 3:
+    // Given a volume should be accessible from TWO zones (because an
+    // opaque parameter in CreateVolumeRequest, for example, specifies
+    // the volume is accessible from two zones, aka synchronously
+    // replicated), and
+    // requisite =
+    //   {"region": "R1", "zone": "Z2"},
+    //   {"region": "R1", "zone": "Z3"},
+    //   {"region": "R1", "zone": "Z4"},
+    //   {"region": "R1", "zone": "Z5"}
+    // preferred =
+    //   {"region": "R1", "zone": "Z5"},
+    //   {"region": "R1", "zone": "Z3"}
+    // then the the SP SHOULD first attempt to make the provisioned volume
+    // accessible from the combination of the two "zones" "Z5" and "Z3" in
+    // the "region" "R1". If that's not possible, it should fall back to
+    // a combination of "Z5" and other possibilities from the list of
+    // requisite. If that's not possible, it should fall back  to a
+    // combination of "Z3" and other possibilities from the list of
+    // requisite. If that's not possible, it should fall back  to a
+    // combination of other possibilities from the list of requisite.
+    repeated Topology preferred = 2;
+}
+
+// Topology is a map of topological domains to topological segments.
+// A topological domain is a sub-division of a cluster, like "region",
+// "zone", "rack", etc.
+// A topological segment is a specific instance of a topological domain,
+// like "zone3", "rack3", etc.
+// For example {"com.company/zone": "Z1", "com.company/rack": "R3"}
+// Valid keys have two segments: an optional prefix and name, separated
+// by a slash (/), for example: "com.company.example/zone".
+// The key name segment is required. The prefix is optional.
+// Both the key name and the prefix MUST each be 63 characters or less,
+// begin and end with an alphanumeric character ([a-z0-9A-Z]) and
+// contain only dashes (-), underscores (_), dots (.), or alphanumerics
+// in between, for example "zone".
+// The key prefix MUST follow reverse domain name notation format
+// (https://en.wikipedia.org/wiki/Reverse_domain_name_notation).
+// The key prefix SHOULD include the plugin's host company name and/or
+// the plugin name, to minimize the possibility of collisions with keys
+// from other plugins.
+// If a key prefix is specified, it MUST be identical across all
+// topology keys returned by the SP (across all RPCs).
+// Keys MUST be case-insensitive. Meaning the keys "Zone" and "zone"
+// MUST not both exist.
+// Each value (topological segment) MUST contain 1 or more strings.
+// Each string MUST be 63 characters or less and begin and end with an
+// alphanumeric character with '-', '_', '.', or alphanumerics in
+// between.
+message Topology {
+    map<string, string> segments = 1;
+}
+message DeleteVolumeRequest {
+    // The ID of the volume to be deprovisioned.
+    // This field is REQUIRED.
+    string volume_id = 1;
+
+    // Secrets required by plugin to complete volume deletion request.
+    // This field is OPTIONAL. Refer to the `Secrets Requirements`
+    // section on how to use this field.
+    map<string, string> controller_delete_secrets = 2;
+}
+
+message DeleteVolumeResponse {
+    // Intentionally empty.
+}
+message ControllerPublishVolumeRequest {
+    // The ID of the volume to be used on a node.
+    // This field is REQUIRED.
+    string volume_id = 1;
+
+    // The ID of the node. This field is REQUIRED. The CO SHALL set this
+    // field to match the node ID returned by `NodeGetInfo`.
+    string node_id = 2;
+
+    // The capability of the volume the CO expects the volume to have.
+    // This is a REQUIRED field.
+    VolumeCapability volume_capability = 3;
+
+    // Whether to publish the volume in readonly mode. This field is
+    // REQUIRED.
+    bool readonly = 4;
+
+    // Secrets required by plugin to complete controller publish volume
+    // request. This field is OPTIONAL. Refer to the
+    // `Secrets Requirements` section on how to use this field.
+    map<string, string> controller_publish_secrets = 5;
+
+    // Attributes of the volume to be used on a node. This field is
+    // OPTIONAL and MUST match the attributes of the Volume identified
+    // by `volume_id`.
+    map<string, string> volume_attributes = 6;
+}
+
+message ControllerPublishVolumeResponse {
+    // The SP specific information that will be passed to the Plugin in
+    // the subsequent `NodeStageVolume` or `NodePublishVolume` calls
+    // for the given volume.
+    // This information is opaque to the CO. This field is OPTIONAL.
+    map<string, string> publish_info = 1;
+}
+message ControllerUnpublishVolumeRequest {
+    // The ID of the volume. This field is REQUIRED.
+    string volume_id = 1;
+
+    // The ID of the node. This field is OPTIONAL. The CO SHOULD set this
+    // field to match the node ID returned by `NodeGetInfo` or leave it
+    // unset. If the value is set, the SP MUST unpublish the volume from
+    // the specified node. If the value is unset, the SP MUST unpublish
+    // the volume from all nodes it is published to.
+    string node_id = 2;
+
+    // Secrets required by plugin to complete controller unpublish volume
+    // request. This SHOULD be the same secrets passed to the
+    // ControllerPublishVolume call for the specified volume.
+    // This field is OPTIONAL. Refer to the `Secrets Requirements`
+    // section on how to use this field.
+    map<string, string> controller_unpublish_secrets = 3;
+}
+
+message ControllerUnpublishVolumeResponse {
+    // Intentionally empty.
+}
+message ValidateVolumeCapabilitiesRequest {
+    // The ID of the volume to check. This field is REQUIRED.
+    string volume_id = 1;
+
+    // The capabilities that the CO wants to check for the volume. This
+    // call SHALL return "supported" only if all the volume capabilities
+    // specified below are supported. This field is REQUIRED.
+    repeated VolumeCapability volume_capabilities = 2;
+
+    // Attributes of the volume to check. This field is OPTIONAL and MUST
+    // match the attributes of the Volume identified by `volume_id`.
+    map<string, string> volume_attributes = 3;
+
+    // Specifies where (regions, zones, racks, etc.) the caller believes
+    // the volume is accessible from.
+    // A caller MAY specify multiple topologies to indicate they believe
+    // the volume to be accessible from multiple locations.
+    // This field is OPTIONAL. This field SHALL NOT be set unless the
+    // plugin advertises the ACCESSIBILITY_CONSTRAINTS capability.
+    repeated Topology accessible_topology = 4;
+}
+
+message ValidateVolumeCapabilitiesResponse {
+    // True if the Plugin supports the specified capabilities for the
+    // given volume. This field is REQUIRED.
+    bool supported = 1;
+
+    // Message to the CO if `supported` above is false. This field is
+    // OPTIONAL.
+    // An empty string is equal to an unspecified field value.
+    string message = 2;
+}
+message ListVolumesRequest {
+    // If specified (non-zero value), the Plugin MUST NOT return more
+    // entries than this number in the response. If the actual number of
+    // entries is more than this number, the Plugin MUST set `next_token`
+    // in the response which can be used to get the next page of entries
+    // in the subsequent `ListVolumes` call. This field is OPTIONAL. If
+    // not specified (zero value), it means there is no restriction on the
+    // number of entries that can be returned.
+    // The value of this field MUST NOT be negative.
+    int32 max_entries = 1;
+
+    // A token to specify where to start paginating. Set this field to
+    // `next_token` returned by a previous `ListVolumes` call to get the
+    // next page of entries. This field is OPTIONAL.
+    // An empty string is equal to an unspecified field value.
+    string starting_token = 2;
+}
+
+message ListVolumesResponse {
+    message Entry {
+        Volume volume = 1;
+    }
+
+    repeated Entry entries = 1;
+
+    // This token allows you to get the next page of entries for
+    // `ListVolumes` request. If the number of entries is larger than
+    // `max_entries`, use the `next_token` as a value for the
+    // `starting_token` field in the next `ListVolumes` request. This
+    // field is OPTIONAL.
+    // An empty string is equal to an unspecified field value.
+    string next_token = 2;
+}
+message GetCapacityRequest {
+    // If specified, the Plugin SHALL report the capacity of the storage
+    // that can be used to provision volumes that satisfy ALL of the
+    // specified `volume_capabilities`. These are the same
+    // `volume_capabilities` the CO will use in `CreateVolumeRequest`.
+    // This field is OPTIONAL.
+    repeated VolumeCapability volume_capabilities = 1;
+
+    // If specified, the Plugin SHALL report the capacity of the storage
+    // that can be used to provision volumes with the given Plugin
+    // specific `parameters`. These are the same `parameters` the CO will
+    // use in `CreateVolumeRequest`. This field is OPTIONAL.
+    map<string, string> parameters = 2;
+
+    // If specified, the Plugin SHALL report the capacity of the storage
+    // that can be used to provision volumes that in the specified
+    // `accessible_topology`. This is the same as the
+    // `accessible_topology` the CO returns in a `CreateVolumeResponse`.
+    // This field is OPTIONAL. This field SHALL NOT be set unless the
+    // plugin advertises the ACCESSIBILITY_CONSTRAINTS capability.
+    Topology accessible_topology = 3;
+}
+
+message GetCapacityResponse {
+    // The available capacity, in bytes, of the storage that can be used
+    // to provision volumes. If `volume_capabilities` or `parameters` is
+    // specified in the request, the Plugin SHALL take those into
+    // consideration when calculating the available capacity of the
+    // storage. This field is REQUIRED.
+    // The value of this field MUST NOT be negative.
+    int64 available_capacity = 1;
+}
+message ControllerGetCapabilitiesRequest {
+    // Intentionally empty.
+}
+
+message ControllerGetCapabilitiesResponse {
+    // All the capabilities that the controller service supports. This
+    // field is OPTIONAL.
+    repeated ControllerServiceCapability capabilities = 2;
+}
+
+// Specifies a capability of the controller service.
+message ControllerServiceCapability {
+    message RPC {
+        enum Type {
+            UNKNOWN = 0;
+            CREATE_DELETE_VOLUME = 1;
+            PUBLISH_UNPUBLISH_VOLUME = 2;
+            LIST_VOLUMES = 3;
+            GET_CAPACITY = 4;
+            // Currently the only way to consume a snapshot is to create
+            // a volume from it. Therefore plugins supporting
+            // CREATE_DELETE_SNAPSHOT MUST support creating volume from
+            // snapshot.
+            CREATE_DELETE_SNAPSHOT = 5;
+            // LIST_SNAPSHOTS is NOT REQUIRED. For plugins that need to upload
+            // a snapshot after it is being cut, LIST_SNAPSHOTS COULD be used
+            // with the snapshot_id as the filter to query whether the
+            // uploading process is complete or not.
+            LIST_SNAPSHOTS = 6;
+        }
+
+        Type type = 1;
+    }
+
+    oneof type {
+        // RPC that the controller supports.
+        RPC rpc = 1;
+    }
+}
+message CreateSnapshotRequest {
+    // The ID of the source volume to be snapshotted.
+    // This field is REQUIRED.
+    string source_volume_id = 1;
+
+    // The suggested name for the snapshot. This field is REQUIRED for
+    // idempotency.
+    string name = 2;
+
+    // Secrets required by plugin to complete snapshot creation request.
+    // This field is OPTIONAL. Refer to the `Secrets Requirements`
+    // section on how to use this field.
+    map<string, string> create_snapshot_secrets = 3;
+
+    // Plugin specific parameters passed in as opaque key-value pairs.
+    // This field is OPTIONAL. The Plugin is responsible for parsing and
+    // validating these parameters. COs will treat these as opaque.
+    // Use cases for opaque parameters:
+    // - Specify a policy to automatically clean up the snapshot.
+    // - Specify an expiration date for the snapshot.
+    // - Specify whether the snapshot is readonly or read/write.
+    // - Specify if the snapshot should be replicated to some place.
+    // - Specify primary or secondary for replication systems that
+    //   support snapshotting only on primary.
+    map<string, string> parameters = 4;
+}
+
+message CreateSnapshotResponse {
+    // Contains all attributes of the newly created snapshot that are
+    // relevant to the CO along with information required by the Plugin
+    // to uniquely identify the snapshot. This field is REQUIRED.
+    Snapshot snapshot = 1;
+}
+
+// The information about a provisioned snapshot.
+message Snapshot {
+    // This is the complete size of the snapshot in bytes. The purpose of
+    // this field is to give CO guidance on how much space is needed to
+    // create a volume from this snapshot. The size of the volume MUST NOT
+    // be less than the size of the source snapshot. This field is
+    // OPTIONAL. If this field is not set, it indicates that this size is
+    // unknown. The value of this field MUST NOT be negative and a size of
+    // zero means it is unspecified.
+    int64 size_bytes = 1;
+
+    // Uniquely identifies a snapshot and is generated by the plugin. It
+    // will not change over time. This field is REQUIRED. The identity
+    // information will be used by the CO in subsequent calls to refer to
+    // the provisioned snapshot.
+    string id = 2;
+
+    // Identity information for the source volume. Note that creating a
+    // snapshot from a snapshot is not supported here so the source has to
+    // be a volume. This field is REQUIRED.
+    string source_volume_id = 3;
+
+    // Timestamp when the point-in-time snapshot is taken on the storage
+    // system. The format of this field should be a Unix nanoseconds time
+    // encoded as an int64. On Unix, the command `date +%s%N` returns the
+    // current time in nanoseconds since 1970-01-01 00:00:00 UTC. This
+    // field is REQUIRED.
+    int64 created_at = 4;
+
+    // The status of a snapshot.
+    SnapshotStatus status = 5;
+}
+
+// The status of a snapshot.
+message SnapshotStatus {
+    enum Type {
+        UNKNOWN = 0;
+        // A snapshot is ready for use.
+        READY = 1;
+        // A snapshot is cut and is now being uploaded.
+        // Some cloud providers and storage systems uploads the snapshot
+        // to the cloud after the snapshot is cut. During this phase,
+        // `thaw` can be done so the application can be running again if
+        // `freeze` was done before taking the snapshot.
+        UPLOADING = 2;
+        // An error occurred during the snapshot uploading process.
+        // This error status is specific for uploading because
+        // `CreateSnaphot` is a blocking call before the snapshot is
+        // cut and therefore it SHOULD NOT come back with an error
+        // status when an error occurs. Instead a gRPC error code SHALL
+        // be returned by `CreateSnapshot` when an error occurs before
+        // a snapshot is cut.
+        ERROR_UPLOADING = 3;
+    }
+    // This field is REQUIRED.
+    Type type = 1;
+
+    // Additional information to describe why a snapshot ended up in the
+    // `ERROR_UPLOADING` status. This field is OPTIONAL.
+    string details = 2;
+}
+message DeleteSnapshotRequest {
+    // The ID of the snapshot to be deleted.
+    // This field is REQUIRED.
+    string snapshot_id = 1;
+
+    // Secrets required by plugin to complete snapshot deletion request.
+    // This field is OPTIONAL. Refer to the `Secrets Requirements`
+    // section on how to use this field.
+    map<string, string> delete_snapshot_secrets = 2;
+}
+
+message DeleteSnapshotResponse {}
+// List all snapshots on the storage system regardless of how they were
+// created.
+message ListSnapshotsRequest {
+    // If specified (non-zero value), the Plugin MUST NOT return more
+    // entries than this number in the response. If the actual number of
+    // entries is more than this number, the Plugin MUST set `next_token`
+    // in the response which can be used to get the next page of entries
+    // in the subsequent `ListSnapshots` call. This field is OPTIONAL. If
+    // not specified (zero value), it means there is no restriction on the
+    // number of entries that can be returned.
+    // The value of this field MUST NOT be negative.
+    int32 max_entries = 1;
+
+    // A token to specify where to start paginating. Set this field to
+    // `next_token` returned by a previous `ListSnapshots` call to get the
+    // next page of entries. This field is OPTIONAL.
+    // An empty string is equal to an unspecified field value.
+    string starting_token = 2;
+
+    // Identity information for the source volume. This field is OPTIONAL.
+    // It can be used to list snapshots by volume.
+    string source_volume_id = 3;
+
+    // Identity information for a specific snapshot. This field is
+    // OPTIONAL. It can be used to list only a specific snapshot.
+    // ListSnapshots will return with current snapshot information
+    // and will not block if the snapshot is being uploaded.
+    string snapshot_id = 4;
+}
+
+message ListSnapshotsResponse {
+    message Entry {
+        Snapshot snapshot = 1;
+    }
+
+    repeated Entry entries = 1;
+
+    // This token allows you to get the next page of entries for
+    // `ListSnapshots` request. If the number of entries is larger than
+    // `max_entries`, use the `next_token` as a value for the
+    // `starting_token` field in the next `ListSnapshots` request. This
+    // field is OPTIONAL.
+    // An empty string is equal to an unspecified field value.
+    string next_token = 2;
+}
+message NodeStageVolumeRequest {
+    // The ID of the volume to publish. This field is REQUIRED.
+    string volume_id = 1;
+
+    // The CO SHALL set this field to the value returned by
+    // `ControllerPublishVolume` if the corresponding Controller Plugin
+    // has `PUBLISH_UNPUBLISH_VOLUME` controller capability, and SHALL be
+    // left unset if the corresponding Controller Plugin does not have
+    // this capability. This is an OPTIONAL field.
+    map<string, string> publish_info = 2;
+
+    // The path to which the volume will be published. It MUST be an
+    // absolute path in the root filesystem of the process serving this
+    // request. The CO SHALL ensure that there is only one
+    // staging_target_path per volume.
+    // This is a REQUIRED field.
+    string staging_target_path = 3;
+
+    // The capability of the volume the CO expects the volume to have.
+    // This is a REQUIRED field.
+    VolumeCapability volume_capability = 4;
+
+    // Secrets required by plugin to complete node stage volume request.
+    // This field is OPTIONAL. Refer to the `Secrets Requirements`
+    // section on how to use this field.
+    map<string, string> node_stage_secrets = 5;
+
+    // Attributes of the volume to publish. This field is OPTIONAL and
+    // MUST match the attributes of the `Volume` identified by
+    // `volume_id`.
+    map<string, string> volume_attributes = 6;
+}
+
+message NodeStageVolumeResponse {
+    // Intentionally empty.
+}
+message NodeUnstageVolumeRequest {
+    // The ID of the volume. This field is REQUIRED.
+    string volume_id = 1;
+
+    // The path at which the volume was published. It MUST be an absolute
+    // path in the root filesystem of the process serving this request.
+    // This is a REQUIRED field.
+    string staging_target_path = 2;
+}
+
+message NodeUnstageVolumeResponse {
+    // Intentionally empty.
+}
+message NodePublishVolumeRequest {
+    // The ID of the volume to publish. This field is REQUIRED.
+    string volume_id = 1;
+
+    // The CO SHALL set this field to the value returned by
+    // `ControllerPublishVolume` if the corresponding Controller Plugin
+    // has `PUBLISH_UNPUBLISH_VOLUME` controller capability, and SHALL be
+    // left unset if the corresponding Controller Plugin does not have
+    // this capability. This is an OPTIONAL field.
+    map<string, string> publish_info = 2;
+
+    // The path to which the device was mounted by `NodeStageVolume`.
+    // It MUST be an absolute path in the root filesystem of the process
+    // serving this request.
+    // It MUST be set if the Node Plugin implements the
+    // `STAGE_UNSTAGE_VOLUME` node capability.
+    // This is an OPTIONAL field.
+    string staging_target_path = 3;
+
+    // The path to which the volume will be published. It MUST be an
+    // absolute path in the root filesystem of the process serving this
+    // request. The CO SHALL ensure uniqueness of target_path per volume.
+    // The CO SHALL ensure that the path exists, and that the process
+    // serving the request has `read` and `write` permissions to the path.
+    // This is a REQUIRED field.
+    string target_path = 4;
+
+    // The capability of the volume the CO expects the volume to have.
+    // This is a REQUIRED field.
+    VolumeCapability volume_capability = 5;
+
+    // Whether to publish the volume in readonly mode. This field is
+    // REQUIRED.
+    bool readonly = 6;
+
+    // Secrets required by plugin to complete node publish volume request.
+    // This field is OPTIONAL. Refer to the `Secrets Requirements`
+    // section on how to use this field.
+    map<string, string> node_publish_secrets = 7;
+
+    // Attributes of the volume to publish. This field is OPTIONAL and
+    // MUST match the attributes of the Volume identified by
+    // `volume_id`.
+    map<string, string> volume_attributes = 8;
+}
+
+message NodePublishVolumeResponse {
+    // Intentionally empty.
+}
+message NodeUnpublishVolumeRequest {
+    // The ID of the volume. This field is REQUIRED.
+    string volume_id = 1;
+
+    // The path at which the volume was published. It MUST be an absolute
+    // path in the root filesystem of the process serving this request.
+    // This is a REQUIRED field.
+    string target_path = 2;
+}
+
+message NodeUnpublishVolumeResponse {
+    // Intentionally empty.
+}
+message NodeGetIdRequest {
+    // Intentionally empty.
+}
+
+message NodeGetIdResponse {
+    // The ID of the node as understood by the SP which SHALL be used by
+    // CO in subsequent `ControllerPublishVolume`.
+    // This is a REQUIRED field.
+    string node_id = 1;
+}
+message NodeGetCapabilitiesRequest {
+    // Intentionally empty.
+}
+
+message NodeGetCapabilitiesResponse {
+    // All the capabilities that the node service supports. This field
+    // is OPTIONAL.
+    repeated NodeServiceCapability capabilities = 1;
+}
+
+// Specifies a capability of the node service.
+message NodeServiceCapability {
+    message RPC {
+        enum Type {
+            UNKNOWN = 0;
+            STAGE_UNSTAGE_VOLUME = 1;
+        }
+
+        Type type = 1;
+    }
+
+    oneof type {
+        // RPC that the controller supports.
+        RPC rpc = 1;
+    }
+}
+message NodeGetInfoRequest {
+}
+
+message NodeGetInfoResponse {
+    // The ID of the node as understood by the SP which SHALL be used by
+    // CO in subsequent calls to `ControllerPublishVolume`.
+    // This is a REQUIRED field.
+    string node_id = 1;
+
+    // Maximum number of volumes that controller can publish to the node.
+    // If value is not set or zero CO SHALL decide how many volumes of
+    // this type can be published by the controller to the node. The
+    // plugin MUST NOT set negative values here.
+    // This field is OPTIONAL.
+    int64 max_volumes_per_node = 2;
+
+    // Specifies where (regions, zones, racks, etc.) the node is
+    // accessible from.
+    // A plugin that returns this field MUST also set the
+    // ACCESSIBILITY_CONSTRAINTS plugin capability.
+    // COs MAY use this information along with the topology information
+    // returned in CreateVolumeResponse to ensure that a given volume is
+    // accessible from a given node when scheduling workloads.
+    // This field is OPTIONAL. If it is not specified, the CO MAY assume
+    // the node is not subject to any topological constraint, and MAY
+    // schedule workloads that reference any volume V, such that there are
+    // no topological constraints declared for V.
+    //
+    // Example 1:
+    //   accessible_topology =
+    //     {"region": "R1", "zone": "R2"}
+    // Indicates the node exists within the "region" "R1" and the "zone"
+    // "Z2".
+    Topology accessible_topology = 3;
+}

+ 65 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/client/FakeCsiDriver.java

@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.csi.client;
+
+import io.grpc.Server;
+import io.grpc.netty.NettyServerBuilder;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerDomainSocketChannel;
+import org.apache.hadoop.yarn.csi.utils.GrpcHelper;
+
+import java.io.IOException;
+import java.util.logging.Logger;
+
+/**
+ * A fake implementation of CSI driver.
+ * This is for testing purpose only.
+ */
+public class FakeCsiDriver {
+
+  private static final Logger LOG = Logger
+      .getLogger(FakeCsiDriver.class.getName());
+
+  private Server server;
+  private String socketAddress;
+
+  public FakeCsiDriver(String socketAddress) {
+    this.socketAddress = socketAddress;
+  }
+
+  public void start() throws IOException {
+    EpollEventLoopGroup group = new EpollEventLoopGroup();
+    server = NettyServerBuilder
+        .forAddress(GrpcHelper.getSocketAddress(socketAddress))
+        .channelType(EpollServerDomainSocketChannel.class)
+        .workerEventLoopGroup(group)
+        .bossEventLoopGroup(group)
+        .addService(new FakeCsiIdentityService())
+        .build();
+    server.start();
+    LOG.info("Server started, listening on " + socketAddress);
+  }
+
+  public void stop() {
+    if (server != null) {
+      server.shutdown();
+      LOG.info("Server has been shutdown");
+    }
+  }
+}

+ 42 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/client/FakeCsiIdentityService.java

@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.csi.client;
+
+import csi.v0.Csi.GetPluginInfoRequest;
+import csi.v0.Csi.GetPluginInfoResponse;
+import csi.v0.IdentityGrpc;
+import io.grpc.stub.StreamObserver;
+
+/**
+ * A fake implementation of CSI identity plugin gRPC service.
+ * This is for testing purpose only.
+ */
+public class FakeCsiIdentityService extends IdentityGrpc.IdentityImplBase {
+
+  @Override
+  public void getPluginInfo(GetPluginInfoRequest request,
+      StreamObserver<GetPluginInfoResponse> responseObserver) {
+    GetPluginInfoResponse response = GetPluginInfoResponse.newBuilder()
+        .setName("fake-csi-identity-service")
+        .setVendorVersion("0.1")
+        .build();
+    responseObserver.onNext(response);
+    responseObserver.onCompleted();
+  }
+}

+ 77 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/client/TestCsiClient.java

@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.csi.client;
+
+import csi.v0.Csi;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Test class for CSI client.
+ */
+public class TestCsiClient {
+
+  private static File testRoot = null;
+  private static String domainSocket = null;
+  private static FakeCsiDriver driver = null;
+
+  @BeforeClass
+  public static void setUp() throws IOException {
+    testRoot = GenericTestUtils.getTestDir("csi-test");
+    File socketPath = new File(testRoot, "csi.sock");
+    FileUtils.forceMkdirParent(socketPath);
+    domainSocket = "unix://" + socketPath.getAbsolutePath();
+    driver = new FakeCsiDriver(domainSocket);
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    if (testRoot != null) {
+      FileUtils.deleteDirectory(testRoot);
+    }
+  }
+
+  @Before
+  public void beforeMethod() {
+    // Skip tests on non-linux systems
+    String osName = System.getProperty("os.name").toLowerCase();
+    Assume.assumeTrue(osName.contains("nix") || osName.contains("nux"));
+  }
+
+  @Test
+  public void testIdentityService() throws IOException {
+    try {
+      driver.start();
+      CsiClient client = new CsiClientImpl(domainSocket);
+      Csi.GetPluginInfoResponse response = client.getPluginInfo();
+      Assert.assertEquals("fake-csi-identity-service", response.getName());
+    } finally {
+      driver.stop();
+    }
+  }
+}

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/client/package-info.java

@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains classes for CSI client library testing.
+ */
+package org.apache.hadoop.yarn.csi.client;

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/pom.xml

@@ -244,5 +244,6 @@
     <module>hadoop-yarn-client</module>
     <module>hadoop-yarn-registry</module>
     <module>hadoop-yarn-ui</module>
+    <module>hadoop-yarn-csi</module>
   </modules>
 </project>