Selaa lähdekoodia

HDFS-10268. Ozone: end-to-end integration for create/get volumes, buckets and keys. Contributed by Chris Nauroth.

Anu Engineer 9 vuotta sitten
vanhempi
commit
fedb22d9b6
21 muutettua tiedostoa jossa 1695 lisäystä ja 114 poistoa
  1. 15 10
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  2. 54 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
  3. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
  4. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
  5. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
  6. 13 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClient.java
  7. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientHandler.java
  8. 83 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientManager.java
  9. 24 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/package-info.java
  10. 24 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java
  11. 97 13
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java
  12. 6 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneBucket.java
  13. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/exceptions/OzoneException.java
  14. 10 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/request/OzoneQuota.java
  15. 193 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkInputStream.java
  16. 193 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkOutputStream.java
  17. 198 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ContainerProtocolCalls.java
  18. 209 57
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
  19. 261 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/OzoneContainerTranslation.java
  20. 53 18
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
  21. 253 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestOzoneRestWithMiniCluster.java

+ 15 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -1846,16 +1846,6 @@ public class DataNode extends ReconfigurableBase
   public void shutdown() {
     stopMetricsLogger();
 
-    if(this.ozoneEnabled) {
-      if(ozoneServer != null) {
-        try {
-          ozoneServer.stop();
-        } catch (Exception e) {
-          LOG.error("Error is ozone shutdown. ex {}", e.toString());
-        }
-      }
-    }
-
     if (plugins != null) {
       for (ServicePlugin p : plugins) {
         try {
@@ -1914,6 +1904,21 @@ public class DataNode extends ReconfigurableBase
       }
     }
 
+    // Stop the object store handler
+    if (this.objectStoreHandler != null) {
+      this.objectStoreHandler.close();
+    }
+
+    if(this.ozoneEnabled) {
+      if(ozoneServer != null) {
+        try {
+          ozoneServer.stop();
+        } catch (Exception e) {
+          LOG.error("Error is ozone shutdown. ex {}", e.toString());
+        }
+      }
+    }
+
     if (pauseMonitor != null) {
       pauseMonitor.stop();
     }

+ 54 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java

@@ -17,36 +17,58 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static com.sun.jersey.api.core.ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS;
-import static com.sun.jersey.api.core.ResourceConfig.FEATURE_TRACE;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_OBJECTSTORE_TRACE_ENABLED_KEY;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_OBJECTSTORE_TRACE_ENABLED_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_ADDRESS_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_BIND_HOST_KEY;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_DEFAULT_PORT;
+import static com.sun.jersey.api.core.ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS;
+import static com.sun.jersey.api.core.ResourceConfig.FEATURE_TRACE;
 
+import java.io.Closeable;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
 
 import com.sun.jersey.api.container.ContainerFactory;
 import com.sun.jersey.api.core.ApplicationAdapter;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolPB;
 import org.apache.hadoop.ozone.web.handlers.ServiceFilter;
 import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
 import org.apache.hadoop.ozone.web.ObjectStoreApplication;
 import org.apache.hadoop.ozone.web.netty.ObjectStoreJerseyContainer;
 import org.apache.hadoop.ozone.web.storage.DistributedStorageHandler;
 import org.apache.hadoop.ozone.web.localstorage.LocalStorageHandler;
+import org.apache.hadoop.security.UserGroupInformation;
 
 /**
  * Implements object store handling within the DataNode process.  This class is
  * responsible for initializing and maintaining the RPC clients and servers and
  * the web application required for the object store implementation.
  */
-public final class ObjectStoreHandler {
+public final class ObjectStoreHandler implements Closeable {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ObjectStoreJerseyContainer.class);
 
   private final ObjectStoreJerseyContainer objectStoreJerseyContainer;
+  private final StorageContainerLocationProtocolClientSideTranslatorPB
+      storageContainerLocationClient;
 
   /**
    * Creates a new ObjectStoreHandler.
@@ -57,14 +79,32 @@ public final class ObjectStoreHandler {
   public ObjectStoreHandler(Configuration conf) throws IOException {
     String shType = conf.getTrimmed(DFS_STORAGE_HANDLER_TYPE_KEY,
         DFS_STORAGE_HANDLER_TYPE_DEFAULT);
+    LOG.info("ObjectStoreHandler initializing with {}: {}",
+        DFS_STORAGE_HANDLER_TYPE_KEY, shType);
     boolean ozoneTrace = conf.getBoolean(DFS_OBJECTSTORE_TRACE_ENABLED_KEY,
         DFS_OBJECTSTORE_TRACE_ENABLED_DEFAULT);
     final StorageHandler storageHandler;
+
+    // Initialize Jersey container for object store web application.
     if ("distributed".equalsIgnoreCase(shType)) {
-      storageHandler = new DistributedStorageHandler();
+      RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
+          ProtobufRpcEngine.class);
+      long version =
+          RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
+      InetSocketAddress address = conf.getSocketAddr(
+          DFS_STORAGE_RPC_BIND_HOST_KEY, DFS_STORAGE_RPC_ADDRESS_KEY,
+          DFS_STORAGE_RPC_ADDRESS_DEFAULT, DFS_STORAGE_RPC_DEFAULT_PORT);
+      this.storageContainerLocationClient =
+          new StorageContainerLocationProtocolClientSideTranslatorPB(
+              RPC.getProxy(StorageContainerLocationProtocolPB.class, version,
+              address, UserGroupInformation.getCurrentUser(), conf,
+              NetUtils.getDefaultSocketFactory(conf), Client.getTimeout(conf)));
+      storageHandler = new DistributedStorageHandler(new OzoneConfiguration(),
+          this.storageContainerLocationClient);
     } else {
       if ("local".equalsIgnoreCase(shType)) {
         storageHandler = new LocalStorageHandler(conf);
+        this.storageContainerLocationClient = null;
       } else {
         throw new IllegalArgumentException(
             String.format("Unrecognized value for %s: %s",
@@ -91,4 +131,12 @@ public final class ObjectStoreHandler {
   public ObjectStoreJerseyContainer getObjectStoreJerseyContainer() {
     return this.objectStoreJerseyContainer;
   }
+
+  @Override
+  public void close() {
+    LOG.info("Closing ObjectStoreHandler.");
+    if (this.storageContainerLocationClient != null) {
+      this.storageContainerLocationClient.close();
+    }
+  }
 }

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java

@@ -68,6 +68,8 @@ public final class OzoneConsts {
   public static final String FILE_HASH = "SHA-256";
   public final static String CHUNK_OVERWRITE = "OverWriteRequested";
 
+  public static final int CHUNK_SIZE = 1 * 1024 * 1024; // 1 MB
+
   /**
    * Supports Bucket Versioning.
    */

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java

@@ -169,7 +169,8 @@ public final class ChunkUtils {
               StandardOpenOption.SPARSE,
               StandardOpenOption.SYNC);
       lock = file.lock().get();
-      if (!chunkInfo.getChecksum().isEmpty()) {
+      if (chunkInfo.getChecksum() != null &&
+          !chunkInfo.getChecksum().isEmpty()) {
         verifyChecksum(chunkInfo, data, log);
       }
       int size = file.write(ByteBuffer.wrap(data), chunkInfo.getOffset()).get();

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java

@@ -127,7 +127,7 @@ public class Dispatcher implements ContainerDispatcher {
           msg.getCreateContainer().getContainerData().getName(),
           msg.getCmdType().name(),
           msg.getTraceID(),
-          ex.toString());
+          ex.toString(), ex);
 
       // TODO : Replace with finer error codes.
       return ContainerUtils.getContainerResponse(msg,
@@ -169,7 +169,7 @@ public class Dispatcher implements ContainerDispatcher {
           msg.getCreateContainer().getContainerData().getName(),
           msg.getCmdType().name(),
           msg.getTraceID(),
-          ex.toString());
+          ex.toString(), ex);
 
       // TODO : Replace with finer error codes.
       return ContainerUtils.getContainerResponse(msg,
@@ -210,7 +210,7 @@ public class Dispatcher implements ContainerDispatcher {
           msg.getCreateContainer().getContainerData().getName(),
           msg.getCmdType().name(),
           msg.getTraceID(),
-          ex.toString());
+          ex.toString(), ex);
 
       // TODO : Replace with finer error codes.
       return ContainerUtils.getContainerResponse(msg,

+ 13 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClient.java

@@ -34,12 +34,13 @@ import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
 import java.io.IOException;
 
 /**
  * A Client for the storageContainer protocol.
  */
-public class XceiverClient {
+public class XceiverClient implements Closeable {
   static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class);
   private final Pipeline pipeline;
   private final Configuration config;
@@ -92,6 +93,7 @@ public class XceiverClient {
   /**
    * Close the client.
    */
+  @Override
   public void close() {
     if(group != null) {
       group.shutdownGracefully();
@@ -102,6 +104,16 @@ public class XceiverClient {
     }
   }
 
+  /**
+   * Returns the pipeline of machines that host the container used by this
+   * client.
+   *
+   * @return pipeline of machines that host the container
+   */
+  public Pipeline getPipeline() {
+    return pipeline;
+  }
+
   /**
    * Sends a given command to server and gets the reply back.
    * @param request Request

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientHandler.java

@@ -94,7 +94,7 @@ public class XceiverClientHandler extends
     ContainerProtos.ContainerCommandResponseProto response;
     channel.writeAndFlush(request);
     boolean interrupted = false;
-    for (; ; ) {
+    for (;;) {
       try {
         response = responses.take();
         break;

+ 83 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientManager.java

@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.client;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
+
+/**
+ * XceiverClientManager is responsible for the lifecycle of XceiverClient
+ * instances.  Callers use this class to acquire an XceiverClient instance
+ * connected to the desired container pipeline.  When done, the caller also uses
+ * this class to release the previously acquired XceiverClient instance.
+ *
+ * This class may evolve to implement efficient lifecycle management policies by
+ * caching container location information and pooling connected client instances
+ * for reuse without needing to reestablish a socket connection.  The current
+ * implementation simply allocates and closes a new instance every time.
+ */
+public class XceiverClientManager {
+
+  private final OzoneConfiguration conf;
+
+  /**
+   * Creates a new XceiverClientManager.
+   *
+   * @param conf configuration
+   */
+  public XceiverClientManager(OzoneConfiguration conf) {
+    Preconditions.checkNotNull(conf);
+    this.conf = conf;
+  }
+
+  /**
+   * Acquires a XceiverClient connected to a container capable of storing the
+   * specified key.
+   *
+   * @param pipeline the container pipeline for the client connection
+   * @return XceiverClient connected to a container
+   * @throws IOException if an XceiverClient cannot be acquired
+   */
+  public XceiverClient acquireClient(Pipeline pipeline) throws IOException {
+    Preconditions.checkNotNull(pipeline);
+    Preconditions.checkArgument(pipeline.getMachines() != null);
+    Preconditions.checkArgument(!pipeline.getMachines().isEmpty());
+    XceiverClient xceiverClient = new XceiverClient(pipeline, conf);
+    try {
+      xceiverClient.connect();
+    } catch (Exception e) {
+      throw new IOException("Exception connecting XceiverClient.", e);
+    }
+    return xceiverClient;
+  }
+
+  /**
+   * Releases an XceiverClient after use.
+   *
+   * @param xceiverClient client to release
+   */
+  public void releaseClient(XceiverClient xceiverClient) {
+    Preconditions.checkNotNull(xceiverClient);
+    xceiverClient.close();
+  }
+}

+ 24 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/package-info.java

@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.client;
+
+/**
+ * This package contains classes for the client of the storage container
+ * protocol.
+ */

+ 24 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java

@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server;
+
+/**
+ * This package contains classes for the server of the storage container
+ * protocol.
+ */

+ 97 - 13
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java

@@ -48,10 +48,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerData;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.CreateContainerRequestProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -81,6 +86,10 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
+import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient;
+import org.apache.hadoop.ozone.container.common.transport.client.XceiverClientManager;
 import org.apache.hadoop.ozone.protocol.LocatedContainer;
 import org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos;
@@ -94,11 +103,14 @@ import org.apache.hadoop.util.StringUtils;
  *
  * The current implementation is a stub suitable to begin end-to-end testing of
  * Ozone service interactions.  DataNodes report to StorageContainerManager
- * using the existing heartbeat messages.  StorageContainerManager tells clients
- * container locations by reporting that all registered nodes are a viable
- * location.  This will evolve from a stub to a full-fledged implementation
- * capable of partitioning the keyspace across multiple containers, with
- * appropriate distribution across nodes.
+ * using the existing heartbeat messages.  StorageContainerManager lazily
+ * initializes a single storage container to be served by those DataNodes.
+ * All subsequent requests for container locations will reply with that single
+ * pipeline, using all registered nodes.
+ *
+ * This will evolve from a stub to a full-fledged implementation capable of
+ * partitioning the keyspace across multiple containers, with appropriate
+ * distribution across nodes.
  */
 @InterfaceAudience.Private
 public class StorageContainerManager
@@ -109,6 +121,8 @@ public class StorageContainerManager
 
   private final StorageContainerNameService ns;
   private final BlockManager blockManager;
+  private final XceiverClientManager xceiverClientManager;
+  private Pipeline singlePipeline;
 
   /** The RPC server that listens to requests from DataNodes. */
   private final RPC.Server serviceRpcServer;
@@ -128,11 +142,12 @@ public class StorageContainerManager
    *
    * @param conf configuration
    */
-  public StorageContainerManager(Configuration conf)
+  public StorageContainerManager(OzoneConfiguration conf)
       throws IOException {
     ns = new StorageContainerNameService();
     boolean haEnabled = false;
     blockManager = new BlockManager(ns, haEnabled, conf);
+    xceiverClientManager = new XceiverClientManager(conf);
 
     RPC.setProtocolEngine(conf, DatanodeProtocolPB.class,
         ProtobufRpcEngine.class);
@@ -193,20 +208,20 @@ public class StorageContainerManager
   public Set<LocatedContainer> getStorageContainerLocations(Set<String> keys)
       throws IOException {
     LOG.trace("getStorageContainerLocations keys = {}", keys);
+    Pipeline pipeline = initSingleContainerPipeline();
     List<DatanodeDescriptor> liveNodes = new ArrayList<DatanodeDescriptor>();
     blockManager.getDatanodeManager().fetchDatanodes(liveNodes, null, false);
     if (liveNodes.isEmpty()) {
       throw new IOException("Storage container locations not found.");
     }
-    String containerName = UUID.randomUUID().toString();
     Set<DatanodeInfo> locations =
         Sets.<DatanodeInfo>newLinkedHashSet(liveNodes);
     DatanodeInfo leader = liveNodes.get(0);
     Set<LocatedContainer> locatedContainers =
         Sets.newLinkedHashSetWithExpectedSize(keys.size());
     for (String key: keys) {
-      locatedContainers.add(new LocatedContainer(key, key, containerName,
-          locations, leader));
+      locatedContainers.add(new LocatedContainer(key, key,
+          pipeline.getContainerName(), locations, leader));
     }
     LOG.trace("getStorageContainerLocations keys = {}, locatedContainers = {}",
         keys, locatedContainers);
@@ -415,6 +430,56 @@ public class StorageContainerManager
     }
   }
 
+  /**
+   * Lazily initializes a single container pipeline using all registered
+   * DataNodes via a synchronous call to the container protocol.  This single
+   * container pipeline will be reused for container requests for the lifetime
+   * of this StorageContainerManager.
+   *
+   * @throws IOException if there is an I/O error
+   */
+  private synchronized Pipeline initSingleContainerPipeline()
+      throws IOException {
+    if (singlePipeline == null) {
+      List<DatanodeDescriptor> liveNodes = new ArrayList<DatanodeDescriptor>();
+      blockManager.getDatanodeManager().fetchDatanodes(liveNodes, null, false);
+      if (liveNodes.isEmpty()) {
+        throw new IOException("Storage container locations not found.");
+      }
+      Pipeline newPipeline = newPipelineFromNodes(liveNodes,
+          UUID.randomUUID().toString());
+      XceiverClient xceiverClient =
+          xceiverClientManager.acquireClient(newPipeline);
+      try {
+        ContainerData containerData = ContainerData
+            .newBuilder()
+            .setName(newPipeline.getContainerName())
+            .build();
+        CreateContainerRequestProto createContainerRequest =
+            CreateContainerRequestProto.newBuilder()
+            .setPipeline(newPipeline.getProtobufMessage())
+            .setContainerData(containerData)
+            .build();
+        ContainerCommandRequestProto request = ContainerCommandRequestProto
+            .newBuilder()
+            .setCmdType(Type.CreateContainer)
+            .setCreateContainer(createContainerRequest)
+            .build();
+        ContainerCommandResponseProto response = xceiverClient.sendCommand(
+            request);
+        Result result = response.getResult();
+        if (result != Result.SUCCESS) {
+          throw new IOException(
+              "Failed to initialize container due to result code: " + result);
+        }
+        singlePipeline = newPipeline;
+      } finally {
+        xceiverClientManager.releaseClient(xceiverClient);
+      }
+    }
+    return singlePipeline;
+  }
+
   /**
    * Builds a message for logging startup information about an RPC server.
    *
@@ -429,6 +494,25 @@ public class StorageContainerManager
         String.format("%s not started", description);
   }
 
+  /**
+   * Translates a list of nodes, ordered such that the first is the leader, into
+   * a corresponding {@link Pipeline} object.
+   *
+   * @param nodes list of nodes
+   * @param containerName container name
+   * @return pipeline corresponding to nodes
+   */
+  private static Pipeline newPipelineFromNodes(List<DatanodeDescriptor> nodes,
+      String containerName) {
+    String leaderId = nodes.get(0).getDatanodeUuid();
+    Pipeline pipeline = new Pipeline(leaderId);
+    for (DatanodeDescriptor node : nodes) {
+      pipeline.addMember(node);
+    }
+    pipeline.setContainerName(containerName);
+    return pipeline;
+  }
+
   /**
    * Starts an RPC server, if configured.
    *
@@ -443,7 +527,7 @@ public class StorageContainerManager
    * @return RPC server, or null if addr is null
    * @throws IOException if there is an I/O error while creating RPC server
    */
-  private static RPC.Server startRpcServer(Configuration conf,
+  private static RPC.Server startRpcServer(OzoneConfiguration conf,
       InetSocketAddress addr, Class<?> protocol, BlockingService instance,
       String bindHostKey, String handlerCountKey, int handlerCountDefault)
       throws IOException {
@@ -480,7 +564,7 @@ public class StorageContainerManager
    * @param rpcServer started RPC server.  If null, then the server was not
    *     started, and this method is a no-op.
    */
-  private static InetSocketAddress updateListenAddress(Configuration conf,
+  private static InetSocketAddress updateListenAddress(OzoneConfiguration conf,
       String rpcAddressKey, InetSocketAddress addr, RPC.Server rpcServer) {
     if (rpcServer == null) {
       return null;
@@ -502,7 +586,7 @@ public class StorageContainerManager
     StringUtils.startupShutdownMessage(
         StorageContainerManager.class, argv, LOG);
     StorageContainerManager scm = new StorageContainerManager(
-        new Configuration());
+        new OzoneConfiguration());
     scm.start();
     scm.join();
   }

+ 6 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneBucket.java

@@ -174,9 +174,12 @@ public class OzoneBucket {
 
       InputStream is = new ByteArrayInputStream(data.getBytes(ENCODING));
       putRequest.setEntity(new InputStreamEntity(is, data.length()));
-      putRequest.setHeader(Header.CONTENT_MD5, DigestUtils.md5Hex(is));
-      putRequest
-          .setHeader(HttpHeaders.CONTENT_LENGTH, Long.toString(data.length()));
+      is.mark(data.length());
+      try {
+        putRequest.setHeader(Header.CONTENT_MD5, DigestUtils.md5Hex(is));
+      } finally {
+        is.reset();
+      }
       executePutKey(putRequest, httpClient);
 
     } catch (IOException | URISyntaxException ex) {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/exceptions/OzoneException.java

@@ -92,7 +92,7 @@ public class OzoneException extends Exception {
    */
   public OzoneException(long httpCode, String shortMessage, String message) {
     this.shortMessage = shortMessage;
-    this.resource = message;
+    this.message = message;
     this.httpCode = httpCode;
   }
 

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/request/OzoneQuota.java

@@ -77,6 +77,16 @@ public class OzoneQuota {
     this.unit = unit;
   }
 
+  /**
+   * Formats a quota as a string.
+   *
+   * @param quota the quota to format
+   * @return string representation of quota
+   */
+  public static String formatQuota(OzoneQuota quota) {
+    return String.valueOf(quota.size) + quota.unit;
+  }
+
   /**
    * Parses a user provided string and returns the
    * Quota Object.

+ 193 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkInputStream.java

@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.web.storage;
+
+import static org.apache.hadoop.ozone.web.storage.ContainerProtocolCalls.*;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient;
+import org.apache.hadoop.ozone.container.common.transport.client.XceiverClientManager;
+import org.apache.hadoop.ozone.web.exceptions.OzoneException;
+import org.apache.hadoop.ozone.web.handlers.UserArgs;
+
+/**
+ * An {@link InputStream} used by the REST service in combination with the
+ * {@link DistributedStorageHandler} to read the value of a key from a sequence
+ * of container chunks.  All bytes of the key value are stored in container
+ * chunks.  Each chunk may contain multiple underlying {@link ByteBuffer}
+ * instances.  This class encapsulates all state management for iterating
+ * through the sequence of chunks and the sequence of buffers within each chunk.
+ */
+class ChunkInputStream extends InputStream {
+
+  private static final int EOF = -1;
+
+  private final String key;
+  private final UserArgs args;
+  private XceiverClientManager xceiverClientManager;
+  private XceiverClient xceiverClient;
+  private List<ChunkInfo> chunks;
+  private int chunkOffset;
+  private List<ByteBuffer> buffers;
+  private int bufferOffset;
+
+  /**
+   * Creates a new ChunkInputStream.
+   *
+   * @param key chunk key
+   * @param xceiverClientManager client manager that controls client
+   * @param xceiverClient client to perform container calls
+   * @param chunks list of chunks to read
+   * @param args container protocol call args
+   */
+  public ChunkInputStream(String key, XceiverClientManager xceiverClientManager,
+      XceiverClient xceiverClient, List<ChunkInfo> chunks, UserArgs args) {
+    this.key = key;
+    this.args = args;
+    this.xceiverClientManager = xceiverClientManager;
+    this.xceiverClient = xceiverClient;
+    this.chunks = chunks;
+    this.chunkOffset = 0;
+    this.buffers = null;
+    this.bufferOffset = 0;
+  }
+
+  @Override
+  public synchronized int read()
+      throws IOException {
+    checkOpen();
+    int available = prepareRead(1);
+    return available == EOF ? EOF : buffers.get(bufferOffset).get();
+  }
+
+  @Override
+  public synchronized int read(byte[] b, int off, int len) throws IOException {
+    // According to the JavaDocs for InputStream, it is recommended that
+    // subclasses provide an override of bulk read if possible for performance
+    // reasons.  In addition to performance, we need to do it for correctness
+    // reasons.  The Ozone REST service uses PipedInputStream and
+    // PipedOutputStream to relay HTTP response data between a Jersey thread and
+    // a Netty thread.  It turns out that PipedInputStream/PipedOutputStream
+    // have a subtle dependency (bug?) on the wrapped stream providing separate
+    // implementations of single-byte read and bulk read.  Without this, get key
+    // responses might close the connection before writing all of the bytes
+    // advertised in the Content-Length.
+    if (b == null) {
+      throw new NullPointerException();
+    }
+    if (off < 0 || len < 0 || len > b.length - off) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (len == 0) {
+      return 0;
+    }
+    checkOpen();
+    int available = prepareRead(len);
+    if (available == EOF) {
+      return EOF;
+    }
+    buffers.get(bufferOffset).get(b, off, available);
+    return available;
+  }
+
+  @Override
+  public synchronized void close() {
+    if (xceiverClientManager != null && xceiverClient != null) {
+      xceiverClientManager.releaseClient(xceiverClient);
+      xceiverClientManager = null;
+      xceiverClient = null;
+    }
+  }
+
+  /**
+   * Checks if the stream is open.  If not, throws an exception.
+   *
+   * @throws IOException if stream is closed
+   */
+  private synchronized void checkOpen() throws IOException {
+    if (xceiverClient == null) {
+      throw new IOException("ChunkInputStream has been closed.");
+    }
+  }
+
+  /**
+   * Prepares to read by advancing through chunks and buffers as needed until it
+   * finds data to return or encounters EOF.
+   *
+   * @param len desired length of data to read
+   * @return length of data available to read, possibly less than desired length
+   */
+  private synchronized int prepareRead(int len) throws IOException {
+    for (;;) {
+      if (chunks == null || chunks.isEmpty()) {
+        // This must be an empty key.
+        return EOF;
+      } else if (buffers == null) {
+        // The first read triggers fetching the first chunk.
+        readChunkFromContainer(0);
+      } else if (!buffers.isEmpty() &&
+          buffers.get(bufferOffset).hasRemaining()) {
+        // Data is available from the current buffer.
+        ByteBuffer bb = buffers.get(bufferOffset);
+        return len > bb.remaining() ? bb.remaining() : len;
+      } else if (!buffers.isEmpty() &&
+          !buffers.get(bufferOffset).hasRemaining() &&
+          bufferOffset < buffers.size() - 1) {
+        // There are additional buffers available.
+        ++bufferOffset;
+      } else if (chunkOffset < chunks.size() - 1) {
+        // There are additional chunks available.
+        readChunkFromContainer(chunkOffset + 1);
+      } else {
+        // All available input has been consumed.
+        return EOF;
+      }
+    }
+  }
+
+  /**
+   * Attempts to read the chunk at the specified offset in the chunk list.  If
+   * successful, then the data of the read chunk is saved so that its bytes can
+   * be returned from subsequent read calls.
+   *
+   * @param readChunkOffset offset in the chunk list of which chunk to read
+   * @throws IOException if there is an I/O error while performing the call
+   */
+  private synchronized void readChunkFromContainer(int readChunkOffset)
+      throws IOException {
+    final ReadChunkResponseProto readChunkResponse;
+    try {
+      readChunkResponse = readChunk(xceiverClient, chunks.get(readChunkOffset),
+          key, args);
+    } catch (OzoneException e) {
+      throw new IOException("Unexpected OzoneException", e);
+    }
+    chunkOffset = readChunkOffset;
+    ByteString byteString = readChunkResponse.getData();
+    buffers = byteString.asReadOnlyByteBufferList();
+  }
+}

+ 193 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkOutputStream.java

@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.web.storage;
+
+import static org.apache.hadoop.ozone.OzoneConsts.CHUNK_SIZE;
+import static org.apache.hadoop.ozone.web.storage.ContainerProtocolCalls.*;
+import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
+import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient;
+import org.apache.hadoop.ozone.container.common.transport.client.XceiverClientManager;
+import org.apache.hadoop.ozone.web.exceptions.OzoneException;
+import org.apache.hadoop.ozone.web.handlers.UserArgs;
+import org.apache.hadoop.ozone.web.response.KeyInfo;
+
+/**
+ * An {@link OutputStream} used by the REST service in combination with the
+ * {@link DistributedStorageHandler} to write the value of a key to a sequence
+ * of container chunks.  Writes are buffered locally and periodically written to
+ * the container as a new chunk.  In order to preserve the semantics that
+ * replacement of a pre-existing key is atomic, each instance of the stream has
+ * an internal unique identifier.  This unique identifier and a monotonically
+ * increasing chunk index form a composite key that is used as the chunk name.
+ * After all data is written, a putKey call creates or updates the corresponding
+ * container key, and this call includes the full list of chunks that make up
+ * the key data.  The list of chunks is updated all at once.  Therefore, a
+ * concurrent reader never can see an intermediate state in which different
+ * chunks of data from different versions of the key data are interleaved.
+ * This class encapsulates all state management for buffering and writing
+ * through to the container.
+ */
+class ChunkOutputStream extends OutputStream {
+
+  private final String containerKey;
+  private final KeyInfo key;
+  private final UserArgs args;
+  private final KeyData.Builder containerKeyData;
+  private XceiverClientManager xceiverClientManager;
+  private XceiverClient xceiverClient;
+  private ByteBuffer buffer;
+  private final String streamId;
+  private int chunkIndex;
+
+  /**
+   * Creates a new ChunkOutputStream.
+   *
+   * @param containerKey container key
+   * @param key chunk key
+   * @param xceiverClientManager client manager that controls client
+   * @param xceiverClient client to perform container calls
+   * @param args container protocol call args
+   */
+  public ChunkOutputStream(String containerKey, KeyInfo key,
+      XceiverClientManager xceiverClientManager, XceiverClient xceiverClient,
+      UserArgs args) {
+    this.containerKey = containerKey;
+    this.key = key;
+    this.args = args;
+    this.containerKeyData = fromKeyToContainerKeyDataBuilder(
+        xceiverClient.getPipeline().getContainerName(), containerKey, key);
+    this.xceiverClientManager = xceiverClientManager;
+    this.xceiverClient = xceiverClient;
+    this.buffer = ByteBuffer.allocate(CHUNK_SIZE);
+    this.streamId = UUID.randomUUID().toString();
+    this.chunkIndex = 0;
+  }
+
+  @Override
+  public synchronized void write(int b) throws IOException {
+    checkOpen();
+    int rollbackPosition = buffer.position();
+    int rollbackLimit = buffer.limit();
+    buffer.put((byte)b);
+    if (buffer.position() == CHUNK_SIZE) {
+      flushBufferToChunk(rollbackPosition, rollbackLimit);
+    }
+  }
+
+  @Override
+  public synchronized void flush() throws IOException {
+    checkOpen();
+    if (buffer.position() > 0) {
+      int rollbackPosition = buffer.position();
+      int rollbackLimit = buffer.limit();
+      flushBufferToChunk(rollbackPosition, rollbackLimit);
+    }
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (xceiverClientManager != null && xceiverClient != null &&
+        buffer != null) {
+      try {
+        if (buffer.position() > 0) {
+          writeChunkToContainer();
+        }
+        putKey(xceiverClient, containerKeyData.build(), args);
+      } catch (OzoneException e) {
+        throw new IOException("Unexpected OzoneException", e);
+      } finally {
+        xceiverClientManager.releaseClient(xceiverClient);
+        xceiverClientManager = null;
+        xceiverClient = null;
+        buffer = null;
+      }
+    }
+
+  }
+
+  /**
+   * Checks if the stream is open.  If not, throws an exception.
+   *
+   * @throws IOException if stream is closed
+   */
+  private synchronized void checkOpen() throws IOException {
+    if (xceiverClient == null) {
+      throw new IOException("ChunkOutputStream has been closed.");
+    }
+  }
+
+  /**
+   * Attempts to flush buffered writes by writing a new chunk to the container.
+   * If successful, then clears the buffer to prepare to receive writes for a
+   * new chunk.
+   *
+   * @param rollbackPosition position to restore in buffer if write fails
+   * @param rollbackLimit limit to restore in buffer if write fails
+   * @throws IOException if there is an I/O error while performing the call
+   */
+  private synchronized void flushBufferToChunk(int rollbackPosition,
+      int rollbackLimit) throws IOException {
+    boolean success = false;
+    try {
+      writeChunkToContainer();
+      success = true;
+    } finally {
+      if (success) {
+        buffer.clear();
+      } else {
+        buffer.position(rollbackPosition);
+        buffer.limit(rollbackLimit);
+      }
+    }
+  }
+
+  /**
+   * Writes buffered data as a new chunk to the container and saves chunk
+   * information to be used later in putKey call.
+   *
+   * @throws IOException if there is an I/O error while performing the call
+   */
+  private synchronized void writeChunkToContainer() throws IOException {
+    buffer.flip();
+    ByteString data = ByteString.copyFrom(buffer);
+    ChunkInfo chunk = ChunkInfo
+        .newBuilder()
+        .setChunkName(
+            key.getKeyName() + "_stream_" + streamId + "_chunk_" + ++chunkIndex)
+        .setOffset(0)
+        .setLen(data.size())
+        .build();
+    try {
+      writeChunk(xceiverClient, chunk, key.getKeyName(), data, args);
+    } catch (OzoneException e) {
+      throw new IOException("Unexpected OzoneException", e);
+    }
+    containerKeyData.addChunks(chunk);
+  }
+}

+ 198 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ContainerProtocolCalls.java

@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.web.storage;
+
+import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
+import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
+
+import java.io.IOException;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyRequestProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyResponseProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.PutKeyRequestProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkRequestProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.WriteChunkRequestProto;
+import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient;
+import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
+import org.apache.hadoop.ozone.web.exceptions.OzoneException;
+import org.apache.hadoop.ozone.web.handlers.UserArgs;
+
+/**
+ * Implementation of all container protocol calls performed by
+ * {@link DistributedStorageHandler}.
+ */
+final class ContainerProtocolCalls {
+
+  /**
+   * Calls the container protocol to get a container key.
+   *
+   * @param xceiverClient client to perform call
+   * @param containerKeyData key data to identify container
+   * @param args container protocol call args
+   * @returns container protocol get key response
+   * @throws IOException if there is an I/O error while performing the call
+   * @throws OzoneException if the container protocol call failed
+   */
+  public static GetKeyResponseProto getKey(XceiverClient xceiverClient,
+      KeyData containerKeyData, UserArgs args) throws IOException,
+      OzoneException {
+    GetKeyRequestProto.Builder readKeyRequest = GetKeyRequestProto
+        .newBuilder()
+        .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
+        .setKeyData(containerKeyData);
+    ContainerCommandRequestProto request = ContainerCommandRequestProto
+        .newBuilder()
+        .setCmdType(Type.GetKey)
+        .setTraceID(args.getRequestID())
+        .setGetKey(readKeyRequest)
+        .build();
+    ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
+    validateContainerResponse(response, args);
+    return response.getGetKey();
+  }
+
+  /**
+   * Calls the container protocol to put a container key.
+   *
+   * @param xceiverClient client to perform call
+   * @param containerKeyData key data to identify container
+   * @param args container protocol call args
+   * @throws IOException if there is an I/O error while performing the call
+   * @throws OzoneException if the container protocol call failed
+   */
+  public static void putKey(XceiverClient xceiverClient,
+      KeyData containerKeyData, UserArgs args) throws IOException,
+      OzoneException {
+    PutKeyRequestProto.Builder createKeyRequest = PutKeyRequestProto
+        .newBuilder()
+        .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
+        .setKeyData(containerKeyData);
+    ContainerCommandRequestProto request = ContainerCommandRequestProto
+        .newBuilder()
+        .setCmdType(Type.PutKey)
+        .setTraceID(args.getRequestID())
+        .setPutKey(createKeyRequest)
+        .build();
+    ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
+    validateContainerResponse(response, args);
+  }
+
+  /**
+   * Calls the container protocol to read a chunk.
+   *
+   * @param xceiverClient client to perform call
+   * @param chunk information about chunk to read
+   * @param key the key name
+   * @param args container protocol call args
+   * @returns container protocol read chunk response
+   * @throws IOException if there is an I/O error while performing the call
+   * @throws OzoneException if the container protocol call failed
+   */
+  public static ReadChunkResponseProto readChunk(XceiverClient xceiverClient,
+      ChunkInfo chunk, String key, UserArgs args)
+      throws IOException, OzoneException {
+    ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto
+        .newBuilder()
+        .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
+        .setKeyName(key)
+        .setChunkData(chunk);
+    ContainerCommandRequestProto request = ContainerCommandRequestProto
+        .newBuilder()
+        .setCmdType(Type.ReadChunk)
+        .setTraceID(args.getRequestID())
+        .setReadChunk(readChunkRequest)
+        .build();
+    ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
+    validateContainerResponse(response, args);
+    return response.getReadChunk();
+  }
+
+  /**
+   * Calls the container protocol to write a chunk.
+   *
+   * @param xceiverClient client to perform call
+   * @param chunk information about chunk to write
+   * @param key the key name
+   * @param data the data of the chunk to write
+   * @param args container protocol call args
+   * @throws IOException if there is an I/O error while performing the call
+   * @throws OzoneException if the container protocol call failed
+   */
+  public static void writeChunk(XceiverClient xceiverClient, ChunkInfo chunk,
+      String key, ByteString data, UserArgs args)
+      throws IOException, OzoneException {
+    WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto
+        .newBuilder()
+        .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
+        .setKeyName(key)
+        .setChunkData(chunk)
+        .setData(data);
+    ContainerCommandRequestProto request = ContainerCommandRequestProto
+        .newBuilder()
+        .setCmdType(Type.WriteChunk)
+        .setTraceID(args.getRequestID())
+        .setWriteChunk(writeChunkRequest)
+        .build();
+    ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
+    validateContainerResponse(response, args);
+  }
+
+  /**
+   * Validates a response from a container protocol call.  Any non-successful
+   * return code is mapped to a corresponding exception and thrown.
+   *
+   * @param response container protocol call response
+   * @param args container protocol call args
+   * @throws OzoneException if the container protocol call failed
+   */
+  private static void validateContainerResponse(
+      ContainerCommandResponseProto response, UserArgs args)
+      throws OzoneException {
+    switch (response.getResult()) {
+    case SUCCESS:
+      break;
+    case MALFORMED_REQUEST:
+      throw ErrorTable.newError(new OzoneException(HTTP_BAD_REQUEST,
+          "badRequest", "Bad container request."), args);
+    case UNSUPPORTED_REQUEST:
+      throw ErrorTable.newError(new OzoneException(HTTP_INTERNAL_ERROR,
+          "internalServerError", "Unsupported container request."), args);
+    case CONTAINER_INTERNAL_ERROR:
+      throw ErrorTable.newError(new OzoneException(HTTP_INTERNAL_ERROR,
+          "internalServerError", "Container internal error."), args);
+    default:
+      throw ErrorTable.newError(new OzoneException(HTTP_INTERNAL_ERROR,
+          "internalServerError", "Unrecognized container response."), args);
+    }
+  }
+
+  /**
+   * There is no need to instantiate this class.
+   */
+  private ContainerProtocolCalls() {
+  }
+}

+ 209 - 57
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java

@@ -18,7 +18,32 @@
 
 package org.apache.hadoop.ozone.web.storage;
 
+import static org.apache.hadoop.ozone.web.storage.ContainerProtocolCalls.*;
+import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.TimeZone;
+
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyResponseProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
+import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient;
+import org.apache.hadoop.ozone.container.common.transport.client.XceiverClientManager;
+import org.apache.hadoop.ozone.protocol.LocatedContainer;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.ozone.web.exceptions.OzoneException;
 import org.apache.hadoop.ozone.web.handlers.BucketArgs;
 import org.apache.hadoop.ozone.web.handlers.KeyArgs;
@@ -27,13 +52,13 @@ import org.apache.hadoop.ozone.web.handlers.UserArgs;
 import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
 import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
 import org.apache.hadoop.ozone.web.response.BucketInfo;
+import org.apache.hadoop.ozone.web.response.KeyInfo;
 import org.apache.hadoop.ozone.web.response.ListBuckets;
 import org.apache.hadoop.ozone.web.response.ListKeys;
 import org.apache.hadoop.ozone.web.response.ListVolumes;
 import org.apache.hadoop.ozone.web.response.VolumeInfo;
-
-import java.io.IOException;
-import java.io.OutputStream;
+import org.apache.hadoop.ozone.web.response.VolumeOwner;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * A {@link StorageHandler} implementation that distributes object storage
@@ -41,156 +66,283 @@ import java.io.OutputStream;
  */
 public final class DistributedStorageHandler implements StorageHandler {
 
-  @Override
-  public void createVolume(VolumeArgs args) throws
-      IOException, OzoneException {
+  private final StorageContainerLocationProtocolClientSideTranslatorPB
+      storageContainerLocation;
+  private final XceiverClientManager xceiverClientManager;
+
+  /**
+   * Creates a new DistributedStorageHandler.
+   *
+   * @param conf configuration
+   * @param storageContainerLocation StorageContainerLocationProtocol proxy
+   */
+  public DistributedStorageHandler(OzoneConfiguration conf,
+      StorageContainerLocationProtocolClientSideTranslatorPB
+      storageContainerLocation) {
+    this.storageContainerLocation = storageContainerLocation;
+    this.xceiverClientManager = new XceiverClientManager(conf);
+  }
 
+  @Override
+  public void createVolume(VolumeArgs args) throws IOException, OzoneException {
+    String containerKey = buildContainerKey(args.getVolumeName());
+    XceiverClient xceiverClient = acquireXceiverClient(containerKey);
+    try {
+      VolumeInfo volume = new VolumeInfo();
+      volume.setVolumeName(args.getVolumeName());
+      volume.setQuota(args.getQuota());
+      volume.setOwner(new VolumeOwner(args.getUserName()));
+      volume.setCreatedOn(dateToString(new Date()));
+      volume.setCreatedBy(args.getAdminName());
+      KeyData containerKeyData = fromVolumeToContainerKeyData(
+          xceiverClient.getPipeline().getContainerName(), containerKey, volume);
+      putKey(xceiverClient, containerKeyData, args);
+    } finally {
+      xceiverClientManager.releaseClient(xceiverClient);
+    }
   }
 
   @Override
   public void setVolumeOwner(VolumeArgs args) throws
       IOException, OzoneException {
-
+    throw new UnsupportedOperationException("setVolumeOwner not implemented");
   }
 
   @Override
   public void setVolumeQuota(VolumeArgs args, boolean remove)
       throws IOException, OzoneException {
-
+    throw new UnsupportedOperationException("setVolumeQuota not implemented");
   }
 
   @Override
   public boolean checkVolumeAccess(VolumeArgs args)
       throws IOException, OzoneException {
-    return false;
+    throw new UnsupportedOperationException("checkVolumeAccessnot implemented");
   }
 
   @Override
   public ListVolumes listVolumes(UserArgs args)
       throws IOException, OzoneException {
-    return null;
+    throw new UnsupportedOperationException("listVolumes not implemented");
   }
 
   @Override
   public void deleteVolume(VolumeArgs args)
       throws IOException, OzoneException {
-
+    throw new UnsupportedOperationException("deleteVolume not implemented");
   }
 
   @Override
   public VolumeInfo getVolumeInfo(VolumeArgs args)
       throws IOException, OzoneException {
-    return null;
+    String containerKey = buildContainerKey(args.getVolumeName());
+    XceiverClient xceiverClient = acquireXceiverClient(containerKey);
+    try {
+      KeyData containerKeyData = containerKeyDataForRead(
+          xceiverClient.getPipeline().getContainerName(), containerKey);
+      GetKeyResponseProto response = getKey(xceiverClient, containerKeyData,
+          args);
+      return fromContainerKeyValueListToVolume(
+          response.getKeyData().getMetadataList());
+    } finally {
+      xceiverClientManager.releaseClient(xceiverClient);
+    }
   }
 
   @Override
-  public void createBucket(BucketArgs args)
+  public void createBucket(final BucketArgs args)
       throws IOException, OzoneException {
-
+    String containerKey = buildContainerKey(args.getVolumeName(),
+        args.getBucketName());
+    XceiverClient xceiverClient = acquireXceiverClient(containerKey);
+    try {
+      BucketInfo bucket = new BucketInfo();
+      bucket.setVolumeName(args.getVolumeName());
+      bucket.setBucketName(args.getBucketName());
+      bucket.setAcls(args.getAddAcls());
+      bucket.setVersioning(args.getVersioning());
+      bucket.setStorageType(args.getStorageType());
+      KeyData containerKeyData = fromBucketToContainerKeyData(
+          xceiverClient.getPipeline().getContainerName(), containerKey, bucket);
+      putKey(xceiverClient, containerKeyData, args);
+    } finally {
+      xceiverClientManager.releaseClient(xceiverClient);
+    }
   }
 
   @Override
   public void setBucketAcls(BucketArgs args)
       throws IOException, OzoneException {
-
+    throw new UnsupportedOperationException("setBucketAcls not implemented");
   }
 
   @Override
   public void setBucketVersioning(BucketArgs args)
       throws IOException, OzoneException {
-
+    throw new UnsupportedOperationException(
+        "setBucketVersioning not implemented");
   }
 
   @Override
   public void setBucketStorageClass(BucketArgs args)
       throws IOException, OzoneException {
-
+    throw new UnsupportedOperationException(
+        "setBucketStorageClass not implemented");
   }
 
   @Override
   public void deleteBucket(BucketArgs args)
       throws IOException, OzoneException {
-
+    throw new UnsupportedOperationException("deleteBucket not implemented");
   }
 
   @Override
   public void checkBucketAccess(BucketArgs args)
       throws IOException, OzoneException {
-
+    throw new UnsupportedOperationException(
+        "checkBucketAccess not implemented");
   }
 
   @Override
   public ListBuckets listBuckets(VolumeArgs args)
       throws IOException, OzoneException {
-    return null;
+    throw new UnsupportedOperationException("listBuckets not implemented");
   }
 
   @Override
   public BucketInfo getBucketInfo(BucketArgs args)
       throws IOException, OzoneException {
-    return null;
+    String containerKey = buildContainerKey(args.getVolumeName(),
+        args.getBucketName());
+    XceiverClient xceiverClient = acquireXceiverClient(containerKey);
+    try {
+      KeyData containerKeyData = containerKeyDataForRead(
+          xceiverClient.getPipeline().getContainerName(), containerKey);
+      GetKeyResponseProto response = getKey(xceiverClient, containerKeyData,
+          args);
+      return fromContainerKeyValueListToBucket(
+          response.getKeyData().getMetadataList());
+    } finally {
+      xceiverClientManager.releaseClient(xceiverClient);
+    }
   }
 
-  /**
-   * Writes a key in an existing bucket.
-   *
-   * @param args KeyArgs
-   * @return InputStream
-   * @throws OzoneException
-   */
   @Override
   public OutputStream newKeyWriter(KeyArgs args) throws IOException,
       OzoneException {
-    return null;
+    String containerKey = buildContainerKey(args.getVolumeName(),
+        args.getBucketName(), args.getKeyName());
+    KeyInfo key = new KeyInfo();
+    key.setKeyName(args.getKeyName());
+    key.setCreatedOn(dateToString(new Date()));
+    XceiverClient xceiverClient = acquireXceiverClient(containerKey);
+    return new ChunkOutputStream(containerKey, key, xceiverClientManager,
+        xceiverClient, args);
   }
 
-  /**
-   * Tells the file system that the object has been written out completely and
-   * it can do any house keeping operation that needs to be done.
-   *
-   * @param args   Key Args
-   * @param stream
-   * @throws IOException
-   */
   @Override
   public void commitKey(KeyArgs args, OutputStream stream) throws
       IOException, OzoneException {
+    stream.close();
+  }
+
+  @Override
+  public LengthInputStream newKeyReader(KeyArgs args) throws IOException,
+      OzoneException {
+    String containerKey = buildContainerKey(args.getVolumeName(),
+        args.getBucketName(), args.getKeyName());
+    XceiverClient xceiverClient = acquireXceiverClient(containerKey);
+    boolean success = false;
+    try {
+      KeyData containerKeyData = containerKeyDataForRead(
+          xceiverClient.getPipeline().getContainerName(), containerKey);
+      GetKeyResponseProto response = getKey(xceiverClient, containerKeyData,
+          args);
+      long length = 0;
+      List<ChunkInfo> chunks = response.getKeyData().getChunksList();
+      for (ChunkInfo chunk : chunks) {
+        length += chunk.getLen();
+      }
+      success = true;
+      return new LengthInputStream(new ChunkInputStream(
+          containerKey, xceiverClientManager, xceiverClient, chunks, args),
+          length);
+    } finally {
+      if (!success) {
+        xceiverClientManager.releaseClient(xceiverClient);
+      }
+    }
+  }
+
+  @Override
+  public void deleteKey(KeyArgs args) throws IOException, OzoneException {
+    throw new UnsupportedOperationException("deleteKey not implemented");
+  }
 
+  @Override
+  public ListKeys listKeys(ListArgs args) throws IOException, OzoneException {
+    throw new UnsupportedOperationException("listKeys not implemented");
   }
 
   /**
-   * Reads a key from an existing bucket.
+   * Acquires an {@link XceiverClient} connected to a {@link Pipeline} of nodes
+   * capable of serving container protocol operations.  The container is
+   * selected based on the specified container key.
    *
-   * @param args KeyArgs
-   * @return LengthInputStream
-   * @throws IOException
+   * @param containerKey container key
+   * @return XceiverClient connected to a container
+   * @throws IOException if an XceiverClient cannot be acquired
    */
-  @Override
-  public LengthInputStream newKeyReader(KeyArgs args) throws IOException,
-      OzoneException {
-    return null;
+  private XceiverClient acquireXceiverClient(String containerKey)
+      throws IOException {
+    Set<LocatedContainer> locatedContainers =
+        storageContainerLocation.getStorageContainerLocations(
+            new HashSet<>(Arrays.asList(containerKey)));
+    Pipeline pipeline = newPipelineFromLocatedContainer(
+        locatedContainers.iterator().next());
+    return xceiverClientManager.acquireClient(pipeline);
   }
 
   /**
-   * Deletes an existing key.
+   * Creates a container key from any number of components by combining all
+   * components with a delimiter.
    *
-   * @param args KeyArgs
-   * @throws OzoneException
+   * @param parts container key components
+   * @return container key
    */
-  @Override
-  public void deleteKey(KeyArgs args) throws IOException, OzoneException {
+  private static String buildContainerKey(String... parts) {
+    return '/' + StringUtils.join('/', parts);
+  }
 
+  /**
+   * Formats a date in the expected string format.
+   *
+   * @param date the date to format
+   * @return formatted string representation of date
+   */
+  private static String dateToString(Date date) {
+    SimpleDateFormat sdf =
+        new SimpleDateFormat(OzoneConsts.OZONE_DATE_FORMAT, Locale.US);
+    sdf.setTimeZone(TimeZone.getTimeZone(OzoneConsts.OZONE_TIME_ZONE));
+    return sdf.format(date);
   }
 
   /**
-   * Returns a list of Key.
+   * Translates a set of container locations, ordered such that the first is the
+   * leader, into a corresponding {@link Pipeline} object.
    *
-   * @param args KeyArgs
-   * @return BucketList
-   * @throws IOException
+   * @param locatedContainer container location
+   * @return pipeline corresponding to container locations
    */
-  @Override
-  public ListKeys listKeys(ListArgs args) throws IOException, OzoneException {
-    return null;
+  private static Pipeline newPipelineFromLocatedContainer(
+      LocatedContainer locatedContainer) {
+    Set<DatanodeInfo> locations = locatedContainer.getLocations();
+    String leaderId = locations.iterator().next().getDatanodeUuid();
+    Pipeline pipeline = new Pipeline(leaderId);
+    for (DatanodeInfo location : locations) {
+      pipeline.addMember(location);
+    }
+    pipeline.setContainerName(locatedContainer.getContainerName());
+    return pipeline;
   }
 }

+ 261 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/OzoneContainerTranslation.java

@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.web.storage;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyValue;
+import org.apache.hadoop.ozone.OzoneConsts.Versioning;
+import org.apache.hadoop.ozone.web.request.OzoneQuota;
+import org.apache.hadoop.ozone.web.response.BucketInfo;
+import org.apache.hadoop.ozone.web.response.KeyInfo;
+import org.apache.hadoop.ozone.web.response.VolumeInfo;
+import org.apache.hadoop.ozone.web.response.VolumeOwner;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This class contains methods that define the translation between the Ozone
+ * domain model and the storage container domain model.
+ */
+final class OzoneContainerTranslation {
+
+  private static final String ACLS = "ACLS";
+  private static final String BUCKET = "BUCKET";
+  private static final String BUCKET_NAME = "BUCKET_NAME";
+  private static final String CREATED_BY = "CREATED_BY";
+  private static final String CREATED_ON = "CREATED_ON";
+  private static final String KEY = "KEY";
+  private static final String OWNER = "OWNER";
+  private static final String QUOTA = "QUOTA";
+  private static final String STORAGE_TYPE = "STORAGE_TYPE";
+  private static final String TYPE = "TYPE";
+  private static final String VERSIONING = "VERSIONING";
+  private static final String VOLUME = "VOLUME";
+  private static final String VOLUME_NAME = "VOLUME_NAME";
+
+  /**
+   * Creates key data intended for reading a container key.
+   *
+   * @param containerName container name
+   * @param containerKey container key
+   * @return KeyData intended for reading the container key
+   */
+  public static KeyData containerKeyDataForRead(String containerName,
+      String containerKey) {
+    return KeyData
+        .newBuilder()
+        .setContainerName(containerName)
+        .setName(containerKey)
+        .build();
+  }
+
+  /**
+   * Translates a bucket to its container representation.
+   *
+   * @param containerName container name
+   * @param containerKey container key
+   * @param bucket the bucket to translate
+   * @return KeyData representation of bucket
+   */
+  public static KeyData fromBucketToContainerKeyData(
+      String containerName, String containerKey, BucketInfo bucket) {
+    KeyData.Builder containerKeyData = KeyData
+        .newBuilder()
+        .setContainerName(containerName)
+        .setName(containerKey)
+        .addMetadata(newKeyValue(TYPE, BUCKET))
+        .addMetadata(newKeyValue(VOLUME_NAME, bucket.getVolumeName()))
+        .addMetadata(newKeyValue(BUCKET_NAME, bucket.getBucketName()));
+
+    if (bucket.getAcls() != null) {
+      containerKeyData.addMetadata(newKeyValue(ACLS,
+          StringUtils.join(',', bucket.getAcls())));
+    }
+
+    if (bucket.getVersioning() != null &&
+        bucket.getVersioning() != Versioning.NOT_DEFINED) {
+      containerKeyData.addMetadata(newKeyValue(VERSIONING,
+          bucket.getVersioning().name()));
+    }
+
+    if (bucket.getStorageType() != StorageType.RAM_DISK) {
+      containerKeyData.addMetadata(newKeyValue(STORAGE_TYPE,
+          bucket.getStorageType().name()));
+    }
+
+    return containerKeyData.build();
+  }
+
+  /**
+   * Translates a bucket from its container representation.
+   *
+   * @param metadata container metadata representing the bucket
+   * @return bucket translated from container representation
+   */
+  public static BucketInfo fromContainerKeyValueListToBucket(
+      List<KeyValue> metadata) {
+    BucketInfo bucket = new BucketInfo();
+    for (KeyValue keyValue : metadata) {
+      switch (keyValue.getKey()) {
+      case VOLUME_NAME:
+        bucket.setVolumeName(keyValue.getValue());
+        break;
+      case BUCKET_NAME:
+        bucket.setBucketName(keyValue.getValue());
+        break;
+      case VERSIONING:
+        bucket.setVersioning(
+            Enum.valueOf(Versioning.class, keyValue.getValue()));
+        break;
+      case STORAGE_TYPE:
+        bucket.setStorageType(
+            Enum.valueOf(StorageType.class, keyValue.getValue()));
+        break;
+      default:
+        break;
+      }
+    }
+    return bucket;
+  }
+
+  /**
+   * Translates a volume from its container representation.
+   *
+   * @param metadata container metadata representing the volume
+   * @return volume translated from container representation
+   */
+  public static VolumeInfo fromContainerKeyValueListToVolume(
+      List<KeyValue> metadata) {
+    VolumeInfo volume = new VolumeInfo();
+    for (KeyValue keyValue : metadata) {
+      switch (keyValue.getKey()) {
+      case VOLUME_NAME:
+        volume.setVolumeName(keyValue.getValue());
+        break;
+      case CREATED_BY:
+        volume.setCreatedBy(keyValue.getValue());
+        break;
+      case CREATED_ON:
+        volume.setCreatedOn(keyValue.getValue());
+        break;
+      case OWNER:
+        volume.setOwner(new VolumeOwner(keyValue.getValue()));
+        break;
+      case QUOTA:
+        volume.setQuota(OzoneQuota.parseQuota(keyValue.getValue()));
+        break;
+      default:
+        break;
+      }
+    }
+    return volume;
+  }
+
+  /**
+   * Translates a key to its container representation.
+   *
+   * @param containerName container name
+   * @param containerKey container key
+   * @param keyInfo key information received from call
+   * @return KeyData intended for reading the container key
+   */
+  public static KeyData fromKeyToContainerKeyData(String containerName,
+      String containerKey, KeyInfo key) {
+    return KeyData
+        .newBuilder()
+        .setContainerName(containerName)
+        .setName(containerKey)
+        .addMetadata(newKeyValue(TYPE, KEY))
+        .build();
+  }
+
+  /**
+   * Translates a key to its container representation.  The return value is a
+   * builder that can be manipulated further before building the result.
+   *
+   * @param containerName container name
+   * @param containerKey container key
+   * @param keyInfo key information received from call
+   * @return KeyData builder
+   */
+  public static KeyData.Builder fromKeyToContainerKeyDataBuilder(
+      String containerName, String containerKey, KeyInfo key) {
+    return KeyData
+        .newBuilder()
+        .setContainerName(containerName)
+        .setName(containerKey)
+        .addMetadata(newKeyValue(TYPE, KEY));
+  }
+
+  /**
+   * Translates a volume to its container representation.
+   *
+   * @param containerName container name
+   * @param containerKey container key
+   * @param volume the volume to translate
+   * @return KeyData representation of volume
+   */
+  public static KeyData fromVolumeToContainerKeyData(
+      String containerName, String containerKey, VolumeInfo volume) {
+    KeyData.Builder containerKeyData = KeyData
+        .newBuilder()
+        .setContainerName(containerName)
+        .setName(containerKey)
+        .addMetadata(newKeyValue(TYPE, VOLUME))
+        .addMetadata(newKeyValue(VOLUME_NAME, volume.getVolumeName()))
+        .addMetadata(newKeyValue(CREATED_ON, volume.getCreatedOn()));
+
+    if (volume.getQuota() != null && volume.getQuota().sizeInBytes() != -1L) {
+      containerKeyData.addMetadata(newKeyValue(QUOTA,
+          OzoneQuota.formatQuota(volume.getQuota())));
+    }
+
+    if (volume.getOwner() != null && volume.getOwner().getName() != null &&
+        !volume.getOwner().getName().isEmpty()) {
+      containerKeyData.addMetadata(newKeyValue(OWNER,
+          volume.getOwner().getName()));
+    }
+
+    if (volume.getCreatedBy() != null && !volume.getCreatedBy().isEmpty()) {
+      containerKeyData.addMetadata(
+          newKeyValue(CREATED_BY, volume.getCreatedBy()));
+    }
+
+    return containerKeyData.build();
+  }
+
+  /**
+   * Translates a key-value pair to its container representation.
+   *
+   * @param key the key
+   * @param value the value
+   * @return container representation of key-value pair
+   */
+  private static KeyValue newKeyValue(String key, Object value) {
+    return KeyValue.newBuilder().setKey(key).setValue(value.toString()).build();
+  }
+
+  /**
+   * There is no need to instantiate this class.
+   */
+  private OzoneContainerTranslation() {
+  }
+}

+ 53 - 18
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java

@@ -23,6 +23,10 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_ADDRESS_KE
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.util.Random;
+
+import com.google.common.base.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,6 +41,8 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolPB;
 import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.ozone.storage.StorageContainerManager;
+import org.apache.hadoop.ozone.web.client.OzoneClient;
+import org.apache.hadoop.ozone.web.exceptions.OzoneException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Time;
 
@@ -53,6 +59,8 @@ public class MiniOzoneCluster extends MiniDFSCluster implements Closeable {
   private static final Logger LOG =
       LoggerFactory.getLogger(MiniOzoneCluster.class);
 
+  private static final String USER_AUTH = "hdfs";
+
   private final OzoneConfiguration conf;
   private final StorageContainerManager scm;
 
@@ -126,24 +134,26 @@ public class MiniOzoneCluster extends MiniDFSCluster implements Closeable {
   }
 
   /**
-   * Waits for the Ozone cluster to be ready for processing requests.
+   * Creates an {@link OzoneClient} connected to this cluster's REST service.
+   * Callers take ownership of the client and must close it when done.
+   *
+   * @return OzoneClient connected to this cluster's REST service
+   * @throws OzoneException if Ozone encounters an error creating the client
    */
-  public void waitOzoneReady() {
-    long begin = Time.monotonicNow();
-    while (scm.getDatanodeReport(DatanodeReportType.LIVE).length <
-        numDataNodes) {
-      if (Time.monotonicNow() - begin > 20000) {
-        throw new IllegalStateException(
-            "Timed out waiting for Ozone cluster to become ready.");
-      }
-      LOG.info("Waiting for Ozone cluster to become ready");
-      try {
-        Thread.sleep(100);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new IllegalStateException(
-            "Interrupted while waiting for Ozone cluster to become ready.");
-      }
+  public OzoneClient createOzoneClient() throws OzoneException {
+    Preconditions.checkState(!getDataNodes().isEmpty(),
+        "Cannot create OzoneClient if the cluster has no DataNodes.");
+    // An Ozone request may originate at any DataNode, so pick one at random.
+    int dnIndex = new Random().nextInt(getDataNodes().size());
+    String uri = String.format("http://127.0.0.1:%d",
+        getDataNodes().get(dnIndex).getInfoPort());
+    LOG.info("Creating Ozone client to DataNode {} with URI {} and user {}",
+        dnIndex, uri, USER_AUTH);
+    try {
+      return new OzoneClient(uri, USER_AUTH);
+    } catch (URISyntaxException e) {
+      // We control the REST service URI, so it should never be invalid.
+      throw new IllegalStateException("Unexpected URISyntaxException", e);
     }
   }
 
@@ -155,14 +165,39 @@ public class MiniOzoneCluster extends MiniDFSCluster implements Closeable {
    * @return RPC proxy for accessing container location information
    * @throws IOException if there is an I/O error
    */
-  protected StorageContainerLocationProtocolClientSideTranslatorPB
+  public StorageContainerLocationProtocolClientSideTranslatorPB
       createStorageContainerLocationClient() throws IOException {
     long version = RPC.getProtocolVersion(
         StorageContainerLocationProtocolPB.class);
     InetSocketAddress address = scm.getStorageContainerLocationRpcAddress();
+    LOG.info(
+        "Creating StorageContainerLocationProtocol RPC client with address {}",
+        address);
     return new StorageContainerLocationProtocolClientSideTranslatorPB(
         RPC.getProxy(StorageContainerLocationProtocolPB.class, version,
         address, UserGroupInformation.getCurrentUser(), conf,
         NetUtils.getDefaultSocketFactory(conf), Client.getTimeout(conf)));
   }
+
+  /**
+   * Waits for the Ozone cluster to be ready for processing requests.
+   */
+  public void waitOzoneReady() {
+    long begin = Time.monotonicNow();
+    while (scm.getDatanodeReport(DatanodeReportType.LIVE).length <
+        numDataNodes) {
+      if (Time.monotonicNow() - begin > 20000) {
+        throw new IllegalStateException(
+            "Timed out waiting for Ozone cluster to become ready.");
+      }
+      LOG.info("Waiting for Ozone cluster to become ready");
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IllegalStateException(
+            "Interrupted while waiting for Ozone cluster to become ready.");
+      }
+    }
+  }
 }

+ 253 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestOzoneRestWithMiniCluster.java

@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.web;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.ozone.OzoneConsts.CHUNK_SIZE;
+import static org.junit.Assert.*;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.web.client.OzoneBucket;
+import org.apache.hadoop.ozone.web.client.OzoneClient;
+import org.apache.hadoop.ozone.web.client.OzoneVolume;
+import org.apache.hadoop.ozone.web.request.OzoneQuota;
+
+/**
+ * End-to-end testing of Ozone REST operations.
+ */
+public class TestOzoneRestWithMiniCluster {
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+  private static int idSuffix;
+  private static OzoneClient ozoneClient;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_ENABLED_KEY, true);
+    conf.set(OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY, "distributed");
+    conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_TRACE_ENABLED_KEY, true);
+    cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(3).build();
+    cluster.waitOzoneReady();
+    ozoneClient = cluster.createOzoneClient();
+  }
+
+  @AfterClass
+  public static void shutdown() throws InterruptedException {
+    IOUtils.cleanup(null, ozoneClient, cluster);
+  }
+
+  @Test
+  public void testCreateAndGetVolume() throws Exception {
+    String volumeName = nextId("volume");
+    OzoneVolume volume = ozoneClient.createVolume(volumeName, "bilbo", "100TB");
+    assertNotNull(volume);
+    assertEquals(volumeName, volume.getVolumeName());
+    assertEquals(ozoneClient.getUserAuth(), volume.getCreatedby());
+    assertEquals("bilbo", volume.getOwnerName());
+    assertNotNull(volume.getQuota());
+    assertEquals(OzoneQuota.parseQuota("100TB").sizeInBytes(),
+        volume.getQuota().sizeInBytes());
+    volume = ozoneClient.getVolume(volumeName);
+    assertNotNull(volume);
+    assertEquals(volumeName, volume.getVolumeName());
+    assertEquals(ozoneClient.getUserAuth(), volume.getCreatedby());
+    assertEquals("bilbo", volume.getOwnerName());
+    assertNotNull(volume.getQuota());
+    assertEquals(OzoneQuota.parseQuota("100TB").sizeInBytes(),
+        volume.getQuota().sizeInBytes());
+  }
+
+  @Test
+  public void testCreateAndGetBucket() throws Exception {
+    String volumeName = nextId("volume");
+    String bucketName = nextId("bucket");
+    OzoneVolume volume = ozoneClient.createVolume(volumeName, "bilbo", "100TB");
+    assertNotNull(volume);
+    assertEquals(volumeName, volume.getVolumeName());
+    assertEquals(ozoneClient.getUserAuth(), volume.getCreatedby());
+    assertEquals("bilbo", volume.getOwnerName());
+    assertNotNull(volume.getQuota());
+    assertEquals(OzoneQuota.parseQuota("100TB").sizeInBytes(),
+        volume.getQuota().sizeInBytes());
+    OzoneBucket bucket = volume.createBucket(bucketName);
+    assertNotNull(bucket);
+    assertEquals(bucketName, bucket.getBucketName());
+    bucket = volume.getBucket(bucketName);
+    assertNotNull(bucket);
+    assertEquals(bucketName, bucket.getBucketName());
+  }
+
+  @Test
+  public void testPutAndGetKey() throws Exception {
+    String volumeName = nextId("volume");
+    String bucketName = nextId("bucket");
+    String keyName = nextId("key");
+    String keyData = nextId("data");
+    OzoneVolume volume = ozoneClient.createVolume(volumeName, "bilbo", "100TB");
+    assertNotNull(volume);
+    assertEquals(volumeName, volume.getVolumeName());
+    assertEquals(ozoneClient.getUserAuth(), volume.getCreatedby());
+    assertEquals("bilbo", volume.getOwnerName());
+    assertNotNull(volume.getQuota());
+    assertEquals(OzoneQuota.parseQuota("100TB").sizeInBytes(),
+        volume.getQuota().sizeInBytes());
+    OzoneBucket bucket = volume.createBucket(bucketName);
+    assertNotNull(bucket);
+    assertEquals(bucketName, bucket.getBucketName());
+    bucket.putKey(keyName, keyData);
+    assertEquals(keyData, bucket.getKey(keyName));
+  }
+
+  @Test
+  public void testPutAndGetEmptyKey() throws Exception {
+    String volumeName = nextId("volume");
+    String bucketName = nextId("bucket");
+    String keyName = nextId("key");
+    String keyData = "";
+    OzoneVolume volume = ozoneClient.createVolume(volumeName, "bilbo", "100TB");
+    assertNotNull(volume);
+    assertEquals(volumeName, volume.getVolumeName());
+    assertEquals(ozoneClient.getUserAuth(), volume.getCreatedby());
+    assertEquals("bilbo", volume.getOwnerName());
+    assertNotNull(volume.getQuota());
+    assertEquals(OzoneQuota.parseQuota("100TB").sizeInBytes(),
+        volume.getQuota().sizeInBytes());
+    OzoneBucket bucket = volume.createBucket(bucketName);
+    assertNotNull(bucket);
+    assertEquals(bucketName, bucket.getBucketName());
+    bucket.putKey(keyName, keyData);
+    assertEquals(keyData, bucket.getKey(keyName));
+  }
+
+  @Test
+  public void testPutAndGetMultiChunkKey() throws Exception {
+    String volumeName = nextId("volume");
+    String bucketName = nextId("bucket");
+    String keyName = nextId("key");
+    int keyDataLen = 3 * CHUNK_SIZE;
+    String keyData = buildKeyData(keyDataLen);
+    OzoneVolume volume = ozoneClient.createVolume(volumeName, "bilbo", "100TB");
+    assertNotNull(volume);
+    assertEquals(volumeName, volume.getVolumeName());
+    assertEquals(ozoneClient.getUserAuth(), volume.getCreatedby());
+    assertEquals("bilbo", volume.getOwnerName());
+    assertNotNull(volume.getQuota());
+    assertEquals(OzoneQuota.parseQuota("100TB").sizeInBytes(),
+        volume.getQuota().sizeInBytes());
+    OzoneBucket bucket = volume.createBucket(bucketName);
+    assertNotNull(bucket);
+    assertEquals(bucketName, bucket.getBucketName());
+    bucket.putKey(keyName, keyData);
+    assertEquals(keyData, bucket.getKey(keyName));
+  }
+
+  @Test
+  public void testPutAndGetMultiChunkKeyLastChunkPartial() throws Exception {
+    String volumeName = nextId("volume");
+    String bucketName = nextId("bucket");
+    String keyName = nextId("key");
+    int keyDataLen = (int)(2.5 * CHUNK_SIZE);
+    String keyData = buildKeyData(keyDataLen);
+    OzoneVolume volume = ozoneClient.createVolume(volumeName, "bilbo", "100TB");
+    assertNotNull(volume);
+    assertEquals(volumeName, volume.getVolumeName());
+    assertEquals(ozoneClient.getUserAuth(), volume.getCreatedby());
+    assertEquals("bilbo", volume.getOwnerName());
+    assertNotNull(volume.getQuota());
+    assertEquals(OzoneQuota.parseQuota("100TB").sizeInBytes(),
+        volume.getQuota().sizeInBytes());
+    OzoneBucket bucket = volume.createBucket(bucketName);
+    assertNotNull(bucket);
+    assertEquals(bucketName, bucket.getBucketName());
+    bucket.putKey(keyName, keyData);
+    assertEquals(keyData, bucket.getKey(keyName));
+  }
+
+  @Test
+  public void testReplaceKey() throws Exception {
+    String volumeName = nextId("volume");
+    String bucketName = nextId("bucket");
+    String keyName = nextId("key");
+    int keyDataLen = (int)(2.5 * CHUNK_SIZE);
+    String keyData = buildKeyData(keyDataLen);
+    OzoneVolume volume = ozoneClient.createVolume(volumeName, "bilbo", "100TB");
+    assertNotNull(volume);
+    assertEquals(volumeName, volume.getVolumeName());
+    assertEquals(ozoneClient.getUserAuth(), volume.getCreatedby());
+    assertEquals("bilbo", volume.getOwnerName());
+    assertNotNull(volume.getQuota());
+    assertEquals(OzoneQuota.parseQuota("100TB").sizeInBytes(),
+        volume.getQuota().sizeInBytes());
+    OzoneBucket bucket = volume.createBucket(bucketName);
+    assertNotNull(bucket);
+    assertEquals(bucketName, bucket.getBucketName());
+    bucket.putKey(keyName, keyData);
+    assertEquals(keyData, bucket.getKey(keyName));
+
+    // Replace key with data consisting of fewer chunks.
+    keyDataLen = (int)(1.5 * CHUNK_SIZE);
+    keyData = buildKeyData(keyDataLen);
+    bucket.putKey(keyName, keyData);
+    assertEquals(keyData, bucket.getKey(keyName));
+
+    // Replace key with data consisting of more chunks.
+    keyDataLen = (int)(3.5 * CHUNK_SIZE);
+    keyData = buildKeyData(keyDataLen);
+    bucket.putKey(keyName, keyData);
+    assertEquals(keyData, bucket.getKey(keyName));
+  }
+
+  /**
+   * Creates sample key data of the specified length.  The data is a string of
+   * printable ASCII characters.  This makes it easy to debug through visual
+   * inspection of the chunk files if a test fails.
+   *
+   * @param keyDataLen desired length of key data
+   * @return string of printable ASCII characters of the specified length
+   */
+  private static String buildKeyData(int keyDataLen) {
+    return new String(dataset(keyDataLen, 33, 93), UTF_8);
+  }
+
+  /**
+   * Generates identifiers unique enough for use in tests, so that individual
+   * tests don't collide on each others' data in the shared mini-cluster.
+   *
+   * @param idPrefix prefix to put in front of ID
+   * @return unique ID generated by appending a suffix to the given prefix
+   */
+  private static String nextId(String idPrefix) {
+    return idPrefix + ++idSuffix;
+  }
+}