Browse Source

HDFS-10250. Ozone: Add key Persistence. Contributed by Anu Engineer.

Chris Nauroth 9 years ago
parent
commit
b3044db407
17 changed files with 1080 additions and 101 deletions
  1. 4 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
  2. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
  3. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
  4. 160 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java
  5. 98 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
  6. 22 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
  7. 123 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
  8. 145 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java
  9. 20 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
  10. 63 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java
  11. 111 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java
  12. 4 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/LevelDBStore.java
  13. 6 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
  14. 55 64
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeContainerProtocol.proto
  15. 90 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
  16. 151 19
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
  17. 23 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java

@@ -54,6 +54,10 @@ public final class OzoneConfigKeys {
   public static final String DFS_OZONE_METADATA_DIRS =
       "dfs.ozone.metadata.dirs";
 
+  public static final String DFS_OZONE_KEY_CACHE = "dfs.ozone.key.cache.size";
+  public static final int DFS_OZONE_KEY_CACHE_DEFAULT = 1024;
+
+
 
   /**
    * There is no need to instantiate this class.

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java

@@ -216,7 +216,7 @@ public final class ChunkUtils {
    * Reads data from an existing chunk file.
    *
    * @param chunkFile - file where data lives.
-   * @param data      - chunk defintion.
+   * @param data      - chunk definition.
    * @return ByteBuffer
    * @throws IOException
    * @throws ExecutionException
@@ -284,8 +284,8 @@ public final class ChunkUtils {
                            byte[] data, ChunkInfo info) {
     Preconditions.checkNotNull(msg);
 
-    ContainerProtos.ReadChunkReponseProto.Builder response =
-        ContainerProtos.ReadChunkReponseProto.newBuilder();
+    ContainerProtos.ReadChunkResponseProto.Builder response =
+        ContainerProtos.ReadChunkResponseProto.newBuilder();
     response.setChunkData(info.getProtoBufMessage());
     response.setData(ByteString.copyFrom(data));
     response.setPipeline(msg.getReadChunk().getPipeline());

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java

@@ -170,9 +170,9 @@ public class ContainerData {
   }
 
   /**
-   * This function serves as the generic key for OzoneCache class. Both
+   * This function serves as the generic key for ContainerCache class. Both
    * ContainerData and ContainerKeyData overrides this function to appropriately
-   * return the right name that can  be used in OzoneCache.
+   * return the right name that can  be used in ContainerCache.
    *
    * @return String Name.
    */

+ 160 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyData.java

@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.helpers;
+
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Helper class to convert Protobuf to Java classes.
+ */
+public class KeyData {
+  private final String containerName;
+  private final String keyName;
+  private final Map<String, String> metadata;
+
+  /**
+   * Please note : when we are working with keys, we don't care what they point
+   * to. So we We don't read chunkinfo nor validate them. It is responsibility
+   * of higher layer like ozone. We just read and write data from network.
+   */
+  private List<ContainerProtos.ChunkInfo> chunks;
+
+  /**
+   * Constructs a KeyData Object.
+   *
+   * @param containerName
+   * @param keyName
+   */
+  public KeyData(String containerName, String keyName) {
+    this.containerName = containerName;
+    this.keyName = keyName;
+    this.metadata = new TreeMap<>();
+  }
+
+  /**
+   * Returns a keyData object from the protobuf data.
+   *
+   * @param data - Protobuf data.
+   * @return - KeyData
+   * @throws IOException
+   */
+  public static KeyData getFromProtoBuf(ContainerProtos.KeyData data) throws
+      IOException {
+    KeyData keyData = new KeyData(data.getContainerName(), data.getName());
+    for (int x = 0; x < data.getMetadataCount(); x++) {
+      keyData.addMetadata(data.getMetadata(x).getKey(),
+          data.getMetadata(x).getValue());
+    }
+    keyData.setChunks(data.getChunksList());
+    return keyData;
+  }
+
+  /**
+   * Returns a Protobuf message from KeyData.
+   * @return Proto Buf Message.
+   */
+  public ContainerProtos.KeyData getProtoBufMessage() {
+    ContainerProtos.KeyData.Builder builder =
+        ContainerProtos.KeyData.newBuilder();
+    builder.setContainerName(this.containerName);
+    builder.setName(this.getKeyName());
+    builder.addAllChunks(this.chunks);
+    for (Map.Entry<String, String> entry : metadata.entrySet()) {
+      ContainerProtos.KeyValue.Builder keyValBuilder =
+          ContainerProtos.KeyValue.newBuilder();
+      builder.addMetadata(keyValBuilder.setKey(entry.getKey())
+          .setValue(entry.getValue()).build());
+    }
+    return builder.build();
+  }
+
+  /**
+   * Adds metadata.
+   *
+   * @param key   - Key
+   * @param value - Value
+   * @throws IOException
+   */
+  public synchronized void addMetadata(String key, String value) throws
+      IOException {
+    if (this.metadata.containsKey(key)) {
+      throw new IOException("This key already exists. Key " + key);
+    }
+    metadata.put(key, value);
+  }
+
+  public synchronized Map<String, String> getMetadata() {
+    return Collections.unmodifiableMap(this.metadata);
+  }
+
+  /**
+   * Returns value of a key.
+   */
+  public synchronized String getValue(String key) {
+    return metadata.get(key);
+  }
+
+  /**
+   * Deletes a metadata entry from the map.
+   *
+   * @param key - Key
+   */
+  public synchronized void deleteKey(String key) {
+    metadata.remove(key);
+  }
+
+  /**
+   * Returns chunks list.
+   *
+   * @return list of chunkinfo.
+   */
+  public List<ContainerProtos.ChunkInfo> getChunks() {
+    return chunks;
+  }
+
+  /**
+   * Returns container Name.
+   * @return String.
+   */
+  public String getContainerName() {
+    return containerName;
+  }
+
+  /**
+   * Returns KeyName.
+   * @return String.
+   */
+  public String getKeyName() {
+    return keyName;
+  }
+
+  /**
+   * Sets Chunk list.
+   *
+   * @param chunks - List of chunks.
+   */
+  public void setChunks(List<ContainerProtos.ChunkInfo> chunks) {
+    this.chunks = chunks;
+  }
+}

+ 98 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java

@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.helpers;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
+import org.apache.hadoop.ozone.container.common.utils.LevelDBStore;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+/**
+ * Utils functions to help key functions.
+ */
+public final class KeyUtils {
+  public static final String ENCODING_NAME = "UTF-8";
+  public static final Charset ENCODING = Charset.forName(ENCODING_NAME);
+
+  /**
+   * Never Constructed.
+   */
+  private KeyUtils() {
+  }
+
+  /**
+   * Returns a file handle to LevelDB.
+   *
+   * @param dbPath - DbPath.
+   * @return LevelDB
+   */
+  public static LevelDBStore getDB(String dbPath) throws IOException {
+    Preconditions.checkNotNull(dbPath);
+    Preconditions.checkState(!dbPath.isEmpty());
+    return new LevelDBStore(new File(dbPath), false);
+  }
+
+  /**
+   * This function is called with  containerManager ReadLock held.
+   *
+   * @param container - container.
+   * @param cache     - cache
+   * @return LevelDB handle.
+   * @throws IOException
+   */
+  public static LevelDBStore getDB(ContainerData container,
+                                   ContainerCache cache) throws IOException {
+    Preconditions.checkNotNull(container);
+    Preconditions.checkNotNull(cache);
+    LevelDBStore db = cache.getDB(container.getContainerName());
+    if (db == null) {
+      db = getDB(container.getDBPath());
+      cache.putDB(container.getContainerName(), db);
+    }
+    return db;
+  }
+
+  /**
+   * Returns successful keyResponse.
+   * @param msg - Request.
+   * @return Response.
+   */
+  public static ContainerProtos.ContainerCommandResponseProto
+      getKeyResponse(ContainerProtos.ContainerCommandRequestProto msg) {
+    return ContainerUtils.getContainerResponse(msg);
+  }
+
+
+  public static ContainerProtos.ContainerCommandResponseProto
+      getKeyDataResponse(ContainerProtos.ContainerCommandRequestProto msg
+                       , KeyData data) {
+    ContainerProtos.GetKeyResponseProto.Builder getKey = ContainerProtos
+        .GetKeyResponseProto.newBuilder();
+    getKey.setKeyData(data.getProtoBufMessage());
+    ContainerProtos.ContainerCommandResponseProto.Builder builder =
+        ContainerUtils.getContainerResponse(msg, ContainerProtos.Result
+            .SUCCESS, "");
+    builder.setGetKey(getKey);
+    return  builder.build();
+  }
+
+}

+ 22 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
 import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerLocationManager;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
+import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,6 +73,7 @@ public class ContainerManagerImpl implements ContainerManager {
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
   private ContainerLocationManager locationManager;
   private ChunkManager chunkManager;
+  private KeyManager keyManager;
 
   /**
    * Init call that sets up a container Manager.
@@ -464,6 +466,26 @@ public class ContainerManagerImpl implements ContainerManager {
     return this.chunkManager;
   }
 
+  /**
+   * Sets the Key Manager.
+   *
+   * @param keyManager - Key Manager.
+   */
+  @Override
+  public void setKeyManager(KeyManager keyManager) {
+    this.keyManager = keyManager;
+  }
+
+  /**
+   * Gets the Key Manager.
+   *
+   * @return KeyManager.
+   */
+  @Override
+  public KeyManager getKeyManager() {
+    return this.keyManager;
+  }
+
   /**
    * Filter out only container files from the container metadata dir.
    */

+ 123 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java

@@ -20,13 +20,17 @@ package org.apache.hadoop.ozone.container.common.impl;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
+    .ContainerCommandResponseProto;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
 import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
@@ -69,6 +73,12 @@ public class Dispatcher implements ContainerDispatcher {
       return containerProcessHandler(msg);
     }
 
+    if ((cmdType == Type.PutKey) ||
+        (cmdType == Type.GetKey) ||
+        (cmdType == Type.DeleteKey) ||
+        (cmdType == Type.ListKey)) {
+      return keyProcessHandler(msg);
+    }
 
     if ((cmdType == Type.WriteChunk) ||
         (cmdType == Type.ReadChunk) ||
@@ -126,6 +136,48 @@ public class Dispatcher implements ContainerDispatcher {
     }
   }
 
+  /**
+   * Handles the all key related functionality.
+   *
+   * @param msg - command
+   * @return - response
+   * @throws IOException
+   */
+  private ContainerCommandResponseProto keyProcessHandler(
+      ContainerCommandRequestProto msg) throws IOException {
+    try {
+      switch (msg.getCmdType()) {
+      case PutKey:
+        return handlePutKey(msg);
+
+      case GetKey:
+        return handleGetKey(msg);
+
+      case DeleteKey:
+        return handleDeleteKey(msg);
+
+      case ListKey:
+        return ContainerUtils.unsupportedRequest(msg);
+
+      default:
+        return ContainerUtils.unsupportedRequest(msg);
+
+      }
+    } catch (IOException ex) {
+      LOG.warn("Container operation failed. " +
+              "Container: {} Operation: {}  trace ID: {} Error: {}",
+          msg.getCreateContainer().getContainerData().getName(),
+          msg.getCmdType().name(),
+          msg.getTraceID(),
+          ex.toString());
+
+      // TODO : Replace with finer error codes.
+      return ContainerUtils.getContainerResponse(msg,
+          ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
+          ex.toString()).build();
+    }
+  }
+
   /**
    * Handles the all chunk related functionality.
    *
@@ -136,7 +188,6 @@ public class Dispatcher implements ContainerDispatcher {
   private ContainerCommandResponseProto chunkProcessHandler(
       ContainerCommandRequestProto msg) throws IOException {
     try {
-
       switch (msg.getCmdType()) {
       case WriteChunk:
         return handleWriteChunk(msg);
@@ -327,4 +378,73 @@ public class Dispatcher implements ContainerDispatcher {
     return ChunkUtils.getChunkResponse(msg);
   }
 
+  /**
+   * Put Key handler.
+   *
+   * @param msg - Request.
+   * @return - Response.
+   * @throws IOException
+   */
+  private ContainerCommandResponseProto handlePutKey(
+      ContainerCommandRequestProto msg) throws IOException {
+    if(!msg.hasPutKey()){
+      LOG.debug("Malformed put key request. trace ID: {}",
+          msg.getTraceID());
+      return ContainerUtils.malformedRequest(msg);
+    }
+    Pipeline pipeline = Pipeline.getFromProtoBuf(msg.getPutKey().getPipeline());
+    Preconditions.checkNotNull(pipeline);
+    KeyData keyData = KeyData.getFromProtoBuf(msg.getPutKey().getKeyData());
+    Preconditions.checkNotNull(keyData);
+    this.containerManager.getKeyManager().putKey(pipeline, keyData);
+    return KeyUtils.getKeyResponse(msg);
+  }
+
+  /**
+   * Handle Get Key.
+   *
+   * @param msg - Request.
+   * @return - Response.
+   * @throws IOException
+   */
+  private ContainerCommandResponseProto handleGetKey(
+      ContainerCommandRequestProto msg) throws IOException {
+    if(!msg.hasGetKey()){
+      LOG.debug("Malformed get key request. trace ID: {}",
+          msg.getTraceID());
+      return ContainerUtils.malformedRequest(msg);
+    }
+    KeyData keyData = KeyData.getFromProtoBuf(msg.getGetKey().getKeyData());
+    Preconditions.checkNotNull(keyData);
+    KeyData responseData =
+        this.containerManager.getKeyManager().getKey(keyData);
+    return KeyUtils.getKeyDataResponse(msg, responseData);
+  }
+
+  /**
+   * Handle Delete Key.
+   *
+   * @param msg - Request.
+   * @return - Response.
+   * @throws IOException
+   */
+  private ContainerCommandResponseProto handleDeleteKey(
+      ContainerCommandRequestProto msg) throws IOException {
+    if(!msg.hasDeleteKey()){
+      LOG.debug("Malformed delete key request. trace ID: {}",
+          msg.getTraceID());
+      return ContainerUtils.malformedRequest(msg);
+    }
+
+    Pipeline pipeline =
+        Pipeline.getFromProtoBuf(msg.getDeleteKey().getPipeline());
+    Preconditions.checkNotNull(pipeline);
+    String keyName = msg.getDeleteKey().getName();
+    Preconditions.checkNotNull(keyName);
+    Preconditions.checkState(!keyName.isEmpty());
+
+    this.containerManager.getKeyManager().deleteKey(pipeline, keyName);
+    return KeyUtils.getKeyResponse(msg);
+  }
+
 }

+ 145 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java

@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
+import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
+import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
+import org.apache.hadoop.ozone.container.common.utils.LevelDBStore;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Key Manager impl.
+ */
+public class KeyManagerImpl implements KeyManager {
+  private static final float LOAD_FACTOR = 0.75f;
+  private final ContainerManager containerManager;
+  private final ContainerCache containerCache;
+
+  /**
+   * Constructs a key Manager.
+   *
+   * @param containerManager - Container Manager.
+   */
+  public KeyManagerImpl(ContainerManager containerManager, Configuration conf) {
+    Preconditions.checkNotNull(containerManager);
+    Preconditions.checkNotNull(conf);
+    int cacheSize = conf.getInt(OzoneConfigKeys.DFS_OZONE_KEY_CACHE,
+        OzoneConfigKeys.DFS_OZONE_KEY_CACHE_DEFAULT);
+    this.containerManager = containerManager;
+    containerCache = new ContainerCache(cacheSize, LOAD_FACTOR, true);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void putKey(Pipeline pipeline, KeyData data) throws IOException {
+    containerManager.readLock();
+    try {
+      // We are not locking the key manager since LevelDb serializes all actions
+      // against a single DB. We rely on DB level locking to avoid conflicts.
+      Preconditions.checkNotNull(pipeline);
+      Preconditions.checkNotNull(pipeline.getContainerName());
+      ContainerData cData = containerManager.readContainer(
+          pipeline.getContainerName());
+      LevelDBStore db = KeyUtils.getDB(cData, containerCache);
+      Preconditions.checkNotNull(db);
+      db.put(data.getKeyName().getBytes(KeyUtils.ENCODING), data
+          .getProtoBufMessage().toByteArray());
+    } finally {
+      containerManager.readUnlock();
+    }
+
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public KeyData getKey(KeyData data) throws IOException {
+    containerManager.readLock();
+    try {
+      Preconditions.checkNotNull(data);
+      Preconditions.checkNotNull(data.getContainerName());
+      ContainerData cData = containerManager.readContainer(data
+          .getContainerName());
+      LevelDBStore db = KeyUtils.getDB(cData, containerCache);
+      Preconditions.checkNotNull(db);
+      byte[] kData = db.get(data.getKeyName().getBytes(KeyUtils.ENCODING));
+      if(kData == null) {
+        throw new IOException("Unable to find the key.");
+      }
+      ContainerProtos.KeyData keyData =
+          ContainerProtos.KeyData.parseFrom(kData);
+      return KeyData.getFromProtoBuf(keyData);
+    } finally {
+      containerManager.readUnlock();
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void deleteKey(Pipeline pipeline, String keyName) throws IOException {
+    containerManager.readLock();
+    try {
+      Preconditions.checkNotNull(pipeline);
+      Preconditions.checkNotNull(pipeline.getContainerName());
+      ContainerData cData = containerManager.readContainer(pipeline
+          .getContainerName());
+      LevelDBStore db = KeyUtils.getDB(cData, containerCache);
+      Preconditions.checkNotNull(db);
+
+      // Note : There is a race condition here, since get and delete
+      // are not atomic. Leaving it here since the impact is refusing
+      // to delete a key which might have just gotten inserted after
+      // the get check.
+
+      byte[] kData = db.get(keyName.getBytes(KeyUtils.ENCODING));
+      if(kData == null) {
+        throw new IOException("Unable to find the key.");
+      }
+      db.delete(keyName.getBytes(KeyUtils.ENCODING));
+    } finally {
+      containerManager.readUnlock();
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public List<KeyData> listKey(Pipeline pipeline, String prefix, String
+      prevKey, int count) {
+    // TODO :
+    return null;
+  }
+}

+ 20 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java

@@ -30,7 +30,6 @@ import java.io.IOException;
 import java.nio.file.Path;
 import java.util.List;
 
-
 /**
  * Interface for container operations.
  */
@@ -41,9 +40,9 @@ public interface ContainerManager extends RwLock {
   /**
    * Init call that sets up a container Manager.
    *
-   * @param config  - Configuration.
+   * @param config        - Configuration.
    * @param containerDirs - List of Metadata Container locations.
-   * @param dataset - FSDataset.
+   * @param dataset       - FSDataset.
    * @throws IOException
    */
   void init(Configuration config, List<Path> containerDirs,
@@ -74,8 +73,8 @@ public interface ContainerManager extends RwLock {
    * As simple interface for container Iterations.
    *
    * @param prevKey - Starting KeyValue
-   * @param count - how many to return
-   * @param data  - Actual containerData
+   * @param count   - how many to return
+   * @param data    - Actual containerData
    * @throws IOException
    */
   void listContainer(String prevKey, long count, List<ContainerData> data)
@@ -99,15 +98,30 @@ public interface ContainerManager extends RwLock {
 
   /**
    * Sets the Chunk Manager.
+   *
    * @param chunkManager - ChunkManager.
    */
   void setChunkManager(ChunkManager chunkManager);
 
   /**
    * Gets the Chunk Manager.
-   * @return  ChunkManager.
+   *
+   * @return ChunkManager.
    */
   ChunkManager getChunkManager();
 
+  /**
+   * Sets the Key Manager.
+   *
+   * @param keyManager - Key Manager.
+   */
+  void setKeyManager(KeyManager keyManager);
+
+  /**
+   * Gets the Key Manager.
+   *
+   * @return KeyManager.
+   */
+  KeyManager getKeyManager();
 
 }

+ 63 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java

@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.interfaces;
+
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * KeyManager deals with Key Operations in the container Level.
+ */
+public interface KeyManager {
+  /**
+   * Puts or overwrites a key.
+   * @param pipeline - Pipeline.
+   * @param data - Key Data.
+   */
+  void putKey(Pipeline pipeline, KeyData data) throws IOException;
+
+  /**
+   * Gets an existing key.
+   * @param data - Key Data.
+   * @return Key Data.
+   */
+  KeyData getKey(KeyData data) throws IOException;
+
+  /**
+   * Deletes an existing Key.
+   * @param pipeline  - Pipeline.
+   * @param keyName Key Data.
+   */
+  void deleteKey(Pipeline pipeline, String keyName) throws IOException;
+
+  /**
+   * List keys in a container.
+   * @param pipeline - pipeline.
+   * @param prefix - Prefix in needed.
+   * @param prevKey - Key to Start from, EMPTY_STRING to begin.
+   * @param count - Number of keys to return.
+   * @return List of Keys that match the criteria.
+   */
+  List<KeyData> listKey(Pipeline pipeline, String prefix, String prevKey, int
+      count);
+
+
+}

+ 111 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java

@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.utils;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.collections.map.LRUMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * container cache is a LRUMap that maintains the DB handles.
+ */
+public class ContainerCache extends LRUMap {
+  static final Log LOG = LogFactory.getLog(ContainerCache.class);
+  private final Lock lock = new ReentrantLock();
+
+  /**
+   * Constructs a cache that holds DBHandle references.
+   */
+  public ContainerCache(int maxSize, float loadFactor, boolean
+      scanUntilRemovable) {
+    super(maxSize, loadFactor, scanUntilRemovable);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  protected boolean removeLRU(LinkEntry entry) {
+    lock.lock();
+    try {
+      LevelDBStore db = (LevelDBStore) entry.getValue();
+      db.close();
+    } catch (IOException e) {
+      LOG.error("Error closing DB. Container: " + entry.getKey().toString(), e);
+    } finally {
+      lock.unlock();
+    }
+    return true;
+  }
+
+  /**
+   * Returns a DB handle if available, null otherwise.
+   *
+   * @param containerName - Name of the container.
+   * @return OzoneLevelDBStore.
+   */
+  public LevelDBStore getDB(String containerName) {
+    Preconditions.checkNotNull(containerName);
+    Preconditions.checkState(!containerName.isEmpty());
+    lock.lock();
+    try {
+      return (LevelDBStore) this.get(containerName);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Add a new DB to the cache.
+   *
+   * @param containerName - Name of the container
+   * @param db            - DB handle
+   */
+  public void putDB(String containerName, LevelDBStore db) {
+    Preconditions.checkNotNull(containerName);
+    Preconditions.checkState(!containerName.isEmpty());
+    lock.lock();
+    try {
+      this.put(containerName, db);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Remove an entry from the cache.
+   *
+   * @param containerName - Name of the container.
+   */
+  public void removeDB(String containerName) {
+    Preconditions.checkNotNull(containerName);
+    Preconditions.checkState(!containerName.isEmpty());
+    lock.lock();
+    try {
+      this.remove(containerName);
+    } finally {
+      lock.unlock();
+    }
+  }
+}

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/LevelDBStore.java

@@ -22,6 +22,7 @@ import org.fusesource.leveldbjni.JniDBFactory;
 import org.iq80.leveldb.DB;
 import org.iq80.leveldb.DBIterator;
 import org.iq80.leveldb.Options;
+import org.iq80.leveldb.WriteOptions;
 
 import java.io.File;
 import java.io.IOException;
@@ -58,7 +59,9 @@ public class LevelDBStore {
    * @param value - value
    */
   public void put(byte[] key, byte[] value) {
-    db.put(key, value);
+    WriteOptions options = new WriteOptions();
+    options.sync(true);
+    db.put(key, value, options);
   }
 
   /**

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java

@@ -24,9 +24,11 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl;
 import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
 import org.apache.hadoop.ozone.container.common.impl.Dispatcher;
+import org.apache.hadoop.ozone.container.common.impl.KeyManagerImpl;
 import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
+import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,6 +53,7 @@ public class OzoneContainer {
   private final ContainerManager manager;
   private final XceiverServer server;
   private final ChunkManager chunkManager;
+  private final KeyManager keyManager;
 
   /**
    * Creates a network endpoint and enables Ozone container.
@@ -80,6 +83,9 @@ public class OzoneContainer {
     this.chunkManager = new ChunkManagerImpl(manager);
     manager.setChunkManager(this.chunkManager);
 
+    this.keyManager = new KeyManagerImpl(manager, ozoneConfig);
+    manager.setKeyManager(this.keyManager);
+
     this.dispatcher = new Dispatcher(manager);
     server = new XceiverServer(this.ozoneConfig, this.dispatcher);
   }

+ 55 - 64
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeContainerProtocol.proto

@@ -49,25 +49,24 @@ import "hdfs.proto";
  *  5. ListContainer - Returns the list of containers on this
  *     datanode. This will be used by tests and tools.
  *
- *  6. CreateKey - Given a valid container, creates a key.
+ *  6. PutKey - Given a valid container, creates a key.
  *
- *  7. ReadKey - Allows user to read the metadata of a Key.
+ *  7. GetKey - Allows user to read the metadata of a Key.
  *
- *  8. UpdateKey - Updates the metadata of a Key.
+ *  8. DeleteKey - Deletes a given key.
  *
- *  9. DeleteKey - Deletes a given key.
- *
- *  10. ListKey - Returns a list of keys that are present inside
+ *  9. ListKey - Returns a list of keys that are present inside
  *      a given container.
  *
- *  11. ReadChunk - Allows us to read a chunk.
+ *  10. ReadChunk - Allows us to read a chunk.
  *
- *  12. DeleteChunk - Delete an unused chunk.
+ *  11. DeleteChunk - Delete an unused chunk.
  *
- *  13. WriteChunk - Allows us to write a chunk
+ *  12. WriteChunk - Allows us to write a chunk
  *
- *  14. ListChunk - Given a Container/Key returns the list of Chunks.
+ *  13. ListChunk - Given a Container/Key returns the list of Chunks.
  *
+ *  14. CompactChunk - Re-writes a chunk based on Offsets.
  */
 
 enum Type {
@@ -77,16 +76,16 @@ enum Type {
    DeleteContainer = 4;
    ListContainer = 5;
 
-   CreateKey = 6;
-   Readkey = 7;
-   UpdateKey = 8;
-   DeleteKey = 9;
-   ListKey = 10;
+   PutKey = 6;
+   GetKey = 7;
+   DeleteKey = 8;
+   ListKey = 9;
 
-   ReadChunk = 11;
-   DeleteChunk = 12;
-   WriteChunk = 13;
-   ListChunk = 14;
+   ReadChunk = 10;
+   DeleteChunk = 11;
+   WriteChunk = 12;
+   ListChunk = 13;
+   CompactChunk = 14;
 }
 
 
@@ -95,7 +94,6 @@ enum Result {
   UNSUPPORTED_REQUEST = 2;
   MALFORMED_REQUEST = 3;
   CONTAINER_INTERNAL_ERROR = 4;
-
 }
 
 message ContainerCommandRequestProto {
@@ -115,16 +113,15 @@ message ContainerCommandRequestProto {
   optional   DeleteContainerRequestProto deleteContainer = 6;
   optional   ListContainerRequestProto listContainer = 7;
 
-  optional   CreateKeyRequestProto createKey = 8;
-  optional   ReadKeyRequestProto readKey = 9;
-  optional   UpdateKeyRequestProto updateKey = 10;
-  optional   DeleteKeyRequestProto deleteKey = 11;
-  optional   ListKeyRequestProto listKey = 12;
+  optional   PutKeyRequestProto putKey = 8;
+  optional   GetKeyRequestProto getKey = 9;
+  optional   DeleteKeyRequestProto deleteKey = 10;
+  optional   ListKeyRequestProto listKey = 11;
 
-  optional   ReadChunkRequestProto readChunk = 13;
-  optional   WriteChunkRequestProto writeChunk = 14;
-  optional   DeleteChunkRequestProto deleteChunk = 15;
-  optional   ListChunkRequestProto listChunk = 16;
+  optional   ReadChunkRequestProto readChunk = 12;
+  optional   WriteChunkRequestProto writeChunk = 13;
+  optional   DeleteChunkRequestProto deleteChunk = 14;
+  optional   ListChunkRequestProto listChunk = 15;
 }
 
 message ContainerCommandResponseProto {
@@ -137,16 +134,15 @@ message ContainerCommandResponseProto {
   optional   DeleteContainerResponseProto deleteContainer = 6;
   optional   ListContainerResponseProto listContainer = 7;
 
-  optional   CreateKeyResponseProto createKey = 8;
-  optional   ReadKeyResponeProto readKey = 9;
-  optional   UpdateKeyResponseProto updateKey = 10;
-  optional   DeleteKeyResponeProto deleteKey = 11;
-  optional   ListKeyResponeProto listKey = 12;
+  optional   PutKeyResponseProto putKey = 8;
+  optional   GetKeyResponseProto getKey = 9;
+  optional   DeleteKeyResponseProto deleteKey = 10;
+  optional   ListKeyResponseProto listKey = 11;
 
-  optional  WriteChunkReponseProto writeChunk = 13;
-  optional  ReadChunkReponseProto readChunk = 14;
-  optional  DeleteChunkResponseProto deleteChunk = 15;
-  optional  ListChunkResponseProto listChunk = 16;
+  optional  WriteChunkResponseProto writeChunk = 12;
+  optional  ReadChunkResponseProto readChunk = 13;
+  optional  DeleteChunkResponseProto deleteChunk = 14;
+  optional  ListChunkResponseProto listChunk = 15;
 
   required Result result = 17;
   optional string message = 18;
@@ -222,37 +218,30 @@ message  ListContainerResponseProto {
 }
 
 
-message ContainerKeyData {
-  optional string containerName = 1;
+message KeyData {
+  required string containerName = 1;
   required string name = 2;
-  repeated KeyValue metadata = 3;
+  optional int64 flags = 3; // for future use.
+  repeated KeyValue metadata = 4;
+  repeated ChunkInfo chunks = 5;
 }
 
 // Key Messages.
-message  CreateKeyRequestProto {
+message  PutKeyRequestProto {
   required Pipeline pipeline = 1;
-  required ContainerKeyData containerKeyData = 2;
+  required KeyData keyData = 2;
 }
 
-message  CreateKeyResponseProto {
+message  PutKeyResponseProto {
 }
 
-message  ReadKeyRequestProto  {
+message  GetKeyRequestProto  {
   required Pipeline pipeline = 1;
-  required ContainerKeyData containerKeyData = 2;
+  required KeyData keyData = 2;
 }
 
-message  ReadKeyResponeProto  {
-  repeated KeyValue metadata = 1;
-  repeated ChunkInfo chunkData = 2;
-}
-
-message  UpdateKeyRequestProto {
-  required Pipeline pipeline = 1;
-  required ContainerKeyData containerKeyData = 2;
-}
-
-message  UpdateKeyResponseProto {
+message  GetKeyResponseProto  {
+  required KeyData keyData = 1;
 }
 
 
@@ -261,17 +250,19 @@ message  DeleteKeyRequestProto {
   required string name = 2;
 }
 
-message   DeleteKeyResponeProto {
+message   DeleteKeyResponseProto {
 }
 
 message  ListKeyRequestProto {
   required Pipeline pipeline = 1;
-  required string prevKey = 2;
-  required uint32 count = 3;
+  optional string prefix = 2; // if specified returns keys that match prefix.
+  required string prevKey = 3;
+  required uint32 count = 4;
+
 }
 
-message  ListKeyResponeProto {
-  repeated ContainerKeyData containerKeyData = 1;
+message  ListKeyResponseProto {
+  repeated KeyData keyData = 1;
 }
 
 // Chunk Operations
@@ -291,7 +282,7 @@ message  WriteChunkRequestProto  {
   required bytes data = 4;
 }
 
-message  WriteChunkReponseProto {
+message  WriteChunkResponseProto {
 }
 
 message  ReadChunkRequestProto  {
@@ -300,7 +291,7 @@ message  ReadChunkRequestProto  {
   required ChunkInfo chunkData = 3;
 }
 
-message  ReadChunkReponseProto {
+message  ReadChunkResponseProto {
   required Pipeline pipeline = 1;
   required ChunkInfo chunkData = 2;
   required bytes data = 3;

+ 90 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java

@@ -20,8 +20,6 @@ package org.apache.hadoop.ozone.container;
 
 import com.google.protobuf.ByteString;
 import org.apache.commons.codec.binary.Hex;
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.commons.io.IOExceptionWithCause;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
     .ContainerCommandRequestProto;
@@ -31,11 +29,15 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.junit.Assert;
 
 import java.io.IOException;
 import java.net.ServerSocket;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Random;
 import java.util.UUID;
 
@@ -231,7 +233,7 @@ public class ContainerTestHelper {
    * @return ContainerCommandRequestProto.
    */
   public static ContainerCommandResponseProto
-    getCreateContainerResponse(ContainerCommandRequestProto request) throws
+  getCreateContainerResponse(ContainerCommandRequestProto request) throws
       IOException {
     ContainerProtos.CreateContainerResponseProto.Builder createResponse =
         ContainerProtos.CreateContainerResponseProto.newBuilder();
@@ -244,4 +246,89 @@ public class ContainerTestHelper {
     response.setResult(ContainerProtos.Result.SUCCESS);
     return response.build();
   }
+
+  /**
+   * Returns the PutKeyRequest for test purpose.
+   *
+   * @param writeRequest - Write Chunk Request.
+   * @return - Request
+   */
+  public static ContainerCommandRequestProto getPutKeyRequest(
+      ContainerProtos.WriteChunkRequestProto writeRequest) {
+    ContainerProtos.PutKeyRequestProto.Builder putRequest =
+        ContainerProtos.PutKeyRequestProto.newBuilder();
+
+    putRequest.setPipeline(writeRequest.getPipeline());
+    KeyData keyData = new KeyData(writeRequest.getPipeline().getContainerName(),
+        writeRequest.getKeyName());
+    List<ContainerProtos.ChunkInfo> newList = new LinkedList<>();
+    newList.add(writeRequest.getChunkData());
+    keyData.setChunks(newList);
+    putRequest.setKeyData(keyData.getProtoBufMessage());
+
+    ContainerCommandRequestProto.Builder request =
+        ContainerCommandRequestProto.newBuilder();
+    request.setCmdType(ContainerProtos.Type.PutKey);
+    request.setPutKey(putRequest);
+    return request.build();
+  }
+
+  /**
+   * Gets a GetKeyRequest for test purpose.
+   *
+   * @param putKeyRequest - putKeyRequest.
+   * @return - Request
+   */
+  public static ContainerCommandRequestProto getKeyRequest(
+      ContainerProtos.PutKeyRequestProto putKeyRequest) {
+    ContainerProtos.GetKeyRequestProto.Builder getRequest =
+        ContainerProtos.GetKeyRequestProto.newBuilder();
+    ContainerProtos.KeyData.Builder keyData = ContainerProtos.KeyData
+        .newBuilder();
+    keyData.setContainerName(putKeyRequest.getPipeline().getContainerName());
+    keyData.setName(putKeyRequest.getKeyData().getName());
+    getRequest.setKeyData(keyData);
+    getRequest.setPipeline(putKeyRequest.getPipeline());
+
+    ContainerCommandRequestProto.Builder request =
+        ContainerCommandRequestProto.newBuilder();
+    request.setCmdType(ContainerProtos.Type.GetKey);
+    request.setGetKey(getRequest);
+    return request.build();
+  }
+
+  /**
+   *  Verify the response against the request.
+   * @param request  - Request
+   * @param response  - Response
+   */
+  public static void verifyGetKey(ContainerCommandRequestProto request,
+                             ContainerCommandResponseProto response) {
+    Assert.assertEquals(request.getTraceID(), response.getTraceID());
+    Assert.assertEquals(response.getResult(), ContainerProtos.Result.SUCCESS);
+    ContainerProtos.PutKeyRequestProto putKey = request.getPutKey();
+    ContainerProtos. GetKeyRequestProto getKey = request.getGetKey();
+    Assert.assertEquals(putKey.getKeyData().getChunksCount(),
+                        getKey.getKeyData().getChunksCount());
+  }
+
+
+  /**
+   *
+   * @param putKeyRequest - putKeyRequest.
+   * @return - Request
+   */
+  public static ContainerCommandRequestProto getDeleteKeyRequest(
+      ContainerProtos.PutKeyRequestProto putKeyRequest) {
+    ContainerProtos.DeleteKeyRequestProto.Builder delRequest =
+        ContainerProtos.DeleteKeyRequestProto.newBuilder();
+    delRequest.setPipeline(putKeyRequest.getPipeline());
+    delRequest.setName(putKeyRequest.getKeyData().getName());
+    ContainerCommandRequestProto.Builder request =
+        ContainerCommandRequestProto.newBuilder();
+    request.setCmdType(ContainerProtos.Type.DeleteKey);
+    request.setDeleteKey(delRequest);
+    return request.build();
+  }
+
 }

+ 151 - 19
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.container.common.impl;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConfiguration;
@@ -28,6 +29,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
 import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
 import org.apache.hadoop.ozone.container.common.utils.LevelDBStore;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
@@ -60,6 +62,7 @@ import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk;
 import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData;
 import static org.apache.hadoop.ozone.container.ContainerTestHelper
     .setDataChecksum;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.fail;
 
 /**
@@ -70,6 +73,7 @@ public class TestContainerPersistence {
   static String path;
   static ContainerManagerImpl containerManager;
   static ChunkManagerImpl chunkManager;
+  static KeyManagerImpl keyManager;
   static OzoneConfiguration conf;
   static FsDatasetSpi fsDataSet;
   static MiniDFSCluster cluster;
@@ -103,6 +107,8 @@ public class TestContainerPersistence {
     containerManager = new ContainerManagerImpl();
     chunkManager = new ChunkManagerImpl(containerManager);
     containerManager.setChunkManager(chunkManager);
+    keyManager = new KeyManagerImpl(containerManager, conf);
+    containerManager.setKeyManager(keyManager);
 
   }
 
@@ -176,9 +182,11 @@ public class TestContainerPersistence {
     ContainerData data = new ContainerData(containerName);
     data.addMetadata("VOLUME", "shire");
     data.addMetadata("owner)", "bilbo");
-    containerManager.createContainer(createSingleNodePipeline(containerName), data);
+    containerManager.createContainer(createSingleNodePipeline(containerName),
+        data);
     try {
-      containerManager.createContainer(createSingleNodePipeline(containerName), data);
+      containerManager.createContainer(createSingleNodePipeline
+          (containerName), data);
       fail("Expected Exception not thrown.");
     } catch (IOException ex) {
       Assert.assertNotNull(ex);
@@ -194,12 +202,14 @@ public class TestContainerPersistence {
     ContainerData data = new ContainerData(containerName1);
     data.addMetadata("VOLUME", "shire");
     data.addMetadata("owner)", "bilbo");
-    containerManager.createContainer(createSingleNodePipeline(containerName1), data);
+    containerManager.createContainer(createSingleNodePipeline(containerName1)
+        , data);
 
     data = new ContainerData(containerName2);
     data.addMetadata("VOLUME", "shire");
     data.addMetadata("owner)", "bilbo");
-    containerManager.createContainer(createSingleNodePipeline(containerName2), data);
+    containerManager.createContainer(createSingleNodePipeline(containerName2)
+        , data);
 
 
     Assert.assertTrue(containerManager.getContainerMap()
@@ -218,7 +228,8 @@ public class TestContainerPersistence {
     data = new ContainerData(containerName1);
     data.addMetadata("VOLUME", "shire");
     data.addMetadata("owner)", "bilbo");
-    containerManager.createContainer(createSingleNodePipeline(containerName1), data);
+    containerManager.createContainer(createSingleNodePipeline(containerName1)
+        , data);
 
     // Assert we still have both containers.
     Assert.assertTrue(containerManager.getContainerMap()
@@ -246,7 +257,8 @@ public class TestContainerPersistence {
       ContainerData data = new ContainerData(containerName);
       data.addMetadata("VOLUME", "shire");
       data.addMetadata("owner)", "bilbo");
-      containerManager.createContainer(createSingleNodePipeline(containerName), data);
+      containerManager.createContainer(createSingleNodePipeline
+          (containerName), data);
       testMap.put(containerName, data);
     }
 
@@ -271,19 +283,10 @@ public class TestContainerPersistence {
     Assert.assertTrue(testMap.isEmpty());
   }
 
-  /**
-   * Writes a single chunk.
-   *
-   * @throws IOException
-   * @throws NoSuchAlgorithmException
-   */
-  @Test
-  public void testWriteChunk() throws IOException, NoSuchAlgorithmException {
+  private ChunkInfo writeChunkHelper(String containerName, String keyName,
+                                     Pipeline pipeline) throws IOException,
+      NoSuchAlgorithmException {
     final int datalen = 1024;
-    String containerName = OzoneUtils.getRequestID();
-    String keyName = OzoneUtils.getRequestID();
-    Pipeline pipeline = createSingleNodePipeline(containerName);
-
     pipeline.setContainerName(containerName);
     ContainerData cData = new ContainerData(containerName);
     cData.addMetadata("VOLUME", "shire");
@@ -293,6 +296,23 @@ public class TestContainerPersistence {
     byte[] data = getData(datalen);
     setDataChecksum(info, data);
     chunkManager.writeChunk(pipeline, keyName, info, data);
+    return info;
+
+  }
+
+  /**
+   * Writes a single chunk.
+   *
+   * @throws IOException
+   * @throws NoSuchAlgorithmException
+   */
+  @Test
+  public void testWriteChunk() throws IOException,
+      NoSuchAlgorithmException {
+    String containerName = OzoneUtils.getRequestID();
+    String keyName = OzoneUtils.getRequestID();
+    Pipeline pipeline = createSingleNodePipeline(containerName);
+    writeChunkHelper(containerName, keyName, pipeline);
   }
 
   /**
@@ -389,7 +409,7 @@ public class TestContainerPersistence {
     chunkManager.writeChunk(pipeline, keyName, info, data);
     try {
       chunkManager.writeChunk(pipeline, keyName, info, data);
-    } catch(IOException ex) {
+    } catch (IOException ex) {
       Assert.assertTrue(ex.getMessage().contains(
           "Rejecting write chunk request. OverWrite flag required."));
     }
@@ -469,4 +489,116 @@ public class TestContainerPersistence {
     exception.expectMessage("Unable to find the chunk file.");
     chunkManager.readChunk(pipeline, keyName, info);
   }
+
+  /**
+   * Tests a put key and read key.
+   *
+   * @throws IOException
+   * @throws NoSuchAlgorithmException
+   */
+  @Test
+  public void testPutKey() throws IOException, NoSuchAlgorithmException {
+    String containerName = OzoneUtils.getRequestID();
+    String keyName = OzoneUtils.getRequestID();
+    Pipeline pipeline = createSingleNodePipeline(containerName);
+    ChunkInfo info = writeChunkHelper(containerName, keyName, pipeline);
+    KeyData keyData = new KeyData(containerName, keyName);
+    List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
+    chunkList.add(info.getProtoBufMessage());
+    keyData.setChunks(chunkList);
+    keyManager.putKey(pipeline, keyData);
+    KeyData readKeyData = keyManager.getKey(keyData);
+    ChunkInfo readChunk =
+        ChunkInfo.getFromProtoBuf(readKeyData.getChunks().get(0));
+    Assert.assertEquals(info.getChecksum(), readChunk.getChecksum());
+  }
+
+  /**
+   * Tests a put key and read key.
+   *
+   * @throws IOException
+   * @throws NoSuchAlgorithmException
+   */
+  @Test
+  public void testPutKeyWithLotsOfChunks() throws IOException,
+      NoSuchAlgorithmException {
+    final int chunkCount = 1024;
+    final int datalen = 1024;
+    String containerName = OzoneUtils.getRequestID();
+    String keyName = OzoneUtils.getRequestID();
+    Pipeline pipeline = createSingleNodePipeline(containerName);
+    List<ChunkInfo> chunkList = new LinkedList<>();
+    ChunkInfo info = writeChunkHelper(containerName, keyName, pipeline);
+    chunkList.add(info);
+    for (int x = 1; x < chunkCount; x++) {
+      info = getChunk(keyName, x, x * datalen, datalen);
+      byte[] data = getData(datalen);
+      setDataChecksum(info, data);
+      chunkManager.writeChunk(pipeline, keyName, info, data);
+      chunkList.add(info);
+    }
+
+    KeyData keyData = new KeyData(containerName, keyName);
+    List<ContainerProtos.ChunkInfo> chunkProtoList = new LinkedList<>();
+    for (ChunkInfo i : chunkList) {
+      chunkProtoList.add(i.getProtoBufMessage());
+    }
+    keyData.setChunks(chunkProtoList);
+    keyManager.putKey(pipeline, keyData);
+    KeyData readKeyData = keyManager.getKey(keyData);
+    ChunkInfo lastChunk = chunkList.get(chunkList.size() - 1);
+    ChunkInfo readChunk =
+        ChunkInfo.getFromProtoBuf(readKeyData.getChunks().get(readKeyData
+            .getChunks().size() - 1));
+    Assert.assertEquals(lastChunk.getChecksum(), readChunk.getChecksum());
+  }
+
+  /**
+   * Deletes a key and tries to read it back.
+   *
+   * @throws IOException
+   * @throws NoSuchAlgorithmException
+   */
+  @Test
+  public void testDeleteKey() throws IOException, NoSuchAlgorithmException {
+    String containerName = OzoneUtils.getRequestID();
+    String keyName = OzoneUtils.getRequestID();
+    Pipeline pipeline = createSingleNodePipeline(containerName);
+    ChunkInfo info = writeChunkHelper(containerName, keyName, pipeline);
+    KeyData keyData = new KeyData(containerName, keyName);
+    List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
+    chunkList.add(info.getProtoBufMessage());
+    keyData.setChunks(chunkList);
+    keyManager.putKey(pipeline, keyData);
+    keyManager.deleteKey(pipeline, keyName);
+    exception.expect(IOException.class);
+    exception.expectMessage("Unable to find the key.");
+    keyManager.getKey(keyData);
+  }
+
+  /**
+   * Tries to Deletes a key twice.
+   *
+   * @throws IOException
+   * @throws NoSuchAlgorithmException
+   */
+  @Test
+  public void testDeleteKeyTwice() throws IOException,
+      NoSuchAlgorithmException {
+    String containerName = OzoneUtils.getRequestID();
+    String keyName = OzoneUtils.getRequestID();
+    Pipeline pipeline = createSingleNodePipeline(containerName);
+    ChunkInfo info = writeChunkHelper(containerName, keyName, pipeline);
+    KeyData keyData = new KeyData(containerName, keyName);
+    List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
+    chunkList.add(info.getProtoBufMessage());
+    keyData.setChunks(chunkList);
+    keyManager.putKey(pipeline, keyData);
+    keyManager.deleteKey(pipeline, keyName);
+    exception.expect(IOException.class);
+    exception.expectMessage("Unable to find the key.");
+    keyManager.deleteKey(pipeline, keyName);
+  }
+
+
 }

+ 23 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java

@@ -135,6 +135,29 @@ public class TestOzoneContainer {
     Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
     Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
 
+    // Put Key
+    ContainerProtos.ContainerCommandRequestProto putKeyRequest =
+        ContainerTestHelper.getPutKeyRequest(writeChunkRequest.getWriteChunk());
+
+    response = client.sendCommand(putKeyRequest);
+    Assert.assertNotNull(response);
+    Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
+    Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
+
+    // Get Key
+    request = ContainerTestHelper.getKeyRequest(putKeyRequest.getPutKey());
+    response = client.sendCommand(request);
+    ContainerTestHelper.verifyGetKey(request, response);
+
+
+    // Delete Key
+    request =
+        ContainerTestHelper.getDeleteKeyRequest(putKeyRequest.getPutKey());
+    response = client.sendCommand(request);
+    Assert.assertNotNull(response);
+    Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
+    Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
+
     //Delete Chunk
     request = ContainerTestHelper.getDeleteChunkRequest(writeChunkRequest
         .getWriteChunk());