Browse Source

HDFS-13017. Block Storage: implement simple iscsi discovery in jscsi server. Contributed by Elek, Marton.

Mukul Kumar Singh 7 years ago
parent
commit
c908b1ea2d

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockManager.java

@@ -246,6 +246,11 @@ public class CBlockManager implements CBlockServiceProtocol,
     return storageManager.isVolumeValid(userName, volumeName);
   }
 
+  @Override
+  public List<VolumeInfo> listVolumes() throws IOException {
+    return listVolume(null);
+  }
+
   @Override
   public synchronized void createVolume(String userName, String volumeName,
       long volumeSize, int blockSize) throws IOException {

+ 38 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockClientProtocolClientSideTranslatorPB.java

@@ -20,9 +20,21 @@ package org.apache.hadoop.cblock.jscsiHelper;
 import com.google.common.primitives.Longs;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.cblock.exception.CBlockException;
+import org.apache.hadoop.cblock.meta.VolumeInfo;
 import org.apache.hadoop.cblock.proto.CBlockClientProtocol;
 import org.apache.hadoop.cblock.proto.MountVolumeResponse;
-import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos;
+import org.apache.hadoop.cblock.protocol.proto
+    .CBlockClientServerProtocolProtos.ContainerIDProto;
+import org.apache.hadoop.cblock.protocol.proto
+    .CBlockClientServerProtocolProtos.ListVolumesRequestProto;
+import org.apache.hadoop.cblock.protocol.proto
+    .CBlockClientServerProtocolProtos.ListVolumesResponseProto;
+import org.apache.hadoop.cblock.protocol.proto
+    .CBlockClientServerProtocolProtos.MountVolumeRequestProto;
+import org.apache.hadoop.cblock.protocol.proto
+    .CBlockClientServerProtocolProtos.MountVolumeResponseProto;
+import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos
+    .VolumeInfoProto;
 import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolPB;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolTranslator;
@@ -69,14 +81,14 @@ public class CBlockClientProtocolClientSideTranslatorPB
   @Override
   public MountVolumeResponse mountVolume(
       String userName, String volumeName) throws IOException {
-    CBlockClientServerProtocolProtos.MountVolumeRequestProto.Builder
+    MountVolumeRequestProto.Builder
         request
-        = CBlockClientServerProtocolProtos.MountVolumeRequestProto
+        = MountVolumeRequestProto
         .newBuilder();
     request.setUserName(userName);
     request.setVolumeName(volumeName);
     try {
-      CBlockClientServerProtocolProtos.MountVolumeResponseProto resp
+      MountVolumeResponseProto resp
           = rpcProxy.mountVolume(null, request.build());
       if (!resp.getIsValid()) {
         throw new CBlockException(
@@ -87,7 +99,7 @@ public class CBlockClientProtocolClientSideTranslatorPB
       if (resp.getAllContainerIDsList().size() == 0) {
         throw new CBlockException("Mount volume request returned no container");
       }
-      for (CBlockClientServerProtocolProtos.ContainerIDProto containerID :
+      for (ContainerIDProto containerID :
           resp.getAllContainerIDsList()) {
         if (containerID.hasPipeline()) {
           // it should always have a pipeline only except for tests.
@@ -111,4 +123,25 @@ public class CBlockClientProtocolClientSideTranslatorPB
       throw ProtobufHelper.getRemoteException(e);
     }
   }
+
+  @Override
+  public List<VolumeInfo> listVolumes() throws IOException {
+    try {
+      List<VolumeInfo> result = new ArrayList<>();
+      ListVolumesResponseProto
+          listVolumesResponseProto = this.rpcProxy.listVolumes(null,
+          ListVolumesRequestProto.newBuilder()
+              .build());
+      for (VolumeInfoProto volumeInfoProto :
+          listVolumesResponseProto
+          .getVolumeEntryList()) {
+        result.add(new VolumeInfo(volumeInfoProto.getUserName(),
+            volumeInfoProto.getVolumeName(), volumeInfoProto.getVolumeSize(),
+            volumeInfoProto.getBlockSize()));
+      }
+      return result;
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
 }

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockManagerHandler.java

@@ -17,9 +17,11 @@
  */
 package org.apache.hadoop.cblock.jscsiHelper;
 
+import org.apache.hadoop.cblock.meta.VolumeInfo;
 import org.apache.hadoop.cblock.proto.MountVolumeResponse;
 
 import java.io.IOException;
+import java.util.List;
 
 /**
  * This class is the handler of CBlockManager used by target server
@@ -41,4 +43,8 @@ public class CBlockManagerHandler {
       String userName, String volumeName) throws IOException {
     return handler.mountVolume(userName, volumeName);
   }
+
+  public List<VolumeInfo> listVolumes() throws IOException {
+    return handler.listVolumes();
+  }
 }

+ 17 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetServer.java

@@ -104,6 +104,23 @@ public final class CBlockTargetServer extends TargetServer {
     return targets.containsKey(checkTargetName);
   }
 
+  @Override
+  public String[] getTargetNames() {
+    try {
+      if (cBlockManagerHandler != null) {
+        return cBlockManagerHandler.listVolumes().
+            stream().map(
+              volumeInfo -> volumeInfo.getUserName() + ":" + volumeInfo
+                .getVolumeName()).toArray(String[]::new);
+      } else {
+        return new String[0];
+      }
+    } catch (IOException e) {
+      LOGGER.error("Can't list existing volumes", e);
+      return new String[0];
+    }
+  }
+
   @VisibleForTesting
   public HashMap<String, Target> getTargets() {
     return targets;

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/proto/CBlockClientProtocol.java

@@ -17,7 +17,10 @@
  */
 package org.apache.hadoop.cblock.proto;
 
+import org.apache.hadoop.cblock.meta.VolumeInfo;
+
 import java.io.IOException;
+import java.util.List;
 
 /**
  * The protocol that CBlock client side uses to talk to server side. CBlock
@@ -30,4 +33,6 @@ import java.io.IOException;
 public interface CBlockClientProtocol {
   MountVolumeResponse mountVolume(String userName, String volumeName)
       throws IOException;
+
+  List<VolumeInfo> listVolumes() throws IOException;
 }

+ 30 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/protocolPB/CBlockClientServerProtocolServerSideTranslatorPB.java

@@ -19,15 +19,18 @@ package org.apache.hadoop.cblock.protocolPB;
 
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
+import org.apache.hadoop.cblock.meta.VolumeInfo;
 import org.apache.hadoop.cblock.proto.CBlockClientProtocol;
 import org.apache.hadoop.cblock.proto.MountVolumeResponse;
 import org.apache.hadoop.cblock.protocol.proto.CBlockClientServerProtocolProtos;
+import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * The server side implementation of cblock client to server protocol.
@@ -83,4 +86,31 @@ public class CBlockClientServerProtocolServerSideTranslatorPB implements
     }
     return resp.build();
   }
+
+  @Override
+  public CBlockClientServerProtocolProtos.ListVolumesResponseProto listVolumes(
+      RpcController controller,
+      CBlockClientServerProtocolProtos.ListVolumesRequestProto request)
+      throws ServiceException {
+    try {
+      CBlockClientServerProtocolProtos.ListVolumesResponseProto.Builder resp =
+          CBlockClientServerProtocolProtos.ListVolumesResponseProto
+              .newBuilder();
+      List<VolumeInfo> volumeInfos = impl.listVolumes();
+      List<CBlockServiceProtocolProtos.VolumeInfoProto> convertedInfos =
+          volumeInfos.stream().map(
+              volumeInfo -> CBlockServiceProtocolProtos.VolumeInfoProto
+                  .newBuilder().setUserName(volumeInfo.getUserName())
+                  .setBlockSize(volumeInfo.getBlockSize())
+                  .setVolumeName(volumeInfo.getVolumeName())
+                  .setVolumeSize(volumeInfo.getVolumeSize())
+                  .setUsage(volumeInfo.getUsage()).build())
+              .collect(Collectors.toList());
+      resp.addAllVolumeEntry(convertedInfos);
+      return resp.build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
 }

+ 14 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/CBlockClientServerProtocol.proto

@@ -28,7 +28,7 @@ option java_generate_equals_and_hash = true;
 package hadoop.cblock;
 
 import "Ozone.proto";
-
+import "CBlockServiceProtocol.proto";
 /**
 * This message is sent from CBlock client side to CBlock server to
 * mount a volume specified by owner name and volume name.
@@ -72,9 +72,22 @@ message ContainerIDProto {
     optional hadoop.hdfs.ozone.Pipeline pipeline = 3;
 }
 
+
+message ListVolumesRequestProto {
+
+}
+
+message ListVolumesResponseProto {
+    repeated VolumeInfoProto volumeEntry = 1;
+}
+
+
 service CBlockClientServerProtocolService {
     /**
     * mount the volume.
     */
     rpc mountVolume(MountVolumeRequestProto) returns (MountVolumeResponseProto);
+
+    rpc listVolumes(ListVolumesRequestProto) returns(ListVolumesResponseProto);
+
 }

+ 51 - 7
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestCBlockServer.java

@@ -22,12 +22,13 @@ import org.apache.hadoop.cblock.meta.VolumeInfo;
 import org.apache.hadoop.scm.client.ScmClient;
 import org.apache.hadoop.cblock.util.MockStorageClient;
 import org.apache.hadoop.conf.OzoneConfiguration;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import static org.apache.hadoop.cblock.CBlockConfigKeys
     .DFS_CBLOCK_JSCSIRPC_ADDRESS_KEY;
@@ -45,8 +46,8 @@ public class TestCBlockServer {
   private static CBlockManager cBlockManager;
   private static OzoneConfiguration conf;
 
-  @BeforeClass
-  public static void setup() throws Exception {
+  @Before
+  public void setup() throws Exception {
     ScmClient storageClient = new MockStorageClient();
     conf = new OzoneConfiguration();
     conf.set(DFS_CBLOCK_SERVICERPC_ADDRESS_KEY, "127.0.0.1:0");
@@ -55,8 +56,8 @@ public class TestCBlockServer {
     cBlockManager.start();
   }
 
-  @AfterClass
-  public static void clean() {
+  @After
+  public void clean() {
     cBlockManager.stop();
     cBlockManager.join();
     cBlockManager.clean();
@@ -152,7 +153,7 @@ public class TestCBlockServer {
     }
     List<VolumeInfo> volumes = cBlockManager.listVolume(userName);
     assertEquals(volumeNum, volumes.size());
-    HashSet<String> volumeIds = new HashSet<>();
+    Set<String> volumeIds = new HashSet<>();
     for (int i = 0; i<volumeNum; i++) {
       VolumeInfo volumeInfo = volumes.get(i);
       assertEquals(userName, volumeInfo.getUserName());
@@ -165,4 +166,47 @@ public class TestCBlockServer {
       assertTrue(volumeIds.contains(volumeName + i));
     }
   }
+
+  /**
+   * Test listing a number of volumes.
+   * @throws Exception
+   */
+  @Test
+  public void testListVolumes() throws Exception {
+    String volumeName ="volume" +  RandomStringUtils.randomNumeric(5);
+    long volumeSize = 1L*1024*1024;
+    int blockSize = 4096;
+    int volumeNum = 100;
+    int userCount = 10;
+
+    assertTrue("We need at least one volume for each user",
+        userCount < volumeNum);
+
+    for (int i = 0; i<volumeNum; i++) {
+      String userName =
+          "user-" + (i % userCount);
+      cBlockManager.createVolume(userName, volumeName + i,
+          volumeSize, blockSize);
+    }
+    List<VolumeInfo> allVolumes = cBlockManager.listVolumes();
+    //check if we have the volumes from all the users.
+
+    Set<String> volumeIds = new HashSet<>();
+    Set<String> usernames = new HashSet<>();
+    for (int i = 0; i < allVolumes.size(); i++) {
+      VolumeInfo volumeInfo = allVolumes.get(i);
+      assertFalse(volumeIds.contains(volumeName + i));
+      usernames.add(volumeInfo.getUserName());
+      volumeIds.add(volumeName + i);
+      assertEquals(volumeSize, volumeInfo.getVolumeSize());
+      assertEquals(blockSize, volumeInfo.getBlockSize());
+    }
+
+    assertEquals(volumeNum, volumeIds.size());
+    for (int i = 0; i<volumeNum; i++) {
+      assertTrue(volumeIds.contains(volumeName + i));
+    }
+
+    assertEquals(userCount, usernames.size());
+  }
 }