|
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.container.common.impl;
|
|
|
|
|
|
import com.google.common.base.Preconditions;
|
|
import com.google.common.base.Preconditions;
|
|
import com.google.protobuf.ByteString;
|
|
import com.google.protobuf.ByteString;
|
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
|
.ContainerCommandRequestProto;
|
|
.ContainerCommandRequestProto;
|
|
@@ -33,6 +34,7 @@ import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
|
import org.apache.hadoop.ozone.container.common.helpers.FileUtils;
|
|
import org.apache.hadoop.ozone.container.common.helpers.FileUtils;
|
|
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
|
|
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
|
|
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
|
|
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
|
|
|
|
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
|
|
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
|
|
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
|
|
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
|
|
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
|
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
|
@@ -51,52 +53,76 @@ public class Dispatcher implements ContainerDispatcher {
|
|
static final Logger LOG = LoggerFactory.getLogger(Dispatcher.class);
|
|
static final Logger LOG = LoggerFactory.getLogger(Dispatcher.class);
|
|
|
|
|
|
private final ContainerManager containerManager;
|
|
private final ContainerManager containerManager;
|
|
-
|
|
|
|
|
|
+ private ContainerMetrics metrics;
|
|
|
|
+ private Configuration conf;
|
|
/**
|
|
/**
|
|
* Constructs an OzoneContainer that receives calls from
|
|
* Constructs an OzoneContainer that receives calls from
|
|
* XceiverServerHandler.
|
|
* XceiverServerHandler.
|
|
*
|
|
*
|
|
* @param containerManager - A class that manages containers.
|
|
* @param containerManager - A class that manages containers.
|
|
*/
|
|
*/
|
|
- public Dispatcher(ContainerManager containerManager) {
|
|
|
|
|
|
+ public Dispatcher(ContainerManager containerManager, Configuration config) {
|
|
Preconditions.checkNotNull(containerManager);
|
|
Preconditions.checkNotNull(containerManager);
|
|
this.containerManager = containerManager;
|
|
this.containerManager = containerManager;
|
|
|
|
+ this.metrics = null;
|
|
|
|
+ this.conf = config;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void init() {
|
|
|
|
+ this.metrics = ContainerMetrics.create(conf);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void shutdown() {
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public ContainerCommandResponseProto dispatch(
|
|
public ContainerCommandResponseProto dispatch(
|
|
ContainerCommandRequestProto msg) throws IOException {
|
|
ContainerCommandRequestProto msg) throws IOException {
|
|
|
|
+ long startNanos = System.nanoTime();
|
|
|
|
+ ContainerCommandResponseProto resp = null;
|
|
Preconditions.checkNotNull(msg);
|
|
Preconditions.checkNotNull(msg);
|
|
Type cmdType = msg.getCmdType();
|
|
Type cmdType = msg.getCmdType();
|
|
|
|
+ metrics.incContainerOpcMetrics(cmdType);
|
|
if ((cmdType == Type.CreateContainer) ||
|
|
if ((cmdType == Type.CreateContainer) ||
|
|
(cmdType == Type.DeleteContainer) ||
|
|
(cmdType == Type.DeleteContainer) ||
|
|
(cmdType == Type.ReadContainer) ||
|
|
(cmdType == Type.ReadContainer) ||
|
|
(cmdType == Type.ListContainer) ||
|
|
(cmdType == Type.ListContainer) ||
|
|
(cmdType == Type.UpdateContainer)) {
|
|
(cmdType == Type.UpdateContainer)) {
|
|
- return containerProcessHandler(msg);
|
|
|
|
|
|
+ resp = containerProcessHandler(msg);
|
|
}
|
|
}
|
|
|
|
|
|
if ((cmdType == Type.PutKey) ||
|
|
if ((cmdType == Type.PutKey) ||
|
|
(cmdType == Type.GetKey) ||
|
|
(cmdType == Type.GetKey) ||
|
|
(cmdType == Type.DeleteKey) ||
|
|
(cmdType == Type.DeleteKey) ||
|
|
(cmdType == Type.ListKey)) {
|
|
(cmdType == Type.ListKey)) {
|
|
- return keyProcessHandler(msg);
|
|
|
|
|
|
+ resp = keyProcessHandler(msg);
|
|
}
|
|
}
|
|
|
|
|
|
if ((cmdType == Type.WriteChunk) ||
|
|
if ((cmdType == Type.WriteChunk) ||
|
|
(cmdType == Type.ReadChunk) ||
|
|
(cmdType == Type.ReadChunk) ||
|
|
(cmdType == Type.DeleteChunk)) {
|
|
(cmdType == Type.DeleteChunk)) {
|
|
- return chunkProcessHandler(msg);
|
|
|
|
|
|
+ resp = chunkProcessHandler(msg);
|
|
}
|
|
}
|
|
|
|
|
|
if ((cmdType == Type.PutSmallFile) ||
|
|
if ((cmdType == Type.PutSmallFile) ||
|
|
(cmdType == Type.GetSmallFile)) {
|
|
(cmdType == Type.GetSmallFile)) {
|
|
- return smallFileHandler(msg);
|
|
|
|
|
|
+ resp = smallFileHandler(msg);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (resp != null) {
|
|
|
|
+ metrics.incContainerOpsLatencies(cmdType,
|
|
|
|
+ System.nanoTime() - startNanos);
|
|
|
|
+ return resp;
|
|
}
|
|
}
|
|
|
|
|
|
return ContainerUtils.unsupportedRequest(msg);
|
|
return ContainerUtils.unsupportedRequest(msg);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public ContainerMetrics getContainerMetrics() {
|
|
|
|
+ return metrics;
|
|
|
|
+ }
|
|
/**
|
|
/**
|
|
* Handles the all Container related functionality.
|
|
* Handles the all Container related functionality.
|
|
*
|
|
*
|
|
@@ -336,6 +362,7 @@ public class Dispatcher implements ContainerDispatcher {
|
|
.getChunkData());
|
|
.getChunkData());
|
|
Preconditions.checkNotNull(chunkInfo);
|
|
Preconditions.checkNotNull(chunkInfo);
|
|
byte[] data = msg.getWriteChunk().getData().toByteArray();
|
|
byte[] data = msg.getWriteChunk().getData().toByteArray();
|
|
|
|
+ metrics.incContainerBytesStats(Type.WriteChunk, data.length);
|
|
this.containerManager.getChunkManager().writeChunk(pipeline, keyName,
|
|
this.containerManager.getChunkManager().writeChunk(pipeline, keyName,
|
|
chunkInfo, data);
|
|
chunkInfo, data);
|
|
return ChunkUtils.getChunkResponse(msg);
|
|
return ChunkUtils.getChunkResponse(msg);
|
|
@@ -366,6 +393,7 @@ public class Dispatcher implements ContainerDispatcher {
|
|
Preconditions.checkNotNull(chunkInfo);
|
|
Preconditions.checkNotNull(chunkInfo);
|
|
byte[] data = this.containerManager.getChunkManager().readChunk(pipeline,
|
|
byte[] data = this.containerManager.getChunkManager().readChunk(pipeline,
|
|
keyName, chunkInfo);
|
|
keyName, chunkInfo);
|
|
|
|
+ metrics.incContainerBytesStats(Type.ReadChunk, data.length);
|
|
return ChunkUtils.getReadChunkResponse(msg, data, chunkInfo);
|
|
return ChunkUtils.getReadChunkResponse(msg, data, chunkInfo);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -417,6 +445,8 @@ public class Dispatcher implements ContainerDispatcher {
|
|
KeyData keyData = KeyData.getFromProtoBuf(msg.getPutKey().getKeyData());
|
|
KeyData keyData = KeyData.getFromProtoBuf(msg.getPutKey().getKeyData());
|
|
Preconditions.checkNotNull(keyData);
|
|
Preconditions.checkNotNull(keyData);
|
|
this.containerManager.getKeyManager().putKey(pipeline, keyData);
|
|
this.containerManager.getKeyManager().putKey(pipeline, keyData);
|
|
|
|
+ long numBytes = keyData.getProtoBufMessage().toByteArray().length;
|
|
|
|
+ metrics.incContainerBytesStats(Type.PutKey, numBytes);
|
|
return KeyUtils.getKeyResponse(msg);
|
|
return KeyUtils.getKeyResponse(msg);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -438,6 +468,8 @@ public class Dispatcher implements ContainerDispatcher {
|
|
Preconditions.checkNotNull(keyData);
|
|
Preconditions.checkNotNull(keyData);
|
|
KeyData responseData =
|
|
KeyData responseData =
|
|
this.containerManager.getKeyManager().getKey(keyData);
|
|
this.containerManager.getKeyManager().getKey(keyData);
|
|
|
|
+ long numBytes = responseData.getProtoBufMessage().toByteArray().length;
|
|
|
|
+ metrics.incContainerBytesStats(Type.GetKey, numBytes);
|
|
return KeyUtils.getKeyDataResponse(msg, responseData);
|
|
return KeyUtils.getKeyDataResponse(msg, responseData);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -492,6 +524,7 @@ public class Dispatcher implements ContainerDispatcher {
|
|
.getChunkInfo());
|
|
.getChunkInfo());
|
|
byte[] data = msg.getPutSmallFile().getData().toByteArray();
|
|
byte[] data = msg.getPutSmallFile().getData().toByteArray();
|
|
|
|
|
|
|
|
+ metrics.incContainerBytesStats(Type.PutSmallFile, data.length);
|
|
this.containerManager.getChunkManager().writeChunk(pipeline, keyData
|
|
this.containerManager.getChunkManager().writeChunk(pipeline, keyData
|
|
.getKeyName(), chunkInfo, data);
|
|
.getKeyName(), chunkInfo, data);
|
|
List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
|
|
List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
|
|
@@ -518,12 +551,14 @@ public class Dispatcher implements ContainerDispatcher {
|
|
}
|
|
}
|
|
Pipeline pipeline =
|
|
Pipeline pipeline =
|
|
Pipeline.getFromProtoBuf(msg.getGetSmallFile().getKey().getPipeline());
|
|
Pipeline.getFromProtoBuf(msg.getGetSmallFile().getKey().getPipeline());
|
|
|
|
+ long bytes = 0;
|
|
Preconditions.checkNotNull(pipeline);
|
|
Preconditions.checkNotNull(pipeline);
|
|
KeyData keyData = KeyData.getFromProtoBuf(msg.getGetSmallFile()
|
|
KeyData keyData = KeyData.getFromProtoBuf(msg.getGetSmallFile()
|
|
.getKey().getKeyData());
|
|
.getKey().getKeyData());
|
|
KeyData data = this.containerManager.getKeyManager().getKey(keyData);
|
|
KeyData data = this.containerManager.getKeyManager().getKey(keyData);
|
|
ContainerProtos.ChunkInfo c = null;
|
|
ContainerProtos.ChunkInfo c = null;
|
|
for (ContainerProtos.ChunkInfo chunk : data.getChunks()) {
|
|
for (ContainerProtos.ChunkInfo chunk : data.getChunks()) {
|
|
|
|
+ bytes += chunk.getSerializedSize();
|
|
ByteString current =
|
|
ByteString current =
|
|
ByteString.copyFrom(this.containerManager.getChunkManager().readChunk(
|
|
ByteString.copyFrom(this.containerManager.getChunkManager().readChunk(
|
|
pipeline, keyData.getKeyName(), ChunkInfo.getFromProtoBuf(
|
|
pipeline, keyData.getKeyName(), ChunkInfo.getFromProtoBuf(
|
|
@@ -531,7 +566,7 @@ public class Dispatcher implements ContainerDispatcher {
|
|
dataBuf = dataBuf.concat(current);
|
|
dataBuf = dataBuf.concat(current);
|
|
c = chunk;
|
|
c = chunk;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+ metrics.incContainerBytesStats(Type.GetSmallFile, bytes);
|
|
return FileUtils.getGetSmallFileResponse(msg, dataBuf.toByteArray(),
|
|
return FileUtils.getGetSmallFileResponse(msg, dataBuf.toByteArray(),
|
|
ChunkInfo.getFromProtoBuf(c));
|
|
ChunkInfo.getFromProtoBuf(c));
|
|
}
|
|
}
|