ソースを参照

HDFS-12740. SCM should support a RPC to share the cluster Id with KSM and DataNodes. Contributed by Shashikant Banerjee.

Nanda kumar 7 年 前
コミット
379e85f305

+ 81 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmInfo.java

@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.scm;
+
+/**
+ * ScmInfo wraps the result returned from SCM#getScmInfo which
+ * contains clusterId and the SCM Id.
+ */
+public final class ScmInfo {
+  private String clusterId;
+  private String scmId;
+
+  /**
+   * Builder for ScmInfo.
+   */
+  public static class Builder {
+    private String clusterId;
+    private String scmId;
+
+    /**
+     * sets the cluster id.
+     * @param cid clusterId to be set
+     * @return Builder for ScmInfo
+     */
+    public Builder setClusterId(String cid) {
+      this.clusterId = cid;
+      return this;
+    }
+
+    /**
+     * sets the scmId.
+     * @param id scmId
+     * @return Builder for scmInfo
+     */
+    public Builder setScmId(String id) {
+      this.scmId = id;
+      return this;
+    }
+
+    public ScmInfo build() {
+      return new ScmInfo(clusterId, scmId);
+    }
+  }
+
+  private ScmInfo(String clusterId, String scmId) {
+    this.clusterId = clusterId;
+    this.scmId = scmId;
+  }
+
+  /**
+   * Gets the clusterId from the Version file.
+   * @return ClusterId
+   */
+  public String getClusterId() {
+    return clusterId;
+  }
+
+  /**
+   * Gets the SCM Id from the Version file.
+   * @return SCM Id
+   */
+  public String getScmId() {
+    return scmId;
+  }
+}

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/ScmBlockLocationProtocol.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
+import org.apache.hadoop.scm.ScmInfo;
 
 /**
  * ScmBlockLocationProtocol is used by an HDFS node to find the set of nodes
@@ -63,4 +64,9 @@ public interface ScmBlockLocationProtocol {
    */
   List<DeleteBlockGroupResult>
       deleteKeyBlocks(List<BlockGroup> keyBlocksInfoList) throws IOException;
+
+  /**
+   * Gets the Clusterid and SCM Id from SCM.
+   */
+  ScmInfo getScmInfo() throws IOException;
 }

+ 24 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java

@@ -33,8 +33,11 @@ import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.Del
 import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.GetScmBlockLocationsRequestProto;
 import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.GetScmBlockLocationsResponseProto;
 import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.ScmLocatedBlockProto;
+import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.GetScmInfoRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.GetScmInfoRespsonseProto;
 import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.KeyBlocks;
 import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.scm.ScmInfo;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
@@ -175,6 +178,27 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB
     return results;
   }
 
+  /**
+   * Gets the cluster Id and Scm Id from SCM.
+   * @return ScmInfo
+   * @throws IOException
+   */
+  @Override
+  public ScmInfo getScmInfo() throws IOException {
+    GetScmInfoRequestProto request =
+        GetScmInfoRequestProto.getDefaultInstance();
+    GetScmInfoRespsonseProto resp;
+    try {
+      resp = rpcProxy.getScmInfo(NULL_RPC_CONTROLLER, request);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+    ScmInfo.Builder builder = new ScmInfo.Builder()
+        .setClusterId(resp.getClusterId())
+        .setScmId(resp.getScmId());
+    return builder.build();
+  }
+
   @Override
   public Object getUnderlyingProxyObject() {
     return rpcProxy;

+ 20 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ScmBlockLocationProtocol.proto

@@ -131,6 +131,20 @@ message AllocateScmBlockResponseProto {
   optional string errorMessage = 5;
 }
 
+/**
+ * Request for cluster Id and SCM Id from SCM.
+ */
+message GetScmInfoRequestProto {
+}
+
+/**
+ * Response from SCM for cluster Id and SCM ID.
+ */
+message GetScmInfoRespsonseProto {
+    required string clusterId = 1;
+    required string scmId = 2;
+}
+
 /**
  * Protocol used from KeySpaceManager to StorageContainerManager.
  * See request and response messages for details of the RPC calls.
@@ -156,4 +170,10 @@ service ScmBlockLocationProtocolService {
    */
   rpc deleteScmKeyBlocks(DeleteScmKeyBlocksRequestProto)
       returns (DeleteScmKeyBlocksResponseProto);
+
+  /**
+   * Gets the scmInfo from SCM.
+   */
+  rpc getScmInfo(GetScmInfoRequestProto)
+      returns (GetScmInfoRespsonseProto);
 }

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java

@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.ozone.ksm;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.BlockingService;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -42,6 +43,7 @@ import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.OzoneAclInfo;
 import org.apache.hadoop.ozone.protocolPB
     .KeySpaceManagerProtocolServerSideTranslatorPB;
+import org.apache.hadoop.scm.ScmInfo;
 import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB;
@@ -137,6 +139,10 @@ public class KeySpaceManager extends ServiceRuntimeInfoImpl
     return scmBlockLocationClient;
   }
 
+  @VisibleForTesting
+  public ScmInfo getScmInfo(OzoneConfiguration conf) throws IOException {
+    return getScmBlockClient(conf).getScmInfo();
+  }
   /**
    * Starts an RPC server, if configured.
    *

+ 20 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.DeleteKeyBlocksResultProto;
 import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.scm.ScmInfo;
 import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB;
@@ -43,6 +44,10 @@ import org.apache.hadoop.ozone.protocol.proto
     .ScmBlockLocationProtocolProtos.GetScmBlockLocationsResponseProto;
 import org.apache.hadoop.ozone.protocol.proto
     .ScmBlockLocationProtocolProtos.ScmLocatedBlockProto;
+import org.apache.hadoop.ozone.protocol.proto
+    .ScmBlockLocationProtocolProtos.GetScmInfoRequestProto;
+import org.apache.hadoop.ozone.protocol.proto
+    .ScmBlockLocationProtocolProtos.GetScmInfoRespsonseProto;
 
 import java.io.IOException;
 import java.util.List;
@@ -148,4 +153,19 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
     }
     return resp.build();
   }
+
+  @Override
+  public GetScmInfoRespsonseProto getScmInfo(RpcController controller,
+      GetScmInfoRequestProto req) throws ServiceException {
+    ScmInfo scmInfo;
+    try {
+      scmInfo = impl.getScmInfo();
+    } catch (IOException ex) {
+      throw new ServiceException(ex);
+    }
+    return GetScmInfoRespsonseProto.newBuilder()
+        .setClusterId(scmInfo.getClusterId())
+        .setScmId(scmInfo.getScmId())
+        .build();
+  }
 }

+ 7 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/SCMStorage.java

@@ -44,11 +44,11 @@ public class SCMStorage extends Storage {
     super(NodeType.SCM, OzoneUtils.getScmMetadirPath(conf), STORAGE_DIR);
   }
 
-  public void setScmUuid(String scmUuid) throws IOException {
+  public void setScmId(String scmId) throws IOException {
     if (getState() == StorageState.INITIALIZED) {
       throw new IOException("SCM is already initialized.");
     } else {
-      getStorageInfo().setProperty(SCM_ID, scmUuid);
+      getStorageInfo().setProperty(SCM_ID, scmId);
     }
   }
 
@@ -56,18 +56,18 @@ public class SCMStorage extends Storage {
    * Retrieves the SCM ID from the version file.
    * @return SCM_ID
    */
-  public String getscmUuid() {
+  public String getScmId() {
     return getStorageInfo().getProperty(SCM_ID);
   }
 
   @Override
   protected Properties getNodeProperties() {
-    String scmUuid = getscmUuid();
-    if (scmUuid == null) {
-      scmUuid = UUID.randomUUID().toString();
+    String scmId = getScmId();
+    if (scmId == null) {
+      scmId = UUID.randomUUID().toString();
     }
     Properties scmProperties = new Properties();
-    scmProperties.setProperty(SCM_ID, scmUuid);
+    scmProperties.setProperty(SCM_ID, scmId);
     return scmProperties;
   }
 

+ 11 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java

@@ -77,6 +77,7 @@ import org.apache.hadoop.ozone.scm.container.Mapping;
 import org.apache.hadoop.ozone.scm.container.placement.metrics.ContainerStat;
 import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMMetrics;
 import org.apache.hadoop.ozone.scm.exceptions.SCMException;
+import org.apache.hadoop.scm.ScmInfo;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
 import org.apache.hadoop.ozone.scm.node.SCMNodeManager;
 import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
@@ -1102,7 +1103,16 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
     return scmBlockManager.allocateBlock(size, type, factor);
   }
 
-
+  /**
+   * Get the clusterId and SCM Id from the version file in SCM.
+   */
+  @Override
+  public ScmInfo getScmInfo() throws IOException {
+    ScmInfo.Builder builder = new ScmInfo.Builder()
+        .setClusterId(scmStorage.getClusterID())
+        .setScmId(scmStorage.getScmId());
+    return builder.build();
+  }
   /**
    * Delete blocks for a set of object keys.
    *

+ 20 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneClassicCluster.java

@@ -335,6 +335,8 @@ public final class MiniOzoneClassicCluster extends MiniDFSCluster
     private Optional<Integer> hbSeconds = Optional.empty();
     private Optional<Integer> hbProcessorInterval = Optional.empty();
     private Optional<String> scmMetadataDir = Optional.empty();
+    private Optional<String> clusterId = Optional.empty();
+    private Optional<String> scmId = Optional.empty();
     private Boolean ozoneEnabled = true;
     private Boolean waitForChillModeFinish = true;
     private Boolean randomContainerPort = true;
@@ -423,6 +425,16 @@ public final class MiniOzoneClassicCluster extends MiniDFSCluster
       return this;
     }
 
+    public Builder setClusterId(String cId) {
+      clusterId = Optional.of(cId);
+      return this;
+    }
+
+    public Builder setScmId(String sId) {
+      scmId = Optional.of(sId);
+      return this;
+    }
+
     public String getPath() {
       return path;
     }
@@ -439,6 +451,7 @@ public final class MiniOzoneClassicCluster extends MiniDFSCluster
       configureTrace();
       configureSCMheartbeat();
       configScmMetadata();
+      initializeScm();
 
       conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
       conf.set(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
@@ -456,8 +469,6 @@ public final class MiniOzoneClassicCluster extends MiniDFSCluster
       conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
           randomContainerPort);
 
-      SCMStorage scmStorage = new SCMStorage(conf);
-      scmStorage.initialize();
       StorageContainerManager scm = StorageContainerManager.createSCM(
           null, conf);
       scm.start();
@@ -513,6 +524,13 @@ public final class MiniOzoneClassicCluster extends MiniDFSCluster
           scmPath.toString() + "/datanode.id");
     }
 
+    private void initializeScm() throws IOException {
+      SCMStorage scmStore = new SCMStorage(conf);
+      scmStore.setClusterId(clusterId.orElse(runID.toString()));
+      scmStore.setScmId(scmId.orElse(UUID.randomUUID().toString()));
+      scmStore.initialize();
+    }
+
     private void configureHandler() {
       conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, this.ozoneEnabled);
       if (!ozoneHandlerType.isPresent()) {

+ 23 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.ozone.scm.block.SCMBlockDeletingService;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
 import org.apache.hadoop.scm.XceiverClientManager;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.ScmInfo;
 import org.junit.Rule;
 import org.junit.Assert;
 import org.junit.Test;
@@ -396,4 +397,26 @@ public class TestStorageContainerManager {
     Assert.assertEquals(OzoneConsts.NodeType.SCM, scmStore.getNodeType());
     Assert.assertNotEquals("testClusterId", scmStore.getClusterID());
   }
+
+  @Test
+  public void testScmInfo() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    final String path =
+        GenericTestUtils.getTempPath(UUID.randomUUID().toString());
+    Path scmPath = Paths.get(path, "scm-meta");
+    conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
+    conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
+    SCMStorage scmStore = new SCMStorage(conf);
+    String clusterId = UUID.randomUUID().toString();
+    String scmId = UUID.randomUUID().toString();
+    scmStore.setClusterId(clusterId);
+    scmStore.setScmId(scmId);
+    // writes the version file properties
+    scmStore.initialize();
+    StorageContainerManager scm = StorageContainerManager.createSCM(null, conf);
+    //Reads the SCM Info from SCM instance
+    ScmInfo scmInfo = scm.getScmInfo();
+    Assert.assertEquals(clusterId, scmInfo.getClusterId());
+    Assert.assertEquals(scmId, scmInfo.getScmId());
+  }
 }

+ 21 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.ozone.web.response.BucketInfo;
 import org.apache.hadoop.ozone.web.response.KeyInfo;
 import org.apache.hadoop.ozone.web.response.VolumeInfo;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.scm.ScmInfo;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.ozone.protocol.proto
     .KeySpaceManagerProtocolProtos.Status;
@@ -66,6 +67,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.List;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
@@ -78,6 +80,8 @@ public class TestKeySpaceManager {
   private static UserArgs userArgs;
   private static KSMMetrics ksmMetrics;
   private static OzoneConfiguration conf;
+  private static String clusterId;
+  private static String scmId;
 
   @Rule
   public ExpectedException exception = ExpectedException.none();
@@ -93,10 +97,15 @@ public class TestKeySpaceManager {
   @BeforeClass
   public static void init() throws Exception {
     conf = new OzoneConfiguration();
+    clusterId = UUID.randomUUID().toString();
+    scmId = UUID.randomUUID().toString();
     conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
         OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
     cluster = new MiniOzoneClassicCluster.Builder(conf)
-        .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
+        .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
+        .setClusterId(clusterId)
+        .setScmId(scmId)
+        .build();
     storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
     userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
         null, null, null, null);
@@ -1033,4 +1042,15 @@ public class TestKeySpaceManager {
     }
     Assert.assertEquals(dataString, DFSUtil.bytes2String(data1));
   }
+
+  /**
+   * Tests the RPC call for getting scmId and clusterId from SCM.
+   * @throws IOException
+   */
+  @Test
+  public void testGetScmInfo() throws IOException {
+    ScmInfo info = cluster.getKeySpaceManager().getScmInfo(conf);
+    Assert.assertEquals(clusterId, info.getClusterId());
+    Assert.assertEquals(scmId, info.getScmId());
+  }
 }