|
@@ -17,15 +17,25 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.ozone.protocolPB;
|
|
|
|
|
|
-import com.google.protobuf.RpcController;
|
|
|
-import com.google.protobuf.ServiceException;
|
|
|
-import io.opentracing.Scope;
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.List;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
-import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
|
|
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
|
|
-import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
|
|
|
- .AllocateBlockResponse;
|
|
|
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
|
|
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
|
|
|
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateBlockResponse;
|
|
|
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateScmBlockRequestProto;
|
|
|
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateScmBlockResponseProto;
|
|
|
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteKeyBlocksResultProto;
|
|
|
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto;
|
|
|
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksResponseProto;
|
|
|
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationRequest;
|
|
|
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationResponse;
|
|
|
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SortDatanodesRequestProto;
|
|
|
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SortDatanodesResponseProto;
|
|
|
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Status;
|
|
|
import org.apache.hadoop.hdds.scm.ScmInfo;
|
|
|
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
|
|
|
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
|
|
@@ -34,34 +44,15 @@ import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
|
|
|
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
|
|
|
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
|
|
|
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
|
|
|
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
|
|
-import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
|
|
|
- .AllocateScmBlockRequestProto;
|
|
|
-import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
|
|
|
- .AllocateScmBlockResponseProto;
|
|
|
-import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
|
|
|
- .DeleteKeyBlocksResultProto;
|
|
|
-import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
|
|
|
- .DeleteScmKeyBlocksRequestProto;
|
|
|
-import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
|
|
|
- .DeleteScmKeyBlocksResponseProto;
|
|
|
-import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
|
|
|
- .SCMBlockLocationResponse;
|
|
|
-import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
|
|
|
- .SCMBlockLocationRequest;
|
|
|
-import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
|
|
|
- .Status;
|
|
|
-import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
|
|
|
- .SortDatanodesRequestProto;
|
|
|
-import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
|
|
|
- .SortDatanodesResponseProto;
|
|
|
import org.apache.hadoop.hdds.tracing.TracingUtil;
|
|
|
import org.apache.hadoop.ozone.common.BlockGroup;
|
|
|
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
|
|
|
|
|
|
-import java.io.IOException;
|
|
|
-import java.util.List;
|
|
|
-import java.util.stream.Collectors;
|
|
|
+import com.google.protobuf.RpcController;
|
|
|
+import com.google.protobuf.ServiceException;
|
|
|
+import io.opentracing.Scope;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
/**
|
|
|
* This class is the server-side translator that forwards requests received on
|
|
@@ -74,14 +65,22 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
|
|
|
|
|
|
private final ScmBlockLocationProtocol impl;
|
|
|
|
|
|
+ private static final Logger LOG = LoggerFactory
|
|
|
+ .getLogger(ScmBlockLocationProtocolServerSideTranslatorPB.class);
|
|
|
+
|
|
|
+ private final ProtocolMessageMetrics
|
|
|
+ protocolMessageMetrics;
|
|
|
+
|
|
|
/**
|
|
|
* Creates a new ScmBlockLocationProtocolServerSideTranslatorPB.
|
|
|
*
|
|
|
* @param impl {@link ScmBlockLocationProtocol} server implementation
|
|
|
*/
|
|
|
public ScmBlockLocationProtocolServerSideTranslatorPB(
|
|
|
- ScmBlockLocationProtocol impl) throws IOException {
|
|
|
+ ScmBlockLocationProtocol impl,
|
|
|
+ ProtocolMessageMetrics metrics) throws IOException {
|
|
|
this.impl = impl;
|
|
|
+ this.protocolMessageMetrics = metrics;
|
|
|
}
|
|
|
|
|
|
private SCMBlockLocationResponse.Builder createSCMBlockResponse(
|
|
@@ -97,15 +96,45 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
|
|
|
SCMBlockLocationRequest request) throws ServiceException {
|
|
|
String traceId = request.getTraceID();
|
|
|
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace("BlockLocationProtocol {} request is received: <json>{}</json>",
|
|
|
+ request.getCmdType().toString(),
|
|
|
+ request.toString().replaceAll("\n", "\\\\n"));
|
|
|
+
|
|
|
+ } else if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("BlockLocationProtocol {} request is received",
|
|
|
+ request.getCmdType().toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ protocolMessageMetrics.increment(request.getCmdType());
|
|
|
+
|
|
|
+ try (Scope scope = TracingUtil
|
|
|
+ .importAndCreateScope(
|
|
|
+ "ScmBlockLocationProtocol." + request.getCmdType(),
|
|
|
+ request.getTraceID())) {
|
|
|
+ SCMBlockLocationResponse response =
|
|
|
+ processMessage(request, traceId);
|
|
|
+
|
|
|
+ if (LOG.isTraceEnabled()) {
|
|
|
+ LOG.trace(
|
|
|
+ "BlockLocationProtocol {} request is processed. Response: "
|
|
|
+ + "<json>{}</json>",
|
|
|
+ request.getCmdType().toString(),
|
|
|
+ response.toString().replaceAll("\n", "\\\\n"));
|
|
|
+ }
|
|
|
+ return response;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private SCMBlockLocationResponse processMessage(
|
|
|
+ SCMBlockLocationRequest request, String traceId) throws ServiceException {
|
|
|
SCMBlockLocationResponse.Builder response = createSCMBlockResponse(
|
|
|
request.getCmdType(),
|
|
|
traceId);
|
|
|
response.setSuccess(true);
|
|
|
response.setStatus(Status.OK);
|
|
|
|
|
|
- try(Scope scope = TracingUtil
|
|
|
- .importAndCreateScope("ScmBlockLocationProtocol."+request.getCmdType(),
|
|
|
- request.getTraceID())) {
|
|
|
+ try {
|
|
|
switch (request.getCmdType()) {
|
|
|
case AllocateScmBlock:
|
|
|
response.setAllocateScmBlockResponse(
|
|
@@ -125,7 +154,7 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
|
|
|
break;
|
|
|
default:
|
|
|
// Should never happen
|
|
|
- throw new IOException("Unknown Operation "+request.getCmdType()+
|
|
|
+ throw new IOException("Unknown Operation " + request.getCmdType() +
|
|
|
" in ScmBlockLocationProtocol");
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
@@ -135,6 +164,7 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
|
|
|
response.setMessage(e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
return response.build();
|
|
|
}
|
|
|
|
|
@@ -182,12 +212,12 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
|
|
|
.map(BlockGroup::getFromProto).collect(Collectors.toList());
|
|
|
final List<DeleteBlockGroupResult> results =
|
|
|
impl.deleteKeyBlocks(infoList);
|
|
|
- for (DeleteBlockGroupResult result: results) {
|
|
|
+ for (DeleteBlockGroupResult result : results) {
|
|
|
DeleteKeyBlocksResultProto.Builder deleteResult =
|
|
|
DeleteKeyBlocksResultProto
|
|
|
- .newBuilder()
|
|
|
- .setObjectKey(result.getObjectKey())
|
|
|
- .addAllBlockResults(result.getBlockResultProtoList());
|
|
|
+ .newBuilder()
|
|
|
+ .setObjectKey(result.getObjectKey())
|
|
|
+ .addAllBlockResults(result.getBlockResultProtoList());
|
|
|
resp.addResults(deleteResult.build());
|
|
|
}
|
|
|
return resp.build();
|