Prechádzať zdrojové kódy

HDDS-2068. Make StorageContainerDatanodeProtocolService message based

Signed-off-by: Anu Engineer <aengineer@apache.org>
Márton Elek 5 rokov pred
rodič
commit
e8ae632d4c

+ 35 - 25
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java

@@ -24,6 +24,9 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 
 
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMDatanodeRequest;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMDatanodeRequest.Builder;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMDatanodeResponse;
 import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.protocol.proto
@@ -38,6 +41,7 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
     .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
     .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolTranslator;
 import org.apache.hadoop.ipc.ProtocolTranslator;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC;
@@ -45,6 +49,7 @@ import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
 
 
 import java.io.Closeable;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.IOException;
+import java.util.function.Consumer;
 
 
 /**
 /**
  * This class is the client-side translator to translate the requests made on
  * This class is the client-side translator to translate the requests made on
@@ -96,6 +101,25 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB
     return rpcProxy;
     return rpcProxy;
   }
   }
 
 
+  /**
+   * Helper method to wrap the request and send the message.
+   */
+  private SCMDatanodeResponse submitRequest(Type type,
+      Consumer<SCMDatanodeRequest.Builder> builderConsumer) throws IOException {
+    final SCMDatanodeResponse response;
+    try {
+      Builder builder = SCMDatanodeRequest.newBuilder()
+          .setCmdType(type);
+      builderConsumer.accept(builder);
+      SCMDatanodeRequest wrapper = builder.build();
+
+      response = rpcProxy.submitRequest(NULL_RPC_CONTROLLER, wrapper);
+    } catch (ServiceException ex) {
+      throw ProtobufHelper.getRemoteException(ex);
+    }
+    return response;
+  }
+
   /**
   /**
    * Returns SCM version.
    * Returns SCM version.
    *
    *
@@ -104,16 +128,11 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB
    */
    */
   @Override
   @Override
   public SCMVersionResponseProto getVersion(SCMVersionRequestProto
   public SCMVersionResponseProto getVersion(SCMVersionRequestProto
-      unused) throws IOException {
-    SCMVersionRequestProto request =
-        SCMVersionRequestProto.newBuilder().build();
-    final SCMVersionResponseProto response;
-    try {
-      response = rpcProxy.getVersion(NULL_RPC_CONTROLLER, request);
-    } catch (ServiceException ex) {
-      throw ProtobufHelper.getRemoteException(ex);
-    }
-    return response;
+      request) throws IOException {
+    return submitRequest(Type.GetVersion,
+        (builder) -> builder
+            .setGetVersionRequest(SCMVersionRequestProto.newBuilder().build()))
+        .getGetVersionResponse();
   }
   }
 
 
   /**
   /**
@@ -126,13 +145,9 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB
   @Override
   @Override
   public SCMHeartbeatResponseProto sendHeartbeat(
   public SCMHeartbeatResponseProto sendHeartbeat(
       SCMHeartbeatRequestProto heartbeat) throws IOException {
       SCMHeartbeatRequestProto heartbeat) throws IOException {
-    final SCMHeartbeatResponseProto resp;
-    try {
-      resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, heartbeat);
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-    return resp;
+    return submitRequest(Type.SendHeartbeat,
+        (builder) -> builder.setSendHeartbeatRequest(heartbeat))
+        .getSendHeartbeatResponse();
   }
   }
 
 
   /**
   /**
@@ -155,13 +170,8 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB
     req.setContainerReport(containerReportsRequestProto);
     req.setContainerReport(containerReportsRequestProto);
     req.setPipelineReports(pipelineReportsProto);
     req.setPipelineReports(pipelineReportsProto);
     req.setNodeReport(nodeReport);
     req.setNodeReport(nodeReport);
-    final SCMRegisteredResponseProto response;
-    try {
-      response = rpcProxy.register(NULL_RPC_CONTROLLER, req.build());
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
-    return response;
+    return submitRequest(Type.Register,
+        (builder) -> builder.setRegisterRequest(req))
+        .getRegisterResponse();
   }
   }
-
 }
 }

+ 67 - 48
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java

@@ -16,29 +16,24 @@
  */
  */
 package org.apache.hadoop.ozone.protocolPB;
 package org.apache.hadoop.ozone.protocolPB;
 
 
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import java.io.IOException;
+
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMDatanodeRequest;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMDatanodeResponse;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.Status;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
+import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
 import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
 import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
 
 
-import java.io.IOException;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
 /**
  * This class is the server-side translator that forwards requests received on
  * This class is the server-side translator that forwards requests received on
@@ -48,47 +43,71 @@ import java.io.IOException;
 public class StorageContainerDatanodeProtocolServerSideTranslatorPB
 public class StorageContainerDatanodeProtocolServerSideTranslatorPB
     implements StorageContainerDatanodeProtocolPB {
     implements StorageContainerDatanodeProtocolPB {
 
 
+  private static final Logger LOG = LoggerFactory
+      .getLogger(StorageContainerDatanodeProtocolServerSideTranslatorPB.class);
+
   private final StorageContainerDatanodeProtocol impl;
   private final StorageContainerDatanodeProtocol impl;
+  private final OzoneProtocolMessageDispatcher<SCMDatanodeRequest,
+      SCMDatanodeResponse> dispatcher;
 
 
   public StorageContainerDatanodeProtocolServerSideTranslatorPB(
   public StorageContainerDatanodeProtocolServerSideTranslatorPB(
-      StorageContainerDatanodeProtocol impl) {
+      StorageContainerDatanodeProtocol impl,
+      ProtocolMessageMetrics protocolMessageMetrics) {
     this.impl = impl;
     this.impl = impl;
+    dispatcher =
+        new OzoneProtocolMessageDispatcher<>("SCMDatanodeProtocol",
+            protocolMessageMetrics,
+            LOG);
   }
   }
 
 
-  @Override
-  public SCMVersionResponseProto getVersion(RpcController controller,
-      SCMVersionRequestProto request)
-      throws ServiceException {
-    try {
-      return impl.getVersion(request);
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    }
+  public SCMRegisteredResponseProto register(
+      SCMRegisterRequestProto request) throws IOException {
+    ContainerReportsProto containerRequestProto = request
+        .getContainerReport();
+    NodeReportProto dnNodeReport = request.getNodeReport();
+    PipelineReportsProto pipelineReport = request.getPipelineReports();
+    return impl.register(request.getDatanodeDetails(), dnNodeReport,
+        containerRequestProto, pipelineReport);
+
   }
   }
 
 
   @Override
   @Override
-  public SCMRegisteredResponseProto register(RpcController controller,
-      SCMRegisterRequestProto request) throws ServiceException {
-    try {
-      ContainerReportsProto containerRequestProto = request
-          .getContainerReport();
-      NodeReportProto dnNodeReport = request.getNodeReport();
-      PipelineReportsProto pipelineReport = request.getPipelineReports();
-      return impl.register(request.getDatanodeDetails(), dnNodeReport,
-          containerRequestProto, pipelineReport);
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    }
+  public SCMDatanodeResponse submitRequest(RpcController controller,
+      SCMDatanodeRequest request) throws ServiceException {
+    return dispatcher.processRequest(request, this::processMessage,
+        request.getCmdType(), request.getTraceID());
   }
   }
 
 
-  @Override
-  public SCMHeartbeatResponseProto sendHeartbeat(RpcController controller,
-      SCMHeartbeatRequestProto request) throws ServiceException {
+  public SCMDatanodeResponse processMessage(SCMDatanodeRequest request)
+      throws ServiceException {
     try {
     try {
-      return impl.sendHeartbeat(request);
+      Type cmdType = request.getCmdType();
+      switch (cmdType) {
+      case GetVersion:
+        return SCMDatanodeResponse.newBuilder()
+            .setCmdType(cmdType)
+            .setStatus(Status.OK)
+            .setGetVersionResponse(
+                impl.getVersion(request.getGetVersionRequest()))
+            .build();
+      case SendHeartbeat:
+        return SCMDatanodeResponse.newBuilder()
+            .setCmdType(cmdType)
+            .setStatus(Status.OK)
+            .setSendHeartbeatResponse(
+                impl.sendHeartbeat(request.getSendHeartbeatRequest()))
+            .build();
+      case Register:
+        return SCMDatanodeResponse.newBuilder()
+            .setCmdType(cmdType)
+            .setStatus(Status.OK)
+            .setRegisterResponse(register(request.getRegisterRequest()))
+            .build();
+      default:
+        throw new ServiceException("Unknown command type: " + cmdType);
+      }
     } catch (IOException e) {
     } catch (IOException e) {
       throw new ServiceException(e);
       throw new ServiceException(e);
     }
     }
   }
   }
-
 }
 }

+ 41 - 17
hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto

@@ -34,6 +34,45 @@ package hadoop.hdds;
 
 
 import "hdds.proto";
 import "hdds.proto";
 
 
+
+message SCMDatanodeRequest {
+  required Type cmdType = 1; // Type of the command
+
+  optional string traceID = 2;
+
+  optional SCMVersionRequestProto getVersionRequest = 3;
+  optional SCMRegisterRequestProto registerRequest = 4;
+  optional SCMHeartbeatRequestProto sendHeartbeatRequest = 5;
+}
+
+message SCMDatanodeResponse {
+  required Type cmdType = 1; // Type of the command
+
+  optional string traceID = 2;
+
+  optional bool success = 3 [default = true];
+
+  optional string message = 4;
+
+  required Status status = 5;
+
+  optional SCMVersionResponseProto getVersionResponse = 6;
+  optional SCMRegisteredResponseProto registerResponse = 7;
+  optional SCMHeartbeatResponseProto sendHeartbeatResponse = 8;
+
+}
+
+enum Type {
+  GetVersion = 1;
+  Register = 2;
+  SendHeartbeat = 3;
+}
+
+enum Status {
+  OK = 1;
+  ERROR = 2;
+}
+
 /**
 /**
  * Request for version info of the software stack on the server.
  * Request for version info of the software stack on the server.
  */
  */
@@ -385,21 +424,6 @@ message ReplicateContainerCommandProto {
  */
  */
 service StorageContainerDatanodeProtocolService {
 service StorageContainerDatanodeProtocolService {
 
 
-  /**
-  * Gets the version information from the SCM.
-  */
-  rpc getVersion (SCMVersionRequestProto) returns (SCMVersionResponseProto);
-
-  /**
-  * Registers a data node with SCM.
-  */
-  rpc register (SCMRegisterRequestProto) returns (SCMRegisteredResponseProto);
-
-  /**
-   * Send heartbeat from datanode to SCM. HB's under SCM looks more
-   * like life line protocol than HB's under HDFS. In other words, it is
-   * extremely light weight and contains no data payload.
-   */
-  rpc sendHeartbeat (SCMHeartbeatRequestProto) returns (SCMHeartbeatResponseProto);
-
+  //Message sent from Datanode to SCM as a heartbeat.
+  rpc submitRequest (SCMDatanodeRequest) returns (SCMDatanodeResponse);
 }
 }

+ 3 - 1
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java

@@ -29,12 +29,14 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
 import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
+import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics;
 import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
 import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
 import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB;
 import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 
 
 import com.google.protobuf.BlockingService;
 import com.google.protobuf.BlockingService;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
+import org.mockito.Mockito;
 
 
 /**
 /**
  * Test Endpoint class.
  * Test Endpoint class.
@@ -91,7 +93,7 @@ public final class SCMTestUtils {
         StorageContainerDatanodeProtocolService.
         StorageContainerDatanodeProtocolService.
             newReflectiveBlockingService(
             newReflectiveBlockingService(
                 new StorageContainerDatanodeProtocolServerSideTranslatorPB(
                 new StorageContainerDatanodeProtocolServerSideTranslatorPB(
-                    server));
+                    server, Mockito.mock(ProtocolMessageMetrics.class)));
 
 
     RPC.Server scmServer = startRpcServer(configuration, rpcServerAddresss,
     RPC.Server scmServer = startRpcServer(configuration, rpcServerAddresss,
         StorageContainerDatanodeProtocolPB.class, scmDatanodeService,
         StorageContainerDatanodeProtocolPB.class, scmDatanodeService,

+ 41 - 61
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java

@@ -21,61 +21,32 @@
  */
  */
 package org.apache.hadoop.hdds.scm.server;
 package org.apache.hadoop.hdds.scm.server;
 
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.protobuf.BlockingService;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
-
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.protocol.proto
-        .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ReregisterCommandProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
-
-import static org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto
-    .Type.closeContainerCommand;
-import static org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto
-    .Type.deleteBlocksCommand;
-import static org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto
-    .Type.deleteContainerCommand;
-import static org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type
-    .replicateContainerCommand;
-import static org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto
-    .Type.reregisterCommand;
-
-
-
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReregisterCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
 import org.apache.hadoop.hdds.scm.HddsServerUtil;
 import org.apache.hadoop.hdds.scm.HddsServerUtil;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
-    .ReportFromDatanode;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
-        .PipelineReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -95,27 +66,28 @@ import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics;
 import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
 import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
-import org.apache.hadoop.ozone.protocolPB
-    .StorageContainerDatanodeProtocolServerSideTranslatorPB;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB;
 
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.protobuf.BlockingService;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.closeContainerCommand;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteBlocksCommand;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteContainerCommand;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.replicateContainerCommand;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reregisterCommand;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY;
-
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_REPORT;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_REPORT;
 import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer;
 import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer;
 import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
 import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
 /**
  * Protocol Handler for Datanode Protocol.
  * Protocol Handler for Datanode Protocol.
@@ -138,6 +110,7 @@ public class SCMDatanodeProtocolServer implements
   private final InetSocketAddress datanodeRpcAddress;
   private final InetSocketAddress datanodeRpcAddress;
   private final SCMDatanodeHeartbeatDispatcher heartbeatDispatcher;
   private final SCMDatanodeHeartbeatDispatcher heartbeatDispatcher;
   private final EventPublisher eventPublisher;
   private final EventPublisher eventPublisher;
+  private final ProtocolMessageMetrics protocolMessageMetrics;
 
 
   public SCMDatanodeProtocolServer(final OzoneConfiguration conf,
   public SCMDatanodeProtocolServer(final OzoneConfiguration conf,
       StorageContainerManager scm, EventPublisher eventPublisher)
       StorageContainerManager scm, EventPublisher eventPublisher)
@@ -157,12 +130,17 @@ public class SCMDatanodeProtocolServer implements
 
 
     RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class,
     RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class,
         ProtobufRpcEngine.class);
         ProtobufRpcEngine.class);
+
+    protocolMessageMetrics = ProtocolMessageMetrics
+        .create("SCMDatanodeProtocol", "SCM Datanode protocol",
+            StorageContainerDatanodeProtocolProtos.Type.values());
+
     BlockingService dnProtoPbService =
     BlockingService dnProtoPbService =
         StorageContainerDatanodeProtocolProtos
         StorageContainerDatanodeProtocolProtos
             .StorageContainerDatanodeProtocolService
             .StorageContainerDatanodeProtocolService
             .newReflectiveBlockingService(
             .newReflectiveBlockingService(
                 new StorageContainerDatanodeProtocolServerSideTranslatorPB(
                 new StorageContainerDatanodeProtocolServerSideTranslatorPB(
-                    this));
+                    this, protocolMessageMetrics));
 
 
     InetSocketAddress datanodeRpcAddr =
     InetSocketAddress datanodeRpcAddr =
         HddsServerUtil.getScmDataNodeBindAddress(conf);
         HddsServerUtil.getScmDataNodeBindAddress(conf);
@@ -191,6 +169,7 @@ public class SCMDatanodeProtocolServer implements
     LOG.info(
     LOG.info(
         StorageContainerManager.buildRpcServerStartMessage(
         StorageContainerManager.buildRpcServerStartMessage(
             "RPC server for DataNodes", datanodeRpcAddress));
             "RPC server for DataNodes", datanodeRpcAddress));
+    protocolMessageMetrics.register();
     datanodeRpcServer.start();
     datanodeRpcServer.start();
   }
   }
 
 
@@ -370,6 +349,7 @@ public class SCMDatanodeProtocolServer implements
       LOG.error(" datanodeRpcServer stop failed.", ex);
       LOG.error(" datanodeRpcServer stop failed.", ex);
     }
     }
     IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager());
     IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager());
+    protocolMessageMetrics.unregister();
   }
   }
 
 
   @Override
   @Override

+ 3 - 1
hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightSubCommand.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.ozone.insight.scm.EventQueueInsight;
 import org.apache.hadoop.ozone.insight.scm.NodeManagerInsight;
 import org.apache.hadoop.ozone.insight.scm.NodeManagerInsight;
 import org.apache.hadoop.ozone.insight.scm.ReplicaManagerInsight;
 import org.apache.hadoop.ozone.insight.scm.ReplicaManagerInsight;
 import org.apache.hadoop.ozone.insight.scm.ScmProtocolBlockLocationInsight;
 import org.apache.hadoop.ozone.insight.scm.ScmProtocolBlockLocationInsight;
+import org.apache.hadoop.ozone.insight.scm.ScmProtocolDatanodeInsight;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 
 
 import picocli.CommandLine;
 import picocli.CommandLine;
@@ -88,7 +89,8 @@ public class BaseInsightSubCommand {
     insights.put("scm.event-queue", new EventQueueInsight());
     insights.put("scm.event-queue", new EventQueueInsight());
     insights.put("scm.protocol.block-location",
     insights.put("scm.protocol.block-location",
         new ScmProtocolBlockLocationInsight());
         new ScmProtocolBlockLocationInsight());
-
+    insights.put("scm.protocol.datanode",
+        new ScmProtocolDatanodeInsight());
     insights.put("om.key-manager", new KeyManagerInsight());
     insights.put("om.key-manager", new KeyManagerInsight());
     insights.put("om.protocol.client", new OmProtocolInsight());
     insights.put("om.protocol.client", new OmProtocolInsight());
 
 

+ 3 - 3
hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java

@@ -23,12 +23,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map;
 
 
 import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdds.scm.server.SCMBlockProtocolServer;
 import org.apache.hadoop.hdds.scm.server.SCMBlockProtocolServer;
 import org.apache.hadoop.ozone.insight.BaseInsightPoint;
 import org.apache.hadoop.ozone.insight.BaseInsightPoint;
 import org.apache.hadoop.ozone.insight.Component.Type;
 import org.apache.hadoop.ozone.insight.Component.Type;
 import org.apache.hadoop.ozone.insight.LoggerSource;
 import org.apache.hadoop.ozone.insight.LoggerSource;
 import org.apache.hadoop.ozone.insight.MetricGroupDisplay;
 import org.apache.hadoop.ozone.insight.MetricGroupDisplay;
-import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocolServerSideTranslatorPB;
 
 
 /**
 /**
  * Insight metric to check the SCM block location protocol behaviour.
  * Insight metric to check the SCM block location protocol behaviour.
@@ -42,9 +42,9 @@ public class ScmProtocolBlockLocationInsight extends BaseInsightPoint {
         new LoggerSource(Type.SCM,
         new LoggerSource(Type.SCM,
             ScmBlockLocationProtocolServerSideTranslatorPB.class,
             ScmBlockLocationProtocolServerSideTranslatorPB.class,
             defaultLevel(verbose)));
             defaultLevel(verbose)));
-    new LoggerSource(Type.SCM,
+    loggers.add(new LoggerSource(Type.SCM,
         SCMBlockProtocolServer.class,
         SCMBlockProtocolServer.class,
-        defaultLevel(verbose));
+        defaultLevel(verbose)));
     return loggers;
     return loggers;
   }
   }
 
 

+ 72 - 0
hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolDatanodeInsight.java

@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.insight.scm;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer;
+import org.apache.hadoop.ozone.insight.BaseInsightPoint;
+import org.apache.hadoop.ozone.insight.Component.Type;
+import org.apache.hadoop.ozone.insight.LoggerSource;
+import org.apache.hadoop.ozone.insight.MetricGroupDisplay;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB;
+
+/**
+ * Insight metric to check the SCM datanode protocol behaviour.
+ */
+public class ScmProtocolDatanodeInsight extends BaseInsightPoint {
+
+  @Override
+  public List<LoggerSource> getRelatedLoggers(boolean verbose) {
+    List<LoggerSource> loggers = new ArrayList<>();
+    loggers.add(
+        new LoggerSource(Type.SCM,
+            SCMDatanodeProtocolServer.class,
+            defaultLevel(verbose)));
+    loggers.add(
+        new LoggerSource(Type.SCM,
+            StorageContainerDatanodeProtocolServerSideTranslatorPB.class,
+            defaultLevel(verbose)));
+    return loggers;
+  }
+
+  @Override
+  public List<MetricGroupDisplay> getMetrics() {
+    List<MetricGroupDisplay> metrics = new ArrayList<>();
+
+    Map<String, String> filter = new HashMap<>();
+    filter.put("servername", "StorageContainerDatanodeProtocolService");
+
+    addRpcMetrics(metrics, Type.SCM, filter);
+
+    addProtocolMessageMetrics(metrics, "scm_datanode_protocol",
+        Type.SCM, StorageContainerDatanodeProtocolProtos.Type.values());
+
+    return metrics;
+  }
+
+  @Override
+  public String getDescription() {
+    return "SCM Datanode protocol endpoint";
+  }
+
+}