瀏覽代碼

HDFS-11004. Ozone : move Chunk IO and container protocol calls to hdfs-client. Contributed by Chen Liang.

Anu Engineer 8 年之前
父節點
當前提交
ee119ff60a

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java

@@ -29,4 +29,7 @@ public final class ScmConfigKeys {
   public static final String DFS_CONTAINER_IPC_PORT =
       "dfs.container.ipc";
   public static final int DFS_CONTAINER_IPC_PORT_DEFAULT = 50011;
+
+  // TODO : this is copied from OzoneConsts, may need to move to a better place
+  public static final int CHUNK_SIZE = 1 * 1024 * 1024; // 1 MB
 }

+ 10 - 12
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkInputStream.java → hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkInputStream.java

@@ -16,9 +16,9 @@
  *  limitations under the License.
  */
 
-package org.apache.hadoop.ozone.web.storage;
+package org.apache.hadoop.scm.storage;
 
-import static org.apache.hadoop.ozone.web.storage.ContainerProtocolCalls.*;
+import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.*;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -31,23 +31,21 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResp
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
 import org.apache.hadoop.scm.XceiverClient;
 import org.apache.hadoop.scm.XceiverClientManager;
-import org.apache.hadoop.ozone.web.exceptions.OzoneException;
-import org.apache.hadoop.ozone.web.handlers.UserArgs;
 
 /**
  * An {@link InputStream} used by the REST service in combination with the
- * {@link DistributedStorageHandler} to read the value of a key from a sequence
+ * SCMClient to read the value of a key from a sequence
  * of container chunks.  All bytes of the key value are stored in container
  * chunks.  Each chunk may contain multiple underlying {@link ByteBuffer}
  * instances.  This class encapsulates all state management for iterating
  * through the sequence of chunks and the sequence of buffers within each chunk.
  */
-class ChunkInputStream extends InputStream {
+public class ChunkInputStream extends InputStream {
 
   private static final int EOF = -1;
 
   private final String key;
-  private final UserArgs args;
+  private final String traceID;
   private XceiverClientManager xceiverClientManager;
   private XceiverClient xceiverClient;
   private List<ChunkInfo> chunks;
@@ -62,12 +60,12 @@ class ChunkInputStream extends InputStream {
    * @param xceiverClientManager client manager that controls client
    * @param xceiverClient client to perform container calls
    * @param chunks list of chunks to read
-   * @param args container protocol call args
+   * @param traceID container protocol call traceID
    */
   public ChunkInputStream(String key, XceiverClientManager xceiverClientManager,
-      XceiverClient xceiverClient, List<ChunkInfo> chunks, UserArgs args) {
+      XceiverClient xceiverClient, List<ChunkInfo> chunks, String traceID) {
     this.key = key;
-    this.args = args;
+    this.traceID = traceID;
     this.xceiverClientManager = xceiverClientManager;
     this.xceiverClient = xceiverClient;
     this.chunks = chunks;
@@ -182,8 +180,8 @@ class ChunkInputStream extends InputStream {
     final ReadChunkResponseProto readChunkResponse;
     try {
       readChunkResponse = readChunk(xceiverClient, chunks.get(readChunkOffset),
-          key, args);
-    } catch (OzoneException e) {
+          key, traceID);
+    } catch (IOException e) {
       throw new IOException("Unexpected OzoneException", e);
     }
     chunkOffset = readChunkOffset;

+ 31 - 28
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkOutputStream.java → hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java

@@ -16,11 +16,10 @@
  *  limitations under the License.
  */
 
-package org.apache.hadoop.ozone.web.storage;
+package org.apache.hadoop.scm.storage;
 
-import static org.apache.hadoop.ozone.OzoneConsts.CHUNK_SIZE;
-import static org.apache.hadoop.ozone.web.storage.ContainerProtocolCalls.*;
-import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*;
+import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey;
+import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.writeChunk;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -31,15 +30,14 @@ import com.google.protobuf.ByteString;
 
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyValue;
+import org.apache.hadoop.scm.ScmConfigKeys;
 import org.apache.hadoop.scm.XceiverClient;
 import org.apache.hadoop.scm.XceiverClientManager;
-import org.apache.hadoop.ozone.web.exceptions.OzoneException;
-import org.apache.hadoop.ozone.web.handlers.UserArgs;
-import org.apache.hadoop.ozone.web.response.KeyInfo;
 
 /**
  * An {@link OutputStream} used by the REST service in combination with the
- * {@link DistributedStorageHandler} to write the value of a key to a sequence
+ * SCMClient to write the value of a key to a sequence
  * of container chunks.  Writes are buffered locally and periodically written to
  * the container as a new chunk.  In order to preserve the semantics that
  * replacement of a pre-existing key is atomic, each instance of the stream has
@@ -53,11 +51,11 @@ import org.apache.hadoop.ozone.web.response.KeyInfo;
  * This class encapsulates all state management for buffering and writing
  * through to the container.
  */
-class ChunkOutputStream extends OutputStream {
+public class ChunkOutputStream extends OutputStream {
 
   private final String containerKey;
-  private final KeyInfo key;
-  private final UserArgs args;
+  private final String key;
+  private final String traceID;
   private final KeyData.Builder containerKeyData;
   private XceiverClientManager xceiverClientManager;
   private XceiverClient xceiverClient;
@@ -72,19 +70,23 @@ class ChunkOutputStream extends OutputStream {
    * @param key chunk key
    * @param xceiverClientManager client manager that controls client
    * @param xceiverClient client to perform container calls
-   * @param args container protocol call args
+   * @param traceID container protocol call args
    */
-  public ChunkOutputStream(String containerKey, KeyInfo key,
+  public ChunkOutputStream(String containerKey, String key,
       XceiverClientManager xceiverClientManager, XceiverClient xceiverClient,
-      UserArgs args) {
+      String traceID) {
     this.containerKey = containerKey;
     this.key = key;
-    this.args = args;
-    this.containerKeyData = fromKeyToContainerKeyDataBuilder(
-        xceiverClient.getPipeline().getContainerName(), containerKey, key);
+    this.traceID = traceID;
+    KeyValue keyValue = KeyValue.newBuilder()
+        .setKey("TYPE").setValue("KEY").build();
+    this.containerKeyData = KeyData.newBuilder()
+        .setContainerName(xceiverClient.getPipeline().getContainerName())
+        .setName(containerKey)
+        .addMetadata(keyValue);
     this.xceiverClientManager = xceiverClientManager;
     this.xceiverClient = xceiverClient;
-    this.buffer = ByteBuffer.allocate(CHUNK_SIZE);
+    this.buffer = ByteBuffer.allocate(ScmConfigKeys.CHUNK_SIZE);
     this.streamId = UUID.randomUUID().toString();
     this.chunkIndex = 0;
   }
@@ -95,7 +97,7 @@ class ChunkOutputStream extends OutputStream {
     int rollbackPosition = buffer.position();
     int rollbackLimit = buffer.limit();
     buffer.put((byte)b);
-    if (buffer.position() == CHUNK_SIZE) {
+    if (buffer.position() == ScmConfigKeys.CHUNK_SIZE) {
       flushBufferToChunk(rollbackPosition, rollbackLimit);
     }
   }
@@ -114,11 +116,12 @@ class ChunkOutputStream extends OutputStream {
     }
     checkOpen();
     while (len > 0) {
-      int writeLen = Math.min(CHUNK_SIZE - buffer.position(), len);
+      int writeLen = Math.min(
+          ScmConfigKeys.CHUNK_SIZE - buffer.position(), len);
       int rollbackPosition = buffer.position();
       int rollbackLimit = buffer.limit();
       buffer.put(b, off, writeLen);
-      if (buffer.position() == CHUNK_SIZE) {
+      if (buffer.position() == ScmConfigKeys.CHUNK_SIZE) {
         flushBufferToChunk(rollbackPosition, rollbackLimit);
       }
       off += writeLen;
@@ -144,9 +147,9 @@ class ChunkOutputStream extends OutputStream {
         if (buffer.position() > 0) {
           writeChunkToContainer();
         }
-        putKey(xceiverClient, containerKeyData.build(), args);
-      } catch (OzoneException e) {
-        throw new IOException("Unexpected OzoneException", e);
+        putKey(xceiverClient, containerKeyData.build(), traceID);
+      } catch (IOException e) {
+        throw new IOException("Unexpected Storage Container Exception", e);
       } finally {
         xceiverClientManager.releaseClient(xceiverClient);
         xceiverClientManager = null;
@@ -205,14 +208,14 @@ class ChunkOutputStream extends OutputStream {
     ChunkInfo chunk = ChunkInfo
         .newBuilder()
         .setChunkName(
-            key.getKeyName() + "_stream_" + streamId + "_chunk_" + ++chunkIndex)
+            key + "_stream_" + streamId + "_chunk_" + ++chunkIndex)
         .setOffset(0)
         .setLen(data.size())
         .build();
     try {
-      writeChunk(xceiverClient, chunk, key.getKeyName(), data, args);
-    } catch (OzoneException e) {
-      throw new IOException("Unexpected OzoneException", e);
+      writeChunk(xceiverClient, chunk, key, data, traceID);
+    } catch (IOException e) {
+      throw new IOException("Unexpected Storage Container Exception", e);
     }
     containerKeyData.addChunks(chunk);
   }

+ 36 - 44
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ContainerProtocolCalls.java → hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java

@@ -16,7 +16,7 @@
  *  limitations under the License.
  */
 
-package org.apache.hadoop.ozone.web.storage;
+package org.apache.hadoop.scm.storage;
 
 import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
 import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
@@ -37,29 +37,24 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResp
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.WriteChunkRequestProto;
 import org.apache.hadoop.scm.XceiverClient;
-import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
-import org.apache.hadoop.ozone.web.exceptions.OzoneException;
-import org.apache.hadoop.ozone.web.handlers.UserArgs;
 
 /**
  * Implementation of all container protocol calls performed by
- * {@link DistributedStorageHandler}.
+ * .
  */
-final class ContainerProtocolCalls {
+public final class ContainerProtocolCalls {
 
   /**
    * Calls the container protocol to get a container key.
    *
    * @param xceiverClient client to perform call
    * @param containerKeyData key data to identify container
-   * @param args container protocol call args
-   * @returns container protocol get key response
+   * @param traceID container protocol call args
+   * @return container protocol get key response
    * @throws IOException if there is an I/O error while performing the call
-   * @throws OzoneException if the container protocol call failed
    */
   public static GetKeyResponseProto getKey(XceiverClient xceiverClient,
-      KeyData containerKeyData, UserArgs args) throws IOException,
-      OzoneException {
+      KeyData containerKeyData, String traceID) throws IOException {
     GetKeyRequestProto.Builder readKeyRequest = GetKeyRequestProto
         .newBuilder()
         .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
@@ -67,11 +62,11 @@ final class ContainerProtocolCalls {
     ContainerCommandRequestProto request = ContainerCommandRequestProto
         .newBuilder()
         .setCmdType(Type.GetKey)
-        .setTraceID(args.getRequestID())
+        .setTraceID(traceID)
         .setGetKey(readKeyRequest)
         .build();
     ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
-    validateContainerResponse(response, args);
+    validateContainerResponse(response, traceID);
     return response.getGetKey();
   }
 
@@ -80,13 +75,11 @@ final class ContainerProtocolCalls {
    *
    * @param xceiverClient client to perform call
    * @param containerKeyData key data to identify container
-   * @param args container protocol call args
+   * @param traceID container protocol call args
    * @throws IOException if there is an I/O error while performing the call
-   * @throws OzoneException if the container protocol call failed
    */
   public static void putKey(XceiverClient xceiverClient,
-      KeyData containerKeyData, UserArgs args) throws IOException,
-      OzoneException {
+      KeyData containerKeyData, String traceID) throws IOException {
     PutKeyRequestProto.Builder createKeyRequest = PutKeyRequestProto
         .newBuilder()
         .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
@@ -94,11 +87,11 @@ final class ContainerProtocolCalls {
     ContainerCommandRequestProto request = ContainerCommandRequestProto
         .newBuilder()
         .setCmdType(Type.PutKey)
-        .setTraceID(args.getRequestID())
+        .setTraceID(traceID)
         .setPutKey(createKeyRequest)
         .build();
     ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
-    validateContainerResponse(response, args);
+    validateContainerResponse(response, traceID);
   }
 
   /**
@@ -107,14 +100,13 @@ final class ContainerProtocolCalls {
    * @param xceiverClient client to perform call
    * @param chunk information about chunk to read
    * @param key the key name
-   * @param args container protocol call args
-   * @returns container protocol read chunk response
+   * @param traceID container protocol call args
+   * @return container protocol read chunk response
    * @throws IOException if there is an I/O error while performing the call
-   * @throws OzoneException if the container protocol call failed
    */
   public static ReadChunkResponseProto readChunk(XceiverClient xceiverClient,
-      ChunkInfo chunk, String key, UserArgs args)
-      throws IOException, OzoneException {
+      ChunkInfo chunk, String key, String traceID)
+      throws IOException {
     ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto
         .newBuilder()
         .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
@@ -123,11 +115,11 @@ final class ContainerProtocolCalls {
     ContainerCommandRequestProto request = ContainerCommandRequestProto
         .newBuilder()
         .setCmdType(Type.ReadChunk)
-        .setTraceID(args.getRequestID())
+        .setTraceID(traceID)
         .setReadChunk(readChunkRequest)
         .build();
     ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
-    validateContainerResponse(response, args);
+    validateContainerResponse(response, traceID);
     return response.getReadChunk();
   }
 
@@ -138,13 +130,12 @@ final class ContainerProtocolCalls {
    * @param chunk information about chunk to write
    * @param key the key name
    * @param data the data of the chunk to write
-   * @param args container protocol call args
+   * @param traceID container protocol call args
    * @throws IOException if there is an I/O error while performing the call
-   * @throws OzoneException if the container protocol call failed
    */
   public static void writeChunk(XceiverClient xceiverClient, ChunkInfo chunk,
-      String key, ByteString data, UserArgs args)
-      throws IOException, OzoneException {
+      String key, ByteString data, String traceID)
+      throws IOException {
     WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto
         .newBuilder()
         .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
@@ -154,11 +145,11 @@ final class ContainerProtocolCalls {
     ContainerCommandRequestProto request = ContainerCommandRequestProto
         .newBuilder()
         .setCmdType(Type.WriteChunk)
-        .setTraceID(args.getRequestID())
+        .setTraceID(traceID)
         .setWriteChunk(writeChunkRequest)
         .build();
     ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
-    validateContainerResponse(response, args);
+    validateContainerResponse(response, traceID);
   }
 
   /**
@@ -166,27 +157,28 @@ final class ContainerProtocolCalls {
    * return code is mapped to a corresponding exception and thrown.
    *
    * @param response container protocol call response
-   * @param args container protocol call args
-   * @throws OzoneException if the container protocol call failed
+   * @param traceID container protocol call args
+   * @throws IOException if the container protocol call failed
    */
   private static void validateContainerResponse(
-      ContainerCommandResponseProto response, UserArgs args)
-      throws OzoneException {
+      ContainerCommandResponseProto response, String traceID
+  ) throws IOException {
+    // TODO : throw the right type of exception
     switch (response.getResult()) {
     case SUCCESS:
       break;
     case MALFORMED_REQUEST:
-      throw ErrorTable.newError(new OzoneException(HTTP_BAD_REQUEST,
-          "badRequest", "Bad container request."), args);
+      throw new IOException(HTTP_BAD_REQUEST +
+          ":Bad container request: " + traceID);
     case UNSUPPORTED_REQUEST:
-      throw ErrorTable.newError(new OzoneException(HTTP_INTERNAL_ERROR,
-          "internalServerError", "Unsupported container request."), args);
+      throw new IOException(HTTP_INTERNAL_ERROR +
+          "Unsupported container request: " + traceID);
     case CONTAINER_INTERNAL_ERROR:
-      throw ErrorTable.newError(new OzoneException(HTTP_INTERNAL_ERROR,
-          "internalServerError", "Container internal error."), args);
+      throw new IOException(HTTP_INTERNAL_ERROR +
+          "Container internal error:" + traceID);
     default:
-      throw ErrorTable.newError(new OzoneException(HTTP_INTERNAL_ERROR,
-          "internalServerError", "Unrecognized container response."), args);
+      throw new IOException(HTTP_INTERNAL_ERROR +
+          "Unrecognized container response:" + traceID);
     }
   }
 

+ 23 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/package-info.java

@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.scm.storage;
+
+/**
+ * This package contains StorageContainerManager classes.
+ */

+ 12 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java

@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.ozone.web.storage;
 
-import static org.apache.hadoop.ozone.web.storage.ContainerProtocolCalls.*;
+import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.*;
 import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*;
 
 import java.io.IOException;
@@ -57,6 +57,8 @@ import org.apache.hadoop.ozone.web.response.ListKeys;
 import org.apache.hadoop.ozone.web.response.ListVolumes;
 import org.apache.hadoop.ozone.web.response.VolumeInfo;
 import org.apache.hadoop.ozone.web.response.VolumeOwner;
+import org.apache.hadoop.scm.storage.ChunkInputStream;
+import org.apache.hadoop.scm.storage.ChunkOutputStream;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -95,7 +97,7 @@ public final class DistributedStorageHandler implements StorageHandler {
       volume.setCreatedBy(args.getAdminName());
       KeyData containerKeyData = fromVolumeToContainerKeyData(
           xceiverClient.getPipeline().getContainerName(), containerKey, volume);
-      putKey(xceiverClient, containerKeyData, args);
+      putKey(xceiverClient, containerKeyData, args.getRequestID());
     } finally {
       xceiverClientManager.releaseClient(xceiverClient);
     }
@@ -140,7 +142,7 @@ public final class DistributedStorageHandler implements StorageHandler {
       KeyData containerKeyData = containerKeyDataForRead(
           xceiverClient.getPipeline().getContainerName(), containerKey);
       GetKeyResponseProto response = getKey(xceiverClient, containerKeyData,
-          args);
+          args.getRequestID());
       return fromContainerKeyValueListToVolume(
           response.getKeyData().getMetadataList());
     } finally {
@@ -163,7 +165,7 @@ public final class DistributedStorageHandler implements StorageHandler {
       bucket.setStorageType(args.getStorageType());
       KeyData containerKeyData = fromBucketToContainerKeyData(
           xceiverClient.getPipeline().getContainerName(), containerKey, bucket);
-      putKey(xceiverClient, containerKeyData, args);
+      putKey(xceiverClient, containerKeyData, args.getRequestID());
     } finally {
       xceiverClientManager.releaseClient(xceiverClient);
     }
@@ -218,7 +220,7 @@ public final class DistributedStorageHandler implements StorageHandler {
       KeyData containerKeyData = containerKeyDataForRead(
           xceiverClient.getPipeline().getContainerName(), containerKey);
       GetKeyResponseProto response = getKey(xceiverClient, containerKeyData,
-          args);
+          args.getRequestID());
       return fromContainerKeyValueListToBucket(
           response.getKeyData().getMetadataList());
     } finally {
@@ -235,8 +237,8 @@ public final class DistributedStorageHandler implements StorageHandler {
     key.setKeyName(args.getKeyName());
     key.setCreatedOn(dateToString(new Date()));
     XceiverClient xceiverClient = acquireXceiverClient(containerKey);
-    return new ChunkOutputStream(containerKey, key, xceiverClientManager,
-        xceiverClient, args);
+    return new ChunkOutputStream(containerKey, key.getKeyName(),
+        xceiverClientManager, xceiverClient, args.getRequestID());
   }
 
   @Override
@@ -256,7 +258,7 @@ public final class DistributedStorageHandler implements StorageHandler {
       KeyData containerKeyData = containerKeyDataForRead(
           xceiverClient.getPipeline().getContainerName(), containerKey);
       GetKeyResponseProto response = getKey(xceiverClient, containerKeyData,
-          args);
+          args.getRequestID());
       long length = 0;
       List<ChunkInfo> chunks = response.getKeyData().getChunksList();
       for (ChunkInfo chunk : chunks) {
@@ -264,8 +266,8 @@ public final class DistributedStorageHandler implements StorageHandler {
       }
       success = true;
       return new LengthInputStream(new ChunkInputStream(
-          containerKey, xceiverClientManager, xceiverClient, chunks, args),
-          length);
+          containerKey, xceiverClientManager, xceiverClient,
+          chunks, args.getRequestID()), length);
     } finally {
       if (!success) {
         xceiverClientManager.releaseClient(xceiverClient);