Browse Source

HDDS-173. Refactor Dispatcher and implement Handler for new ContainerIO design.

Hanisha Koneru 6 years ago
parent
commit
13579f9296
39 changed files with 1678 additions and 354 deletions
  1. 9 15
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
  2. 12 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
  3. 8 12
      hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
  4. 33 0
      hadoop-hdds/common/src/main/resources/ozone-default.xml
  5. 2 3
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
  6. 88 81
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
  7. 2 3
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
  8. 5 2
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
  9. 23 26
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
  10. 180 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
  11. 15 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
  12. 71 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
  13. 2 3
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
  14. 7 4
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
  15. 19 10
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
  16. 28 19
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
  17. 643 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
  18. 6 9
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueYaml.java
  19. 48 2
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
  20. 33 2
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java
  21. 1 1
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerLocationUtil.java
  22. 30 5
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
  23. 19 17
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java
  24. 2 2
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
  25. 12 8
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java
  26. 2 2
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java
  27. 1 35
      hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java
  28. 10 2
      hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java
  29. 2 0
      hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestKeyValueYaml.java
  30. 91 0
      hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java
  31. 1 2
      hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
  32. 3 4
      hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyManagerImpl.java
  33. 6 4
      hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
  34. 246 0
      hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
  35. 3 3
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
  36. 0 50
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneContainerTranslation.java
  37. 7 18
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
  38. 4 2
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
  39. 4 8
      hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java

+ 9 - 15
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java

@@ -28,6 +28,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .DatanodeBlockID;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .GetKeyRequestProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@@ -73,16 +75,16 @@ public final class ContainerProtocolCalls  {
    * Calls the container protocol to get a container key.
    *
    * @param xceiverClient client to perform call
-   * @param containerKeyData key data to identify container
+   * @param datanodeBlockID blockID to identify container
    * @param traceID container protocol call args
    * @return container protocol get key response
    * @throws IOException if there is an I/O error while performing the call
    */
   public static GetKeyResponseProto getKey(XceiverClientSpi xceiverClient,
-      KeyData containerKeyData, String traceID) throws IOException {
+      DatanodeBlockID datanodeBlockID, String traceID) throws IOException {
     GetKeyRequestProto.Builder readKeyRequest = GetKeyRequestProto
         .newBuilder()
-        .setKeyData(containerKeyData);
+        .setBlockID(datanodeBlockID);
     String id = xceiverClient.getPipeline().getLeader().getUuidString();
     ContainerCommandRequestProto request = ContainerCommandRequestProto
         .newBuilder()
@@ -240,18 +242,15 @@ public final class ContainerProtocolCalls  {
     ContainerProtos.CreateContainerRequestProto.Builder createRequest =
         ContainerProtos.CreateContainerRequestProto
             .newBuilder();
-    ContainerProtos.ContainerData.Builder containerData = ContainerProtos
-        .ContainerData.newBuilder();
-    containerData.setContainerID(containerID);
-    containerData.setContainerType(ContainerProtos.ContainerType
+    createRequest.setContainerID(containerID);
+    createRequest.setContainerType(ContainerProtos.ContainerType
         .KeyValueContainer);
-    createRequest.setContainerData(containerData.build());
 
     String id = client.getPipeline().getLeader().getUuidString();
     ContainerCommandRequestProto.Builder request =
         ContainerCommandRequestProto.newBuilder();
     request.setCmdType(ContainerProtos.Type.CreateContainer);
-    request.setCreateContainer(createRequest);
+    request.setCreateContainer(createRequest.build());
     request.setDatanodeUuid(id);
     request.setTraceID(traceID);
     ContainerCommandResponseProto response = client.sendCommand(
@@ -348,14 +347,9 @@ public final class ContainerProtocolCalls  {
    */
   public static GetSmallFileResponseProto readSmallFile(XceiverClientSpi client,
       BlockID blockID, String traceID) throws IOException {
-    KeyData containerKeyData = KeyData
-        .newBuilder()
-        .setBlockID(blockID.getDatanodeBlockIDProtobuf())
-        .build();
-
     GetKeyRequestProto.Builder getKey = GetKeyRequestProto
         .newBuilder()
-        .setKeyData(containerKeyData);
+        .setBlockID(blockID.getDatanodeBlockIDProtobuf());
     ContainerProtos.GetSmallFileRequestProto getSmallFileRequest =
         GetSmallFileRequestProto
             .newBuilder().setKey(getKey)

+ 12 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java

@@ -256,6 +256,18 @@ public final class OzoneConfigKeys {
       "hdds.datanode.storage.utilization.critical.threshold";
   public static final double
       HDDS_DATANODE_STORAGE_UTILIZATION_CRITICAL_THRESHOLD_DEFAULT = 0.75;
+
+  public static final String
+      HDDS_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY =
+      "hdds.write.lock.reporting.threshold.ms";
+  public static final long
+      HDDS_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT = 5000L;
+  public static final String
+      HDDS_LOCK_SUPPRESS_WARNING_INTERVAL_MS_KEY =
+      "hdds.lock.suppress.warning.interval.ms";
+  public static final long
+      HDDS_LOCK_SUPPRESS_WARNING_INTERVAL_MS_DEAFULT = 10000L;
+
   /**
    * There is no need to instantiate this class.
    */

+ 8 - 12
hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto

@@ -192,6 +192,7 @@ message ContainerCommandRequestProto {
   optional   PutSmallFileRequestProto putSmallFile = 16;
   optional   GetSmallFileRequestProto getSmallFile = 17;
   optional   CloseContainerRequestProto closeContainer = 18;
+
   required   string datanodeUuid = 19;
 }
 
@@ -237,14 +238,6 @@ message ContainerData {
   optional string containerDBType = 11;
 }
 
-// This is used for create Container Request.
-message CreateContainerData {
-  required int64 containerId = 1;
-  repeated KeyValue metadata = 2;
-  optional ContainerType containerType = 3 [default = KeyValueContainer];
-}
-
-
 enum ContainerType {
   KeyValueContainer = 1;
 }
@@ -252,7 +245,9 @@ enum ContainerType {
 
 // Container Messages.
 message  CreateContainerRequestProto {
-  required ContainerData containerData = 1;
+  required int64 containerID = 1;
+  repeated KeyValue metadata = 2;
+  optional ContainerType containerType = 3 [default = KeyValueContainer];
 }
 
 message  CreateContainerResponseProto {
@@ -267,8 +262,9 @@ message  ReadContainerResponseProto {
 }
 
 message  UpdateContainerRequestProto {
-  required ContainerData containerData = 1;
-  optional bool forceUpdate = 2 [default = false];
+  required int64 containerID = 1;
+  repeated KeyValue metadata = 2;
+  optional bool forceUpdate = 3 [default = false];
 }
 
 message  UpdateContainerResponseProto {
@@ -316,7 +312,7 @@ message  PutKeyResponseProto {
 }
 
 message  GetKeyRequestProto  {
-  required KeyData keyData = 1;
+  required DatanodeBlockID blockID = 1;
 }
 
 message  GetKeyResponseProto  {

+ 33 - 0
hadoop-hdds/common/src/main/resources/ozone-default.xml

@@ -62,6 +62,18 @@
       this not set. Ideally, this should be mapped to a fast disk like an SSD.
     </description>
   </property>
+  <property>
+    <name>hdds.datanode.dir</name>
+    <value/>
+    <tag>OZONE, CONTAINER, STORAGE, MANAGEMENT</tag>
+    <description>Determines where on the local filesystem HDDS data will be
+      stored. Defaults to dfs.datanode.data.dir if not specified.
+      The directories should be tagged with corresponding storage types
+      ([SSD]/[DISK]/[ARCHIVE]/[RAM_DISK]) for storage policies. The default
+      storage type will be DISK if the directory does not have a storage type
+      tagged explicitly.
+    </description>
+  </property>
   <property>
     <name>dfs.container.ratis.enabled</name>
     <value>false</value>
@@ -1086,4 +1098,25 @@
     </description>
   </property>
 
+  <property>
+    <name>hdds.write.lock.reporting.threshold.ms</name>
+    <value>5000</value>
+    <tag>OZONE, DATANODE, MANAGEMENT</tag>
+    <description>
+      When a write lock is held for a long time, this will be logged as the
+      lock is released. This sets how long the lock must be held for logging
+      to occur.
+    </description>
+  </property>
+
+  <property>
+    <name>hdds.lock.suppress.warning.interval.ms</name>
+    <value>10000</value>
+    <tag>OZONE, DATANODE, MANAGEMENT</tag>
+    <description>
+      Instrumentation reporting long critical sections will suppress
+      consecutive warnings within this interval.
+    </description>
+  </property>
+
 </configuration>

+ 2 - 3
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java

@@ -313,7 +313,7 @@ public final class ChunkUtils {
    */
   public static ContainerProtos.ContainerCommandResponseProto
       getChunkResponse(ContainerProtos.ContainerCommandRequestProto msg) {
-    return ContainerUtils.getContainerResponse(msg);
+    return ContainerUtils.getSuccessResponse(msg);
   }
 
   /**
@@ -336,8 +336,7 @@ public final class ChunkUtils {
     response.setBlockID(msg.getReadChunk().getBlockID());
 
     ContainerProtos.ContainerCommandResponseProto.Builder builder =
-        ContainerUtils.getContainerResponse(msg, ContainerProtos.Result
-            .SUCCESS, "");
+        ContainerUtils.getSuccessResponseBuilder(msg);
     builder.setReadChunk(response);
     return builder.build();
   }

+ 88 - 81
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java

@@ -22,13 +22,20 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .StorageContainerException;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
 import org.apache.hadoop.ozone.OzoneConsts;
+import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_EXTENSION;
 import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.utils.MetadataStore;
 import org.apache.hadoop.utils.MetadataStoreBuilder;
 import org.slf4j.Logger;
@@ -42,12 +49,14 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 
 import static org.apache.commons.io.FilenameUtils.removeExtension;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result
-    .INVALID_ARGUMENT;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result
-    .UNABLE_TO_FIND_DATA_DIR;
-import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_EXTENSION;
-
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .Result.CLOSED_CONTAINER_IO;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .Result.INVALID_CONTAINER_STATE;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .Result.SUCCESS;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .Result.UNABLE_TO_FIND_DATA_DIR;
 
 /**
  * A set of helper functions to create proper responses.
@@ -59,28 +68,61 @@ public final class ContainerUtils {
   }
 
   /**
-   * Returns a CreateContainer Response. This call is used by create and delete
-   * containers which have null success responses.
-   *
-   * @param msg Request
-   * @return Response.
+   * Returns a Container Command Response Builder with the specified result
+   * and message.
+   * @param request requestProto message.
+   * @param result result of the command.
+   * @param message response message.
+   * @return ContainerCommand Response Builder.
    */
-  public static ContainerProtos.ContainerCommandResponseProto
-      getContainerResponse(ContainerProtos.ContainerCommandRequestProto msg) {
-    ContainerProtos.ContainerCommandResponseProto.Builder builder =
-        getContainerResponse(msg, ContainerProtos.Result.SUCCESS, "");
+  public static ContainerCommandResponseProto.Builder
+  getContainerCommandResponse(
+      ContainerCommandRequestProto request, Result result, String message) {
+    return
+        ContainerCommandResponseProto.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setTraceID(request.getTraceID())
+            .setResult(result)
+            .setMessage(message);
+  }
+
+  /**
+   * Returns a Container Command Response Builder. This call is used to build
+   * success responses. Calling function can add other fields to the response
+   * as required.
+   * @param request requestProto message.
+   * @return ContainerCommand Response Builder with result as SUCCESS.
+   */
+  public static ContainerCommandResponseProto.Builder getSuccessResponseBuilder(
+      ContainerCommandRequestProto request) {
+    return
+        ContainerCommandResponseProto.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setTraceID(request.getTraceID())
+            .setResult(Result.SUCCESS);
+  }
+
+  /**
+   * Returns a Container Command Response. This call is used for creating null
+   * success responses.
+   * @param request requestProto message.
+   * @return ContainerCommand Response with result as SUCCESS.
+   */
+  public static ContainerCommandResponseProto getSuccessResponse(
+      ContainerCommandRequestProto request) {
+    ContainerCommandResponseProto.Builder builder =
+        getContainerCommandResponse(request, Result.SUCCESS, "");
     return builder.build();
   }
 
   /**
    * Returns a ReadContainer Response.
-   *
-   * @param msg Request
-   * @param containerData - data
-   * @return Response.
+   * @param msg requestProto message.
+   * @param containerData container data to be returned.
+   * @return ReadContainer Response
    */
   public static ContainerProtos.ContainerCommandResponseProto
-      getReadContainerResponse(ContainerProtos.ContainerCommandRequestProto msg,
+    getReadContainerResponse(ContainerProtos.ContainerCommandRequestProto msg,
       ContainerData containerData) {
     Preconditions.checkNotNull(containerData);
 
@@ -89,7 +131,7 @@ public final class ContainerUtils {
     response.setContainerData(containerData.getProtoBufMessage());
 
     ContainerProtos.ContainerCommandResponseProto.Builder builder =
-        getContainerResponse(msg, ContainerProtos.Result.SUCCESS, "");
+        getSuccessResponseBuilder(msg);
     builder.setReadContainer(response);
     return builder.build();
   }
@@ -98,37 +140,25 @@ public final class ContainerUtils {
    * We found a command type but no associated payload for the command. Hence
    * return malformed Command as response.
    *
-   * @param msg - Protobuf message.
-   * @param result - result
-   * @param message - Error message.
+   * @param request - Protobuf message.
    * @return ContainerCommandResponseProto - MALFORMED_REQUEST.
    */
-  public static ContainerProtos.ContainerCommandResponseProto.Builder
-      getContainerResponse(ContainerProtos.ContainerCommandRequestProto msg,
-      ContainerProtos.Result result, String message) {
-    return
-        ContainerProtos.ContainerCommandResponseProto.newBuilder()
-            .setCmdType(msg.getCmdType())
-            .setTraceID(msg.getTraceID())
-            .setResult(result)
-            .setMessage(message);
+  public static ContainerCommandResponseProto malformedRequest(
+      ContainerCommandRequestProto request) {
+    return getContainerCommandResponse(request, Result.MALFORMED_REQUEST,
+        "Cmd type does not match the payload.").build();
   }
 
   /**
-   * Logs the error and returns a response to the caller.
+   * We found a command type that is not supported yet.
    *
-   * @param log - Logger
-   * @param ex - Exception
-   * @param msg - Request Object
-   * @return Response
+   * @param request - Protobuf message.
+   * @return ContainerCommandResponseProto - UNSUPPORTED_REQUEST.
    */
-  public static ContainerProtos.ContainerCommandResponseProto logAndReturnError(
-      Logger log, StorageContainerException ex,
-      ContainerProtos.ContainerCommandRequestProto msg) {
-    log.info("Operation: {} : Trace ID: {} : Message: {} : Result: {}",
-        msg.getCmdType().name(), msg.getTraceID(),
-        ex.getMessage(), ex.getResult().getValueDescriptor().getName());
-    return getContainerResponse(msg, ex.getResult(), ex.getMessage()).build();
+  public static ContainerCommandResponseProto unsupportedRequest(
+      ContainerCommandRequestProto request) {
+    return getContainerCommandResponse(request, Result.UNSUPPORTED_REQUEST,
+        "Server does not support this command yet.").build();
   }
 
   /**
@@ -136,40 +166,17 @@ public final class ContainerUtils {
    *
    * @param log - Logger
    * @param ex - Exception
-   * @param msg - Request Object
+   * @param request - Request Object
    * @return Response
    */
-  public static ContainerProtos.ContainerCommandResponseProto logAndReturnError(
-      Logger log, RuntimeException ex,
-      ContainerProtos.ContainerCommandRequestProto msg) {
-    log.info("Operation: {} : Trace ID: {} : Message: {} ",
-        msg.getCmdType().name(), msg.getTraceID(), ex.getMessage());
-    return getContainerResponse(msg, INVALID_ARGUMENT, ex.getMessage()).build();
-  }
-
-  /**
-   * We found a command type but no associated payload for the command. Hence
-   * return malformed Command as response.
-   *
-   * @param msg - Protobuf message.
-   * @return ContainerCommandResponseProto - MALFORMED_REQUEST.
-   */
-  public static ContainerProtos.ContainerCommandResponseProto
-      malformedRequest(ContainerProtos.ContainerCommandRequestProto msg) {
-    return getContainerResponse(msg, ContainerProtos.Result.MALFORMED_REQUEST,
-        "Cmd type does not match the payload.").build();
-  }
-
-  /**
-   * We found a command type that is not supported yet.
-   *
-   * @param msg - Protobuf message.
-   * @return ContainerCommandResponseProto - MALFORMED_REQUEST.
-   */
-  public static ContainerProtos.ContainerCommandResponseProto
-      unsupportedRequest(ContainerProtos.ContainerCommandRequestProto msg) {
-    return getContainerResponse(msg, ContainerProtos.Result.UNSUPPORTED_REQUEST,
-        "Server does not support this command yet.").build();
+  public static ContainerCommandResponseProto logAndReturnError(
+      Logger log, StorageContainerException ex,
+      ContainerCommandRequestProto request) {
+    log.info("Operation: {} : Trace ID: {} : Message: {} : Result: {}",
+        request.getCmdType().name(), request.getTraceID(),
+        ex.getMessage(), ex.getResult().getValueDescriptor().getName());
+    return getContainerCommandResponse(request, ex.getResult(), ex.getMessage())
+        .build();
   }
 
   /**
@@ -191,7 +198,7 @@ public final class ContainerUtils {
   }
 
   /**
-   * Verifies that this in indeed a new container.
+   * Verifies that this is indeed a new container.
    *
    * @param containerFile - Container File to verify
    * @throws IOException
@@ -343,7 +350,7 @@ public final class ContainerUtils {
     if(!forceDelete && !db.isEmpty()) {
       throw new StorageContainerException(
           "Container cannot be deleted because it is not empty.",
-          ContainerProtos.Result.ERROR_CONTAINER_NOT_EMPTY);
+          Result.ERROR_CONTAINER_NOT_EMPTY);
     }
     // Close the DB connection and remove the DB handler from cache
     KeyUtils.removeDB(containerData, conf);

+ 2 - 3
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java

@@ -103,7 +103,7 @@ public final class KeyUtils {
    */
   public static ContainerProtos.ContainerCommandResponseProto
       getKeyResponse(ContainerProtos.ContainerCommandRequestProto msg) {
-    return ContainerUtils.getContainerResponse(msg);
+    return ContainerUtils.getSuccessResponse(msg);
   }
 
 
@@ -114,8 +114,7 @@ public final class KeyUtils {
         .GetKeyResponseProto.newBuilder();
     getKey.setKeyData(data.getProtoBufMessage());
     ContainerProtos.ContainerCommandResponseProto.Builder builder =
-        ContainerUtils.getContainerResponse(msg, ContainerProtos.Result
-            .SUCCESS, "");
+        ContainerUtils.getSuccessResponseBuilder(msg);
     builder.setGetKey(getKey);
     return  builder.build();
   }

+ 5 - 2
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.container.common.impl;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerInfo;
@@ -68,8 +69,10 @@ public class ContainerSet {
           containerId);
       return true;
     } else {
-      LOG.debug("Container already exists with container Id {}", containerId);
-      return false;
+      LOG.warn("Container already exists with container Id {}", containerId);
+      throw new StorageContainerException("Container already exists with " +
+          "container Id " + containerId,
+          ContainerProtos.Result.CONTAINER_EXISTS);
     }
   }
 

+ 23 - 26
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java

@@ -35,7 +35,7 @@ import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import org.apache.hadoop.ozone.container.common.helpers.FileUtils;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils;
 import org.apache.hadoop.ozone.container.common.helpers.KeyData;
 import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@@ -138,8 +138,6 @@ public class Dispatcher implements ContainerDispatcher {
     } catch (StorageContainerException e) {
       // This useful since the trace ID will allow us to correlate failures.
       return ContainerUtils.logAndReturnError(LOG, e, msg);
-    } catch (IllegalStateException | NullPointerException e) {
-      return ContainerUtils.logAndReturnError(LOG, e, msg);
     }
   }
 
@@ -186,13 +184,13 @@ public class Dispatcher implements ContainerDispatcher {
     } catch (IOException ex) {
       LOG.warn("Container operation failed. " +
               "Container: {} Operation: {}  trace ID: {} Error: {}",
-          msg.getCreateContainer().getContainerData().getContainerID(),
+          msg.getCreateContainer().getContainerID(),
           msg.getCmdType().name(),
           msg.getTraceID(),
           ex.toString(), ex);
 
       // TODO : Replace with finer error codes.
-      return ContainerUtils.getContainerResponse(msg,
+      return ContainerUtils.getContainerCommandResponse(msg,
           ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
           ex.toString()).build();
     }
@@ -230,13 +228,13 @@ public class Dispatcher implements ContainerDispatcher {
     } catch (IOException ex) {
       LOG.warn("Container operation failed. " +
               "Container: {} Operation: {}  trace ID: {} Error: {}",
-          msg.getCreateContainer().getContainerData().getContainerID(),
+          msg.getCreateContainer().getContainerID(),
           msg.getCmdType().name(),
           msg.getTraceID(),
           ex.toString(), ex);
 
       // TODO : Replace with finer error codes.
-      return ContainerUtils.getContainerResponse(msg,
+      return ContainerUtils.getContainerCommandResponse(msg,
           ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
           ex.toString()).build();
     }
@@ -273,13 +271,13 @@ public class Dispatcher implements ContainerDispatcher {
     } catch (IOException ex) {
       LOG.warn("Container operation failed. " +
               "Container: {} Operation: {}  trace ID: {} Error: {}",
-          msg.getCreateContainer().getContainerData().getContainerID(),
+          msg.getCreateContainer().getContainerID(),
           msg.getCmdType().name(),
           msg.getTraceID(),
           ex.toString(), ex);
 
       // TODO : Replace with finer error codes.
-      return ContainerUtils.getContainerResponse(msg,
+      return ContainerUtils.getContainerCommandResponse(msg,
           ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
           ex.toString()).build();
     }
@@ -318,15 +316,14 @@ public class Dispatcher implements ContainerDispatcher {
           msg.getTraceID());
       return ContainerUtils.malformedRequest(msg);
     }
-    long containerID = msg.getUpdateContainer()
-        .getContainerData().getContainerID();
+    long containerID = msg.getUpdateContainer().getContainerID();
 
-    ContainerData data = ContainerData.getFromProtBuf(
-        msg.getUpdateContainer().getContainerData(), conf);
+    ContainerData data = new ContainerData(msg.getUpdateContainer()
+        .getContainerID(), conf);
     boolean forceUpdate = msg.getUpdateContainer().getForceUpdate();
     this.containerManager.updateContainer(containerID,
         data, forceUpdate);
-    return ContainerUtils.getContainerResponse(msg);
+    return ContainerUtils.getSuccessResponse(msg);
   }
 
   /**
@@ -371,7 +368,7 @@ public class Dispatcher implements ContainerDispatcher {
     long containerID = msg.getDeleteContainer().getContainerID();
     boolean forceDelete = msg.getDeleteContainer().getForceDelete();
     this.containerManager.deleteContainer(containerID, forceDelete);
-    return ContainerUtils.getContainerResponse(msg);
+    return ContainerUtils.getSuccessResponse(msg);
   }
 
   /**
@@ -388,12 +385,11 @@ public class Dispatcher implements ContainerDispatcher {
           msg.getTraceID());
       return ContainerUtils.malformedRequest(msg);
     }
-    ContainerData cData = ContainerData.getFromProtBuf(
-        msg.getCreateContainer().getContainerData(), conf);
-    Preconditions.checkNotNull(cData, "Container data is null");
+    ContainerData cData = new ContainerData(
+        msg.getCreateContainer().getContainerID(), conf);
 
     this.containerManager.createContainer(cData);
-    return ContainerUtils.getContainerResponse(msg);
+    return ContainerUtils.getSuccessResponse(msg);
   }
 
   /**
@@ -417,7 +413,7 @@ public class Dispatcher implements ContainerDispatcher {
             "container.", CLOSED_CONTAINER_IO);
       }
       this.containerManager.closeContainer(containerID);
-      return ContainerUtils.getContainerResponse(msg);
+      return ContainerUtils.getSuccessResponse(msg);
     } catch (NoSuchAlgorithmException e) {
       throw new StorageContainerException("No such Algorithm", e,
           NO_SUCH_ALGORITHM);
@@ -561,7 +557,8 @@ public class Dispatcher implements ContainerDispatcher {
           msg.getTraceID());
       return ContainerUtils.malformedRequest(msg);
     }
-    KeyData keyData = KeyData.getFromProtoBuf(msg.getGetKey().getKeyData());
+    KeyData keyData = new KeyData(
+        BlockID.getFromProtobuf(msg.getGetKey().getBlockID()));
     Preconditions.checkNotNull(keyData);
     KeyData responseData =
         this.containerManager.getKeyManager().getKey(keyData);
@@ -634,7 +631,7 @@ public class Dispatcher implements ContainerDispatcher {
       chunks.add(chunkInfo.getProtoBufMessage());
       keyData.setChunks(chunks);
       this.containerManager.getKeyManager().putKey(keyData);
-      return FileUtils.getPutFileResponse(msg);
+      return SmallFileUtils.getPutFileResponseSuccess(msg);
     } catch (StorageContainerException e) {
       return ContainerUtils.logAndReturnError(LOG, e, msg);
     } catch (IOException e) {
@@ -661,8 +658,8 @@ public class Dispatcher implements ContainerDispatcher {
     }
     try {
       long bytes = 0;
-      KeyData keyData = KeyData.getFromProtoBuf(msg.getGetSmallFile()
-          .getKey().getKeyData());
+      KeyData keyData = new KeyData(BlockID.getFromProtobuf(
+          msg.getGetSmallFile().getKey().getBlockID()));
       KeyData data = this.containerManager.getKeyManager().getKey(keyData);
       ContainerProtos.ChunkInfo c = null;
       for (ContainerProtos.ChunkInfo chunk : data.getChunks()) {
@@ -675,8 +672,8 @@ public class Dispatcher implements ContainerDispatcher {
         c = chunk;
       }
       metrics.incContainerBytesStats(Type.GetSmallFile, bytes);
-      return FileUtils.getGetSmallFileResponse(msg, dataBuf.toByteArray(),
-          ChunkInfo.getFromProtoBuf(c));
+      return SmallFileUtils.getGetSmallFileResponseSuccess(
+          msg, dataBuf.toByteArray(), ChunkInfo.getFromProtoBuf(c));
     } catch (StorageContainerException e) {
       return ContainerUtils.logAndReturnError(LOG, e, msg);
     } catch (IOException e) {

+ 180 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java

@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerType;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Ozone Container dispatcher takes a call from the netty server and routes it
+ * to the right handler function.
+ */
+public class HddsDispatcher implements ContainerDispatcher {
+
+  static final Logger LOG = LoggerFactory.getLogger(HddsDispatcher.class);
+
+  private final Map<ContainerType, Handler> handlers;
+  private final Configuration conf;
+  private final ContainerSet containerSet;
+  private final VolumeSet volumeSet;
+  private final String scmID;
+
+  /**
+   * Constructs an OzoneContainer that receives calls from
+   * XceiverServerHandler.
+   */
+  public HddsDispatcher(Configuration config, ContainerSet contSet,
+      VolumeSet volumes, String scmId) {
+    // TODO: Pass ContainerSet, VolumeSet and scmID, intialize metrics
+    this.conf = config;
+    this.containerSet = contSet;
+    this.volumeSet = volumes;
+    this.scmID = scmId;
+    this.handlers = Maps.newHashMap();
+    for (ContainerType containerType : ContainerType.values()) {
+      handlers.put(containerType,
+          Handler.getHandlerForContainerType(
+              containerType, conf, containerSet, volumeSet, scmID));
+    }
+  }
+
+  @Override
+  public void init() {
+  }
+
+  @Override
+  public void shutdown() {
+  }
+
+  @Override
+  public ContainerCommandResponseProto dispatch(
+      ContainerCommandRequestProto msg) {
+    LOG.trace("Command {}, trace ID: {} ", msg.getCmdType().toString(),
+        msg.getTraceID());
+    Preconditions.checkNotNull(msg);
+
+    Container container = null;
+    ContainerType containerType = null;
+    try {
+      long containerID = getContainerID(msg);
+
+      if (msg.getCmdType() != ContainerProtos.Type.CreateContainer) {
+        container = getContainer(containerID);
+        containerType = getContainerType(container);
+      } else {
+        containerType = msg.getCreateContainer().getContainerType();
+      }
+    } catch (StorageContainerException ex) {
+      return ContainerUtils.logAndReturnError(LOG, ex, msg);
+    }
+
+    Handler handler = getHandlerForContainerType(containerType);
+    if (handler == null) {
+      StorageContainerException ex = new StorageContainerException("Invalid " +
+          "ContainerType " + containerType,
+          ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
+      return ContainerUtils.logAndReturnError(LOG, ex, msg);
+    }
+    return handler.handle(msg, container);
+  }
+
+  @VisibleForTesting
+  public Handler getHandlerForContainerType(ContainerType type) {
+    return handlers.get(type);
+  }
+
+  private long getContainerID(ContainerCommandRequestProto request)
+      throws StorageContainerException {
+    ContainerProtos.Type cmdType = request.getCmdType();
+
+    switch(cmdType) {
+    case CreateContainer:
+      return request.getCreateContainer().getContainerID();
+    case ReadContainer:
+      return request.getReadContainer().getContainerID();
+    case UpdateContainer:
+      return request.getUpdateContainer().getContainerID();
+    case DeleteContainer:
+      return request.getDeleteContainer().getContainerID();
+    case ListContainer:
+      return request.getListContainer().getStartContainerID();
+    case CloseContainer:
+      return request.getCloseContainer().getContainerID();
+    case PutKey:
+      return request.getPutKey().getKeyData().getBlockID().getContainerID();
+    case GetKey:
+      return request.getGetKey().getBlockID().getContainerID();
+    case DeleteKey:
+      return request.getDeleteKey().getBlockID().getContainerID();
+    case ListKey:
+      return request.getListKey().getContainerID();
+    case ReadChunk:
+      return request.getReadChunk().getBlockID().getContainerID();
+    case DeleteChunk:
+      return request.getDeleteChunk().getBlockID().getContainerID();
+    case WriteChunk:
+      return request.getWriteChunk().getBlockID().getContainerID();
+    case ListChunk:
+      return request.getListChunk().getBlockID().getContainerID();
+    case PutSmallFile:
+      return request.getPutSmallFile().getKey().getKeyData().getBlockID()
+          .getContainerID();
+    case GetSmallFile:
+      return request.getGetSmallFile().getKey().getBlockID().getContainerID();
+    }
+
+    throw new StorageContainerException(
+        ContainerProtos.Result.UNSUPPORTED_REQUEST);
+  }
+
+  @VisibleForTesting
+  public Container getContainer(long containerID)
+      throws StorageContainerException {
+    Container container = containerSet.getContainer(containerID);
+    if (container == null) {
+      throw new StorageContainerException(
+          "ContainerID " + containerID + " does not exist",
+          ContainerProtos.Result.CONTAINER_NOT_FOUND);
+    }
+    return container;
+  }
+
+  private ContainerType getContainerType(Container container) {
+    return container.getContainerType();
+  }
+}

+ 15 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java

@@ -19,6 +19,9 @@
 package org.apache.hadoop.ozone.container.common.interfaces;
 
 
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerLifeCycleState;
 import org.apache.hadoop.hdds.scm.container.common.helpers.
     StorageContainerException;
 
@@ -68,6 +71,14 @@ public interface Container extends RwLock {
    */
   ContainerData getContainerData() throws StorageContainerException;
 
+  /**
+   * Get the Container Lifecycle state.
+   *
+   * @return ContainerLifeCycleState - Container State.
+   * @throws StorageContainerException
+   */
+  ContainerLifeCycleState getContainerState();
+
   /**
    * Closes a open container, if it is already closed or does not exist a
    * StorageContainerException is thrown.
@@ -76,5 +87,9 @@ public interface Container extends RwLock {
    */
   void close() throws StorageContainerException;
 
+  /**
+   * Return the ContainerType for the container.
+   */
+  ContainerProtos.ContainerType getContainerType();
 
 }

+ 71 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java

@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.interfaces;
+
+import com.sun.jersey.spi.resource.Singleton;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerType;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+
+import java.io.IOException;
+
+/**
+ * Dispatcher sends ContainerCommandRequests to Handler. Each Container Type
+ * should have an implementation for Handler.
+ */
+public class Handler {
+
+  protected final Configuration conf;
+  protected final ContainerSet containerSet;
+  protected final VolumeSet volumeSet;
+  protected final String scmID;
+
+  protected Handler(Configuration config, ContainerSet contSet,
+      VolumeSet volumeSet, String scmID) {
+    conf = config;
+    containerSet = contSet;
+    this.volumeSet = volumeSet;
+    this.scmID = scmID;
+  }
+
+  public static Handler getHandlerForContainerType(ContainerType containerType,
+      Configuration config, ContainerSet contSet, VolumeSet volumeSet,
+      String scmID) {
+    switch (containerType) {
+    case KeyValueContainer:
+      return KeyValueHandler.getInstance(config, contSet, volumeSet, scmID);
+    default:
+      throw new IllegalArgumentException("Handler for ContainerType: " +
+        containerType + "doesn't exist.");
+    }
+  }
+
+  public ContainerCommandResponseProto handle(
+      ContainerCommandRequestProto msg, Container container) {
+    return null;
+  }
+}

+ 2 - 3
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java

@@ -208,7 +208,7 @@ public class ContainerStateMachine extends BaseStateMachine {
   private CompletableFuture<Message> handleCreateContainer(
       ContainerCommandRequestProto requestProto) {
     long containerID =
-        requestProto.getCreateContainer().getContainerData().getContainerID();
+        requestProto.getCreateContainer().getContainerID();
     createContainerFutureMap.
         computeIfAbsent(containerID, k -> new CompletableFuture<>());
     return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
@@ -265,8 +265,7 @@ public class ContainerStateMachine extends BaseStateMachine {
         Message message = runCommand(requestProto);
         if (cmdType == ContainerProtos.Type.CreateContainer) {
           long containerID =
-              requestProto.getCreateContainer()
-                  .getContainerData().getContainerID();
+              requestProto.getCreateContainer().getContainerID();
           createContainerFutureMap.remove(containerID).complete(message);
         }
         return CompletableFuture.completedFuture(message);

+ 7 - 4
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java

@@ -25,9 +25,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
 import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume.VolumeState;
@@ -101,10 +101,13 @@ public class VolumeSet {
         new InstrumentedLock(getClass().getName(), LOG,
             new ReentrantLock(true),
             conf.getTimeDuration(
-                DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
-                DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
+                OzoneConfigKeys.HDDS_WRITE_LOCK_REPORTING_THRESHOLD_MS_KEY,
+                OzoneConfigKeys.HDDS_WRITE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT,
                 TimeUnit.MILLISECONDS),
-            300));
+            conf.getTimeDuration(
+                OzoneConfigKeys.HDDS_LOCK_SUPPRESS_WARNING_INTERVAL_MS_KEY,
+                OzoneConfigKeys.HDDS_LOCK_SUPPRESS_WARNING_INTERVAL_MS_DEAFULT,
+                TimeUnit.MILLISECONDS)));
 
     initializeVolumeSet();
   }

+ 19 - 10
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java

@@ -18,29 +18,29 @@
 
 package org.apache.hadoop.ozone.container.keyvalue;
 
-
 import com.google.common.base.Preconditions;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerLifeCycleState;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
-
-
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueYaml;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.keyvalue.helpers
+    .KeyValueContainerLocationUtil;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.apache.hadoop.utils.MetadataStore;
 import org.slf4j.Logger;
@@ -116,9 +116,9 @@ public class KeyValueContainer implements Container {
     Preconditions.checkNotNull(scmId, "scmId cannot be null");
 
     File containerMetaDataPath = null;
+    //acquiring volumeset lock and container lock
+    volumeSet.acquireLock();
     try {
-      //acquiring volumeset lock and container lock
-      volumeSet.acquireLock();
       HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet
           .getVolumesList(), containerMaxSize);
       String containerBasePath = containerVolume.getHddsRootDir().toString();
@@ -404,10 +404,19 @@ public class KeyValueContainer implements Container {
   }
 
   @Override
-  public ContainerData getContainerData()  {
+  public KeyValueContainerData getContainerData()  {
     return containerData;
   }
 
+  @Override
+  public ContainerLifeCycleState getContainerState() {
+    return containerData.getState();
+  }
+
+  @Override
+  public ContainerProtos.ContainerType getContainerType() {
+    return ContainerProtos.ContainerType.KeyValueContainer;
+  }
 
   @Override
   public void update(Map<String, String> metadata, boolean forceUpdate)

+ 28 - 19
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyValueContainerData.java → hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java

@@ -16,13 +16,15 @@
  *  limitations under the License.
  */
 
-package org.apache.hadoop.ozone.container.common.impl;
+package org.apache.hadoop.ozone.container.keyvalue;
 
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Map;
 
 /**
  * This class represents the KeyValueContainer metadata, which is the
@@ -161,31 +163,38 @@ public class KeyValueContainerData extends ContainerData {
     this.numPendingDeletionBlocks -= numBlocks;
   }
 
-
   /**
-   * Constructs a KeyValueContainerData object from ProtoBuf classes.
+   * Returns a ProtoBuf Message from ContainerData.
    *
-   * @param protoData - ProtoBuf Message
-   * @throws IOException
+   * @return Protocol Buffer Message
    */
-  public static KeyValueContainerData getFromProtoBuf(
-      ContainerProtos.CreateContainerData protoData) throws IOException {
-
-    long containerID;
-    ContainerProtos.ContainerType containerType;
+  public ContainerProtos.ContainerData getProtoBufMessage() {
+    ContainerProtos.ContainerData.Builder builder = ContainerProtos
+        .ContainerData.newBuilder();
+    builder.setContainerID(this.getContainerId());
+    builder.setDbPath(this.getDbFile().getPath());
+    builder.setContainerPath(this.getMetadataPath());
+    builder.setState(this.getState());
+
+    for (Map.Entry<String, String> entry : getMetadata().entrySet()) {
+      ContainerProtos.KeyValue.Builder keyValBuilder =
+          ContainerProtos.KeyValue.newBuilder();
+      builder.addMetadata(keyValBuilder.setKey(entry.getKey())
+          .setValue(entry.getValue()).build());
+    }
 
-    containerID = protoData.getContainerId();
-    containerType = protoData.getContainerType();
+    if (this.getBytesUsed() >= 0) {
+      builder.setBytesUsed(this.getBytesUsed());
+    }
 
-    KeyValueContainerData keyValueContainerData = new KeyValueContainerData(
-        containerType, containerID);
+    if(this.getContainerType() != null) {
+      builder.setContainerType(ContainerProtos.ContainerType.KeyValueContainer);
+    }
 
-    for (int x = 0; x < protoData.getMetadataCount(); x++) {
-      keyValueContainerData.addMetadata(protoData.getMetadata(x).getKey(),
-          protoData.getMetadata(x).getValue());
+    if(this.getContainerDBType() != null) {
+      builder.setContainerDBType(containerDBType);
     }
 
-    return keyValueContainerData;
+    return builder.build();
   }
-
 }

+ 643 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java

@@ -0,0 +1,643 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import com.sun.jersey.spi.resource.Singleton;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerType;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .CreateContainerRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .GetSmallFileRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .KeyValue;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .PutSmallFileRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .Type;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume
+    .RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
+import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl;
+import org.apache.hadoop.ozone.container.keyvalue.impl.KeyManagerImpl;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.KeyManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .Result.*;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .Stage;
+
+/**
+ * Handler for KeyValue Container type.
+ */
+@Singleton
+public class KeyValueHandler extends Handler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      KeyValueHandler.class);
+
+  private static volatile KeyValueHandler INSTANCE = null; // Singleton class
+
+  private final ContainerType containerType;
+  private final KeyManager keyManager;
+  private final ChunkManager chunkManager;
+  private VolumeChoosingPolicy volumeChoosingPolicy;
+
+  // TODO : Add metrics and populate it.
+
+  public static KeyValueHandler getInstance(Configuration config,
+      ContainerSet contSet, VolumeSet volSet, String scmID) {
+    if (INSTANCE == null) {
+      INSTANCE = new KeyValueHandler(config, contSet, volSet, scmID);
+    }
+    return INSTANCE;
+  }
+
+  private KeyValueHandler(Configuration config, ContainerSet contSet,
+      VolumeSet volSet, String scmID) {
+    super(config, contSet, volSet, scmID);
+    containerType = ContainerType.KeyValueContainer;
+    keyManager = new KeyManagerImpl(config);
+    chunkManager = new ChunkManagerImpl();
+    // TODO: Add supoort for different volumeChoosingPolicies.
+    volumeChoosingPolicy = new RoundRobinVolumeChoosingPolicy();
+  }
+
+  @Override
+  public ContainerCommandResponseProto handle(
+      ContainerCommandRequestProto request, Container container) {
+
+    Type cmdType = request.getCmdType();
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    switch(cmdType) {
+    case CreateContainer:
+      return handleCreateContainer(request, kvContainer);
+    case ReadContainer:
+      return handleReadContainer(request, kvContainer);
+    case UpdateContainer:
+      return handleUpdateContainer(request, kvContainer);
+    case DeleteContainer:
+      return handleDeleteContainer(request, kvContainer);
+    case ListContainer:
+      return handleUnsupportedOp(request);
+    case CloseContainer:
+      return handleCloseContainer(request, kvContainer);
+    case PutKey:
+      return handlePutKey(request, kvContainer);
+    case GetKey:
+      return handleGetKey(request, kvContainer);
+    case DeleteKey:
+      return handleDeleteKey(request, kvContainer);
+    case ListKey:
+      return handleUnsupportedOp(request);
+    case ReadChunk:
+      return handleReadChunk(request, kvContainer);
+    case DeleteChunk:
+      return handleDeleteChunk(request, kvContainer);
+    case WriteChunk:
+      return handleWriteChunk(request, kvContainer);
+    case ListChunk:
+      return handleUnsupportedOp(request);
+    case CompactChunk:
+      return handleUnsupportedOp(request);
+    case PutSmallFile:
+      return handlePutSmallFile(request, kvContainer);
+    case GetSmallFile:
+      return handleGetSmallFile(request, kvContainer);
+    }
+
+    return null;
+  }
+
+  /**
+   * Handles Create Container Request. If successful, adds the container to
+   * ContainerSet.
+   */
+  ContainerCommandResponseProto handleCreateContainer(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+    if (!request.hasCreateContainer()) {
+      LOG.debug("Malformed Create Container request. trace ID: {}",
+          request.getTraceID());
+      return ContainerUtils.malformedRequest(request);
+    }
+    // Create Container request should be passed a null container as the
+    // container would be created here.
+    Preconditions.checkArgument(kvContainer == null);
+
+    CreateContainerRequestProto createContainerReq =
+        request.getCreateContainer();
+    long containerID = createContainerReq.getContainerID();
+    if (createContainerReq.hasContainerType()) {
+      Preconditions.checkArgument(createContainerReq.getContainerType()
+          .equals(ContainerType.KeyValueContainer));
+    }
+
+    KeyValueContainerData newContainerData = new KeyValueContainerData(
+        containerType, containerID);
+    // TODO: Add support to add metadataList to ContainerData. Add metadata
+    // to container during creation.
+    KeyValueContainer newContainer = new KeyValueContainer(
+        newContainerData, conf);
+
+    try {
+      newContainer.create(volumeSet, volumeChoosingPolicy, scmID);
+      containerSet.addContainer(newContainer);
+    } catch (StorageContainerException ex) {
+      return ContainerUtils.logAndReturnError(LOG, ex, request);
+    }
+
+    return ContainerUtils.getSuccessResponse(request);
+  }
+
+  /**
+   * Handles Read Container Request. Returns the ContainerData as response.
+   */
+  ContainerCommandResponseProto handleReadContainer(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+    if (!request.hasReadContainer()) {
+      LOG.debug("Malformed Read Container request. trace ID: {}",
+          request.getTraceID());
+      return ContainerUtils.malformedRequest(request);
+    }
+
+    KeyValueContainerData containerData = kvContainer.getContainerData();
+    return KeyValueContainerUtil.getReadContainerResponse(
+        request, containerData);
+  }
+
+
+  /**
+   * Handles Update Container Request. If successful, the container metadata
+   * is updated.
+   */
+  ContainerCommandResponseProto handleUpdateContainer(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+    if (!request.hasUpdateContainer()) {
+      LOG.debug("Malformed Update Container request. trace ID: {}",
+          request.getTraceID());
+      return ContainerUtils.malformedRequest(request);
+    }
+
+    boolean forceUpdate = request.getUpdateContainer().getForceUpdate();
+    List<KeyValue> keyValueList =
+        request.getUpdateContainer().getMetadataList();
+    Map<String, String> metadata = new HashMap<>();
+    for (KeyValue keyValue : keyValueList) {
+      metadata.put(keyValue.getKey(), keyValue.getValue());
+    }
+
+    try {
+      if (!metadata.isEmpty()) {
+        kvContainer.update(metadata, forceUpdate);
+      }
+    } catch (StorageContainerException ex) {
+      return ContainerUtils.logAndReturnError(LOG, ex, request);
+    }
+    return ContainerUtils.getSuccessResponse(request);
+  }
+
+  /**
+   * Handles Delete Container Request.
+   * Open containers cannot be deleted.
+   * Holds writeLock on ContainerSet till the container is removed from
+   * containerMap. On disk deletion of container files will happen
+   * asynchornously without the lock.
+   */
+  ContainerCommandResponseProto handleDeleteContainer(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+    if (!request.hasDeleteContainer()) {
+      LOG.debug("Malformed Delete container request. trace ID: {}",
+          request.getTraceID());
+      return ContainerUtils.malformedRequest(request);
+    }
+
+    boolean forceDelete = request.getDeleteContainer().getForceDelete();
+    kvContainer.writeLock();
+
+    try {
+      // Check if container is open
+      if (kvContainer.getContainerData().isOpen()) {
+        kvContainer.writeUnlock();
+        throw new StorageContainerException(
+            "Deletion of Open Container is not allowed.",
+            DELETE_ON_OPEN_CONTAINER);
+      } else {
+        containerSet.removeContainer(
+            kvContainer.getContainerData().getContainerId());
+        // Release the lock first.
+        // Avoid holding write locks for disk operations
+        kvContainer.writeUnlock();
+
+        kvContainer.delete(forceDelete);
+      }
+    } catch (StorageContainerException ex) {
+      return ContainerUtils.logAndReturnError(LOG, ex, request);
+    } finally {
+      if (kvContainer.hasWriteLock()) {
+        kvContainer.writeUnlock();
+      }
+    }
+    return ContainerUtils.getSuccessResponse(request);
+  }
+
+  /**
+   * Handles Close Container Request. An open container is closed.
+   */
+  ContainerCommandResponseProto handleCloseContainer(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+    if (!request.hasCloseContainer()) {
+      LOG.debug("Malformed Update Container request. trace ID: {}",
+          request.getTraceID());
+      return ContainerUtils.malformedRequest(request);
+    }
+
+    try {
+      checkContainerOpen(kvContainer);
+
+      kvContainer.close();
+    } catch (StorageContainerException ex) {
+      return ContainerUtils.logAndReturnError(LOG, ex, request);
+    }
+
+    return ContainerUtils.getSuccessResponse(request);
+  }
+
+  /**
+   * Handle Put Key operation. Calls KeyManager to process the request.
+   */
+  ContainerCommandResponseProto handlePutKey(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+    if (!request.hasPutKey()) {
+      LOG.debug("Malformed Put Key request. trace ID: {}",
+          request.getTraceID());
+      return ContainerUtils.malformedRequest(request);
+    }
+
+    try {
+      checkContainerOpen(kvContainer);
+
+      KeyData keyData = KeyData.getFromProtoBuf(
+          request.getPutKey().getKeyData());
+      Preconditions.checkNotNull(keyData);
+
+      keyManager.putKey(kvContainer, keyData);
+    } catch (StorageContainerException ex) {
+      return ContainerUtils.logAndReturnError(LOG, ex, request);
+    } catch (IOException ex) {
+      return ContainerUtils.logAndReturnError(LOG,
+          new StorageContainerException("Put Key failed", ex, IO_EXCEPTION),
+          request);
+    }
+
+    return KeyUtils.getKeyResponseSuccess(request);
+  }
+
+  /**
+   * Handle Get Key operation. Calls KeyManager to process the request.
+   */
+  ContainerCommandResponseProto handleGetKey(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+    if (!request.hasGetKey()) {
+      LOG.debug("Malformed Get Key request. trace ID: {}",
+          request.getTraceID());
+      return ContainerUtils.malformedRequest(request);
+    }
+
+    KeyData responseData;
+    try {
+      BlockID blockID = BlockID.getFromProtobuf(
+          request.getGetKey().getBlockID());
+      responseData = keyManager.getKey(kvContainer, blockID);
+
+    } catch (StorageContainerException ex) {
+      return ContainerUtils.logAndReturnError(LOG, ex, request);
+    } catch (IOException ex) {
+      return ContainerUtils.logAndReturnError(LOG,
+          new StorageContainerException("Get Key failed", ex, IO_EXCEPTION),
+          request);
+    }
+
+    return KeyUtils.getKeyDataResponse(request, responseData);
+  }
+
+  /**
+   * Handle Delete Key operation. Calls KeyManager to process the request.
+   */
+  ContainerCommandResponseProto handleDeleteKey(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+    if (!request.hasDeleteKey()) {
+      LOG.debug("Malformed Delete Key request. trace ID: {}",
+          request.getTraceID());
+      return ContainerUtils.malformedRequest(request);
+    }
+
+    try {
+      checkContainerOpen(kvContainer);
+
+      BlockID blockID = BlockID.getFromProtobuf(
+          request.getDeleteKey().getBlockID());
+
+      keyManager.deleteKey(kvContainer, blockID);
+    } catch (StorageContainerException ex) {
+      return ContainerUtils.logAndReturnError(LOG, ex, request);
+    } catch (IOException ex) {
+      return ContainerUtils.logAndReturnError(LOG,
+          new StorageContainerException("Delete Key failed", ex, IO_EXCEPTION),
+          request);
+    }
+
+    return KeyUtils.getKeyResponseSuccess(request);
+  }
+
+  /**
+   * Handle Read Chunk operation. Calls ChunkManager to process the request.
+   */
+  ContainerCommandResponseProto handleReadChunk(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+    if (!request.hasReadChunk()) {
+      LOG.debug("Malformed Read Chunk request. trace ID: {}",
+          request.getTraceID());
+      return ContainerUtils.malformedRequest(request);
+    }
+
+    ChunkInfo chunkInfo;
+    byte[] data;
+    try {
+      BlockID blockID = BlockID.getFromProtobuf(
+          request.getReadChunk().getBlockID());
+      chunkInfo = ChunkInfo.getFromProtoBuf(request.getReadChunk()
+          .getChunkData());
+      Preconditions.checkNotNull(chunkInfo);
+
+      data = chunkManager.readChunk(kvContainer, blockID, chunkInfo);
+    } catch (StorageContainerException ex) {
+      return ContainerUtils.logAndReturnError(LOG, ex, request);
+    } catch (IOException ex) {
+      return ContainerUtils.logAndReturnError(LOG,
+          new StorageContainerException("Read Chunk failed", ex, IO_EXCEPTION),
+          request);
+    }
+
+    return ChunkUtils.getReadChunkResponse(request, data, chunkInfo);
+  }
+
+  /**
+   * Handle Delete Chunk operation. Calls ChunkManager to process the request.
+   */
+  ContainerCommandResponseProto handleDeleteChunk(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+    if (!request.hasDeleteChunk()) {
+      LOG.debug("Malformed Delete Chunk request. trace ID: {}",
+          request.getTraceID());
+      return ContainerUtils.malformedRequest(request);
+    }
+
+    try {
+      checkContainerOpen(kvContainer);
+
+      BlockID blockID = BlockID.getFromProtobuf(
+          request.getDeleteChunk().getBlockID());
+      ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(request.getDeleteChunk()
+          .getChunkData());
+      Preconditions.checkNotNull(chunkInfo);
+
+      chunkManager.deleteChunk(kvContainer, blockID, chunkInfo);
+    } catch (StorageContainerException ex) {
+      return ContainerUtils.logAndReturnError(LOG, ex, request);
+    } catch (IOException ex) {
+      return ContainerUtils.logAndReturnError(LOG,
+          new StorageContainerException("Delete Chunk failed", ex,
+              IO_EXCEPTION), request);
+    }
+
+    return ChunkUtils.getChunkResponseSuccess(request);
+  }
+
+  /**
+   * Handle Write Chunk operation. Calls ChunkManager to process the request.
+   */
+  ContainerCommandResponseProto handleWriteChunk(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+    if (!request.hasWriteChunk()) {
+      LOG.debug("Malformed Write Chunk request. trace ID: {}",
+          request.getTraceID());
+      return ContainerUtils.malformedRequest(request);
+    }
+
+    try {
+      checkContainerOpen(kvContainer);
+
+      BlockID blockID = BlockID.getFromProtobuf(
+          request.getWriteChunk().getBlockID());
+      ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(request.getWriteChunk()
+          .getChunkData());
+      Preconditions.checkNotNull(chunkInfo);
+
+      byte[] data = null;
+      if (request.getWriteChunk().getStage() == Stage.WRITE_DATA ||
+          request.getWriteChunk().getStage() == Stage.COMBINED) {
+        data = request.getWriteChunk().getData().toByteArray();
+      }
+
+      chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data,
+          request.getWriteChunk().getStage());
+    } catch (StorageContainerException ex) {
+      return ContainerUtils.logAndReturnError(LOG, ex, request);
+    } catch (IOException ex) {
+      return ContainerUtils.logAndReturnError(LOG,
+          new StorageContainerException("Write Chunk failed", ex, IO_EXCEPTION),
+          request);
+    }
+
+    return ChunkUtils.getChunkResponseSuccess(request);
+  }
+
+  /**
+   * Handle Put Small File operation. Writes the chunk and associated key
+   * using a single RPC. Calls KeyManager and ChunkManager to process the
+   * request.
+   */
+  ContainerCommandResponseProto handlePutSmallFile(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+    if (!request.hasPutSmallFile()) {
+      LOG.debug("Malformed Put Small File request. trace ID: {}",
+          request.getTraceID());
+      return ContainerUtils.malformedRequest(request);
+    }
+    PutSmallFileRequestProto putSmallFileReq =
+        request.getPutSmallFile();
+
+    try {
+      checkContainerOpen(kvContainer);
+
+      BlockID blockID = BlockID.getFromProtobuf(
+        putSmallFileReq.getKey().getKeyData().getBlockID());
+      KeyData keyData = KeyData.getFromProtoBuf(
+          putSmallFileReq.getKey().getKeyData());
+      Preconditions.checkNotNull(keyData);
+
+      ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(
+          putSmallFileReq.getChunkInfo());
+      Preconditions.checkNotNull(chunkInfo);
+
+      byte[] data = putSmallFileReq.getData().toByteArray();
+      chunkManager.writeChunk(
+          kvContainer, blockID, chunkInfo, data, Stage.COMBINED);
+
+      List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
+      chunks.add(chunkInfo.getProtoBufMessage());
+      keyData.setChunks(chunks);
+      keyManager.putKey(kvContainer, keyData);
+
+    } catch (StorageContainerException ex) {
+      return ContainerUtils.logAndReturnError(LOG, ex, request);
+    } catch (IOException ex) {
+      return ContainerUtils.logAndReturnError(LOG,
+          new StorageContainerException("Read Chunk failed", ex,
+              PUT_SMALL_FILE_ERROR), request);
+    }
+
+    return SmallFileUtils.getPutFileResponseSuccess(request);
+  }
+
+  /**
+   * Handle Get Small File operation. Gets a data stream using a key. This
+   * helps in reducing the RPC overhead for small files. Calls KeyManager and
+   * ChunkManager to process the request.
+   */
+  ContainerCommandResponseProto handleGetSmallFile(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+    if (!request.hasGetSmallFile()) {
+      LOG.debug("Malformed Get Small File request. trace ID: {}",
+          request.getTraceID());
+      return ContainerUtils.malformedRequest(request);
+    }
+
+    GetSmallFileRequestProto getSmallFileReq = request.getGetSmallFile();
+
+    try {
+      BlockID blockID = BlockID.getFromProtobuf(
+        getSmallFileReq.getKey().getBlockID());
+      KeyData responseData = keyManager.getKey(kvContainer, blockID);
+
+      ContainerProtos.ChunkInfo chunkInfo = null;
+      ByteString dataBuf = ByteString.EMPTY;
+      for (ContainerProtos.ChunkInfo chunk : responseData.getChunks()) {
+        byte[] data = chunkManager.readChunk(kvContainer, blockID,
+            ChunkInfo.getFromProtoBuf(chunk));
+        ByteString current = ByteString.copyFrom(data);
+        dataBuf = dataBuf.concat(current);
+        chunkInfo = chunk;
+      }
+
+      return SmallFileUtils.getGetSmallFileResponseSuccess(request, dataBuf
+          .toByteArray(), ChunkInfo.getFromProtoBuf(chunkInfo));
+    } catch (StorageContainerException e) {
+      return ContainerUtils.logAndReturnError(LOG, e, request);
+    } catch (IOException ex) {
+      return ContainerUtils.logAndReturnError(LOG,
+          new StorageContainerException("Write Chunk failed", ex,
+              GET_SMALL_FILE_ERROR), request);
+    }
+  }
+
+  /**
+   * Handle unsupported operation.
+   */
+  ContainerCommandResponseProto handleUnsupportedOp(
+      ContainerCommandRequestProto request) {
+    // TODO : remove all unsupported operations or handle them.
+    return ContainerUtils.unsupportedRequest(request);
+  }
+
+  /**
+   * Check if container is open. Throw exception otherwise.
+   * @param kvContainer
+   * @throws StorageContainerException
+   */
+  private void checkContainerOpen(KeyValueContainer kvContainer)
+      throws StorageContainerException {
+
+    ContainerProtos.ContainerLifeCycleState containerState =
+        kvContainer.getContainerState();
+
+    if (containerState == ContainerProtos.ContainerLifeCycleState.OPEN) {
+      return;
+    } else {
+      String msg = "Requested operation not allowed as ContainerState is " +
+          containerState;
+      ContainerProtos.Result result = null;
+      switch (containerState) {
+      case CLOSING:
+      case CLOSED:
+        result = CLOSED_CONTAINER_IO;
+        break;
+      case INVALID:
+        result = INVALID_CONTAINER_STATE;
+        break;
+      default:
+        result = CONTAINER_INTERNAL_ERROR;
+      }
+
+      throw new StorageContainerException(msg, result);
+    }
+  }
+}

+ 6 - 9
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyValueYaml.java → hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueYaml.java

@@ -16,13 +16,13 @@
  *  limitations under the License.
  */
 
-package org.apache.hadoop.ozone.container.common.impl;
+package org.apache.hadoop.ozone.container.keyvalue;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.yaml.snakeyaml.Yaml;
 
-
 import java.beans.IntrospectionException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -30,10 +30,7 @@ import java.io.Writer;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.OutputStreamWriter;
-
 import java.io.File;
-
-
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.Map;
@@ -77,8 +74,8 @@ public final class KeyValueYaml {
 
     Representer representer = new KeyValueContainerDataRepresenter();
     representer.setPropertyUtils(propertyUtils);
-    representer.addClassTag(org.apache.hadoop.ozone.container.common.impl
-        .KeyValueContainerData.class, new Tag("KeyValueContainerData"));
+    representer.addClassTag(
+        KeyValueContainerData.class, new Tag("KeyValueContainerData"));
 
     Constructor keyValueDataConstructor = new KeyValueDataConstructor();
 
@@ -109,8 +106,8 @@ public final class KeyValueYaml {
 
       Representer representer = new KeyValueContainerDataRepresenter();
       representer.setPropertyUtils(propertyUtils);
-      representer.addClassTag(org.apache.hadoop.ozone.container.common.impl
-          .KeyValueContainerData.class, new Tag("KeyValueContainerData"));
+      representer.addClassTag(
+          KeyValueContainerData.class, new Tag("KeyValueContainerData"));
 
       Constructor keyValueDataConstructor = new KeyValueDataConstructor();
 

+ 48 - 2
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java

@@ -21,12 +21,21 @@ package org.apache.hadoop.ozone.container.keyvalue.helpers;
 import com.google.common.base.Preconditions;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ReadChunkResponseProto;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -292,4 +301,41 @@ public final class ChunkUtils {
         (Boolean.valueOf(overWrite));
   }
 
+  /**
+   * Returns a CreateContainer Response. This call is used by create and delete
+   * containers which have null success responses.
+   *
+   * @param msg Request
+   * @return Response.
+   */
+  public static ContainerCommandResponseProto getChunkResponseSuccess(
+      ContainerCommandRequestProto msg) {
+    return ContainerUtils.getSuccessResponse(msg);
+  }
+
+  /**
+   * Gets a response to the read chunk calls.
+   *
+   * @param msg - Msg
+   * @param data - Data
+   * @param info - Info
+   * @return Response.
+   */
+  public static ContainerCommandResponseProto getReadChunkResponse(
+      ContainerCommandRequestProto msg, byte[] data, ChunkInfo info) {
+    Preconditions.checkNotNull(msg);
+    Preconditions.checkNotNull("Chunk data is null", data);
+    Preconditions.checkNotNull("Chunk Info is null", info);
+
+    ReadChunkResponseProto.Builder response =
+        ReadChunkResponseProto.newBuilder();
+    response.setChunkData(info.getProtoBufMessage());
+    response.setData(ByteString.copyFrom(data));
+    response.setBlockID(msg.getReadChunk().getBlockID());
+
+    ContainerCommandResponseProto.Builder builder =
+        ContainerUtils.getSuccessResponseBuilder(msg);
+    builder.setReadChunk(response);
+    return builder.build();
+  }
 }

+ 33 - 2
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java

@@ -21,9 +21,17 @@ package org.apache.hadoop.ozone.container.keyvalue.helpers;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .GetKeyResponseProto;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.helpers.KeyData;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
 import org.apache.hadoop.utils.MetadataStore;
 
@@ -112,4 +120,27 @@ public final class KeyUtils {
           " bytes array.", NO_SUCH_KEY);
     }
   }
+
+  /**
+   * Returns successful keyResponse.
+   * @param msg - Request.
+   * @return Response.
+   */
+  public static ContainerCommandResponseProto getKeyResponseSuccess(
+      ContainerCommandRequestProto msg) {
+    return ContainerUtils.getSuccessResponse(msg);
+  }
+
+
+  public static ContainerCommandResponseProto getKeyDataResponse(
+      ContainerCommandRequestProto msg, KeyData data) {
+    GetKeyResponseProto.Builder getKey = ContainerProtos
+        .GetKeyResponseProto
+        .newBuilder();
+    getKey.setKeyData(data.getProtoBufMessage());
+    ContainerProtos.ContainerCommandResponseProto.Builder builder =
+        ContainerUtils.getSuccessResponseBuilder(msg);
+    builder.setGetKey(getKey);
+    return  builder.build();
+  }
 }

+ 1 - 1
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerLocationUtil.java → hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerLocationUtil.java

@@ -15,7 +15,7 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package org.apache.hadoop.ozone.container.keyvalue;
+package org.apache.hadoop.ozone.container.keyvalue.helpers;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.ozone.OzoneConsts;

+ 30 - 5
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerUtil.java → hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java

@@ -15,19 +15,23 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package org.apache.hadoop.ozone.container.keyvalue;
+package org.apache.hadoop.ozone.container.keyvalue.helpers;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
-
-import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.utils.MetadataStore;
 import org.apache.hadoop.utils.MetadataStoreBuilder;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -143,6 +147,27 @@ public final class KeyValueContainerUtil {
 
     //Delete Container directory
     FileUtils.deleteDirectory(containerMetaDataPath.getParentFile());
+  }
+
+  /**
+   * Returns a ReadContainer Response.
+   *
+   * @param request Request
+   * @param containerData - data
+   * @return Response.
+   */
+  public static ContainerCommandResponseProto getReadContainerResponse(
+      ContainerCommandRequestProto request,
+      KeyValueContainerData containerData) {
+    Preconditions.checkNotNull(containerData);
+
+    ContainerProtos.ReadContainerResponseProto.Builder response =
+        ContainerProtos.ReadContainerResponseProto.newBuilder();
+    response.setContainerData(containerData.getProtoBufMessage());
 
+    ContainerCommandResponseProto.Builder builder =
+        ContainerUtils.getSuccessResponseBuilder(request);
+    builder.setReadContainer(response);
+    return builder.build();
   }
 }

+ 19 - 17
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/FileUtils.java → hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java

@@ -15,21 +15,27 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package org.apache.hadoop.ozone.container.common.helpers;
+package org.apache.hadoop.ozone.container.keyvalue.helpers;
 
 import com.google.common.base.Preconditions;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandResponseProto;
 
 /**
  * File Utils are helper routines used by putSmallFile and getSmallFile
  * RPCs.
  */
-public final class FileUtils {
+public final class SmallFileUtils {
   /**
    * Never Constructed.
    */
-  private FileUtils() {
+  private SmallFileUtils() {
   }
 
   /**
@@ -37,13 +43,12 @@ public final class FileUtils {
    * @param msg - ContainerCommandRequestProto
    * @return - ContainerCommandResponseProto
    */
-  public static ContainerProtos.ContainerCommandResponseProto
-      getPutFileResponse(ContainerProtos.ContainerCommandRequestProto msg) {
+  public static ContainerCommandResponseProto getPutFileResponseSuccess(
+      ContainerCommandRequestProto msg) {
     ContainerProtos.PutSmallFileResponseProto.Builder getResponse =
         ContainerProtos.PutSmallFileResponseProto.newBuilder();
-    ContainerProtos.ContainerCommandResponseProto.Builder builder =
-        ContainerUtils.getContainerResponse(msg, ContainerProtos.Result
-            .SUCCESS, "");
+    ContainerCommandResponseProto.Builder builder =
+        ContainerUtils.getSuccessResponseBuilder(msg);
     builder.setCmdType(ContainerProtos.Type.PutSmallFile);
     builder.setPutSmallFile(getResponse);
     return  builder.build();
@@ -56,24 +61,21 @@ public final class FileUtils {
    * @param info  - Info
    * @return    Response.
    */
-  public static ContainerProtos.ContainerCommandResponseProto
-      getGetSmallFileResponse(ContainerProtos.ContainerCommandRequestProto msg,
-      byte[] data, ChunkInfo info) {
+  public static ContainerCommandResponseProto getGetSmallFileResponseSuccess(
+      ContainerCommandRequestProto msg, byte[] data, ChunkInfo info) {
     Preconditions.checkNotNull(msg);
 
     ContainerProtos.ReadChunkResponseProto.Builder readChunkresponse =
         ContainerProtos.ReadChunkResponseProto.newBuilder();
     readChunkresponse.setChunkData(info.getProtoBufMessage());
     readChunkresponse.setData(ByteString.copyFrom(data));
-    readChunkresponse.setBlockID(msg.getGetSmallFile().getKey().
-        getKeyData().getBlockID());
+    readChunkresponse.setBlockID(msg.getGetSmallFile().getKey().getBlockID());
 
     ContainerProtos.GetSmallFileResponseProto.Builder getSmallFile =
         ContainerProtos.GetSmallFileResponseProto.newBuilder();
     getSmallFile.setData(readChunkresponse.build());
-    ContainerProtos.ContainerCommandResponseProto.Builder builder =
-        ContainerUtils.getContainerResponse(msg, ContainerProtos.Result
-            .SUCCESS, "");
+    ContainerCommandResponseProto.Builder builder =
+        ContainerUtils.getSuccessResponseBuilder(msg);
     builder.setCmdType(ContainerProtos.Type.GetSmallFile);
     builder.setGetSmallFile(getSmallFile);
     return builder.build();

+ 2 - 2
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/ChunkManagerImpl.java → hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java

@@ -16,7 +16,7 @@
  *  limitations under the License.
  */
 
-package org.apache.hadoop.ozone.container.keyvalue;
+package org.apache.hadoop.ozone.container.keyvalue.impl;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.FileUtil;
@@ -25,9 +25,9 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
 import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.slf4j.Logger;

+ 12 - 8
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyManagerImpl.java → hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyManagerImpl.java

@@ -16,7 +16,7 @@
  *  limitations under the License.
  */
 
-package org.apache.hadoop.ozone.container.keyvalue;
+package org.apache.hadoop.ozone.container.keyvalue.impl;
 
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.Longs;
@@ -24,9 +24,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
 import org.apache.hadoop.ozone.container.common.helpers.KeyData;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.keyvalue.interfaces.KeyManager;
 import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
@@ -89,21 +90,24 @@ public class KeyManagerImpl implements KeyManager {
    * Gets an existing key.
    *
    * @param container - Container from which key need to be get.
-   * @param data - Key Data.
+   * @param blockID - BlockID of the key.
    * @return Key Data.
    * @throws IOException
    */
-  public KeyData getKey(Container container, KeyData data) throws IOException {
-    Preconditions.checkNotNull(data, "Key data cannot be null");
-    Preconditions.checkNotNull(data.getContainerID(), "Container name cannot" +
-        " be null");
+  public KeyData getKey(Container container, BlockID blockID)
+      throws IOException {
+    Preconditions.checkNotNull(blockID,
+        "BlockID cannot be null in GetKet request");
+    Preconditions.checkNotNull(blockID.getContainerID(),
+        "Container name cannot be null");
+
     KeyValueContainerData containerData = (KeyValueContainerData) container
         .getContainerData();
     MetadataStore db = KeyUtils.getDB(containerData, config);
     // This is a post condition that acts as a hint to the user.
     // Should never fail.
     Preconditions.checkNotNull(db, "DB cannot be null here");
-    byte[] kData = db.get(Longs.toByteArray(data.getLocalID()));
+    byte[] kData = db.get(Longs.toByteArray(blockID.getLocalID()));
     if (kData == null) {
       throw new StorageContainerException("Unable to find the key.",
           NO_SUCH_KEY);

+ 2 - 2
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java

@@ -43,11 +43,11 @@ public interface KeyManager {
    * Gets an existing key.
    *
    * @param container - Container from which key need to be get.
-   * @param data - Key Data.
+   * @param blockID - BlockID of the Key.
    * @return Key Data.
    * @throws IOException
    */
-  KeyData getKey(Container container, KeyData data) throws IOException;
+  KeyData getKey(Container container, BlockID blockID) throws IOException;
 
   /**
    * Deletes an existing Key.

+ 1 - 35
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java

@@ -19,12 +19,11 @@
 package org.apache.hadoop.ozone.container.common;
 
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
-import java.io.IOException;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -32,39 +31,6 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class TestKeyValueContainerData {
 
-  @Test
-  public void testGetFromProtoBuf() throws IOException {
-
-    long containerId = 1L;
-    ContainerProtos.ContainerType containerType = ContainerProtos
-        .ContainerType.KeyValueContainer;
-    int layOutVersion = 1;
-    ContainerProtos.ContainerLifeCycleState state = ContainerProtos
-        .ContainerLifeCycleState.OPEN;
-
-    ContainerProtos.KeyValue.Builder keyValBuilder =
-        ContainerProtos.KeyValue.newBuilder();
-    ContainerProtos.CreateContainerData containerData = ContainerProtos
-        .CreateContainerData.newBuilder()
-        .setContainerType(containerType)
-        .setContainerId(containerId)
-        .addMetadata(0, keyValBuilder.setKey("VOLUME").setValue("ozone")
-            .build())
-        .addMetadata(1, keyValBuilder.setKey("OWNER").setValue("hdfs")
-            .build()).build();
-
-    KeyValueContainerData kvData = KeyValueContainerData.getFromProtoBuf(
-        containerData);
-    assertEquals(containerType, kvData.getContainerType());
-    assertEquals(containerId, kvData.getContainerId());
-    assertEquals(layOutVersion, kvData.getLayOutVersion());
-    assertEquals(state, kvData.getState());
-    assertEquals(2, kvData.getMetadata().size());
-    assertEquals("ozone", kvData.getMetadata().get("VOLUME"));
-    assertEquals("hdfs", kvData.getMetadata().get("OWNER"));
-
-  }
-
   @Test
   public void testKeyValueData() {
     long containerId = 1L;

+ 10 - 2
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerSet.java

@@ -25,6 +25,8 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerExcep
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -37,6 +39,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Class used to test ContainerSet operations.
@@ -59,8 +62,13 @@ public class TestContainerSet {
     //addContainer
     boolean result = containerSet.addContainer(keyValueContainer);
     assertTrue(result);
-    result = containerSet.addContainer(keyValueContainer);
-    assertFalse(result);
+    try {
+      result = containerSet.addContainer(keyValueContainer);
+      fail("Adding same container ID twice should fail.");
+    } catch (StorageContainerException ex) {
+      GenericTestUtils.assertExceptionContains("Container already exists with" +
+          " container Id " + containerId, ex);
+    }
 
     //getContainer
     KeyValueContainer container = (KeyValueContainer) containerSet

+ 2 - 0
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestKeyValueYaml.java

@@ -21,6 +21,8 @@ package org.apache.hadoop.ozone.container.common.impl;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueYaml;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Test;
 

+ 91 - 0
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/interfaces/TestHandler.java

@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.interfaces;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import static org.junit.Assert.fail;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
+import org.mockito.Mockito;
+
+import java.util.UUID;
+
+/**
+ * Tests Handler interface.
+ */
+public class TestHandler {
+  @Rule
+  public TestRule timeout = new Timeout(300000);
+
+  private Configuration conf;
+  private HddsDispatcher dispatcher;
+  private ContainerSet containerSet;
+  private VolumeSet volumeSet;
+  private Handler handler;
+
+  private final static String SCM_ID = UUID.randomUUID().toString();
+  private final static String DATANODE_UUID = UUID.randomUUID().toString();
+
+  @Before
+  public void setup() throws Exception {
+    this.conf = new Configuration();
+    this.containerSet = Mockito.mock(ContainerSet.class);
+    this.volumeSet = Mockito.mock(VolumeSet.class);
+
+    this.dispatcher = new HddsDispatcher(conf, containerSet, volumeSet, SCM_ID);
+  }
+
+  @Test
+  public void testGetKeyValueHandler() throws Exception {
+    Handler kvHandler = dispatcher.getHandlerForContainerType(
+        ContainerProtos.ContainerType.KeyValueContainer);
+
+    Assert.assertTrue("getHandlerForContainerType returned incorrect handler",
+        (kvHandler instanceof KeyValueHandler));
+  }
+
+  @Test
+  public void testGetHandlerForInvalidContainerType() {
+    // When new ContainerProtos.ContainerType are added, increment the code
+    // for invalid enum.
+    ContainerProtos.ContainerType invalidContainerType =
+        ContainerProtos.ContainerType.forNumber(2);
+
+    Assert.assertEquals("New ContainerType detected. Not an invalid " +
+        "containerType", invalidContainerType, null);
+
+    Handler handler = dispatcher.getHandlerForContainerType(
+        invalidContainerType);
+    Assert.assertEquals("Get Handler for Invalid ContainerType should " +
+        "return null.", handler, null);
+  }
+}

+ 1 - 2
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java

@@ -18,17 +18,16 @@
 
 package org.apache.hadoop.ozone.container.keyvalue;
 
-
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.KeyData;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Before;
 import org.junit.Rule;

+ 3 - 4
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyManagerImpl.java

@@ -26,9 +26,9 @@ import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.KeyData;
 import org.apache.hadoop.ozone.container.common.volume
     .RoundRobinVolumeChoosingPolicy;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.keyvalue.impl.KeyManagerImpl;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Before;
 import org.junit.Rule;
@@ -110,7 +110,7 @@ public class TestKeyManagerImpl {
 
     //Get Key
     KeyData fromGetKeyData = keyValueContainerManager.getKey(keyValueContainer,
-        keyData);
+        keyData.getBlockID());
 
     assertEquals(keyData.getContainerID(), fromGetKeyData.getContainerID());
     assertEquals(keyData.getLocalID(), fromGetKeyData.getLocalID());
@@ -168,8 +168,7 @@ public class TestKeyManagerImpl {
   @Test
   public void testGetNoSuchKey() throws Exception {
     try {
-      keyData = new KeyData(new BlockID(1L, 2L));
-      keyValueContainerManager.getKey(keyValueContainer, keyData);
+      keyValueContainerManager.getKey(keyValueContainer, new BlockID(1L, 2L));
       fail("testGetNoSuchKey failed");
     } catch (StorageContainerException ex) {
       GenericTestUtils.assertExceptionContains("Unable to find the key.", ex);

+ 6 - 4
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java

@@ -22,13 +22,15 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 
 
-import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueYaml;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume
+    .RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 
+import org.apache.hadoop.ozone.container.keyvalue.helpers
+    .KeyValueContainerLocationUtil;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.DiskChecker;
 import org.junit.Before;

+ 246 - 0
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java

@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.times;
+
+import java.util.UUID;
+
+/**
+ * Unit tests for {@link KeyValueHandler}.
+ */
+public class TestKeyValueHandler {
+
+  @Rule
+  public TestRule timeout = new Timeout(300000);
+
+  private Configuration conf;
+  private HddsDispatcher dispatcher;
+  private ContainerSet containerSet;
+  private VolumeSet volumeSet;
+  private KeyValueHandler handler;
+
+  private final static String SCM_ID = UUID.randomUUID().toString();
+  private final static String DATANODE_UUID = UUID.randomUUID().toString();
+  private int containerID;
+
+  private final String baseDir = MiniDFSCluster.getBaseDirectory();
+  private final String volume = baseDir + "disk1";
+
+  private void setup() throws Exception {
+    this.conf = new Configuration();
+    conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY, volume);
+
+    this.containerSet = new ContainerSet();
+    DatanodeDetails datanodeDetails = DatanodeDetails.newBuilder()
+        .setUuid(DATANODE_UUID)
+        .setHostName("localhost")
+        .setIpAddress("127.0.0.1")
+        .build();
+    this.volumeSet = new VolumeSet(datanodeDetails, conf);
+
+    this.dispatcher = new HddsDispatcher(conf, containerSet, volumeSet, SCM_ID);
+    this.handler = (KeyValueHandler) dispatcher.getHandlerForContainerType(
+        ContainerProtos.ContainerType.KeyValueContainer);
+  }
+
+  @Test
+  /**
+   * Test that Handler handles different command types correctly.
+   */
+  public void testHandlerCommandHandling() throws Exception{
+    // Create mock HddsDispatcher and KeyValueHandler.
+    this.handler = Mockito.mock(KeyValueHandler.class);
+    this.dispatcher = Mockito.mock(HddsDispatcher.class);
+    Mockito.when(dispatcher.getHandlerForContainerType(any())).thenReturn
+        (handler);
+    Mockito.when(dispatcher.dispatch(any())).thenCallRealMethod();
+    Mockito.when(dispatcher.getContainer(anyLong())).thenReturn(
+        Mockito.mock(KeyValueContainer.class));
+    Mockito.when(handler.handle(any(), any())).thenCallRealMethod();
+
+    // Test Create Container Request handling
+    ContainerCommandRequestProto createContainerRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.CreateContainer);
+    dispatcher.dispatch(createContainerRequest);
+    Mockito.verify(handler, times(1)).handleCreateContainer(
+        any(ContainerCommandRequestProto.class), any());
+
+    // Test Read Container Request handling
+    ContainerCommandRequestProto readContainerRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.ReadContainer);
+    dispatcher.dispatch(readContainerRequest);
+    Mockito.verify(handler, times(1)).handleReadContainer(
+        any(ContainerCommandRequestProto.class), any());
+
+    // Test Update Container Request handling
+    ContainerCommandRequestProto updateContainerRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.UpdateContainer);
+    dispatcher.dispatch(updateContainerRequest);
+    Mockito.verify(handler, times(1)).handleUpdateContainer(
+        any(ContainerCommandRequestProto.class), any());
+
+    // Test Delete Container Request handling
+    ContainerCommandRequestProto deleteContainerRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.DeleteContainer);
+    dispatcher.dispatch(deleteContainerRequest);
+    Mockito.verify(handler, times(1)).handleDeleteContainer(
+        any(ContainerCommandRequestProto.class), any());
+
+    // Test List Container Request handling
+    ContainerCommandRequestProto listContainerRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.ListContainer);
+    dispatcher.dispatch(listContainerRequest);
+    Mockito.verify(handler, times(1)).handleUnsupportedOp(
+        any(ContainerCommandRequestProto.class));
+
+    // Test Close Container Request handling
+    ContainerCommandRequestProto closeContainerRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.CloseContainer);
+    dispatcher.dispatch(closeContainerRequest);
+    Mockito.verify(handler, times(1)).handleCloseContainer(
+        any(ContainerCommandRequestProto.class), any());
+
+    // Test Put Key Request handling
+    ContainerCommandRequestProto putKeyRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.PutKey);
+    dispatcher.dispatch(putKeyRequest);
+    Mockito.verify(handler, times(1)).handlePutKey(
+        any(ContainerCommandRequestProto.class), any());
+
+    // Test Get Key Request handling
+    ContainerCommandRequestProto getKeyRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.GetKey);
+    dispatcher.dispatch(getKeyRequest);
+    Mockito.verify(handler, times(1)).handleGetKey(
+        any(ContainerCommandRequestProto.class), any());
+
+    // Test Delete Key Request handling
+    ContainerCommandRequestProto deleteKeyRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.DeleteKey);
+    dispatcher.dispatch(deleteKeyRequest);
+    Mockito.verify(handler, times(1)).handleDeleteKey(
+        any(ContainerCommandRequestProto.class), any());
+
+    // Test List Key Request handling
+    ContainerCommandRequestProto listKeyRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.ListKey);
+    dispatcher.dispatch(listKeyRequest);
+    Mockito.verify(handler, times(2)).handleUnsupportedOp(
+        any(ContainerCommandRequestProto.class));
+
+    // Test Read Chunk Request handling
+    ContainerCommandRequestProto readChunkRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.ReadChunk);
+    dispatcher.dispatch(readChunkRequest);
+    Mockito.verify(handler, times(1)).handleReadChunk(
+        any(ContainerCommandRequestProto.class), any());
+
+    // Test Delete Chunk Request handling
+    ContainerCommandRequestProto deleteChunkRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.DeleteChunk);
+    dispatcher.dispatch(deleteChunkRequest);
+    Mockito.verify(handler, times(1)).handleDeleteChunk(
+        any(ContainerCommandRequestProto.class), any());
+
+    // Test Write Chunk Request handling
+    ContainerCommandRequestProto writeChunkRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.WriteChunk);
+    dispatcher.dispatch(writeChunkRequest);
+    Mockito.verify(handler, times(1)).handleWriteChunk(
+        any(ContainerCommandRequestProto.class), any());
+
+    // Test List Chunk Request handling
+    ContainerCommandRequestProto listChunkRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.ListChunk);
+    dispatcher.dispatch(listChunkRequest);
+    Mockito.verify(handler, times(3)).handleUnsupportedOp(
+        any(ContainerCommandRequestProto.class));
+
+    // Test Put Small File Request handling
+    ContainerCommandRequestProto putSmallFileRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.PutSmallFile);
+    dispatcher.dispatch(putSmallFileRequest);
+    Mockito.verify(handler, times(1)).handlePutSmallFile(
+        any(ContainerCommandRequestProto.class), any());
+
+    // Test Get Small File Request handling
+    ContainerCommandRequestProto getSmallFileRequest =
+        getDummyCommandRequestProto(ContainerProtos.Type.GetSmallFile);
+    dispatcher.dispatch(getSmallFileRequest);
+    Mockito.verify(handler, times(1)).handleGetSmallFile(
+        any(ContainerCommandRequestProto.class), any());
+  }
+
+  private ContainerCommandRequestProto getDummyCommandRequestProto
+      (ContainerProtos.Type cmdType) {
+    ContainerCommandRequestProto request =
+        ContainerProtos.ContainerCommandRequestProto.newBuilder()
+            .setCmdType(cmdType)
+            .setDatanodeUuid(DATANODE_UUID)
+            .build();
+
+    return request;
+  }
+
+  @Test
+  public void testCreateContainer() throws Exception {
+    setup();
+
+    long contId = ++containerID;
+    ContainerProtos.CreateContainerRequestProto createReq =
+        ContainerProtos.CreateContainerRequestProto.newBuilder()
+            .setContainerID(contId)
+            .build();
+
+    ContainerCommandRequestProto request =
+        ContainerProtos.ContainerCommandRequestProto.newBuilder()
+            .setCmdType(ContainerProtos.Type.CreateContainer)
+            .setDatanodeUuid(DATANODE_UUID)
+            .setCreateContainer(createReq)
+            .build();
+
+    dispatcher.dispatch(request);
+
+    // Verify that new container is added to containerSet.
+    Container container = containerSet.getContainer(contId);
+    Assert.assertEquals(contId, container.getContainerData().getContainerId());
+    Assert.assertEquals(ContainerProtos.ContainerLifeCycleState.OPEN,
+        container.getContainerState());
+  }
+}

+ 3 - 3
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java

@@ -281,10 +281,10 @@ public class ChunkGroupInputStream extends InputStream implements Seekable {
         LOG.debug("get key accessing {} {}",
             containerID, containerKey);
         groupInputStream.streamOffset[i] = length;
-          ContainerProtos.KeyData containerKeyData = OzoneContainerTranslation
-            .containerKeyDataForRead(blockID);
+        ContainerProtos.DatanodeBlockID datanodeBlockID = blockID
+            .getDatanodeBlockIDProtobuf();
         ContainerProtos.GetKeyResponseProto response = ContainerProtocolCalls
-            .getKey(xceiverClient, containerKeyData, requestId);
+            .getKey(xceiverClient, datanodeBlockID, requestId);
         List<ContainerProtos.ChunkInfo> chunks =
             response.getKeyData().getChunksList();
         for (ContainerProtos.ChunkInfo chunk : chunks) {

+ 0 - 50
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneContainerTranslation.java

@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.client.io;
-
-
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyData;
-import org.apache.hadoop.hdds.client.BlockID;
-
-
-/**
- * This class contains methods that define the translation between the Ozone
- * domain model and the storage container domain model.
- */
-final class OzoneContainerTranslation {
-
-  /**
-   * Creates key data intended for reading a container key.
-   *
-   * @param blockID - ID of the block.
-   * @return KeyData intended for reading the container key
-   */
-  public static KeyData containerKeyDataForRead(BlockID blockID) {
-    return KeyData
-        .newBuilder()
-        .setBlockID(blockID.getDatanodeBlockIDProtobuf())
-        .build();
-  }
-
-  /**
-   * There is no need to instantiate this class.
-   */
-  private OzoneContainerTranslation() {
-  }
-}

+ 7 - 18
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java

@@ -362,10 +362,7 @@ public final class ContainerTestHelper {
     ContainerProtos.CreateContainerRequestProto.Builder createRequest =
         ContainerProtos.CreateContainerRequestProto
             .newBuilder();
-    ContainerProtos.ContainerData.Builder containerData = ContainerProtos
-        .ContainerData.newBuilder();
-    containerData.setContainerID(containerID);
-    createRequest.setContainerData(containerData.build());
+    createRequest.setContainerID(containerID);
 
     ContainerCommandRequestProto.Builder request =
         ContainerCommandRequestProto.newBuilder();
@@ -391,19 +388,16 @@ public final class ContainerTestHelper {
       long containerID, Map<String, String> metaData) throws IOException {
     ContainerProtos.UpdateContainerRequestProto.Builder updateRequestBuilder =
         ContainerProtos.UpdateContainerRequestProto.newBuilder();
-    ContainerProtos.ContainerData.Builder containerData = ContainerProtos
-        .ContainerData.newBuilder();
-    containerData.setContainerID(containerID);
+    updateRequestBuilder.setContainerID(containerID);
     String[] keys = metaData.keySet().toArray(new String[]{});
     for(int i=0; i<keys.length; i++) {
       KeyValue.Builder kvBuilder = KeyValue.newBuilder();
       kvBuilder.setKey(keys[i]);
       kvBuilder.setValue(metaData.get(keys[i]));
-      containerData.addMetadata(i, kvBuilder.build());
+      updateRequestBuilder.addMetadata(kvBuilder.build());
     }
     Pipeline pipeline =
         ContainerTestHelper.createSingleNodePipeline();
-    updateRequestBuilder.setContainerData(containerData.build());
 
     ContainerCommandRequestProto.Builder request =
         ContainerCommandRequestProto.newBuilder();
@@ -478,10 +472,7 @@ public final class ContainerTestHelper {
 
     ContainerProtos.GetKeyRequestProto.Builder getRequest =
         ContainerProtos.GetKeyRequestProto.newBuilder();
-    ContainerProtos.KeyData.Builder keyData = ContainerProtos.KeyData
-        .newBuilder();
-    keyData.setBlockID(blockID);
-    getRequest.setKeyData(keyData);
+    getRequest.setBlockID(blockID);
 
     ContainerCommandRequestProto.Builder request =
         ContainerCommandRequestProto.newBuilder();
@@ -499,13 +490,11 @@ public final class ContainerTestHelper {
    * @param response - Response
    */
   public static void verifyGetKey(ContainerCommandRequestProto request,
-      ContainerCommandResponseProto response) {
+      ContainerCommandResponseProto response, int expectedChunksCount) {
     Assert.assertEquals(request.getTraceID(), response.getTraceID());
     Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
-    ContainerProtos.PutKeyRequestProto putKey = request.getPutKey();
-    ContainerProtos.GetKeyRequestProto getKey = request.getGetKey();
-    Assert.assertEquals(putKey.getKeyData().getChunksCount(),
-        getKey.getKeyData().getChunksCount());
+    Assert.assertEquals(expectedChunksCount,
+        response.getGetKey().getKeyData().getChunksCount());
   }
 
   /**

+ 4 - 2
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java

@@ -158,7 +158,8 @@ public class TestOzoneContainer {
       // Get Key
       request = ContainerTestHelper.getKeyRequest(pipeline, putKeyRequest.getPutKey());
       response = client.sendCommand(request);
-      ContainerTestHelper.verifyGetKey(request, response);
+      int chunksCount = putKeyRequest.getPutKey().getKeyData().getChunksCount();
+      ContainerTestHelper.verifyGetKey(request, response, chunksCount);
 
 
       // Delete Key
@@ -331,7 +332,8 @@ public class TestOzoneContainer {
       request = ContainerTestHelper.getKeyRequest(client.getPipeline(),
           putKeyRequest.getPutKey());
       response = client.sendCommand(request);
-      ContainerTestHelper.verifyGetKey(request, response);
+      int chunksCount = putKeyRequest.getPutKey().getKeyData().getChunksCount();
+      ContainerTestHelper.verifyGetKey(request, response, chunksCount);
 
       // Delete Key must fail on a closed container.
       request =

+ 4 - 8
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java

@@ -174,9 +174,7 @@ public class BenchMarkDatanodeDispatcher {
   private ContainerCommandRequestProto getCreateContainerCommand(long containerID) {
     CreateContainerRequestProto.Builder createRequest =
         CreateContainerRequestProto.newBuilder();
-    createRequest.setContainerData(
-        ContainerData.newBuilder().setContainerID(
-            containerID).build());
+    createRequest.setContainerID(containerID).build();
 
     ContainerCommandRequestProto.Builder request =
         ContainerCommandRequestProto.newBuilder();
@@ -245,10 +243,9 @@ public class BenchMarkDatanodeDispatcher {
     return request.build();
   }
 
-  private ContainerCommandRequestProto getGetKeyCommand(
-      BlockID blockID, String chunkKey) {
+  private ContainerCommandRequestProto getGetKeyCommand(BlockID blockID) {
     GetKeyRequestProto.Builder readKeyRequest = GetKeyRequestProto.newBuilder()
-        .setKeyData(getKeyData(blockID, chunkKey));
+        .setBlockID(blockID.getDatanodeBlockIDProtobuf());
     ContainerCommandRequestProto.Builder request = ContainerCommandRequestProto
         .newBuilder()
         .setCmdType(ContainerProtos.Type.GetKey)
@@ -300,8 +297,7 @@ public class BenchMarkDatanodeDispatcher {
   @Benchmark
   public void getKey(BenchMarkDatanodeDispatcher bmdd) {
     BlockID blockID = getRandomBlockID();
-    String chunkKey = getNewChunkToWrite();
-    bmdd.dispatcher.dispatch(getGetKeyCommand(blockID, chunkKey));
+    bmdd.dispatcher.dispatch(getGetKeyCommand(blockID));
   }
 
   // Chunks writes from benchmark only reaches certain containers