|
@@ -44,6 +44,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
|
|
import org.apache.hadoop.hdds.scm.container.common.helpers
|
|
|
.StorageContainerException;
|
|
|
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
|
|
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
|
|
|
import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
|
|
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
|
|
|
import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils;
|
|
@@ -71,7 +72,20 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
|
|
|
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
|
|
- .Result.*;
|
|
|
+ .Result.CONTAINER_INTERNAL_ERROR;
|
|
|
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
|
|
+ .Result.CLOSED_CONTAINER_IO;
|
|
|
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
|
|
+ .Result.DELETE_ON_OPEN_CONTAINER;
|
|
|
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
|
|
+ .Result.IO_EXCEPTION;
|
|
|
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
|
|
+ .Result.INVALID_CONTAINER_STATE;
|
|
|
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
|
|
+ .Result.GET_SMALL_FILE_ERROR;
|
|
|
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
|
|
+ .Result.PUT_SMALL_FILE_ERROR;
|
|
|
+
|
|
|
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
|
|
.Stage;
|
|
|
|
|
@@ -94,16 +108,18 @@ public class KeyValueHandler extends Handler {
|
|
|
// TODO : Add metrics and populate it.
|
|
|
|
|
|
public static KeyValueHandler getInstance(Configuration config,
|
|
|
- ContainerSet contSet, VolumeSet volSet) {
|
|
|
+ ContainerSet contSet,
|
|
|
+ VolumeSet volSet,
|
|
|
+ ContainerMetrics metrics) {
|
|
|
if (INSTANCE == null) {
|
|
|
- INSTANCE = new KeyValueHandler(config, contSet, volSet);
|
|
|
+ INSTANCE = new KeyValueHandler(config, contSet, volSet, metrics);
|
|
|
}
|
|
|
return INSTANCE;
|
|
|
}
|
|
|
|
|
|
private KeyValueHandler(Configuration config, ContainerSet contSet,
|
|
|
- VolumeSet volSet) {
|
|
|
- super(config, contSet, volSet);
|
|
|
+ VolumeSet volSet, ContainerMetrics metrics) {
|
|
|
+ super(config, contSet, volSet, metrics);
|
|
|
containerType = ContainerType.KeyValueContainer;
|
|
|
keyManager = new KeyManagerImpl(config);
|
|
|
chunkManager = new ChunkManagerImpl();
|
|
@@ -342,6 +358,8 @@ public class KeyValueHandler extends Handler {
|
|
|
Preconditions.checkNotNull(keyData);
|
|
|
|
|
|
keyManager.putKey(kvContainer, keyData);
|
|
|
+ long numBytes = keyData.getProtoBufMessage().toByteArray().length;
|
|
|
+ metrics.incContainerBytesStats(Type.PutKey, numBytes);
|
|
|
} catch (StorageContainerException ex) {
|
|
|
return ContainerUtils.logAndReturnError(LOG, ex, request);
|
|
|
} catch (IOException ex) {
|
|
@@ -370,6 +388,8 @@ public class KeyValueHandler extends Handler {
|
|
|
BlockID blockID = BlockID.getFromProtobuf(
|
|
|
request.getGetKey().getBlockID());
|
|
|
responseData = keyManager.getKey(kvContainer, blockID);
|
|
|
+ long numBytes = responseData.getProtoBufMessage().toByteArray().length;
|
|
|
+ metrics.incContainerBytesStats(Type.GetKey, numBytes);
|
|
|
|
|
|
} catch (StorageContainerException ex) {
|
|
|
return ContainerUtils.logAndReturnError(LOG, ex, request);
|
|
@@ -434,6 +454,7 @@ public class KeyValueHandler extends Handler {
|
|
|
Preconditions.checkNotNull(chunkInfo);
|
|
|
|
|
|
data = chunkManager.readChunk(kvContainer, blockID, chunkInfo);
|
|
|
+ metrics.incContainerBytesStats(Type.ReadChunk, data.length);
|
|
|
} catch (StorageContainerException ex) {
|
|
|
return ContainerUtils.logAndReturnError(LOG, ex, request);
|
|
|
} catch (IOException ex) {
|
|
@@ -507,6 +528,13 @@ public class KeyValueHandler extends Handler {
|
|
|
|
|
|
chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data,
|
|
|
request.getWriteChunk().getStage());
|
|
|
+
|
|
|
+ // We should increment stats after writeChunk
|
|
|
+ if (request.getWriteChunk().getStage() == Stage.WRITE_DATA ||
|
|
|
+ request.getWriteChunk().getStage() == Stage.COMBINED) {
|
|
|
+ metrics.incContainerBytesStats(Type.WriteChunk, request.getWriteChunk()
|
|
|
+ .getChunkData().getLen());
|
|
|
+ }
|
|
|
} catch (StorageContainerException ex) {
|
|
|
return ContainerUtils.logAndReturnError(LOG, ex, request);
|
|
|
} catch (IOException ex) {
|
|
@@ -555,6 +583,7 @@ public class KeyValueHandler extends Handler {
|
|
|
chunks.add(chunkInfo.getProtoBufMessage());
|
|
|
keyData.setChunks(chunks);
|
|
|
keyManager.putKey(kvContainer, keyData);
|
|
|
+ metrics.incContainerBytesStats(Type.PutSmallFile, data.length);
|
|
|
|
|
|
} catch (StorageContainerException ex) {
|
|
|
return ContainerUtils.logAndReturnError(LOG, ex, request);
|
|
@@ -597,7 +626,7 @@ public class KeyValueHandler extends Handler {
|
|
|
dataBuf = dataBuf.concat(current);
|
|
|
chunkInfo = chunk;
|
|
|
}
|
|
|
-
|
|
|
+ metrics.incContainerBytesStats(Type.GetSmallFile, dataBuf.size());
|
|
|
return SmallFileUtils.getGetSmallFileResponseSuccess(request, dataBuf
|
|
|
.toByteArray(), ChunkInfo.getFromProtoBuf(chunkInfo));
|
|
|
} catch (StorageContainerException e) {
|