Przeglądaj źródła

HDDS-1382. Create customized CSI server for Ozone

Closes #693
Márton Elek 6 lat temu
rodzic
commit
1ae062c818

+ 6 - 1
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java

@@ -226,7 +226,12 @@ public final class HddsUtils {
     if ((value == null) || value.isEmpty()) {
       return Optional.empty();
     }
-    return Optional.of(HostAndPort.fromString(value).getHostText());
+    String hostname = value.replaceAll("\\:[0-9]+$", "");
+    if (hostname.length() == 0) {
+      return Optional.empty();
+    } else {
+      return Optional.of(hostname);
+    }
   }
 
   /**

+ 42 - 0
hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java

@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds;
+
+import java.util.Optional;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Testing HddsUtils.
+ */
+public class TestHddsUtils {
+
+  @Test
+  public void testGetHostName() {
+    Assert.assertEquals(Optional.of("localhost"),
+        HddsUtils.getHostName("localhost:1234"));
+
+    Assert.assertEquals(Optional.of("localhost"),
+        HddsUtils.getHostName("localhost"));
+
+    Assert.assertEquals(Optional.empty(),
+        HddsUtils.getHostName(":1234"));
+  }
+
+}

+ 6 - 0
hadoop-ozone/common/src/main/bin/ozone

@@ -46,6 +46,7 @@ function hadoop_usage
   hadoop_add_subcommand "om" daemon "Ozone Manager"
   hadoop_add_subcommand "scm" daemon "run the Storage Container Manager service"
   hadoop_add_subcommand "s3g" daemon "run the S3 compatible REST gateway"
+  hadoop_add_subcommand "csi" daemon "run the standalone CSI daemon"
   hadoop_add_subcommand "recon" daemon "run the Recon service"
   hadoop_add_subcommand "scmcli" client "run the CLI of the Storage Container Manager"
   hadoop_add_subcommand "sh" client "command line interface for object store operations"
@@ -154,6 +155,11 @@ function ozonecmd_case
       HADOOP_CLASSNAME='org.apache.hadoop.ozone.s3.Gateway'
       OZONE_RUN_ARTIFACT_NAME="hadoop-ozone-s3gateway"
     ;;
+    csi)
+      HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
+      HADOOP_CLASSNAME='org.apache.hadoop.ozone.csi.CsiServer'
+      OZONE_RUN_ARTIFACT_NAME="hadoop-ozone-csi"
+    ;;
     recon)
       HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
       HADOOP_CLASSNAME='org.apache.hadoop.ozone.recon.ReconServer'

+ 22 - 0
hadoop-ozone/csi/dev-support/findbugsExcludeFile.xml

@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<FindBugsFilter>
+  <Match>
+    <Package name="csi.v1"/>
+  </Match>
+</FindBugsFilter>

+ 169 - 0
hadoop-ozone/csi/pom.xml

@@ -0,0 +1,169 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-ozone</artifactId>
+    <version>0.5.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>hadoop-ozone-csi</artifactId>
+  <version>0.5.0-SNAPSHOT</version>
+  <description>Apache Hadoop Ozone CSI service</description>
+  <name>Apache Hadoop Ozone CSI service</name>
+  <packaging>jar</packaging>
+
+  <properties>
+    <grpc.version>1.17.1</grpc.version>
+  </properties>
+  <dependencies>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java-util</artifactId>
+      <version>3.5.1</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-java</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdds-config</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>26.0-android</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>3.5.1</version>
+    </dependency>
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-netty</artifactId>
+      <version>${grpc.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-transport-native-epoll</artifactId>
+      <version>4.1.30.Final</version>
+    </dependency>
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-protobuf</artifactId>
+      <version>${grpc.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-java</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-stub</artifactId>
+      <version>${grpc.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-ozone-client</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-java</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-all</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+
+
+  <build>
+    <extensions>
+      <extension>
+        <groupId>kr.motd.maven</groupId>
+        <artifactId>os-maven-plugin</artifactId>
+        <version>${os-maven-plugin.version}</version>
+      </extension>
+    </extensions>
+    <plugins>
+      <plugin>
+        <groupId>org.xolstice.maven.plugins</groupId>
+        <artifactId>protobuf-maven-plugin</artifactId>
+        <version>${protobuf-maven-plugin.version}</version>
+        <extensions>true</extensions>
+        <configuration>
+          <protocArtifact>
+            com.google.protobuf:protoc:${protobuf-compile.version}:exe:${os.detected.classifier}
+          </protocArtifact>
+          <protoSourceRoot>${basedir}/src/main/proto/</protoSourceRoot>
+          <includes>
+            <include>csi.proto</include>
+          </includes>
+          <outputDirectory>target/generated-sources/java</outputDirectory>
+          <clearOutputDirectory>false</clearOutputDirectory>
+        </configuration>
+        <executions>
+          <execution>
+            <id>compile-protoc</id>
+            <goals>
+              <goal>compile</goal>
+              <goal>test-compile</goal>
+              <goal>compile-custom</goal>
+              <goal>test-compile-custom</goal>
+            </goals>
+            <configuration>
+              <pluginId>grpc-java</pluginId>
+              <pluginArtifact>
+                io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+              </pluginArtifact>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-enforcer-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>depcheck</id>
+            <phase></phase>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>findbugs-maven-plugin</artifactId>
+        <configuration>
+          <excludeFilterFile>${basedir}/dev-support/findbugsExcludeFile.xml
+          </excludeFilterFile>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>

+ 123 - 0
hadoop-ozone/csi/src/main/java/org/apache/hadoop/ozone/csi/ControllerService.java

@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.csi;
+
+import java.io.IOException;
+
+import org.apache.hadoop.ozone.client.OzoneClient;
+
+import csi.v1.ControllerGrpc.ControllerImplBase;
+import csi.v1.Csi.CapacityRange;
+import csi.v1.Csi.ControllerGetCapabilitiesRequest;
+import csi.v1.Csi.ControllerGetCapabilitiesResponse;
+import csi.v1.Csi.ControllerServiceCapability;
+import csi.v1.Csi.ControllerServiceCapability.RPC;
+import csi.v1.Csi.ControllerServiceCapability.RPC.Type;
+import csi.v1.Csi.CreateVolumeRequest;
+import csi.v1.Csi.CreateVolumeResponse;
+import csi.v1.Csi.DeleteVolumeRequest;
+import csi.v1.Csi.DeleteVolumeResponse;
+import csi.v1.Csi.Volume;
+import io.grpc.stub.StreamObserver;
+
+/**
+ * CSI controller service.
+ * <p>
+ * This service usually runs only once and responsible for the creation of
+ * the volume.
+ */
+public class ControllerService extends ControllerImplBase {
+
+  private final String volumeOwner;
+
+  private long defaultVolumeSize;
+
+  private OzoneClient ozoneClient;
+
+  public ControllerService(OzoneClient ozoneClient, long volumeSize,
+      String volumeOwner) {
+    this.volumeOwner = volumeOwner;
+    this.defaultVolumeSize = volumeSize;
+    this.ozoneClient = ozoneClient;
+  }
+
+  @Override
+  public void createVolume(CreateVolumeRequest request,
+      StreamObserver<CreateVolumeResponse> responseObserver) {
+    try {
+      ozoneClient.getObjectStore()
+          .createS3Bucket(volumeOwner, request.getName());
+
+      long size = findSize(request.getCapacityRange());
+
+      CreateVolumeResponse response = CreateVolumeResponse.newBuilder()
+          .setVolume(Volume.newBuilder()
+              .setVolumeId(request.getName())
+              .setCapacityBytes(size))
+          .build();
+
+      responseObserver.onNext(response);
+      responseObserver.onCompleted();
+    } catch (IOException e) {
+      responseObserver.onError(e);
+    }
+  }
+
+  private long findSize(CapacityRange capacityRange) {
+    if (capacityRange.getRequiredBytes() != 0) {
+      return capacityRange.getRequiredBytes();
+    } else {
+      if (capacityRange.getLimitBytes() != 0) {
+        return Math.min(defaultVolumeSize, capacityRange.getLimitBytes());
+      } else {
+        //~1 gig
+        return defaultVolumeSize;
+      }
+    }
+  }
+
+  @Override
+  public void deleteVolume(DeleteVolumeRequest request,
+      StreamObserver<DeleteVolumeResponse> responseObserver) {
+    try {
+      ozoneClient.getObjectStore().deleteS3Bucket(request.getVolumeId());
+
+      DeleteVolumeResponse response = DeleteVolumeResponse.newBuilder()
+          .build();
+
+      responseObserver.onNext(response);
+      responseObserver.onCompleted();
+    } catch (IOException e) {
+      responseObserver.onError(e);
+    }
+  }
+
+  @Override
+  public void controllerGetCapabilities(
+      ControllerGetCapabilitiesRequest request,
+      StreamObserver<ControllerGetCapabilitiesResponse> responseObserver) {
+    ControllerGetCapabilitiesResponse response =
+        ControllerGetCapabilitiesResponse.newBuilder()
+            .addCapabilities(
+                ControllerServiceCapability.newBuilder().setRpc(
+                    RPC.newBuilder().setType(Type.CREATE_DELETE_VOLUME)))
+            .build();
+    responseObserver.onNext(response);
+    responseObserver.onCompleted();
+  }
+}

+ 160 - 0
hadoop-ozone/csi/src/main/java/org/apache/hadoop/ozone/csi/CsiServer.java

@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.csi;
+
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.hdds.cli.GenericCli;
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigTag;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.util.StringUtils;
+
+import io.grpc.Server;
+import io.grpc.netty.NettyServerBuilder;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerDomainSocketChannel;
+import io.netty.channel.unix.DomainSocketAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import picocli.CommandLine.Command;
+
+/**
+ * CLI entrypoint of the CSI service daemon.
+ */
+@Command(name = "ozone csi",
+    hidden = true, description = "CSI service daemon.",
+    versionProvider = HddsVersionProvider.class,
+    mixinStandardHelpOptions = true)
+public class CsiServer extends GenericCli implements Callable<Void> {
+
+  private final static Logger LOG = LoggerFactory.getLogger(CsiServer.class);
+
+  @Override
+  public Void call() throws Exception {
+    OzoneConfiguration ozoneConfiguration = createOzoneConfiguration();
+    CsiConfig csiConfig = ozoneConfiguration.getObject(CsiConfig.class);
+
+    OzoneClient rpcClient = OzoneClientFactory.getRpcClient(ozoneConfiguration);
+
+    EpollEventLoopGroup group = new EpollEventLoopGroup();
+
+    if (csiConfig.getVolumeOwner().isEmpty()) {
+      throw new IllegalArgumentException(
+          "ozone.csi.owner is not set. You should set this configuration "
+              + "variable to define which user should own all the created "
+              + "buckets.");
+    }
+
+    Server server =
+        NettyServerBuilder
+            .forAddress(new DomainSocketAddress(csiConfig.getSocketPath()))
+            .channelType(EpollServerDomainSocketChannel.class)
+            .workerEventLoopGroup(group)
+            .bossEventLoopGroup(group)
+            .addService(new IdentitiyService())
+            .addService(new ControllerService(rpcClient,
+                csiConfig.getDefaultVolumeSize(), csiConfig.getVolumeOwner()))
+            .addService(new NodeService(csiConfig))
+            .build();
+
+    server.start();
+    server.awaitTermination();
+    rpcClient.close();
+    return null;
+  }
+
+  public static void main(String[] args) {
+
+    StringUtils.startupShutdownMessage(CsiServer.class, args, LOG);
+    new CsiServer().run(args);
+  }
+
+  /**
+   * Configuration settings specific to the CSI server.
+   */
+  @ConfigGroup(prefix = "ozone.csi")
+  public static class CsiConfig {
+    private String socketPath;
+    private long defaultVolumeSize;
+    private String s3gAddress;
+    private String volumeOwner;
+
+    public String getSocketPath() {
+      return socketPath;
+    }
+
+    public String getVolumeOwner() {
+      return volumeOwner;
+    }
+
+    @Config(key = "owner",
+        defaultValue = "",
+        description =
+            "This is the username which is used to create the requested "
+                + "storage. Used as a hadoop username and the generated ozone"
+                + " volume used to store all the buckets. WARNING: It can "
+                + "be a security hole to use CSI in a secure environments as "
+                + "ALL the users can request the mount of a specific bucket "
+                + "via the CSI interface.",
+        tags = ConfigTag.STORAGE)
+    public void setVolumeOwner(String volumeOwner) {
+      this.volumeOwner = volumeOwner;
+    }
+
+    @Config(key = "socket",
+        defaultValue = "/var/lib/csi.sock",
+        description =
+            "The socket where all the CSI services will listen (file name).",
+        tags = ConfigTag.STORAGE)
+    public void setSocketPath(String socketPath) {
+      this.socketPath = socketPath;
+    }
+
+    public long getDefaultVolumeSize() {
+      return defaultVolumeSize;
+    }
+
+    @Config(key = "default-volume-size",
+        defaultValue = "1000000000",
+        description =
+            "The default size of the create volumes (if not specified).",
+        tags = ConfigTag.STORAGE)
+    public void setDefaultVolumeSize(long defaultVolumeSize) {
+      this.defaultVolumeSize = defaultVolumeSize;
+    }
+
+    public String getS3gAddress() {
+      return s3gAddress;
+    }
+
+    @Config(key = "s3g.address",
+        defaultValue = "http://localhost:9878",
+        description =
+            "The default size of the created volumes (if not specified in the"
+                + " requests).",
+        tags = ConfigTag.STORAGE)
+    public void setS3gAddress(String s3gAddress) {
+      this.s3gAddress = s3gAddress;
+    }
+  }
+}

+ 72 - 0
hadoop-ozone/csi/src/main/java/org/apache/hadoop/ozone/csi/IdentitiyService.java

@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.csi;
+
+import org.apache.hadoop.ozone.util.OzoneVersionInfo;
+
+import com.google.protobuf.BoolValue;
+import csi.v1.Csi.GetPluginCapabilitiesResponse;
+import csi.v1.Csi.GetPluginInfoResponse;
+import csi.v1.Csi.PluginCapability;
+import csi.v1.Csi.PluginCapability.Service;
+import static csi.v1.Csi.PluginCapability.Service.Type.CONTROLLER_SERVICE;
+import csi.v1.Csi.ProbeResponse;
+import csi.v1.IdentityGrpc.IdentityImplBase;
+import io.grpc.stub.StreamObserver;
+
+/**
+ * Implementation of the CSI identity service.
+ */
+public class IdentitiyService extends IdentityImplBase {
+
+  @Override
+  public void getPluginInfo(csi.v1.Csi.GetPluginInfoRequest request,
+      StreamObserver<csi.v1.Csi.GetPluginInfoResponse> responseObserver) {
+    GetPluginInfoResponse response = GetPluginInfoResponse.newBuilder()
+        .setName("org.apache.hadoop.ozone")
+        .setVendorVersion(OzoneVersionInfo.OZONE_VERSION_INFO.getVersion())
+        .build();
+    responseObserver.onNext(response);
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void getPluginCapabilities(
+      csi.v1.Csi.GetPluginCapabilitiesRequest request,
+      StreamObserver<GetPluginCapabilitiesResponse> responseObserver) {
+    GetPluginCapabilitiesResponse response =
+        GetPluginCapabilitiesResponse.newBuilder()
+            .addCapabilities(PluginCapability.newBuilder().setService(
+                Service.newBuilder().setType(CONTROLLER_SERVICE)))
+            .build();
+    responseObserver.onNext(response);
+    responseObserver.onCompleted();
+
+  }
+
+  @Override
+  public void probe(csi.v1.Csi.ProbeRequest request,
+      StreamObserver<csi.v1.Csi.ProbeResponse> responseObserver) {
+    ProbeResponse response = ProbeResponse.newBuilder()
+        .setReady(BoolValue.of(true))
+        .build();
+    responseObserver.onNext(response);
+    responseObserver.onCompleted();
+
+  }
+}

+ 142 - 0
hadoop-ozone/csi/src/main/java/org/apache/hadoop/ozone/csi/NodeService.java

@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.csi;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.ozone.csi.CsiServer.CsiConfig;
+
+import csi.v1.Csi.NodeGetCapabilitiesRequest;
+import csi.v1.Csi.NodeGetCapabilitiesResponse;
+import csi.v1.Csi.NodeGetInfoRequest;
+import csi.v1.Csi.NodeGetInfoResponse;
+import csi.v1.Csi.NodePublishVolumeRequest;
+import csi.v1.Csi.NodePublishVolumeResponse;
+import csi.v1.Csi.NodeUnpublishVolumeRequest;
+import csi.v1.Csi.NodeUnpublishVolumeResponse;
+import csi.v1.NodeGrpc.NodeImplBase;
+import io.grpc.stub.StreamObserver;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of the CSI node service.
+ */
+public class NodeService extends NodeImplBase {
+
+  private static final Logger LOG = LoggerFactory.getLogger(NodeService.class);
+
+  private String s3Endpoint;
+
+  public NodeService(CsiConfig configuration) {
+    this.s3Endpoint = configuration.getS3gAddress();
+
+  }
+
+  @Override
+  public void nodePublishVolume(NodePublishVolumeRequest request,
+      StreamObserver<NodePublishVolumeResponse> responseObserver) {
+
+    try {
+      Files.createDirectories(Paths.get(request.getTargetPath()));
+      String mountCommand =
+          String.format("goofys --endpoint %s %s %s",
+              s3Endpoint,
+              request.getVolumeId(),
+              request.getTargetPath());
+      LOG.info("Executing {}", mountCommand);
+
+      executeCommand(mountCommand);
+
+      responseObserver.onNext(NodePublishVolumeResponse.newBuilder()
+          .build());
+      responseObserver.onCompleted();
+
+    } catch (Exception e) {
+      responseObserver.onError(e);
+    }
+
+  }
+
+  private void executeCommand(String mountCommand)
+      throws IOException, InterruptedException {
+    Process exec = Runtime.getRuntime().exec(mountCommand);
+    exec.waitFor(10, TimeUnit.SECONDS);
+
+    LOG.info("Command is executed with  stdout: {}, stderr: {}",
+        IOUtils.toString(exec.getInputStream(), "UTF-8"),
+        IOUtils.toString(exec.getErrorStream(), "UTF-8"));
+    if (exec.exitValue() != 0) {
+      throw new RuntimeException(String
+          .format("Return code of the command %s was %d", mountCommand,
+              exec.exitValue()));
+    }
+  }
+
+  @Override
+  public void nodeUnpublishVolume(NodeUnpublishVolumeRequest request,
+      StreamObserver<NodeUnpublishVolumeResponse> responseObserver) {
+    String umountCommand =
+        String.format("fusermount -u %s", request.getTargetPath());
+    LOG.info("Executing {}", umountCommand);
+
+    try {
+      executeCommand(umountCommand);
+
+      responseObserver.onNext(NodeUnpublishVolumeResponse.newBuilder()
+          .build());
+      responseObserver.onCompleted();
+
+    } catch (Exception e) {
+      responseObserver.onError(e);
+    }
+
+  }
+
+  @Override
+  public void nodeGetCapabilities(NodeGetCapabilitiesRequest request,
+      StreamObserver<NodeGetCapabilitiesResponse> responseObserver) {
+    NodeGetCapabilitiesResponse response =
+        NodeGetCapabilitiesResponse.newBuilder()
+            .build();
+    responseObserver.onNext(response);
+    responseObserver.onCompleted();
+  }
+
+  @Override
+  public void nodeGetInfo(NodeGetInfoRequest request,
+      StreamObserver<NodeGetInfoResponse> responseObserver) {
+    NodeGetInfoResponse response = null;
+    try {
+      response = NodeGetInfoResponse.newBuilder()
+          .setNodeId(InetAddress.getLocalHost().getHostName())
+          .build();
+      responseObserver.onNext(response);
+      responseObserver.onCompleted();
+    } catch (UnknownHostException e) {
+      responseObserver.onError(e);
+    }
+
+  }
+}

+ 22 - 0
hadoop-ozone/csi/src/main/java/org/apache/hadoop/ozone/csi/package-info.java

@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.csi;
+
+/**
+ * Container Storage Interface server implementation for Ozone.
+ */

+ 1323 - 0
hadoop-ozone/csi/src/main/proto/csi.proto

@@ -0,0 +1,1323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Code generated by make; DO NOT EDIT.
+syntax = "proto3";
+package csi.v1;
+
+import "google/protobuf/descriptor.proto";
+import "google/protobuf/timestamp.proto";
+import "google/protobuf/wrappers.proto";
+
+option go_package = "csi";
+
+extend google.protobuf.FieldOptions {
+  // Indicates that a field MAY contain information that is sensitive
+  // and MUST be treated as such (e.g. not logged).
+  bool csi_secret = 1059;
+}
+service Identity {
+  rpc GetPluginInfo(GetPluginInfoRequest)
+    returns (GetPluginInfoResponse) {}
+
+  rpc GetPluginCapabilities(GetPluginCapabilitiesRequest)
+    returns (GetPluginCapabilitiesResponse) {}
+
+  rpc Probe (ProbeRequest)
+    returns (ProbeResponse) {}
+}
+
+service Controller {
+  rpc CreateVolume (CreateVolumeRequest)
+    returns (CreateVolumeResponse) {}
+
+  rpc DeleteVolume (DeleteVolumeRequest)
+    returns (DeleteVolumeResponse) {}
+
+  rpc ControllerPublishVolume (ControllerPublishVolumeRequest)
+    returns (ControllerPublishVolumeResponse) {}
+
+  rpc ControllerUnpublishVolume (ControllerUnpublishVolumeRequest)
+    returns (ControllerUnpublishVolumeResponse) {}
+
+  rpc ValidateVolumeCapabilities (ValidateVolumeCapabilitiesRequest)
+    returns (ValidateVolumeCapabilitiesResponse) {}
+
+  rpc ListVolumes (ListVolumesRequest)
+    returns (ListVolumesResponse) {}
+
+  rpc GetCapacity (GetCapacityRequest)
+    returns (GetCapacityResponse) {}
+
+  rpc ControllerGetCapabilities (ControllerGetCapabilitiesRequest)
+    returns (ControllerGetCapabilitiesResponse) {}
+
+  rpc CreateSnapshot (CreateSnapshotRequest)
+    returns (CreateSnapshotResponse) {}
+
+  rpc DeleteSnapshot (DeleteSnapshotRequest)
+    returns (DeleteSnapshotResponse) {}
+
+  rpc ListSnapshots (ListSnapshotsRequest)
+    returns (ListSnapshotsResponse) {}
+
+  rpc ControllerExpandVolume (ControllerExpandVolumeRequest)
+    returns (ControllerExpandVolumeResponse) {}
+}
+
+service Node {
+  rpc NodeStageVolume (NodeStageVolumeRequest)
+    returns (NodeStageVolumeResponse) {}
+
+  rpc NodeUnstageVolume (NodeUnstageVolumeRequest)
+    returns (NodeUnstageVolumeResponse) {}
+
+  rpc NodePublishVolume (NodePublishVolumeRequest)
+    returns (NodePublishVolumeResponse) {}
+
+  rpc NodeUnpublishVolume (NodeUnpublishVolumeRequest)
+    returns (NodeUnpublishVolumeResponse) {}
+
+  rpc NodeGetVolumeStats (NodeGetVolumeStatsRequest)
+    returns (NodeGetVolumeStatsResponse) {}
+
+
+  rpc NodeExpandVolume(NodeExpandVolumeRequest)
+    returns (NodeExpandVolumeResponse) {}
+
+
+  rpc NodeGetCapabilities (NodeGetCapabilitiesRequest)
+    returns (NodeGetCapabilitiesResponse) {}
+
+  rpc NodeGetInfo (NodeGetInfoRequest)
+    returns (NodeGetInfoResponse) {}
+}
+message GetPluginInfoRequest {
+  // Intentionally empty.
+}
+
+message GetPluginInfoResponse {
+  // The name MUST follow domain name notation format
+  // (https://tools.ietf.org/html/rfc1035#section-2.3.1). It SHOULD
+  // include the plugin's host company name and the plugin name,
+  // to minimize the possibility of collisions. It MUST be 63
+  // characters or less, beginning and ending with an alphanumeric
+  // character ([a-z0-9A-Z]) with dashes (-), dots (.), and
+  // alphanumerics between. This field is REQUIRED.
+  string name = 1;
+
+  // This field is REQUIRED. Value of this field is opaque to the CO.
+  string vendor_version = 2;
+
+  // This field is OPTIONAL. Values are opaque to the CO.
+  map<string, string> manifest = 3;
+}
+message GetPluginCapabilitiesRequest {
+  // Intentionally empty.
+}
+
+message GetPluginCapabilitiesResponse {
+  // All the capabilities that the controller service supports. This
+  // field is OPTIONAL.
+  repeated PluginCapability capabilities = 1;
+}
+
+// Specifies a capability of the plugin.
+message PluginCapability {
+  message Service {
+    enum Type {
+      UNKNOWN = 0;
+      // CONTROLLER_SERVICE indicates that the Plugin provides RPCs for
+      // the ControllerService. Plugins SHOULD provide this capability.
+      // In rare cases certain plugins MAY wish to omit the
+      // ControllerService entirely from their implementation, but such
+      // SHOULD NOT be the common case.
+      // The presence of this capability determines whether the CO will
+      // attempt to invoke the REQUIRED ControllerService RPCs, as well
+      // as specific RPCs as indicated by ControllerGetCapabilities.
+      CONTROLLER_SERVICE = 1;
+
+      // VOLUME_ACCESSIBILITY_CONSTRAINTS indicates that the volumes for
+      // this plugin MAY NOT be equally accessible by all nodes in the
+      // cluster. The CO MUST use the topology information returned by
+      // CreateVolumeRequest along with the topology information
+      // returned by NodeGetInfo to ensure that a given volume is
+      // accessible from a given node when scheduling workloads.
+      VOLUME_ACCESSIBILITY_CONSTRAINTS = 2;
+    }
+    Type type = 1;
+  }
+
+  message VolumeExpansion {
+    enum Type {
+      UNKNOWN = 0;
+
+      // ONLINE indicates that volumes may be expanded when published to
+      // a node. When a Plugin implements this capability it MUST
+      // implement either the EXPAND_VOLUME controller capability or the
+      // EXPAND_VOLUME node capability or both. When a plugin supports
+      // ONLINE volume expansion and also has the EXPAND_VOLUME
+      // controller capability then the plugin MUST support expansion of
+      // volumes currently published and available on a node. When a
+      // plugin supports ONLINE volume expansion and also has the
+      // EXPAND_VOLUME node capability then the plugin MAY support
+      // expansion of node-published volume via NodeExpandVolume.
+      //
+      // Example 1: Given a shared filesystem volume (e.g. GlusterFs),
+      //   the Plugin may set the ONLINE volume expansion capability and
+      //   implement ControllerExpandVolume but not NodeExpandVolume.
+      //
+      // Example 2: Given a block storage volume type (e.g. EBS), the
+      //   Plugin may set the ONLINE volume expansion capability and
+      //   implement both ControllerExpandVolume and NodeExpandVolume.
+      //
+      // Example 3: Given a Plugin that supports volume expansion only
+      //   upon a node, the Plugin may set the ONLINE volume
+      //   expansion capability and implement NodeExpandVolume but not
+      //   ControllerExpandVolume.
+      ONLINE = 1;
+
+      // OFFLINE indicates that volumes currently published and
+      // available on a node SHALL NOT be expanded via
+      // ControllerExpandVolume. When a plugin supports OFFLINE volume
+      // expansion it MUST implement either the EXPAND_VOLUME controller
+      // capability or both the EXPAND_VOLUME controller capability and
+      // the EXPAND_VOLUME node capability.
+      //
+      // Example 1: Given a block storage volume type (e.g. Azure Disk)
+      //   that does not support expansion of "node-attached" (i.e.
+      //   controller-published) volumes, the Plugin may indicate
+      //   OFFLINE volume expansion support and implement both
+      //   ControllerExpandVolume and NodeExpandVolume.
+      OFFLINE = 2;
+    }
+  }
+
+  oneof type {
+    // Service that the plugin supports.
+    Service service = 1;
+    VolumeExpansion volume_expansion = 2;
+  }
+}
+message ProbeRequest {
+  // Intentionally empty.
+}
+
+message ProbeResponse {
+  // Readiness allows a plugin to report its initialization status back
+  // to the CO. Initialization for some plugins MAY be time consuming
+  // and it is important for a CO to distinguish between the following
+  // cases:
+  //
+  // 1) The plugin is in an unhealthy state and MAY need restarting. In
+  //    this case a gRPC error code SHALL be returned.
+  // 2) The plugin is still initializing, but is otherwise perfectly
+  //    healthy. In this case a successful response SHALL be returned
+  //    with a readiness value of `false`. Calls to the plugin's
+  //    Controller and/or Node services MAY fail due to an incomplete
+  //    initialization state.
+  // 3) The plugin has finished initializing and is ready to service
+  //    calls to its Controller and/or Node services. A successful
+  //    response is returned with a readiness value of `true`.
+  //
+  // This field is OPTIONAL. If not present, the caller SHALL assume
+  // that the plugin is in a ready state and is accepting calls to its
+  // Controller and/or Node services (according to the plugin's reported
+  // capabilities).
+  .google.protobuf.BoolValue ready = 1;
+}
+message CreateVolumeRequest {
+  // The suggested name for the storage space. This field is REQUIRED.
+  // It serves two purposes:
+  // 1) Idempotency - This name is generated by the CO to achieve
+  //    idempotency.  The Plugin SHOULD ensure that multiple
+  //    `CreateVolume` calls for the same name do not result in more
+  //    than one piece of storage provisioned corresponding to that
+  //    name. If a Plugin is unable to enforce idempotency, the CO's
+  //    error recovery logic could result in multiple (unused) volumes
+  //    being provisioned.
+  //    In the case of error, the CO MUST handle the gRPC error codes
+  //    per the recovery behavior defined in the "CreateVolume Errors"
+  //    section below.
+  //    The CO is responsible for cleaning up volumes it provisioned
+  //    that it no longer needs. If the CO is uncertain whether a volume
+  //    was provisioned or not when a `CreateVolume` call fails, the CO
+  //    MAY call `CreateVolume` again, with the same name, to ensure the
+  //    volume exists and to retrieve the volume's `volume_id` (unless
+  //    otherwise prohibited by "CreateVolume Errors").
+  // 2) Suggested name - Some storage systems allow callers to specify
+  //    an identifier by which to refer to the newly provisioned
+  //    storage. If a storage system supports this, it can optionally
+  //    use this name as the identifier for the new volume.
+  // Any Unicode string that conforms to the length limit is allowed
+  // except those containing the following banned characters:
+  // U+0000-U+0008, U+000B, U+000C, U+000E-U+001F, U+007F-U+009F.
+  // (These are control characters other than commonly used whitespace.)
+  string name = 1;
+
+  // This field is OPTIONAL. This allows the CO to specify the capacity
+  // requirement of the volume to be provisioned. If not specified, the
+  // Plugin MAY choose an implementation-defined capacity range. If
+  // specified it MUST always be honored, even when creating volumes
+  // from a source; which MAY force some backends to internally extend
+  // the volume after creating it.
+  CapacityRange capacity_range = 2;
+
+  // The capabilities that the provisioned volume MUST have. SP MUST
+  // provision a volume that will satisfy ALL of the capabilities
+  // specified in this list. Otherwise SP MUST return the appropriate
+  // gRPC error code.
+  // The Plugin MUST assume that the CO MAY use the provisioned volume
+  // with ANY of the capabilities specified in this list.
+  // For example, a CO MAY specify two volume capabilities: one with
+  // access mode SINGLE_NODE_WRITER and another with access mode
+  // MULTI_NODE_READER_ONLY. In this case, the SP MUST verify that the
+  // provisioned volume can be used in either mode.
+  // This also enables the CO to do early validation: If ANY of the
+  // specified volume capabilities are not supported by the SP, the call
+  // MUST return the appropriate gRPC error code.
+  // This field is REQUIRED.
+  repeated VolumeCapability volume_capabilities = 3;
+
+  // Plugin specific parameters passed in as opaque key-value pairs.
+  // This field is OPTIONAL. The Plugin is responsible for parsing and
+  // validating these parameters. COs will treat these as opaque.
+  map<string, string> parameters = 4;
+
+  // Secrets required by plugin to complete volume creation request.
+  // This field is OPTIONAL. Refer to the `Secrets Requirements`
+  // section on how to use this field.
+  map<string, string> secrets = 5 [(csi_secret) = true];
+
+  // If specified, the new volume will be pre-populated with data from
+  // this source. This field is OPTIONAL.
+  VolumeContentSource volume_content_source = 6;
+
+  // Specifies where (regions, zones, racks, etc.) the provisioned
+  // volume MUST be accessible from.
+  // An SP SHALL advertise the requirements for topological
+  // accessibility information in documentation. COs SHALL only specify
+  // topological accessibility information supported by the SP.
+  // This field is OPTIONAL.
+  // This field SHALL NOT be specified unless the SP has the
+  // VOLUME_ACCESSIBILITY_CONSTRAINTS plugin capability.
+  // If this field is not specified and the SP has the
+  // VOLUME_ACCESSIBILITY_CONSTRAINTS plugin capability, the SP MAY
+  // choose where the provisioned volume is accessible from.
+  TopologyRequirement accessibility_requirements = 7;
+}
+
+// Specifies what source the volume will be created from. One of the
+// type fields MUST be specified.
+message VolumeContentSource {
+  message SnapshotSource {
+    // Contains identity information for the existing source snapshot.
+    // This field is REQUIRED. Plugin is REQUIRED to support creating
+    // volume from snapshot if it supports the capability
+    // CREATE_DELETE_SNAPSHOT.
+    string snapshot_id = 1;
+  }
+
+  message VolumeSource {
+    // Contains identity information for the existing source volume.
+    // This field is REQUIRED. Plugins reporting CLONE_VOLUME
+    // capability MUST support creating a volume from another volume.
+    string volume_id = 1;
+  }
+
+  oneof type {
+    SnapshotSource snapshot = 1;
+    VolumeSource volume = 2;
+  }
+}
+
+message CreateVolumeResponse {
+  // Contains all attributes of the newly created volume that are
+  // relevant to the CO along with information required by the Plugin
+  // to uniquely identify the volume. This field is REQUIRED.
+  Volume volume = 1;
+}
+
+// Specify a capability of a volume.
+message VolumeCapability {
+  // Indicate that the volume will be accessed via the block device API.
+  message BlockVolume {
+    // Intentionally empty, for now.
+  }
+
+  // Indicate that the volume will be accessed via the filesystem API.
+  message MountVolume {
+    // The filesystem type. This field is OPTIONAL.
+    // An empty string is equal to an unspecified field value.
+    string fs_type = 1;
+
+    // The mount options that can be used for the volume. This field is
+    // OPTIONAL. `mount_flags` MAY contain sensitive information.
+    // Therefore, the CO and the Plugin MUST NOT leak this information
+    // to untrusted entities. The total size of this repeated field
+    // SHALL NOT exceed 4 KiB.
+    repeated string mount_flags = 2;
+  }
+
+  // Specify how a volume can be accessed.
+  message AccessMode {
+    enum Mode {
+      UNKNOWN = 0;
+
+      // Can only be published once as read/write on a single node, at
+      // any given time.
+      SINGLE_NODE_WRITER = 1;
+
+      // Can only be published once as readonly on a single node, at
+      // any given time.
+      SINGLE_NODE_READER_ONLY = 2;
+
+      // Can be published as readonly at multiple nodes simultaneously.
+      MULTI_NODE_READER_ONLY = 3;
+
+      // Can be published at multiple nodes simultaneously. Only one of
+      // the node can be used as read/write. The rest will be readonly.
+      MULTI_NODE_SINGLE_WRITER = 4;
+
+      // Can be published as read/write at multiple nodes
+      // simultaneously.
+      MULTI_NODE_MULTI_WRITER = 5;
+    }
+
+    // This field is REQUIRED.
+    Mode mode = 1;
+  }
+
+  // Specifies what API the volume will be accessed using. One of the
+  // following fields MUST be specified.
+  oneof access_type {
+    BlockVolume block = 1;
+    MountVolume mount = 2;
+  }
+
+  // This is a REQUIRED field.
+  AccessMode access_mode = 3;
+}
+
+// The capacity of the storage space in bytes. To specify an exact size,
+// `required_bytes` and `limit_bytes` SHALL be set to the same value. At
+// least one of the these fields MUST be specified.
+message CapacityRange {
+  // Volume MUST be at least this big. This field is OPTIONAL.
+  // A value of 0 is equal to an unspecified field value.
+  // The value of this field MUST NOT be negative.
+  int64 required_bytes = 1;
+
+  // Volume MUST not be bigger than this. This field is OPTIONAL.
+  // A value of 0 is equal to an unspecified field value.
+  // The value of this field MUST NOT be negative.
+  int64 limit_bytes = 2;
+}
+
+// Information about a specific volume.
+message Volume {
+  // The capacity of the volume in bytes. This field is OPTIONAL. If not
+  // set (value of 0), it indicates that the capacity of the volume is
+  // unknown (e.g., NFS share).
+  // The value of this field MUST NOT be negative.
+  int64 capacity_bytes = 1;
+
+  // The identifier for this volume, generated by the plugin.
+  // This field is REQUIRED.
+  // This field MUST contain enough information to uniquely identify
+  // this specific volume vs all other volumes supported by this plugin.
+  // This field SHALL be used by the CO in subsequent calls to refer to
+  // this volume.
+  // The SP is NOT responsible for global uniqueness of volume_id across
+  // multiple SPs.
+  string volume_id = 2;
+
+  // Opaque static properties of the volume. SP MAY use this field to
+  // ensure subsequent volume validation and publishing calls have
+  // contextual information.
+  // The contents of this field SHALL be opaque to a CO.
+  // The contents of this field SHALL NOT be mutable.
+  // The contents of this field SHALL be safe for the CO to cache.
+  // The contents of this field SHOULD NOT contain sensitive
+  // information.
+  // The contents of this field SHOULD NOT be used for uniquely
+  // identifying a volume. The `volume_id` alone SHOULD be sufficient to
+  // identify the volume.
+  // A volume uniquely identified by `volume_id` SHALL always report the
+  // same volume_context.
+  // This field is OPTIONAL and when present MUST be passed to volume
+  // validation and publishing calls.
+  map<string, string> volume_context = 3;
+
+  // If specified, indicates that the volume is not empty and is
+  // pre-populated with data from the specified source.
+  // This field is OPTIONAL.
+  VolumeContentSource content_source = 4;
+
+  // Specifies where (regions, zones, racks, etc.) the provisioned
+  // volume is accessible from.
+  // A plugin that returns this field MUST also set the
+  // VOLUME_ACCESSIBILITY_CONSTRAINTS plugin capability.
+  // An SP MAY specify multiple topologies to indicate the volume is
+  // accessible from multiple locations.
+  // COs MAY use this information along with the topology information
+  // returned by NodeGetInfo to ensure that a given volume is accessible
+  // from a given node when scheduling workloads.
+  // This field is OPTIONAL. If it is not specified, the CO MAY assume
+  // the volume is equally accessible from all nodes in the cluster and
+  // MAY schedule workloads referencing the volume on any available
+  // node.
+  //
+  // Example 1:
+  //   accessible_topology = {"region": "R1", "zone": "Z2"}
+  // Indicates a volume accessible only from the "region" "R1" and the
+  // "zone" "Z2".
+  //
+  // Example 2:
+  //   accessible_topology =
+  //     {"region": "R1", "zone": "Z2"},
+  //     {"region": "R1", "zone": "Z3"}
+  // Indicates a volume accessible from both "zone" "Z2" and "zone" "Z3"
+  // in the "region" "R1".
+  repeated Topology accessible_topology = 5;
+}
+
+message TopologyRequirement {
+  // Specifies the list of topologies the provisioned volume MUST be
+  // accessible from.
+  // This field is OPTIONAL. If TopologyRequirement is specified either
+  // requisite or preferred or both MUST be specified.
+  //
+  // If requisite is specified, the provisioned volume MUST be
+  // accessible from at least one of the requisite topologies.
+  //
+  // Given
+  //   x = number of topologies provisioned volume is accessible from
+  //   n = number of requisite topologies
+  // The CO MUST ensure n >= 1. The SP MUST ensure x >= 1
+  // If x==n, then the SP MUST make the provisioned volume available to
+  // all topologies from the list of requisite topologies. If it is
+  // unable to do so, the SP MUST fail the CreateVolume call.
+  // For example, if a volume should be accessible from a single zone,
+  // and requisite =
+  //   {"region": "R1", "zone": "Z2"}
+  // then the provisioned volume MUST be accessible from the "region"
+  // "R1" and the "zone" "Z2".
+  // Similarly, if a volume should be accessible from two zones, and
+  // requisite =
+  //   {"region": "R1", "zone": "Z2"},
+  //   {"region": "R1", "zone": "Z3"}
+  // then the provisioned volume MUST be accessible from the "region"
+  // "R1" and both "zone" "Z2" and "zone" "Z3".
+  //
+  // If x<n, then the SP SHALL choose x unique topologies from the list
+  // of requisite topologies. If it is unable to do so, the SP MUST fail
+  // the CreateVolume call.
+  // For example, if a volume should be accessible from a single zone,
+  // and requisite =
+  //   {"region": "R1", "zone": "Z2"},
+  //   {"region": "R1", "zone": "Z3"}
+  // then the SP may choose to make the provisioned volume available in
+  // either the "zone" "Z2" or the "zone" "Z3" in the "region" "R1".
+  // Similarly, if a volume should be accessible from two zones, and
+  // requisite =
+  //   {"region": "R1", "zone": "Z2"},
+  //   {"region": "R1", "zone": "Z3"},
+  //   {"region": "R1", "zone": "Z4"}
+  // then the provisioned volume MUST be accessible from any combination
+  // of two unique topologies: e.g. "R1/Z2" and "R1/Z3", or "R1/Z2" and
+  //  "R1/Z4", or "R1/Z3" and "R1/Z4".
+  //
+  // If x>n, then the SP MUST make the provisioned volume available from
+  // all topologies from the list of requisite topologies and MAY choose
+  // the remaining x-n unique topologies from the list of all possible
+  // topologies. If it is unable to do so, the SP MUST fail the
+  // CreateVolume call.
+  // For example, if a volume should be accessible from two zones, and
+  // requisite =
+  //   {"region": "R1", "zone": "Z2"}
+  // then the provisioned volume MUST be accessible from the "region"
+  // "R1" and the "zone" "Z2" and the SP may select the second zone
+  // independently, e.g. "R1/Z4".
+  repeated Topology requisite = 1;
+
+  // Specifies the list of topologies the CO would prefer the volume to
+  // be provisioned in.
+  //
+  // This field is OPTIONAL. If TopologyRequirement is specified either
+  // requisite or preferred or both MUST be specified.
+  //
+  // An SP MUST attempt to make the provisioned volume available using
+  // the preferred topologies in order from first to last.
+  //
+  // If requisite is specified, all topologies in preferred list MUST
+  // also be present in the list of requisite topologies.
+  //
+  // If the SP is unable to to make the provisioned volume available
+  // from any of the preferred topologies, the SP MAY choose a topology
+  // from the list of requisite topologies.
+  // If the list of requisite topologies is not specified, then the SP
+  // MAY choose from the list of all possible topologies.
+  // If the list of requisite topologies is specified and the SP is
+  // unable to to make the provisioned volume available from any of the
+  // requisite topologies it MUST fail the CreateVolume call.
+  //
+  // Example 1:
+  // Given a volume should be accessible from a single zone, and
+  // requisite =
+  //   {"region": "R1", "zone": "Z2"},
+  //   {"region": "R1", "zone": "Z3"}
+  // preferred =
+  //   {"region": "R1", "zone": "Z3"}
+  // then the the SP SHOULD first attempt to make the provisioned volume
+  // available from "zone" "Z3" in the "region" "R1" and fall back to
+  // "zone" "Z2" in the "region" "R1" if that is not possible.
+  //
+  // Example 2:
+  // Given a volume should be accessible from a single zone, and
+  // requisite =
+  //   {"region": "R1", "zone": "Z2"},
+  //   {"region": "R1", "zone": "Z3"},
+  //   {"region": "R1", "zone": "Z4"},
+  //   {"region": "R1", "zone": "Z5"}
+  // preferred =
+  //   {"region": "R1", "zone": "Z4"},
+  //   {"region": "R1", "zone": "Z2"}
+  // then the the SP SHOULD first attempt to make the provisioned volume
+  // accessible from "zone" "Z4" in the "region" "R1" and fall back to
+  // "zone" "Z2" in the "region" "R1" if that is not possible. If that
+  // is not possible, the SP may choose between either the "zone"
+  // "Z3" or "Z5" in the "region" "R1".
+  //
+  // Example 3:
+  // Given a volume should be accessible from TWO zones (because an
+  // opaque parameter in CreateVolumeRequest, for example, specifies
+  // the volume is accessible from two zones, aka synchronously
+  // replicated), and
+  // requisite =
+  //   {"region": "R1", "zone": "Z2"},
+  //   {"region": "R1", "zone": "Z3"},
+  //   {"region": "R1", "zone": "Z4"},
+  //   {"region": "R1", "zone": "Z5"}
+  // preferred =
+  //   {"region": "R1", "zone": "Z5"},
+  //   {"region": "R1", "zone": "Z3"}
+  // then the the SP SHOULD first attempt to make the provisioned volume
+  // accessible from the combination of the two "zones" "Z5" and "Z3" in
+  // the "region" "R1". If that's not possible, it should fall back to
+  // a combination of "Z5" and other possibilities from the list of
+  // requisite. If that's not possible, it should fall back  to a
+  // combination of "Z3" and other possibilities from the list of
+  // requisite. If that's not possible, it should fall back  to a
+  // combination of other possibilities from the list of requisite.
+  repeated Topology preferred = 2;
+}
+
+// Topology is a map of topological domains to topological segments.
+// A topological domain is a sub-division of a cluster, like "region",
+// "zone", "rack", etc.
+// A topological segment is a specific instance of a topological domain,
+// like "zone3", "rack3", etc.
+// For example {"com.company/zone": "Z1", "com.company/rack": "R3"}
+// Valid keys have two segments: an OPTIONAL prefix and name, separated
+// by a slash (/), for example: "com.company.example/zone".
+// The key name segment is REQUIRED. The prefix is OPTIONAL.
+// The key name MUST be 63 characters or less, begin and end with an
+// alphanumeric character ([a-z0-9A-Z]), and contain only dashes (-),
+// underscores (_), dots (.), or alphanumerics in between, for example
+// "zone".
+// The key prefix MUST be 63 characters or less, begin and end with a
+// lower-case alphanumeric character ([a-z0-9]), contain only
+// dashes (-), dots (.), or lower-case alphanumerics in between, and
+// follow domain name notation format
+// (https://tools.ietf.org/html/rfc1035#section-2.3.1).
+// The key prefix SHOULD include the plugin's host company name and/or
+// the plugin name, to minimize the possibility of collisions with keys
+// from other plugins.
+// If a key prefix is specified, it MUST be identical across all
+// topology keys returned by the SP (across all RPCs).
+// Keys MUST be case-insensitive. Meaning the keys "Zone" and "zone"
+// MUST not both exist.
+// Each value (topological segment) MUST contain 1 or more strings.
+// Each string MUST be 63 characters or less and begin and end with an
+// alphanumeric character with '-', '_', '.', or alphanumerics in
+// between.
+message Topology {
+  map<string, string> segments = 1;
+}
+message DeleteVolumeRequest {
+  // The ID of the volume to be deprovisioned.
+  // This field is REQUIRED.
+  string volume_id = 1;
+
+  // Secrets required by plugin to complete volume deletion request.
+  // This field is OPTIONAL. Refer to the `Secrets Requirements`
+  // section on how to use this field.
+  map<string, string> secrets = 2 [(csi_secret) = true];
+}
+
+message DeleteVolumeResponse {
+  // Intentionally empty.
+}
+message ControllerPublishVolumeRequest {
+  // The ID of the volume to be used on a node.
+  // This field is REQUIRED.
+  string volume_id = 1;
+
+  // The ID of the node. This field is REQUIRED. The CO SHALL set this
+  // field to match the node ID returned by `NodeGetInfo`.
+  string node_id = 2;
+
+  // Volume capability describing how the CO intends to use this volume.
+  // SP MUST ensure the CO can use the published volume as described.
+  // Otherwise SP MUST return the appropriate gRPC error code.
+  // This is a REQUIRED field.
+  VolumeCapability volume_capability = 3;
+
+  // Indicates SP MUST publish the volume in readonly mode.
+  // CO MUST set this field to false if SP does not have the
+  // PUBLISH_READONLY controller capability.
+  // This is a REQUIRED field.
+  bool readonly = 4;
+
+  // Secrets required by plugin to complete controller publish volume
+  // request. This field is OPTIONAL. Refer to the
+  // `Secrets Requirements` section on how to use this field.
+  map<string, string> secrets = 5 [(csi_secret) = true];
+
+  // Volume context as returned by CO in CreateVolumeRequest. This field
+  // is OPTIONAL and MUST match the volume_context of the volume
+  // identified by `volume_id`.
+  map<string, string> volume_context = 6;
+}
+
+message ControllerPublishVolumeResponse {
+  // Opaque static publish properties of the volume. SP MAY use this
+  // field to ensure subsequent `NodeStageVolume` or `NodePublishVolume`
+  // calls calls have contextual information.
+  // The contents of this field SHALL be opaque to a CO.
+  // The contents of this field SHALL NOT be mutable.
+  // The contents of this field SHALL be safe for the CO to cache.
+  // The contents of this field SHOULD NOT contain sensitive
+  // information.
+  // The contents of this field SHOULD NOT be used for uniquely
+  // identifying a volume. The `volume_id` alone SHOULD be sufficient to
+  // identify the volume.
+  // This field is OPTIONAL and when present MUST be passed to
+  // subsequent `NodeStageVolume` or `NodePublishVolume` calls
+  map<string, string> publish_context = 1;
+}
+message ControllerUnpublishVolumeRequest {
+  // The ID of the volume. This field is REQUIRED.
+  string volume_id = 1;
+
+  // The ID of the node. This field is OPTIONAL. The CO SHOULD set this
+  // field to match the node ID returned by `NodeGetInfo` or leave it
+  // unset. If the value is set, the SP MUST unpublish the volume from
+  // the specified node. If the value is unset, the SP MUST unpublish
+  // the volume from all nodes it is published to.
+  string node_id = 2;
+
+  // Secrets required by plugin to complete controller unpublish volume
+  // request. This SHOULD be the same secrets passed to the
+  // ControllerPublishVolume call for the specified volume.
+  // This field is OPTIONAL. Refer to the `Secrets Requirements`
+  // section on how to use this field.
+  map<string, string> secrets = 3 [(csi_secret) = true];
+}
+
+message ControllerUnpublishVolumeResponse {
+  // Intentionally empty.
+}
+message ValidateVolumeCapabilitiesRequest {
+  // The ID of the volume to check. This field is REQUIRED.
+  string volume_id = 1;
+
+  // Volume context as returned by CO in CreateVolumeRequest. This field
+  // is OPTIONAL and MUST match the volume_context of the volume
+  // identified by `volume_id`.
+  map<string, string> volume_context = 2;
+
+  // The capabilities that the CO wants to check for the volume. This
+  // call SHALL return "confirmed" only if all the volume capabilities
+  // specified below are supported. This field is REQUIRED.
+  repeated VolumeCapability volume_capabilities = 3;
+
+  // See CreateVolumeRequest.parameters.
+  // This field is OPTIONAL.
+  map<string, string> parameters = 4;
+
+  // Secrets required by plugin to complete volume validation request.
+  // This field is OPTIONAL. Refer to the `Secrets Requirements`
+  // section on how to use this field.
+  map<string, string> secrets = 5 [(csi_secret) = true];
+}
+
+message ValidateVolumeCapabilitiesResponse {
+  message Confirmed {
+    // Volume context validated by the plugin.
+    // This field is OPTIONAL.
+    map<string, string> volume_context = 1;
+
+    // Volume capabilities supported by the plugin.
+    // This field is REQUIRED.
+    repeated VolumeCapability volume_capabilities = 2;
+
+    // The volume creation parameters validated by the plugin.
+    // This field is OPTIONAL.
+    map<string, string> parameters = 3;
+  }
+
+  // Confirmed indicates to the CO the set of capabilities that the
+  // plugin has validated. This field SHALL only be set to a non-empty
+  // value for successful validation responses.
+  // For successful validation responses, the CO SHALL compare the
+  // fields of this message to the originally requested capabilities in
+  // order to guard against an older plugin reporting "valid" for newer
+  // capability fields that it does not yet understand.
+  // This field is OPTIONAL.
+  Confirmed confirmed = 1;
+
+  // Message to the CO if `confirmed` above is empty. This field is
+  // OPTIONAL.
+  // An empty string is equal to an unspecified field value.
+  string message = 2;
+}
+message ListVolumesRequest {
+  // If specified (non-zero value), the Plugin MUST NOT return more
+  // entries than this number in the response. If the actual number of
+  // entries is more than this number, the Plugin MUST set `next_token`
+  // in the response which can be used to get the next page of entries
+  // in the subsequent `ListVolumes` call. This field is OPTIONAL. If
+  // not specified (zero value), it means there is no restriction on the
+  // number of entries that can be returned.
+  // The value of this field MUST NOT be negative.
+  int32 max_entries = 1;
+
+  // A token to specify where to start paginating. Set this field to
+  // `next_token` returned by a previous `ListVolumes` call to get the
+  // next page of entries. This field is OPTIONAL.
+  // An empty string is equal to an unspecified field value.
+  string starting_token = 2;
+}
+
+message ListVolumesResponse {
+  message Entry {
+    Volume volume = 1;
+  }
+
+  repeated Entry entries = 1;
+
+  // This token allows you to get the next page of entries for
+  // `ListVolumes` request. If the number of entries is larger than
+  // `max_entries`, use the `next_token` as a value for the
+  // `starting_token` field in the next `ListVolumes` request. This
+  // field is OPTIONAL.
+  // An empty string is equal to an unspecified field value.
+  string next_token = 2;
+}
+message GetCapacityRequest {
+  // If specified, the Plugin SHALL report the capacity of the storage
+  // that can be used to provision volumes that satisfy ALL of the
+  // specified `volume_capabilities`. These are the same
+  // `volume_capabilities` the CO will use in `CreateVolumeRequest`.
+  // This field is OPTIONAL.
+  repeated VolumeCapability volume_capabilities = 1;
+
+  // If specified, the Plugin SHALL report the capacity of the storage
+  // that can be used to provision volumes with the given Plugin
+  // specific `parameters`. These are the same `parameters` the CO will
+  // use in `CreateVolumeRequest`. This field is OPTIONAL.
+  map<string, string> parameters = 2;
+
+  // If specified, the Plugin SHALL report the capacity of the storage
+  // that can be used to provision volumes that in the specified
+  // `accessible_topology`. This is the same as the
+  // `accessible_topology` the CO returns in a `CreateVolumeResponse`.
+  // This field is OPTIONAL. This field SHALL NOT be set unless the
+  // plugin advertises the VOLUME_ACCESSIBILITY_CONSTRAINTS capability.
+  Topology accessible_topology = 3;
+}
+
+message GetCapacityResponse {
+  // The available capacity, in bytes, of the storage that can be used
+  // to provision volumes. If `volume_capabilities` or `parameters` is
+  // specified in the request, the Plugin SHALL take those into
+  // consideration when calculating the available capacity of the
+  // storage. This field is REQUIRED.
+  // The value of this field MUST NOT be negative.
+  int64 available_capacity = 1;
+}
+message ControllerGetCapabilitiesRequest {
+  // Intentionally empty.
+}
+
+message ControllerGetCapabilitiesResponse {
+  // All the capabilities that the controller service supports. This
+  // field is OPTIONAL.
+  repeated ControllerServiceCapability capabilities = 1;
+}
+
+// Specifies a capability of the controller service.
+message ControllerServiceCapability {
+  message RPC {
+    enum Type {
+      UNKNOWN = 0;
+      CREATE_DELETE_VOLUME = 1;
+      PUBLISH_UNPUBLISH_VOLUME = 2;
+      LIST_VOLUMES = 3;
+      GET_CAPACITY = 4;
+      // Currently the only way to consume a snapshot is to create
+      // a volume from it. Therefore plugins supporting
+      // CREATE_DELETE_SNAPSHOT MUST support creating volume from
+      // snapshot.
+      CREATE_DELETE_SNAPSHOT = 5;
+      LIST_SNAPSHOTS = 6;
+
+      // Plugins supporting volume cloning at the storage level MAY
+      // report this capability. The source volume MUST be managed by
+      // the same plugin. Not all volume sources and parameters
+      // combinations MAY work.
+      CLONE_VOLUME = 7;
+
+      // Indicates the SP supports ControllerPublishVolume.readonly
+      // field.
+      PUBLISH_READONLY = 8;
+
+      // See VolumeExpansion for details.
+      EXPAND_VOLUME = 9;
+    }
+
+    Type type = 1;
+  }
+
+  oneof type {
+    // RPC that the controller supports.
+    RPC rpc = 1;
+  }
+}
+message CreateSnapshotRequest {
+  // The ID of the source volume to be snapshotted.
+  // This field is REQUIRED.
+  string source_volume_id = 1;
+
+  // The suggested name for the snapshot. This field is REQUIRED for
+  // idempotency.
+  // Any Unicode string that conforms to the length limit is allowed
+  // except those containing the following banned characters:
+  // U+0000-U+0008, U+000B, U+000C, U+000E-U+001F, U+007F-U+009F.
+  // (These are control characters other than commonly used whitespace.)
+  string name = 2;
+
+  // Secrets required by plugin to complete snapshot creation request.
+  // This field is OPTIONAL. Refer to the `Secrets Requirements`
+  // section on how to use this field.
+  map<string, string> secrets = 3 [(csi_secret) = true];
+
+  // Plugin specific parameters passed in as opaque key-value pairs.
+  // This field is OPTIONAL. The Plugin is responsible for parsing and
+  // validating these parameters. COs will treat these as opaque.
+  // Use cases for opaque parameters:
+  // - Specify a policy to automatically clean up the snapshot.
+  // - Specify an expiration date for the snapshot.
+  // - Specify whether the snapshot is readonly or read/write.
+  // - Specify if the snapshot should be replicated to some place.
+  // - Specify primary or secondary for replication systems that
+  //   support snapshotting only on primary.
+  map<string, string> parameters = 4;
+}
+
+message CreateSnapshotResponse {
+  // Contains all attributes of the newly created snapshot that are
+  // relevant to the CO along with information required by the Plugin
+  // to uniquely identify the snapshot. This field is REQUIRED.
+  Snapshot snapshot = 1;
+}
+
+// Information about a specific snapshot.
+message Snapshot {
+  // This is the complete size of the snapshot in bytes. The purpose of
+  // this field is to give CO guidance on how much space is needed to
+  // create a volume from this snapshot. The size of the volume MUST NOT
+  // be less than the size of the source snapshot. This field is
+  // OPTIONAL. If this field is not set, it indicates that this size is
+  // unknown. The value of this field MUST NOT be negative and a size of
+  // zero means it is unspecified.
+  int64 size_bytes = 1;
+
+  // The identifier for this snapshot, generated by the plugin.
+  // This field is REQUIRED.
+  // This field MUST contain enough information to uniquely identify
+  // this specific snapshot vs all other snapshots supported by this
+  // plugin.
+  // This field SHALL be used by the CO in subsequent calls to refer to
+  // this snapshot.
+  // The SP is NOT responsible for global uniqueness of snapshot_id
+  // across multiple SPs.
+  string snapshot_id = 2;
+
+  // Identity information for the source volume. Note that creating a
+  // snapshot from a snapshot is not supported here so the source has to
+  // be a volume. This field is REQUIRED.
+  string source_volume_id = 3;
+
+  // Timestamp when the point-in-time snapshot is taken on the storage
+  // system. This field is REQUIRED.
+  .google.protobuf.Timestamp creation_time = 4;
+
+  // Indicates if a snapshot is ready to use as a
+  // `volume_content_source` in a `CreateVolumeRequest`. The default
+  // value is false. This field is REQUIRED.
+  bool ready_to_use = 5;
+}
+message DeleteSnapshotRequest {
+  // The ID of the snapshot to be deleted.
+  // This field is REQUIRED.
+  string snapshot_id = 1;
+
+  // Secrets required by plugin to complete snapshot deletion request.
+  // This field is OPTIONAL. Refer to the `Secrets Requirements`
+  // section on how to use this field.
+  map<string, string> secrets = 2 [(csi_secret) = true];
+}
+
+message DeleteSnapshotResponse {}
+// List all snapshots on the storage system regardless of how they were
+// created.
+message ListSnapshotsRequest {
+  // If specified (non-zero value), the Plugin MUST NOT return more
+  // entries than this number in the response. If the actual number of
+  // entries is more than this number, the Plugin MUST set `next_token`
+  // in the response which can be used to get the next page of entries
+  // in the subsequent `ListSnapshots` call. This field is OPTIONAL. If
+  // not specified (zero value), it means there is no restriction on the
+  // number of entries that can be returned.
+  // The value of this field MUST NOT be negative.
+  int32 max_entries = 1;
+
+  // A token to specify where to start paginating. Set this field to
+  // `next_token` returned by a previous `ListSnapshots` call to get the
+  // next page of entries. This field is OPTIONAL.
+  // An empty string is equal to an unspecified field value.
+  string starting_token = 2;
+
+  // Identity information for the source volume. This field is OPTIONAL.
+  // It can be used to list snapshots by volume.
+  string source_volume_id = 3;
+
+  // Identity information for a specific snapshot. This field is
+  // OPTIONAL. It can be used to list only a specific snapshot.
+  // ListSnapshots will return with current snapshot information
+  // and will not block if the snapshot is being processed after
+  // it is cut.
+  string snapshot_id = 4;
+}
+
+message ListSnapshotsResponse {
+  message Entry {
+    Snapshot snapshot = 1;
+  }
+
+  repeated Entry entries = 1;
+
+  // This token allows you to get the next page of entries for
+  // `ListSnapshots` request. If the number of entries is larger than
+  // `max_entries`, use the `next_token` as a value for the
+  // `starting_token` field in the next `ListSnapshots` request. This
+  // field is OPTIONAL.
+  // An empty string is equal to an unspecified field value.
+  string next_token = 2;
+}
+message ControllerExpandVolumeRequest {
+  // The ID of the volume to expand. This field is REQUIRED.
+  string volume_id = 1;
+
+  // This allows CO to specify the capacity requirements of the volume
+  // after expansion. This field is REQUIRED.
+  CapacityRange capacity_range = 2;
+
+  // Secrets required by the plugin for expanding the volume.
+  // This field is OPTIONAL.
+  map<string, string> secrets = 3 [(csi_secret) = true];
+}
+
+message ControllerExpandVolumeResponse {
+  // Capacity of volume after expansion. This field is REQUIRED.
+  int64 capacity_bytes = 1;
+
+  // Whether node expansion is required for the volume. When true
+  // the CO MUST make NodeExpandVolume RPC call on the node. This field
+  // is REQUIRED.
+  bool node_expansion_required = 2;
+}
+message NodeStageVolumeRequest {
+  // The ID of the volume to publish. This field is REQUIRED.
+  string volume_id = 1;
+
+  // The CO SHALL set this field to the value returned by
+  // `ControllerPublishVolume` if the corresponding Controller Plugin
+  // has `PUBLISH_UNPUBLISH_VOLUME` controller capability, and SHALL be
+  // left unset if the corresponding Controller Plugin does not have
+  // this capability. This is an OPTIONAL field.
+  map<string, string> publish_context = 2;
+
+  // The path to which the volume MAY be staged. It MUST be an
+  // absolute path in the root filesystem of the process serving this
+  // request, and MUST be a directory. The CO SHALL ensure that there
+  // is only one `staging_target_path` per volume. The CO SHALL ensure
+  // that the path is directory and that the process serving the
+  // request has `read` and `write` permission to that directory. The
+  // CO SHALL be responsible for creating the directory if it does not
+  // exist.
+  // This is a REQUIRED field.
+  string staging_target_path = 3;
+
+  // Volume capability describing how the CO intends to use this volume.
+  // SP MUST ensure the CO can use the staged volume as described.
+  // Otherwise SP MUST return the appropriate gRPC error code.
+  // This is a REQUIRED field.
+  VolumeCapability volume_capability = 4;
+
+  // Secrets required by plugin to complete node stage volume request.
+  // This field is OPTIONAL. Refer to the `Secrets Requirements`
+  // section on how to use this field.
+  map<string, string> secrets = 5 [(csi_secret) = true];
+
+  // Volume context as returned by CO in CreateVolumeRequest. This field
+  // is OPTIONAL and MUST match the volume_context of the volume
+  // identified by `volume_id`.
+  map<string, string> volume_context = 6;
+}
+
+message NodeStageVolumeResponse {
+  // Intentionally empty.
+}
+message NodeUnstageVolumeRequest {
+  // The ID of the volume. This field is REQUIRED.
+  string volume_id = 1;
+
+  // The path at which the volume was staged. It MUST be an absolute
+  // path in the root filesystem of the process serving this request.
+  // This is a REQUIRED field.
+  string staging_target_path = 2;
+}
+
+message NodeUnstageVolumeResponse {
+  // Intentionally empty.
+}
+message NodePublishVolumeRequest {
+  // The ID of the volume to publish. This field is REQUIRED.
+  string volume_id = 1;
+
+  // The CO SHALL set this field to the value returned by
+  // `ControllerPublishVolume` if the corresponding Controller Plugin
+  // has `PUBLISH_UNPUBLISH_VOLUME` controller capability, and SHALL be
+  // left unset if the corresponding Controller Plugin does not have
+  // this capability. This is an OPTIONAL field.
+  map<string, string> publish_context = 2;
+
+  // The path to which the volume was staged by `NodeStageVolume`.
+  // It MUST be an absolute path in the root filesystem of the process
+  // serving this request.
+  // It MUST be set if the Node Plugin implements the
+  // `STAGE_UNSTAGE_VOLUME` node capability.
+  // This is an OPTIONAL field.
+  string staging_target_path = 3;
+
+  // The path to which the volume will be published. It MUST be an
+  // absolute path in the root filesystem of the process serving this
+  // request. The CO SHALL ensure uniqueness of target_path per volume.
+  // The CO SHALL ensure that the parent directory of this path exists
+  // and that the process serving the request has `read` and `write`
+  // permissions to that parent directory.
+  // For volumes with an access type of block, the SP SHALL place the
+  // block device at target_path.
+  // For volumes with an access type of mount, the SP SHALL place the
+  // mounted directory at target_path.
+  // Creation of target_path is the responsibility of the SP.
+  // This is a REQUIRED field.
+  string target_path = 4;
+
+  // Volume capability describing how the CO intends to use this volume.
+  // SP MUST ensure the CO can use the published volume as described.
+  // Otherwise SP MUST return the appropriate gRPC error code.
+  // This is a REQUIRED field.
+  VolumeCapability volume_capability = 5;
+
+  // Indicates SP MUST publish the volume in readonly mode.
+  // This field is REQUIRED.
+  bool readonly = 6;
+
+  // Secrets required by plugin to complete node publish volume request.
+  // This field is OPTIONAL. Refer to the `Secrets Requirements`
+  // section on how to use this field.
+  map<string, string> secrets = 7 [(csi_secret) = true];
+
+  // Volume context as returned by CO in CreateVolumeRequest. This field
+  // is OPTIONAL and MUST match the volume_context of the volume
+  // identified by `volume_id`.
+  map<string, string> volume_context = 8;
+}
+
+message NodePublishVolumeResponse {
+  // Intentionally empty.
+}
+message NodeUnpublishVolumeRequest {
+  // The ID of the volume. This field is REQUIRED.
+  string volume_id = 1;
+
+  // The path at which the volume was published. It MUST be an absolute
+  // path in the root filesystem of the process serving this request.
+  // The SP MUST delete the file or directory it created at this path.
+  // This is a REQUIRED field.
+  string target_path = 2;
+}
+
+message NodeUnpublishVolumeResponse {
+  // Intentionally empty.
+}
+message NodeGetVolumeStatsRequest {
+  // The ID of the volume. This field is REQUIRED.
+  string volume_id = 1;
+
+  // It can be any valid path where volume was previously
+  // staged or published.
+  // It MUST be an absolute path in the root filesystem of
+  // the process serving this request.
+  // This is a REQUIRED field.
+  string volume_path = 2;
+}
+
+message NodeGetVolumeStatsResponse {
+  // This field is OPTIONAL.
+  repeated VolumeUsage usage = 1;
+}
+
+message VolumeUsage {
+  enum Unit {
+    UNKNOWN = 0;
+    BYTES = 1;
+    INODES = 2;
+  }
+  // The available capacity in specified Unit. This field is OPTIONAL.
+  // The value of this field MUST NOT be negative.
+  int64 available = 1;
+
+  // The total capacity in specified Unit. This field is REQUIRED.
+  // The value of this field MUST NOT be negative.
+  int64 total = 2;
+
+  // The used capacity in specified Unit. This field is OPTIONAL.
+  // The value of this field MUST NOT be negative.
+  int64 used = 3;
+
+  // Units by which values are measured. This field is REQUIRED.
+  Unit unit = 4;
+}
+message NodeGetCapabilitiesRequest {
+  // Intentionally empty.
+}
+
+message NodeGetCapabilitiesResponse {
+  // All the capabilities that the node service supports. This field
+  // is OPTIONAL.
+  repeated NodeServiceCapability capabilities = 1;
+}
+
+// Specifies a capability of the node service.
+message NodeServiceCapability {
+  message RPC {
+    enum Type {
+      UNKNOWN = 0;
+      STAGE_UNSTAGE_VOLUME = 1;
+      // If Plugin implements GET_VOLUME_STATS capability
+      // then it MUST implement NodeGetVolumeStats RPC
+      // call for fetching volume statistics.
+      GET_VOLUME_STATS = 2;
+      // See VolumeExpansion for details.
+      EXPAND_VOLUME = 3;
+    }
+
+    Type type = 1;
+  }
+
+  oneof type {
+    // RPC that the controller supports.
+    RPC rpc = 1;
+  }
+}
+message NodeGetInfoRequest {
+}
+
+message NodeGetInfoResponse {
+  // The identifier of the node as understood by the SP.
+  // This field is REQUIRED.
+  // This field MUST contain enough information to uniquely identify
+  // this specific node vs all other nodes supported by this plugin.
+  // This field SHALL be used by the CO in subsequent calls, including
+  // `ControllerPublishVolume`, to refer to this node.
+  // The SP is NOT responsible for global uniqueness of node_id across
+  // multiple SPs.
+  string node_id = 1;
+
+  // Maximum number of volumes that controller can publish to the node.
+  // If value is not set or zero CO SHALL decide how many volumes of
+  // this type can be published by the controller to the node. The
+  // plugin MUST NOT set negative values here.
+  // This field is OPTIONAL.
+  int64 max_volumes_per_node = 2;
+
+  // Specifies where (regions, zones, racks, etc.) the node is
+  // accessible from.
+  // A plugin that returns this field MUST also set the
+  // VOLUME_ACCESSIBILITY_CONSTRAINTS plugin capability.
+  // COs MAY use this information along with the topology information
+  // returned in CreateVolumeResponse to ensure that a given volume is
+  // accessible from a given node when scheduling workloads.
+  // This field is OPTIONAL. If it is not specified, the CO MAY assume
+  // the node is not subject to any topological constraint, and MAY
+  // schedule workloads that reference any volume V, such that there are
+  // no topological constraints declared for V.
+  //
+  // Example 1:
+  //   accessible_topology =
+  //     {"region": "R1", "zone": "R2"}
+  // Indicates the node exists within the "region" "R1" and the "zone"
+  // "Z2".
+  Topology accessible_topology = 3;
+}
+message NodeExpandVolumeRequest {
+  // The ID of the volume. This field is REQUIRED.
+  string volume_id = 1;
+
+  // The path on which volume is available. This field is REQUIRED.
+  string volume_path = 2;
+
+  // This allows CO to specify the capacity requirements of the volume
+  // after expansion. If capacity_range is omitted then a plugin MAY
+  // inspect the file system of the volume to determine the maximum
+  // capacity to which the volume can be expanded. In such cases a
+  // plugin MAY expand the volume to its maximum capacity.
+  // This field is OPTIONAL.
+  CapacityRange capacity_range = 3;
+}
+
+message NodeExpandVolumeResponse {
+  // The capacity of the volume in bytes. This field is OPTIONAL.
+  int64 capacity_bytes = 1;
+}

+ 34 - 0
hadoop-ozone/dist/pom.xml

@@ -68,6 +68,13 @@
                   <classifier>classpath</classifier>
                   <destFileName>hadoop-ozone-s3gateway.classpath</destFileName>
                 </artifactItem>
+                <artifactItem>
+                  <groupId>org.apache.hadoop</groupId>
+                  <artifactId>hadoop-ozone-csi</artifactId>
+                  <version>${ozone.version}</version>
+                  <classifier>classpath</classifier>
+                  <destFileName>hadoop-ozone-csi.classpath</destFileName>
+                </artifactItem>
                 <artifactItem>
                   <groupId>org.apache.hadoop</groupId>
                   <artifactId>hadoop-ozone-ozone-manager</artifactId>
@@ -133,6 +140,29 @@
               <includeScope>runtime</includeScope>
             </configuration>
           </execution>
+          <execution>
+            <id>copy-omitted-jars</id>
+            <phase>prepare-package</phase>
+            <goals>
+              <goal>copy</goal>
+            </goals>
+            <configuration>
+              <outputDirectory>target/ozone-${ozone.version}/share/ozone/lib
+              </outputDirectory>
+              <artifactItems>
+                <artifactItem>
+                  <groupId>com.google.protobuf</groupId>
+                  <artifactId>protobuf-java</artifactId>
+                  <version>3.5.1</version>
+                </artifactItem>
+                <artifactItem>
+                  <groupId>com.google.guava</groupId>
+                  <artifactId>guava</artifactId>
+                  <version>26.0-android</version>
+                </artifactItem>
+              </artifactItems>
+            </configuration>
+          </execution>
         </executions>
       </plugin>
       <plugin>
@@ -247,6 +277,10 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-ozone-s3gateway</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-ozone-csi</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-ozone-ozone-manager</artifactId>

+ 4 - 0
hadoop-ozone/dist/src/main/Dockerfile

@@ -19,3 +19,7 @@ FROM apache/hadoop-runner:jdk11
 ADD --chown=hadoop . /opt/hadoop
 
 WORKDIR /opt/hadoop
+
+RUN sudo wget https://os.anzix.net/goofys -O /usr/bin/goofys
+RUN sudo chmod 755 /usr/bin/goofys
+RUN sudo yum install -y fuse

+ 4 - 0
hadoop-ozone/integration-test/pom.xml

@@ -56,6 +56,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-ozone-s3gateway</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-ozone-csi</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-ozone-recon</artifactId>

+ 1 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java

@@ -52,4 +52,4 @@ public class TestOzoneConfigurationFields extends TestConfigurationFieldsBase {
     configurationPropsToSkipCompare.add(OzoneConfigKeys.
         OZONE_S3_TOKEN_MAX_LIFETIME_KEY);
   }
-}
+}

+ 11 - 0
hadoop-ozone/pom.xml

@@ -52,6 +52,7 @@
     <module>ozone-recon</module>
     <module>ozone-recon-codegen</module>
     <module>upgrade</module>
+    <module>csi</module>
   </modules>
 
   <repositories>
@@ -89,6 +90,11 @@
         <artifactId>hadoop-ozone-s3gateway</artifactId>
         <version>${ozone.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-ozone-csi</artifactId>
+        <version>${ozone.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-ozone-datanode</artifactId>
@@ -114,6 +120,11 @@
         <artifactId>hadoop-ozone-filesystem-lib-legacy</artifactId>
         <version>${ozone.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-hdds-config</artifactId>
+        <version>${hdds.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-ozone-integration-test</artifactId>