فهرست منبع

HDFS-9960. OzoneHandler : Add localstorage support for keys. Contributed by Anu Engineer.

Chris Nauroth 9 سال پیش
والد
کامیت
7c55d7feb5

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/ObjectStoreApplication.java

@@ -20,8 +20,10 @@ package org.apache.hadoop.ozone.web;
 
 import org.apache.hadoop.ozone.web.exceptions.OzoneExceptionMapper;
 import org.apache.hadoop.ozone.web.handlers.BucketHandler;
+import org.apache.hadoop.ozone.web.handlers.KeyHandler;
 import org.apache.hadoop.ozone.web.handlers.ServiceFilter;
 import org.apache.hadoop.ozone.web.handlers.VolumeHandler;
+import org.apache.hadoop.ozone.web.messages.LengthInputStreamMessageBodyWriter;
 import org.apache.hadoop.ozone.web.messages.StringMessageBodyWriter;
 
 import javax.ws.rs.core.Application;
@@ -41,7 +43,9 @@ public class ObjectStoreApplication extends Application {
     HashSet<Class<?>> set = new HashSet<>();
     set.add(BucketHandler.class);
     set.add(VolumeHandler.class);
+    set.add(KeyHandler.class);
     set.add(OzoneExceptionMapper.class);
+    set.add(LengthInputStreamMessageBodyWriter.class);
     set.add(StringMessageBodyWriter.class);
     return set;
   }

+ 405 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneBucket.java

@@ -18,19 +18,50 @@
 
 package org.apache.hadoop.ozone.web.client;
 
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.ozone.web.exceptions.OzoneException;
+import org.apache.hadoop.ozone.web.headers.Header;
 import org.apache.hadoop.ozone.web.request.OzoneAcl;
 import org.apache.hadoop.ozone.web.response.BucketInfo;
+import org.apache.hadoop.ozone.web.response.KeyInfo;
+import org.apache.hadoop.ozone.web.response.ListKeys;
 import org.apache.hadoop.ozone.web.utils.OzoneConsts;
-import org.apache.http.HttpException;
+import org.apache.http.HttpEntity;
 import org.apache.http.HttpRequest;
 import org.apache.http.HttpRequestInterceptor;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.FileEntity;
+import org.apache.http.entity.InputStreamEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
 import org.apache.http.protocol.HTTP;
 import org.apache.http.protocol.HttpContext;
+import org.apache.http.util.EntityUtils;
 
+import javax.ws.rs.core.HttpHeaders;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.util.LinkedList;
 import java.util.List;
 
+import static java.net.HttpURLConnection.HTTP_CREATED;
+import static java.net.HttpURLConnection.HTTP_OK;
+import static org.apache.hadoop.ozone.web.utils.OzoneUtils.ENCODING;
+import static org.apache.hadoop.ozone.web.utils.OzoneUtils.ENCODING_NAME;
+
 /**
  * A Bucket class the represents an Ozone Bucket.
  */
@@ -109,19 +140,386 @@ public class OzoneBucket {
    *
    * @return Storage Class Enum
    */
-  public StorageType getStorageClass() {
+  public StorageType getStorageType() {
     return bucketInfo.getStorageType();
   }
 
+  /**
+   * Puts an Object in Ozone bucket.
+   *
+   * @param keyName - Name of the key
+   * @param data    - Data that you want to put
+   * @throws OzoneException
+   */
+  public void putKey(String keyName, String data) throws OzoneException {
+    if ((keyName == null) || keyName.isEmpty()) {
+      throw new OzoneClientException("Invalid key Name.");
+    }
+
+    if (data == null) {
+      throw new OzoneClientException("Invalid data.");
+    }
+
+    try {
+      OzoneClient client = getVolume().getClient();
+
+      DefaultHttpClient httpClient = new DefaultHttpClient();
+
+      URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI());
+      builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName()
+          + "/" + keyName).build();
+
+      HttpPut putRequest =
+          getVolume().getClient().getHttpPut(builder.toString());
+
+      InputStream is = new ByteArrayInputStream(data.getBytes(ENCODING));
+      putRequest.setEntity(new InputStreamEntity(is, data.length()));
+      putRequest.setHeader(Header.CONTENT_MD5, DigestUtils.md5Hex(is));
+      putRequest
+          .setHeader(HttpHeaders.CONTENT_LENGTH, Long.toString(data.length()));
+      executePutKey(putRequest, httpClient);
+
+    } catch (IOException | URISyntaxException ex) {
+      throw new OzoneClientException(ex.getMessage());
+    }
+  }
+
+  /**
+   * Puts an Object in Ozone Bucket.
+   *
+   * @param dataFile - File from which you want the data to be put. Key Name
+   *                 will same as the file name, devoid of any path.
+   * @throws OzoneException
+   */
+  public void putKey(File dataFile) throws OzoneException {
+    if (dataFile == null) {
+      throw new OzoneClientException("Invalid file object.");
+    }
+    String keyName = dataFile.getName();
+    putKey(keyName, dataFile);
+  }
+
+  /**
+   * Puts a Key in Ozone Bucket.
+   *
+   * @param keyName - Name of the Key
+   * @param file    - Stream that gets read to be put into Ozone.
+   * @throws OzoneException
+   */
+  public void putKey(String keyName, File file)
+      throws OzoneException {
+
+    if ((keyName == null) || keyName.isEmpty()) {
+      throw new OzoneClientException("Invalid key Name");
+    }
+
+    if (file == null) {
+      throw new OzoneClientException("Invalid data stream");
+    }
+
+    try {
+      OzoneClient client = getVolume().getClient();
+
+      DefaultHttpClient httpClient = new DefaultHttpClient();
+      URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI());
+      builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName()
+          + "/" + keyName).build();
+
+      HttpPut putRequest =
+          getVolume().getClient().getHttpPut(builder.toString());
+
+      FileEntity fileEntity = new FileEntity(file, ContentType
+          .APPLICATION_OCTET_STREAM);
+      putRequest.setEntity(fileEntity);
+
+      FileInputStream fis = new FileInputStream(file);
+      putRequest.setHeader(Header.CONTENT_MD5, DigestUtils.md5Hex(fis));
+      fis.close();
+      putRequest.setHeader(HttpHeaders.CONTENT_LENGTH, Long.toString(file
+          .length()));
+      httpClient.removeRequestInterceptorByClass(
+          org.apache.http.protocol.RequestContent.class);
+      executePutKey(putRequest, httpClient);
+
+    } catch (IOException | URISyntaxException ex) {
+      throw new OzoneClientException(ex.getMessage());
+    }
+  }
+
+  /**
+   * executePutKey executes the Put request against the Ozone Server.
+   *
+   * @param putRequest - Http Put Request
+   * @param httpClient - httpClient
+   * @throws OzoneException
+   * @throws IOException
+   */
+  private void executePutKey(HttpPut putRequest, DefaultHttpClient httpClient)
+      throws OzoneException, IOException {
+    HttpEntity entity = null;
+    try {
+
+      HttpResponse response = httpClient.execute(putRequest);
+      int errorCode = response.getStatusLine().getStatusCode();
+      entity = response.getEntity();
+
+      if ((errorCode == HTTP_OK) || (errorCode == HTTP_CREATED)) {
+        return;
+      }
+
+      if (entity == null) {
+        throw new OzoneClientException("Unexpected null in http payload");
+      }
+
+      throw OzoneException.parse(EntityUtils.toString(entity));
+    } finally {
+      if (entity != null) {
+        EntityUtils.consumeQuietly(entity);
+      }
+    }
+  }
+
+  /**
+   * Gets a key from the Ozone server and writes to the file pointed by the
+   * downloadTo PAth.
+   *
+   * @param keyName    - Key Name in Ozone.
+   * @param downloadTo File Name to download the Key's Data to
+   */
+  public void getKey(String keyName, Path downloadTo) throws OzoneException {
+
+    if ((keyName == null) || keyName.isEmpty()) {
+      throw new OzoneClientException("Invalid key Name");
+    }
+
+    if (downloadTo == null) {
+      throw new OzoneClientException("Invalid download path");
+    }
+
+    try {
+      OzoneClient client = getVolume().getClient();
+
+      DefaultHttpClient httpClient = new DefaultHttpClient();
+      FileOutputStream outPutFile = new FileOutputStream(downloadTo.toFile());
+
+      URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI());
+      builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName()
+          + "/" + keyName).build();
+
+      HttpGet getRequest =
+          getVolume().getClient().getHttpGet(builder.toString());
+      executeGetKey(getRequest, httpClient, outPutFile);
+      outPutFile.flush();
+      outPutFile.close();
+    } catch (IOException | URISyntaxException ex) {
+      throw new OzoneClientException(ex.getMessage());
+    }
+  }
+
+  /**
+   * Returns the data part of the key as a string.
+   *
+   * @param keyName - KeyName to get
+   * @return String - Data
+   * @throws OzoneException
+   */
+  public String getKey(String keyName) throws OzoneException {
+
+    if ((keyName == null) || keyName.isEmpty()) {
+      throw new OzoneClientException("Invalid key Name");
+    }
+
+    try {
+      OzoneClient client = getVolume().getClient();
+      ByteArrayOutputStream outPutStream = new ByteArrayOutputStream();
+
+      DefaultHttpClient httpClient = new DefaultHttpClient();
+      URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI());
+
+      builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName()
+          + "/" + keyName).build();
+
+      HttpGet getRequest =
+          getVolume().getClient().getHttpGet(builder.toString());
+      executeGetKey(getRequest, httpClient, outPutStream);
+      return outPutStream.toString(ENCODING_NAME);
+    } catch (IOException | URISyntaxException ex) {
+      throw new OzoneClientException(ex.getMessage());
+    }
+
+  }
+
+  /**
+   * Executes get key and returns the data.
+   *
+   * @param getRequest - http Get Request
+   * @param httpClient - Client
+   * @param stream     - Stream to write data to.
+   * @throws IOException
+   * @throws OzoneException
+   */
+  private void executeGetKey(HttpGet getRequest, DefaultHttpClient httpClient,
+                             OutputStream stream)
+      throws IOException, OzoneException {
+
+    HttpEntity entity = null;
+    try {
+
+      HttpResponse response = httpClient.execute(getRequest);
+      int errorCode = response.getStatusLine().getStatusCode();
+      entity = response.getEntity();
+
+      if (errorCode == HTTP_OK) {
+        entity.writeTo(stream);
+        return;
+      }
+
+      if (entity == null) {
+        throw new OzoneClientException("Unexpected null in http payload");
+      }
+
+      throw OzoneException.parse(EntityUtils.toString(entity));
+    } finally {
+      if (entity != null) {
+        EntityUtils.consumeQuietly(entity);
+      }
+    }
+  }
+
+  /**
+   * Deletes a key in this bucket.
+   *
+   * @param keyName - Name of the Key
+   * @throws OzoneException
+   */
+  public void deleteKey(String keyName) throws OzoneException {
+
+    if ((keyName == null) || keyName.isEmpty()) {
+      throw new OzoneClientException("Invalid key Name");
+    }
+
+    try {
+      OzoneClient client = getVolume().getClient();
+      DefaultHttpClient httpClient = new DefaultHttpClient();
+
+      URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI());
+      builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName()
+          + "/" + keyName).build();
+
+      HttpDelete deleteRequest =
+          getVolume().getClient().getHttpDelete(builder.toString());
+      executeDeleteKey(deleteRequest, httpClient);
+    } catch (IOException | URISyntaxException ex) {
+      throw new OzoneClientException(ex.getMessage());
+    }
+  }
+
+  /**
+   * Executes deleteKey.
+   *
+   * @param deleteRequest - http Delete Request
+   * @param httpClient    - Client
+   * @throws IOException
+   * @throws OzoneException
+   */
+  private void executeDeleteKey(HttpDelete deleteRequest,
+                                DefaultHttpClient httpClient)
+      throws IOException, OzoneException {
+
+    HttpEntity entity = null;
+    try {
+
+      HttpResponse response = httpClient.execute(deleteRequest);
+      int errorCode = response.getStatusLine().getStatusCode();
+      entity = response.getEntity();
+
+      if (errorCode == HTTP_OK) {
+        return;
+      }
+
+      if (entity == null) {
+        throw new OzoneClientException("Unexpected null in http payload");
+      }
+
+      throw OzoneException.parse(EntityUtils.toString(entity));
+    } finally {
+      if (entity != null) {
+        EntityUtils.consumeQuietly(entity);
+      }
+    }
+  }
+
+  /**
+   * List all keys in a bucket.
+   *
+   * @return List of OzoneKeys
+   */
+  public List<OzoneKey> listKeys() throws OzoneException {
+    try {
+      OzoneClient client = getVolume().getClient();
+      DefaultHttpClient httpClient = new DefaultHttpClient();
+      URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI());
+      builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName())
+          .build();
+
+      HttpGet getRequest = client.getHttpGet(builder.toString());
+      return executeListKeys(getRequest, httpClient);
+
+    } catch (IOException | URISyntaxException e) {
+      throw new OzoneClientException(e.getMessage());
+    }
+  }
+
+  /**
+   * Execute list Key.
+   *
+   * @param getRequest - HttpGet
+   * @param httpClient - HttpClient
+   * @return List<OzoneKey>
+   * @throws IOException
+   * @throws OzoneException
+   */
+  private List<OzoneKey> executeListKeys(HttpGet getRequest,
+                                         DefaultHttpClient httpClient)
+      throws IOException, OzoneException {
+    HttpEntity entity = null;
+    List<OzoneKey> ozoneKeyList = new LinkedList<OzoneKey>();
+    try {
+      HttpResponse response = httpClient.execute(getRequest);
+      int errorCode = response.getStatusLine().getStatusCode();
+
+      entity = response.getEntity();
+
+      if (entity == null) {
+        throw new OzoneClientException("Unexpected null in http payload");
+      }
+      if (errorCode == HTTP_OK) {
+        String temp = EntityUtils.toString(entity);
+        ListKeys keyList = ListKeys.parse(temp);
+
+        for (KeyInfo info : keyList.getKeyList()) {
+          ozoneKeyList.add(new OzoneKey(info));
+        }
+        return ozoneKeyList;
+
+      } else {
+        throw OzoneException.parse(EntityUtils.toString(entity));
+      }
+    } finally {
+      if (entity != null) {
+        EntityUtils.consumeQuietly(entity);
+      }
+    }
+  }
+
+  /**
+   * used to fix the content-length issue with protocol class.
+   */
   private static class ContentLengthHeaderRemover implements
       HttpRequestInterceptor {
     @Override
     public void process(HttpRequest request, HttpContext context)
-        throws HttpException, IOException {
-
-      // fighting org.apache.http.protocol
-      // .RequestContent's ProtocolException("Content-Length header
-      // already present");
+        throws IOException {
       request.removeHeaders(HTTP.CONTENT_LEN);
     }
   }

+ 44 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneKey.java

@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.web.client;
+
+import org.apache.hadoop.ozone.web.response.KeyInfo;
+
+/**
+ * Client side representation of an ozone Key.
+ */
+public class OzoneKey {
+  private KeyInfo keyInfo;
+
+  /**
+   * Constructor for Ozone Key.
+   * @param keyInfo - Key Info
+   */
+  public OzoneKey(KeyInfo keyInfo) {
+    this.keyInfo = keyInfo;
+  }
+
+  /**
+   * Returns Key Info.
+   * @return Object Info
+   */
+  public KeyInfo getObjectInfo() {
+    return keyInfo;
+  }
+
+}

+ 18 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/LocalStorageHandler.java

@@ -49,6 +49,8 @@ public class LocalStorageHandler implements StorageHandler {
 
   /**
    * Constructs LocalStorageHandler.
+   *
+   * @param conf ozone conf.
    */
   public LocalStorageHandler(Configuration conf) {
     this.conf = conf;
@@ -285,7 +287,9 @@ public class LocalStorageHandler implements StorageHandler {
   @Override
   public OutputStream newKeyWriter(KeyArgs args) throws IOException,
       OzoneException {
-    return null;
+    OzoneMetadataManager oz =
+        OzoneMetadataManager.getOzoneMetadataManager(conf);
+    return oz.createKey(args);
   }
 
   /**
@@ -299,6 +303,9 @@ public class LocalStorageHandler implements StorageHandler {
   @Override
   public void commitKey(KeyArgs args, OutputStream stream) throws
       IOException, OzoneException {
+    OzoneMetadataManager oz =
+        OzoneMetadataManager.getOzoneMetadataManager(conf);
+    oz.commitKey(args, stream);
 
   }
 
@@ -312,7 +319,9 @@ public class LocalStorageHandler implements StorageHandler {
   @Override
   public LengthInputStream newKeyReader(KeyArgs args) throws IOException,
       OzoneException {
-    return null;
+    OzoneMetadataManager oz =
+        OzoneMetadataManager.getOzoneMetadataManager(conf);
+    return oz.newKeyReader(args);
   }
 
   /**
@@ -323,7 +332,9 @@ public class LocalStorageHandler implements StorageHandler {
    */
   @Override
   public void deleteKey(KeyArgs args) throws IOException, OzoneException {
-
+    OzoneMetadataManager oz =
+        OzoneMetadataManager.getOzoneMetadataManager(conf);
+    oz.deleteKey(args);
   }
 
   /**
@@ -335,7 +346,10 @@ public class LocalStorageHandler implements StorageHandler {
    */
   @Override
   public ListKeys listKeys(ListArgs args) throws IOException, OzoneException {
-    return null;
+    OzoneMetadataManager oz =
+        OzoneMetadataManager.getOzoneMetadataManager(conf);
+    return oz.listKeys(args);
+
   }
 
 }

+ 273 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java

@@ -17,13 +17,18 @@
  */
 package org.apache.hadoop.ozone.web.localstorage;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.ozone.web.handlers.ListArgs;
+import org.apache.hadoop.ozone.web.response.KeyInfo;
+import org.apache.hadoop.ozone.web.response.ListKeys;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
 import org.apache.hadoop.ozone.web.exceptions.OzoneException;
 import org.apache.hadoop.ozone.web.handlers.BucketArgs;
+import org.apache.hadoop.ozone.web.handlers.KeyArgs;
 import org.apache.hadoop.ozone.web.handlers.UserArgs;
 import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
 import org.apache.hadoop.ozone.web.request.OzoneAcl;
@@ -34,9 +39,13 @@ import org.apache.hadoop.ozone.web.response.VolumeInfo;
 import org.apache.hadoop.ozone.web.response.VolumeOwner;
 import org.apache.hadoop.ozone.web.utils.OzoneConsts;
 import org.iq80.leveldb.DBException;
+import org.apache.commons.codec.digest.DigestUtils;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.OutputStream;
+import java.io.FileOutputStream;
 import java.nio.charset.Charset;
 import java.text.SimpleDateFormat;
 import java.util.Date;
@@ -44,6 +53,7 @@ import java.util.List;
 import java.util.ListIterator;
 import java.util.Locale;
 import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -111,7 +121,7 @@ public final class OzoneMetadataManager {
       // //  stand-alone tests for the protocol and client code.
 
 */
-  static final Log LOG = LogFactory.getLog(OzoneMetadataManager.class);
+  static final Logger LOG = LoggerFactory.getLogger(OzoneMetadataManager.class);
   private static final String USER_DB = "/user.db";
   private static final String META_DB = "/metadata.db";
   private static OzoneMetadataManager bm = null;
@@ -119,28 +129,37 @@ public final class OzoneMetadataManager {
   private OzoneLevelDBStore metadataDB;
   private ReadWriteLock lock;
   private Charset encoding = Charset.forName("UTF-8");
+  private String storageRoot;
+  private static final String OBJECT_DIR = "/_objects/";
+
+  // This table keeps a pointer to objects whose operations
+  // are in progress but not yet committed to persistent store
+  private ConcurrentHashMap<OutputStream, String> inProgressObjects;
 
   /**
    * Constructs OzoneMetadataManager.
    */
-  private OzoneMetadataManager(Configuration conf) {
+  private OzoneMetadataManager(Configuration conf) throws IOException {
 
     lock = new ReentrantReadWriteLock();
-    String storageRoot =
+    storageRoot =
         conf.getTrimmed(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT,
             OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT_DEFAULT);
 
-    File file = new File(storageRoot);
+    File file = new File(storageRoot + OBJECT_DIR);
 
     if (!file.exists() && !file.mkdirs()) {
-      LOG.fatal("Creation of Ozone root failed. " + file.toString());
+      LOG.error("Creation of Ozone root failed. " + file.toString());
+      throw new IOException("Creation of Ozone root failed.");
     }
 
     try {
       userDB = new OzoneLevelDBStore(new File(storageRoot + USER_DB), true);
       metadataDB = new OzoneLevelDBStore(new File(storageRoot + META_DB), true);
+      inProgressObjects = new ConcurrentHashMap<>();
     } catch (IOException ex) {
-      LOG.fatal("Cannot open db :" + ex.getMessage());
+      LOG.error("Cannot open db :" + ex.getMessage());
+      throw ex;
     }
   }
 
@@ -150,7 +169,7 @@ public final class OzoneMetadataManager {
    * @return OzoneMetadataManager
    */
   public static synchronized OzoneMetadataManager
-      getOzoneMetadataManager(Configuration conf) {
+      getOzoneMetadataManager(Configuration conf) throws IOException {
     if (bm == null) {
       bm = new OzoneMetadataManager(conf);
     }
@@ -440,8 +459,8 @@ public final class OzoneMetadataManager {
 
       if (args.getRemoveAcls() != null) {
         OzoneException ex = ErrorTable.newError(ErrorTable.MALFORMED_ACL, args);
-        ex.setMessage("Remove ACLs specified in bucket create. Please remove " +
-            "them and retry.");
+        ex.setMessage("Remove ACLs specified in bucket create. Please remove "
+            + "them and retry.");
         throw ex;
       }
 
@@ -680,6 +699,249 @@ public final class OzoneMetadataManager {
     }
   }
 
+  /**
+   *  Creates a key and returns a stream to which this key can be written to.
+   * @param args  KeyArgs
+   * @return - A stream into which key can be written to.
+   * @throws OzoneException
+   */
+  public OutputStream createKey(KeyArgs args) throws OzoneException {
+    lock.writeLock().lock();
+    try {
+      String fileNameHash = DigestUtils.sha256Hex(args.getResourceName());
+
+      // Please don't try trillion objects unless the physical file system
+      // is capable of doing that in a single directory.
+
+      String fullPath = storageRoot + OBJECT_DIR + fileNameHash;
+      File f = new File(fullPath);
+
+      // In real ozone it would not be this way, a file will be overwritten
+      // only if the upload is successful.
+      if (f.exists()) {
+        LOG.debug("we are overwriting a file. This is by design.");
+        if(!f.delete()) {
+          LOG.error("Unable to delete the file: {}", fullPath);
+          throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args);
+        }
+      }
+
+     // f.createNewFile();
+      FileOutputStream fsStream = new FileOutputStream(f);
+      inProgressObjects.put(fsStream, fullPath);
+
+      return fsStream;
+    } catch (IOException e) {
+      throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, e);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+
+  /**
+   * commit keys moves an In progress object into the metadata store
+   * so that key is visible in the metadata operations from that point
+   * onwards.
+   *
+   * @param args Object args
+   *
+   * @throws OzoneException
+   */
+  public void commitKey(KeyArgs args, OutputStream stream)
+      throws OzoneException {
+    SimpleDateFormat format =
+        new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
+    lock.writeLock().lock();
+
+    try {
+      byte[] bucketInfo = metadataDB.get(args.getParentName()
+          .getBytes(encoding));
+      if (bucketInfo == null) {
+        throw ErrorTable.newError(ErrorTable.INVALID_BUCKET_NAME, args);
+      }
+      BucketInfo bInfo = BucketInfo.parse(new String(bucketInfo, encoding));
+      bInfo.setKeyCount(bInfo.getKeyCount() + 1);
+
+      String fileNameHash = inProgressObjects.get(stream);
+      inProgressObjects.remove(stream);
+      if (fileNameHash == null) {
+        throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args);
+      }
+
+      ListKeys keyList;
+      byte[] bucketListBytes = userDB.get(args.getParentName()
+          .getBytes(encoding));
+      if (bucketListBytes == null) {
+        keyList = new ListKeys();
+      } else {
+        keyList = ListKeys.parse(new String(bucketListBytes, encoding));
+      }
+
+      KeyInfo keyInfo;
+
+      byte[] objectBytes = metadataDB.get(args.getResourceName()
+          .getBytes(encoding));
+
+      if (objectBytes != null) {
+        // we are overwriting an existing object.
+        // TODO : Emit info for Accounting
+        keyInfo = KeyInfo.parse(new String(objectBytes, encoding));
+        keyList.getKeyList().remove(keyInfo);
+      } else {
+        keyInfo = new KeyInfo();
+      }
+
+      keyInfo.setCreatedOn(format.format(new Date(System.currentTimeMillis())));
+
+      // TODO : support version, we need to check if versioning
+      // is switched on the bucket and make appropriate calls.
+      keyInfo.setVersion(0);
+
+      keyInfo.setDataFileName(fileNameHash);
+      keyInfo.setKeyName(args.getKeyName());
+      keyInfo.setMd5hash(args.getHash());
+      keyInfo.setSize(args.getSize());
+
+      keyList.getKeyList().add(keyInfo);
+
+      // if the key exists, we overwrite happily :). since the
+      // earlier call - createObject -  has overwritten the data.
+
+      metadataDB.put(args.getResourceName().getBytes(encoding),
+          keyInfo.toDBString().getBytes(encoding));
+
+      metadataDB.put(args.getParentName().getBytes(encoding),
+              bInfo.toDBString().getBytes(encoding));
+
+      userDB.put(args.getParentName().getBytes(encoding),
+          keyList.toDBString().getBytes(encoding));
+
+    } catch (IOException e) {
+      throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, e);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * deletes an key from a given bucket.
+   *
+   * @param args - ObjectArgs
+   *
+   * @throws OzoneException
+   */
+  public void deleteKey(KeyArgs args) throws OzoneException {
+    lock.writeLock().lock();
+    try {
+      byte[] bucketInfo = metadataDB.get(args.getParentName()
+          .getBytes(encoding));
+      if (bucketInfo == null) {
+        throw ErrorTable.newError(ErrorTable.INVALID_BUCKET_NAME, args);
+      }
+      BucketInfo bInfo = BucketInfo.parse(new String(bucketInfo, encoding));
+      bInfo.setKeyCount(bInfo.getKeyCount() - 1);
+
+
+      byte[] bucketListBytes = userDB.get(args.getParentName()
+          .getBytes(encoding));
+      if (bucketListBytes == null) {
+        throw ErrorTable.newError(ErrorTable.INVALID_BUCKET_NAME, args);
+      }
+      ListKeys keyList = ListKeys.parse(new String(bucketListBytes, encoding));
+
+
+      byte[] objectBytes = metadataDB.get(args.getResourceName()
+          .getBytes(encoding));
+      if (objectBytes == null) {
+        throw ErrorTable.newError(ErrorTable.INVALID_KEY, args);
+      }
+
+      KeyInfo oInfo = KeyInfo.parse(new String(objectBytes, encoding));
+      keyList.getKeyList().remove(oInfo);
+
+      String fileNameHash = DigestUtils.sha256Hex(args.getResourceName());
+
+      String fullPath = storageRoot + OBJECT_DIR + fileNameHash;
+      File f = new File(fullPath);
+
+      if (f.exists()) {
+        if(!f.delete()) {
+          throw ErrorTable.newError(ErrorTable.KEY_OPERATION_CONFLICT, args);
+        }
+      } else {
+        throw ErrorTable.newError(ErrorTable.INVALID_KEY, args);
+      }
+
+
+      metadataDB.delete(args.getResourceName().getBytes(encoding));
+      metadataDB.put(args.getParentName().getBytes(encoding),
+              bInfo.toDBString().getBytes(encoding));
+      userDB.put(args.getParentName().getBytes(encoding),
+          keyList.toDBString().getBytes(encoding));
+    } catch (IOException e) {
+      throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, e);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Returns a Stream for the file.
+   *
+   * @param args - Object args
+   *
+   * @return Stream
+   *
+   * @throws IOException
+   * @throws OzoneException
+   */
+  public LengthInputStream newKeyReader(KeyArgs args)
+      throws IOException, OzoneException {
+    lock.readLock().lock();
+    try {
+      String fileNameHash = DigestUtils.sha256Hex(args.getResourceName());
+      String fullPath = storageRoot + OBJECT_DIR + fileNameHash;
+      File f = new File(fullPath);
+      if (!f.exists()) {
+        throw ErrorTable.newError(ErrorTable.INVALID_KEY, args);
+      }
+      long size = f.length();
+
+      FileInputStream fileStream = new FileInputStream(f);
+      return new LengthInputStream(fileStream, size);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Returns keys in a bucket.
+   * @param args
+   * @return  List of keys.
+   * @throws IOException
+   * @throws OzoneException
+   */
+  public ListKeys listKeys(ListArgs args) throws IOException, OzoneException {
+    lock.readLock().lock();
+    try {
+      byte[] bucketInfo = metadataDB.get(args.getResourceName()
+          .getBytes(encoding));
+      if (bucketInfo == null) {
+        throw ErrorTable.newError(ErrorTable.INVALID_BUCKET_NAME, args);
+      }
+
+      byte[] bucketListBytes = userDB.get(args.getResourceName()
+          .getBytes(encoding));
+      if (bucketListBytes == null) {
+        throw ErrorTable.newError(ErrorTable.INVALID_BUCKET_NAME, args);
+      }
+      return ListKeys.parse(new String(bucketListBytes, encoding));
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
   /**
    * This is used in updates to volume metadata.
    */

+ 14 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/response/ListKeys.java

@@ -41,13 +41,13 @@ public class ListKeys {
   private String prefix;
   private long maxKeys;
   private boolean truncated;
-  private List<KeyInfo> objectList;
+  private List<KeyInfo> keyList;
 
   /**
    * Default constructor needed for json serialization.
    */
   public ListKeys() {
-    this.objectList = new LinkedList<>();
+    this.keyList = new LinkedList<>();
   }
 
   /**
@@ -65,9 +65,9 @@ public class ListKeys {
 
   /**
    * Converts a Json string to POJO.
-   * @param jsonString
+   * @param jsonString - json string.
    * @return ListObject
-   * @throws IOException
+   * @throws IOException - Json conversion error.
    */
   public static ListKeys parse(String jsonString) throws IOException {
     ObjectMapper mapper = new ObjectMapper();
@@ -79,17 +79,17 @@ public class ListKeys {
    *
    * @return List of KeyInfo Objects.
    */
-  public List<KeyInfo> getObjectList() {
-    return objectList;
+  public List<KeyInfo> getKeyList() {
+    return keyList;
   }
 
   /**
    * Sets the list of Objects.
    *
-   * @param objectList
+   * @param objectList - List of Keys
    */
-  public void setObjectList(List<KeyInfo> objectList) {
-    this.objectList = objectList;
+  public void setKeyList(List<KeyInfo> objectList) {
+    this.keyList = objectList;
   }
 
   /**
@@ -142,6 +142,7 @@ public class ListKeys {
    * keyCount.
    *
    * @return String
+   * @throws  IOException - On json Errors.
    */
   public String toJsonString() throws IOException {
     String[] ignorableFieldNames = {"dataFileName"};
@@ -159,6 +160,9 @@ public class ListKeys {
 
   /**
    * Returns the Object as a Json String.
+   *
+   * @return String
+   * @throws IOException - on json errors.
    */
   public String toDBString() throws IOException {
     ObjectMapper mapper = new ObjectMapper();
@@ -170,7 +174,7 @@ public class ListKeys {
    * list of keys.
    */
   public void sort() {
-    Collections.sort(objectList);
+    Collections.sort(keyList);
   }
 
   /**

+ 9 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/utils/OzoneUtils.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone.web.utils;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
 import org.apache.hadoop.ozone.web.exceptions.OzoneException;
 import org.apache.hadoop.ozone.web.handlers.UserArgs;
@@ -27,7 +28,7 @@ import org.apache.hadoop.ozone.web.headers.Header;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Request;
 import javax.ws.rs.core.Response;
-import java.io.InputStream;
+import javax.ws.rs.core.MediaType;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.charset.Charset;
@@ -45,7 +46,8 @@ import java.util.UUID;
 @InterfaceAudience.Private
 public final class OzoneUtils {
 
-  public static final Charset ENCODING = Charset.forName("UTF-8");
+  public static final String ENCODING_NAME = "UTF-8";
+  public static final Charset ENCODING = Charset.forName(ENCODING_NAME);
 
   private OzoneUtils() {
     // Never constructed
@@ -256,15 +258,17 @@ public final class OzoneUtils {
    * @return JAX-RS Response
    */
   public static Response getResponse(UserArgs args, int statusCode,
-                                     InputStream stream) {
+                                     LengthInputStream stream) {
     SimpleDateFormat format =
         new SimpleDateFormat(OzoneConsts.OZONE_DATE_FORMAT, Locale.US);
     format.setTimeZone(TimeZone.getTimeZone(OzoneConsts.OZONE_TIME_ZONE));
     String date = format.format(new Date(System.currentTimeMillis()));
-    return Response.ok(stream)
+    return Response.ok(stream, MediaType.APPLICATION_OCTET_STREAM)
         .header(Header.OZONE_SERVER_NAME, args.getHostName())
         .header(Header.OZONE_REQUEST_ID, args.getRequestID())
         .header(HttpHeaders.DATE, date).status(statusCode)
-        .header(HttpHeaders.CONTENT_TYPE, "application/octet-stream").build();
+        .header(HttpHeaders.CONTENT_LENGTH, stream.getLength())
+        .build();
+
   }
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestBuckets.java

@@ -59,7 +59,7 @@ public class TestBuckets {
     OzoneConfiguration conf = new OzoneConfiguration();
 
     URL p = conf.getClass().getResource("");
-    String path = p.getPath();
+    String path = p.getPath().concat(TestBuckets.class.getSimpleName());
     path += conf.getTrimmed(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT,
         OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT_DEFAULT);
 

+ 236 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java

@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.web.client;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
+import org.apache.hadoop.ozone.web.exceptions.OzoneException;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+public class TestKeys {
+  static MiniDFSCluster cluster = null;
+  static int port = 0;
+  static private String path;
+  private static OzoneClient client = null;
+
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   *
+   * Ozone is made active by setting DFS_OBJECTSTORE_ENABLED_KEY = true and
+   * DFS_STORAGE_HANDLER_TYPE_KEY = "local" , which uses a local
+   * directory to emulate Ozone backend.
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init()
+      throws IOException, OzoneException, URISyntaxException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+
+    URL p = conf.getClass().getResource("");
+    path = p.getPath().concat(TestKeys.class.getSimpleName());
+    path += conf.getTrimmed(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT,
+                            OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT_DEFAULT);
+
+    conf.set(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT, path);
+    conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_ENABLED_KEY, true);
+    conf.set(OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY, "local");
+
+    conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_TRACE_ENABLED_KEY, true);
+    Logger.getLogger("log4j.logger.org.apache.http").setLevel(Level.DEBUG);
+
+
+    cluster = new MiniDFSCluster.Builder(conf).build();
+    cluster.waitActive();
+    DataNode dataNode = cluster.getDataNodes().get(0);
+    port = dataNode.getInfoPort();
+    client = new OzoneClient(String.format("http://localhost:%d", port));
+  }
+
+  /**
+   * shutdown MiniDFSCluster
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Creates a file with Random Data
+   *
+   * @return File.
+   */
+  private File createRandomDataFile(String fileName, long size) {
+    File tmpDir = new File(path);
+    tmpDir.mkdirs();
+    File tmpFile = new File(path + "/" + fileName);
+    try {
+      FileOutputStream randFile = new FileOutputStream(tmpFile);
+      Random r = new Random();
+      for (int x = 0; x < size; x++) {
+        char c = (char) (r.nextInt(26) + 'a');
+        randFile.write(c);
+      }
+
+    } catch (IOException e) {
+      fail(e.getMessage());
+    }
+    return tmpFile;
+  }
+
+
+  private class PutHelper {
+    OzoneVolume vol;
+    OzoneBucket bucket;
+    File file;
+
+    public OzoneVolume getVol() {
+      return vol;
+    }
+
+    public OzoneBucket getBucket() {
+      return bucket;
+    }
+
+    public File getFile() {
+      return file;
+    }
+    /**
+     * This function is reused in all other tests.
+     *
+     * @return Returns the name of the new key that was created.
+     * @throws OzoneException
+     */
+    private String putKey() throws
+        OzoneException {
+      String volumeName = OzoneUtils.getRequestID().toLowerCase();
+      client.setUserAuth("hdfs");
+
+      vol = client.createVolume(volumeName, "bilbo", "100TB");
+      String[] acls = {"user:frodo:rw", "user:samwise:rw"};
+
+      String bucketName = OzoneUtils.getRequestID().toLowerCase();
+      bucket = vol.createBucket(bucketName, acls, StorageType.DEFAULT);
+
+      String keyName = OzoneUtils.getRequestID().toLowerCase();
+      file = createRandomDataFile(keyName, 1024);
+
+      bucket.putKey(keyName, file);
+      return keyName;
+    }
+
+  }
+
+  @Test
+  public void testPutKey() throws OzoneException {
+    PutHelper helper  = new PutHelper();
+    helper.putKey();
+    assertNotNull(helper.getBucket());
+    assertNotNull(helper.getFile());
+  }
+
+
+  @Test
+  public void testPutAndGetKey() throws OzoneException, IOException {
+
+    PutHelper helper  = new PutHelper();
+    String keyName = helper.putKey();
+    assertNotNull(helper.getBucket());
+    assertNotNull(helper.getFile());
+
+    String newFileName =  path + "/" +OzoneUtils.getRequestID().toLowerCase();
+    Path newPath = Paths.get(newFileName);
+    helper.getBucket().getKey(keyName, newPath);
+
+    FileInputStream original = new FileInputStream(helper.getFile());
+    FileInputStream downloaded = new FileInputStream(newPath.toFile());
+
+
+    String originalHash = DigestUtils.sha256Hex(original);
+    String downloadedHash = DigestUtils.sha256Hex(downloaded);
+
+    assertEquals(
+        "Sha256 does not match between original file and downloaded file.",
+        originalHash, downloadedHash);
+
+  }
+
+  @Test
+  public void testPutAndDeleteKey() throws OzoneException, IOException {
+
+    PutHelper helper  = new PutHelper();
+    String keyName = helper.putKey();
+    assertNotNull(helper.getBucket());
+    assertNotNull(helper.getFile());
+    helper.getBucket().deleteKey(keyName);
+
+    try {
+      helper.getBucket().getKey(keyName);
+      fail("Get Key on a deleted key should have thrown");
+    } catch (OzoneException ex) {
+      assertEquals(ex.getShortMessage(),
+          ErrorTable.INVALID_KEY.getShortMessage());
+    }
+  }
+
+
+  @Test
+  public void testPutAndListKey() throws OzoneException, IOException {
+    PutHelper helper  = new PutHelper();
+    helper.putKey();
+    assertNotNull(helper.getBucket());
+    assertNotNull(helper.getFile());
+
+    for (int x = 0; x < 10; x++) {
+      String newkeyName =   OzoneUtils.getRequestID().toLowerCase();
+      helper.getBucket().putKey(newkeyName, helper.getFile());
+    }
+
+    List<OzoneKey> keyList = helper.getBucket().listKeys();
+    Assert.assertEquals(keyList.size(), 11);
+  }
+}