|
@@ -30,14 +30,15 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type;
|
|
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
|
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
|
import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils;
|
|
import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils;
|
|
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
|
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
|
|
|
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
|
|
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
|
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
|
import org.apache.hadoop.ozone.container.common.helpers.FileUtils;
|
|
import org.apache.hadoop.ozone.container.common.helpers.FileUtils;
|
|
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
|
|
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
|
|
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
|
|
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
|
|
-import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
|
|
|
|
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
|
|
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
|
|
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
|
|
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
|
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
|
|
|
+import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
@@ -45,6 +46,11 @@ import java.io.IOException;
|
|
import java.util.LinkedList;
|
|
import java.util.LinkedList;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
|
|
|
|
+import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
|
|
|
+ .Result.PUT_SMALL_FILE_ERROR;
|
|
|
|
+import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
|
|
|
+ .Result.GET_SMALL_FILE_ERROR;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Ozone Container dispatcher takes a call from the netty server and routes it
|
|
* Ozone Container dispatcher takes a call from the netty server and routes it
|
|
* to the right handler function.
|
|
* to the right handler function.
|
|
@@ -55,6 +61,7 @@ public class Dispatcher implements ContainerDispatcher {
|
|
private final ContainerManager containerManager;
|
|
private final ContainerManager containerManager;
|
|
private ContainerMetrics metrics;
|
|
private ContainerMetrics metrics;
|
|
private Configuration conf;
|
|
private Configuration conf;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Constructs an OzoneContainer that receives calls from
|
|
* Constructs an OzoneContainer that receives calls from
|
|
* XceiverServerHandler.
|
|
* XceiverServerHandler.
|
|
@@ -79,59 +86,67 @@ public class Dispatcher implements ContainerDispatcher {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public ContainerCommandResponseProto dispatch(
|
|
public ContainerCommandResponseProto dispatch(
|
|
- ContainerCommandRequestProto msg) throws IOException {
|
|
|
|
|
|
+ ContainerCommandRequestProto msg) {
|
|
long startNanos = System.nanoTime();
|
|
long startNanos = System.nanoTime();
|
|
ContainerCommandResponseProto resp = null;
|
|
ContainerCommandResponseProto resp = null;
|
|
- Preconditions.checkNotNull(msg);
|
|
|
|
- Type cmdType = msg.getCmdType();
|
|
|
|
- metrics.incContainerOpcMetrics(cmdType);
|
|
|
|
- if ((cmdType == Type.CreateContainer) ||
|
|
|
|
- (cmdType == Type.DeleteContainer) ||
|
|
|
|
- (cmdType == Type.ReadContainer) ||
|
|
|
|
- (cmdType == Type.ListContainer) ||
|
|
|
|
- (cmdType == Type.UpdateContainer)) {
|
|
|
|
- resp = containerProcessHandler(msg);
|
|
|
|
- }
|
|
|
|
|
|
+ try {
|
|
|
|
+ Preconditions.checkNotNull(msg);
|
|
|
|
+ Type cmdType = msg.getCmdType();
|
|
|
|
+ metrics.incContainerOpcMetrics(cmdType);
|
|
|
|
+ if ((cmdType == Type.CreateContainer) ||
|
|
|
|
+ (cmdType == Type.DeleteContainer) ||
|
|
|
|
+ (cmdType == Type.ReadContainer) ||
|
|
|
|
+ (cmdType == Type.ListContainer) ||
|
|
|
|
+ (cmdType == Type.UpdateContainer)) {
|
|
|
|
+ resp = containerProcessHandler(msg);
|
|
|
|
+ }
|
|
|
|
|
|
- if ((cmdType == Type.PutKey) ||
|
|
|
|
- (cmdType == Type.GetKey) ||
|
|
|
|
- (cmdType == Type.DeleteKey) ||
|
|
|
|
- (cmdType == Type.ListKey)) {
|
|
|
|
- resp = keyProcessHandler(msg);
|
|
|
|
- }
|
|
|
|
|
|
+ if ((cmdType == Type.PutKey) ||
|
|
|
|
+ (cmdType == Type.GetKey) ||
|
|
|
|
+ (cmdType == Type.DeleteKey) ||
|
|
|
|
+ (cmdType == Type.ListKey)) {
|
|
|
|
+ resp = keyProcessHandler(msg);
|
|
|
|
+ }
|
|
|
|
|
|
- if ((cmdType == Type.WriteChunk) ||
|
|
|
|
- (cmdType == Type.ReadChunk) ||
|
|
|
|
- (cmdType == Type.DeleteChunk)) {
|
|
|
|
- resp = chunkProcessHandler(msg);
|
|
|
|
- }
|
|
|
|
|
|
+ if ((cmdType == Type.WriteChunk) ||
|
|
|
|
+ (cmdType == Type.ReadChunk) ||
|
|
|
|
+ (cmdType == Type.DeleteChunk)) {
|
|
|
|
+ resp = chunkProcessHandler(msg);
|
|
|
|
+ }
|
|
|
|
|
|
- if ((cmdType == Type.PutSmallFile) ||
|
|
|
|
- (cmdType == Type.GetSmallFile)) {
|
|
|
|
- resp = smallFileHandler(msg);
|
|
|
|
- }
|
|
|
|
|
|
+ if ((cmdType == Type.PutSmallFile) ||
|
|
|
|
+ (cmdType == Type.GetSmallFile)) {
|
|
|
|
+ resp = smallFileHandler(msg);
|
|
|
|
+ }
|
|
|
|
|
|
- if (resp != null) {
|
|
|
|
- metrics.incContainerOpsLatencies(cmdType,
|
|
|
|
- System.nanoTime() - startNanos);
|
|
|
|
- return resp;
|
|
|
|
- }
|
|
|
|
|
|
+ if (resp != null) {
|
|
|
|
+ metrics.incContainerOpsLatencies(cmdType,
|
|
|
|
+ System.nanoTime() - startNanos);
|
|
|
|
+ return resp;
|
|
|
|
+ }
|
|
|
|
|
|
- return ContainerUtils.unsupportedRequest(msg);
|
|
|
|
|
|
+ return ContainerUtils.unsupportedRequest(msg);
|
|
|
|
+ } catch (StorageContainerException e) {
|
|
|
|
+ // This useful since the trace ID will allow us to correlate failures.
|
|
|
|
+ return ContainerUtils.logAndReturnError(LOG, e, msg);
|
|
|
|
+ } catch (IllegalStateException | NullPointerException e) {
|
|
|
|
+ return ContainerUtils.logAndReturnError(LOG, e, msg);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
public ContainerMetrics getContainerMetrics() {
|
|
public ContainerMetrics getContainerMetrics() {
|
|
return metrics;
|
|
return metrics;
|
|
}
|
|
}
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Handles the all Container related functionality.
|
|
* Handles the all Container related functionality.
|
|
*
|
|
*
|
|
* @param msg - command
|
|
* @param msg - command
|
|
* @return - response
|
|
* @return - response
|
|
- * @throws IOException
|
|
|
|
|
|
+ * @throws StorageContainerException
|
|
*/
|
|
*/
|
|
private ContainerCommandResponseProto containerProcessHandler(
|
|
private ContainerCommandResponseProto containerProcessHandler(
|
|
- ContainerCommandRequestProto msg) throws IOException {
|
|
|
|
|
|
+ ContainerCommandRequestProto msg) throws StorageContainerException {
|
|
try {
|
|
try {
|
|
|
|
|
|
switch (msg.getCmdType()) {
|
|
switch (msg.getCmdType()) {
|
|
@@ -175,10 +190,10 @@ public class Dispatcher implements ContainerDispatcher {
|
|
*
|
|
*
|
|
* @param msg - command
|
|
* @param msg - command
|
|
* @return - response
|
|
* @return - response
|
|
- * @throws IOException
|
|
|
|
|
|
+ * @throws StorageContainerException
|
|
*/
|
|
*/
|
|
private ContainerCommandResponseProto keyProcessHandler(
|
|
private ContainerCommandResponseProto keyProcessHandler(
|
|
- ContainerCommandRequestProto msg) throws IOException {
|
|
|
|
|
|
+ ContainerCommandRequestProto msg) throws StorageContainerException {
|
|
try {
|
|
try {
|
|
switch (msg.getCmdType()) {
|
|
switch (msg.getCmdType()) {
|
|
case PutKey:
|
|
case PutKey:
|
|
@@ -217,10 +232,10 @@ public class Dispatcher implements ContainerDispatcher {
|
|
*
|
|
*
|
|
* @param msg - command
|
|
* @param msg - command
|
|
* @return - response
|
|
* @return - response
|
|
- * @throws IOException
|
|
|
|
|
|
+ * @throws StorageContainerException
|
|
*/
|
|
*/
|
|
private ContainerCommandResponseProto chunkProcessHandler(
|
|
private ContainerCommandResponseProto chunkProcessHandler(
|
|
- ContainerCommandRequestProto msg) throws IOException {
|
|
|
|
|
|
+ ContainerCommandRequestProto msg) throws StorageContainerException {
|
|
try {
|
|
try {
|
|
switch (msg.getCmdType()) {
|
|
switch (msg.getCmdType()) {
|
|
case WriteChunk:
|
|
case WriteChunk:
|
|
@@ -254,7 +269,7 @@ public class Dispatcher implements ContainerDispatcher {
|
|
}
|
|
}
|
|
|
|
|
|
private ContainerCommandResponseProto smallFileHandler(
|
|
private ContainerCommandResponseProto smallFileHandler(
|
|
- ContainerCommandRequestProto msg) throws IOException {
|
|
|
|
|
|
+ ContainerCommandRequestProto msg) throws StorageContainerException {
|
|
switch (msg.getCmdType()) {
|
|
switch (msg.getCmdType()) {
|
|
case PutSmallFile:
|
|
case PutSmallFile:
|
|
return handlePutSmallFile(msg);
|
|
return handlePutSmallFile(msg);
|
|
@@ -494,7 +509,6 @@ public class Dispatcher implements ContainerDispatcher {
|
|
String keyName = msg.getDeleteKey().getName();
|
|
String keyName = msg.getDeleteKey().getName();
|
|
Preconditions.checkNotNull(keyName);
|
|
Preconditions.checkNotNull(keyName);
|
|
Preconditions.checkState(!keyName.isEmpty());
|
|
Preconditions.checkState(!keyName.isEmpty());
|
|
-
|
|
|
|
this.containerManager.getKeyManager().deleteKey(pipeline, keyName);
|
|
this.containerManager.getKeyManager().deleteKey(pipeline, keyName);
|
|
return KeyUtils.getKeyResponse(msg);
|
|
return KeyUtils.getKeyResponse(msg);
|
|
}
|
|
}
|
|
@@ -504,34 +518,41 @@ public class Dispatcher implements ContainerDispatcher {
|
|
*
|
|
*
|
|
* @param msg - Message.
|
|
* @param msg - Message.
|
|
* @return ContainerCommandResponseProto
|
|
* @return ContainerCommandResponseProto
|
|
- * @throws IOException
|
|
|
|
|
|
+ * @throws StorageContainerException
|
|
*/
|
|
*/
|
|
private ContainerCommandResponseProto handlePutSmallFile(
|
|
private ContainerCommandResponseProto handlePutSmallFile(
|
|
- ContainerCommandRequestProto msg) throws IOException {
|
|
|
|
|
|
+ ContainerCommandRequestProto msg) throws StorageContainerException {
|
|
|
|
|
|
if (!msg.hasPutSmallFile()) {
|
|
if (!msg.hasPutSmallFile()) {
|
|
LOG.debug("Malformed put small file request. trace ID: {}",
|
|
LOG.debug("Malformed put small file request. trace ID: {}",
|
|
msg.getTraceID());
|
|
msg.getTraceID());
|
|
return ContainerUtils.malformedRequest(msg);
|
|
return ContainerUtils.malformedRequest(msg);
|
|
}
|
|
}
|
|
|
|
+ try {
|
|
|
|
|
|
- Pipeline pipeline =
|
|
|
|
- Pipeline.getFromProtoBuf(msg.getPutSmallFile().getKey().getPipeline());
|
|
|
|
- Preconditions.checkNotNull(pipeline);
|
|
|
|
- KeyData keyData = KeyData.getFromProtoBuf(msg.getPutSmallFile().getKey()
|
|
|
|
- .getKeyData());
|
|
|
|
- ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getPutSmallFile()
|
|
|
|
- .getChunkInfo());
|
|
|
|
- byte[] data = msg.getPutSmallFile().getData().toByteArray();
|
|
|
|
-
|
|
|
|
- metrics.incContainerBytesStats(Type.PutSmallFile, data.length);
|
|
|
|
- this.containerManager.getChunkManager().writeChunk(pipeline, keyData
|
|
|
|
- .getKeyName(), chunkInfo, data);
|
|
|
|
- List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
|
|
|
|
- chunks.add(chunkInfo.getProtoBufMessage());
|
|
|
|
- keyData.setChunks(chunks);
|
|
|
|
- this.containerManager.getKeyManager().putKey(pipeline, keyData);
|
|
|
|
- return FileUtils.getPutFileResponse(msg);
|
|
|
|
|
|
+ Pipeline pipeline =
|
|
|
|
+ Pipeline.getFromProtoBuf(msg.getPutSmallFile()
|
|
|
|
+ .getKey().getPipeline());
|
|
|
|
+
|
|
|
|
+ Preconditions.checkNotNull(pipeline);
|
|
|
|
+ KeyData keyData = KeyData.getFromProtoBuf(msg.getPutSmallFile().getKey()
|
|
|
|
+ .getKeyData());
|
|
|
|
+ ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getPutSmallFile()
|
|
|
|
+ .getChunkInfo());
|
|
|
|
+ byte[] data = msg.getPutSmallFile().getData().toByteArray();
|
|
|
|
+
|
|
|
|
+ metrics.incContainerBytesStats(Type.PutSmallFile, data.length);
|
|
|
|
+ this.containerManager.getChunkManager().writeChunk(pipeline, keyData
|
|
|
|
+ .getKeyName(), chunkInfo, data);
|
|
|
|
+ List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
|
|
|
|
+ chunks.add(chunkInfo.getProtoBufMessage());
|
|
|
|
+ keyData.setChunks(chunks);
|
|
|
|
+ this.containerManager.getKeyManager().putKey(pipeline, keyData);
|
|
|
|
+ return FileUtils.getPutFileResponse(msg);
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ throw new StorageContainerException("Put Small File Failed.", e,
|
|
|
|
+ PUT_SMALL_FILE_ERROR);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -540,35 +561,44 @@ public class Dispatcher implements ContainerDispatcher {
|
|
*
|
|
*
|
|
* @param msg - ContainerCommandRequestProto
|
|
* @param msg - ContainerCommandRequestProto
|
|
* @return ContainerCommandResponseProto
|
|
* @return ContainerCommandResponseProto
|
|
|
|
+ * @throws StorageContainerException
|
|
*/
|
|
*/
|
|
private ContainerCommandResponseProto handleGetSmallFile(
|
|
private ContainerCommandResponseProto handleGetSmallFile(
|
|
- ContainerCommandRequestProto msg) throws IOException {
|
|
|
|
|
|
+ ContainerCommandRequestProto msg) throws StorageContainerException {
|
|
ByteString dataBuf = ByteString.EMPTY;
|
|
ByteString dataBuf = ByteString.EMPTY;
|
|
if (!msg.hasGetSmallFile()) {
|
|
if (!msg.hasGetSmallFile()) {
|
|
LOG.debug("Malformed get small file request. trace ID: {}",
|
|
LOG.debug("Malformed get small file request. trace ID: {}",
|
|
msg.getTraceID());
|
|
msg.getTraceID());
|
|
return ContainerUtils.malformedRequest(msg);
|
|
return ContainerUtils.malformedRequest(msg);
|
|
}
|
|
}
|
|
- Pipeline pipeline =
|
|
|
|
- Pipeline.getFromProtoBuf(msg.getGetSmallFile().getKey().getPipeline());
|
|
|
|
- long bytes = 0;
|
|
|
|
- Preconditions.checkNotNull(pipeline);
|
|
|
|
- KeyData keyData = KeyData.getFromProtoBuf(msg.getGetSmallFile()
|
|
|
|
- .getKey().getKeyData());
|
|
|
|
- KeyData data = this.containerManager.getKeyManager().getKey(keyData);
|
|
|
|
- ContainerProtos.ChunkInfo c = null;
|
|
|
|
- for (ContainerProtos.ChunkInfo chunk : data.getChunks()) {
|
|
|
|
- bytes += chunk.getSerializedSize();
|
|
|
|
- ByteString current =
|
|
|
|
- ByteString.copyFrom(this.containerManager.getChunkManager().readChunk(
|
|
|
|
- pipeline, keyData.getKeyName(), ChunkInfo.getFromProtoBuf(
|
|
|
|
- chunk)));
|
|
|
|
- dataBuf = dataBuf.concat(current);
|
|
|
|
- c = chunk;
|
|
|
|
|
|
+ try {
|
|
|
|
+ Pipeline pipeline =
|
|
|
|
+ Pipeline.getFromProtoBuf(msg.getGetSmallFile()
|
|
|
|
+ .getKey().getPipeline());
|
|
|
|
+
|
|
|
|
+ long bytes = 0;
|
|
|
|
+ Preconditions.checkNotNull(pipeline);
|
|
|
|
+ KeyData keyData = KeyData.getFromProtoBuf(msg.getGetSmallFile()
|
|
|
|
+ .getKey().getKeyData());
|
|
|
|
+ KeyData data = this.containerManager.getKeyManager().getKey(keyData);
|
|
|
|
+ ContainerProtos.ChunkInfo c = null;
|
|
|
|
+ for (ContainerProtos.ChunkInfo chunk : data.getChunks()) {
|
|
|
|
+ bytes += chunk.getSerializedSize();
|
|
|
|
+ ByteString current =
|
|
|
|
+ ByteString.copyFrom(this.containerManager.getChunkManager()
|
|
|
|
+ .readChunk(
|
|
|
|
+ pipeline, keyData.getKeyName(), ChunkInfo.getFromProtoBuf(
|
|
|
|
+ chunk)));
|
|
|
|
+ dataBuf = dataBuf.concat(current);
|
|
|
|
+ c = chunk;
|
|
|
|
+ }
|
|
|
|
+ metrics.incContainerBytesStats(Type.GetSmallFile, bytes);
|
|
|
|
+ return FileUtils.getGetSmallFileResponse(msg, dataBuf.toByteArray(),
|
|
|
|
+ ChunkInfo.getFromProtoBuf(c));
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ throw new StorageContainerException("Unable to decode protobuf", e,
|
|
|
|
+ GET_SMALL_FILE_ERROR);
|
|
|
|
+
|
|
}
|
|
}
|
|
- metrics.incContainerBytesStats(Type.GetSmallFile, bytes);
|
|
|
|
- return FileUtils.getGetSmallFileResponse(msg, dataBuf.toByteArray(),
|
|
|
|
- ChunkInfo.getFromProtoBuf(c));
|
|
|
|
}
|
|
}
|
|
-
|
|
|
|
}
|
|
}
|