Browse Source

HDFS-12794. Ozone: Parallelize ChunkOutputSream Writes to container. Contributed by Shashikant Banerjee.

Xiaoyu Yao 7 năm trước cách đây
mục cha
commit
98eeabbdb3

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java

@@ -107,6 +107,9 @@ public final class OzoneConfigKeys {
       "ozone.scm.block.size.in.mb";
   public static final long OZONE_SCM_BLOCK_SIZE_DEFAULT = 256;
 
+  public static final String OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB =
+      "ozone.output.stream.buffer.size.in.mb";
+  public static final long OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT = 256;
   /**
    * Ozone administrator users delimited by comma.
    * If not set, only the user who launches an ozone service will be the

+ 48 - 6
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java

@@ -19,7 +19,10 @@ package org.apache.hadoop.ozone.client.io;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
@@ -46,6 +49,9 @@ import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT;
+
 /**
  * Maintaining a list of ChunkInputStream. Write based on offset.
  *
@@ -72,6 +78,7 @@ public class ChunkGroupOutputStream extends OutputStream {
   private final XceiverClientManager xceiverClientManager;
   private final int chunkSize;
   private final String requestID;
+  private final long streamBufferSize;
 
   /**
    * A constructor for testing purpose only.
@@ -86,6 +93,7 @@ public class ChunkGroupOutputStream extends OutputStream {
     xceiverClientManager = null;
     chunkSize = 0;
     requestID = null;
+    streamBufferSize = OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT * OzoneConsts.MB;
   }
 
   /**
@@ -105,12 +113,26 @@ public class ChunkGroupOutputStream extends OutputStream {
     return streamEntries;
   }
 
-  public ChunkGroupOutputStream(
+  /**
+   * Chunkoutput stream, making this package visible since this can be
+   * created only via builder.
+   * @param handler  - Open Key state.
+   * @param xceiverClientManager - Communication Manager.
+   * @param scmClient - SCM protocol Client.
+   * @param ksmClient - KSM Protocol client
+   * @param chunkSize - Chunk Size - I/O
+   * @param requestId - Seed for trace ID generation.
+   * @param factor - Replication factor
+   * @param type - Replication Type - RATIS/Standalone etc.
+   * @param maxBufferSize - Maximum stream buffer Size.
+   * @throws IOException - Throws this exception if there is an error.
+   */
+  ChunkGroupOutputStream(
       OpenKeySession handler, XceiverClientManager xceiverClientManager,
       StorageContainerLocationProtocolClientSideTranslatorPB scmClient,
       KeySpaceManagerProtocolClientSideTranslatorPB ksmClient,
       int chunkSize, String requestId, ReplicationFactor factor,
-      ReplicationType type) throws IOException {
+      ReplicationType type, long maxBufferSize) throws IOException {
     this.streamEntries = new ArrayList<>();
     this.currentStreamIndex = 0;
     this.byteOffset = 0;
@@ -130,6 +152,7 @@ public class ChunkGroupOutputStream extends OutputStream {
     this.requestID = requestId;
     LOG.debug("Expecting open key with one block, but got" +
         info.getKeyLocationVersions().size());
+    this.streamBufferSize = maxBufferSize;
   }
 
   /**
@@ -184,7 +207,7 @@ public class ChunkGroupOutputStream extends OutputStream {
     }
     streamEntries.add(new ChunkOutputStreamEntry(containerKey,
         keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID,
-        chunkSize, subKeyInfo.getLength()));
+        chunkSize, subKeyInfo.getLength(), this.streamBufferSize));
   }
 
 
@@ -324,6 +347,7 @@ public class ChunkGroupOutputStream extends OutputStream {
     private String requestID;
     private ReplicationType type;
     private ReplicationFactor factor;
+    private long streamBufferSize;
 
     public Builder setHandler(OpenKeySession handler) {
       this.openHandler = handler;
@@ -367,9 +391,23 @@ public class ChunkGroupOutputStream extends OutputStream {
       return this;
     }
 
+    public Builder setStreamBufferSize(long blockSize) {
+      this.streamBufferSize = blockSize;
+      return this;
+    }
+
     public ChunkGroupOutputStream build() throws IOException {
+      Preconditions.checkNotNull(openHandler);
+      Preconditions.checkNotNull(xceiverManager);
+      Preconditions.checkNotNull(scmClient);
+      Preconditions.checkNotNull(ksmClient);
+      Preconditions.checkState(chunkSize > 0);
+      Preconditions.checkState(StringUtils.isNotEmpty(requestID));
+      Preconditions
+          .checkState(streamBufferSize > 0 && streamBufferSize > chunkSize);
+
       return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient,
-          ksmClient, chunkSize, requestID, factor, type);
+          ksmClient, chunkSize, requestID, factor, type, streamBufferSize);
     }
   }
 
@@ -385,11 +423,12 @@ public class ChunkGroupOutputStream extends OutputStream {
     private final long length;
     // the current position of this stream 0 <= currentPosition < length
     private long currentPosition;
+    private long streamBufferSize; // Max block size.
 
     ChunkOutputStreamEntry(String containerKey, String key,
         XceiverClientManager xceiverClientManager,
         XceiverClientSpi xceiverClient, String requestId, int chunkSize,
-        long length) {
+        long length, long streamBufferSize) {
       this.outputStream = null;
       this.containerKey = containerKey;
       this.key = key;
@@ -400,6 +439,7 @@ public class ChunkGroupOutputStream extends OutputStream {
 
       this.length = length;
       this.currentPosition = 0;
+      this.streamBufferSize = streamBufferSize;
     }
 
     /**
@@ -418,6 +458,8 @@ public class ChunkGroupOutputStream extends OutputStream {
 
       this.length = length;
       this.currentPosition = 0;
+      this.streamBufferSize =
+          OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT * OzoneConsts.MB;
     }
 
     long getLength() {
@@ -432,7 +474,7 @@ public class ChunkGroupOutputStream extends OutputStream {
       if (this.outputStream == null) {
         this.outputStream = new ChunkOutputStream(containerKey,
             key, xceiverClientManager, xceiverClient,
-            requestId, chunkSize);
+            requestId, chunkSize, Ints.checkedCast(streamBufferSize));
       }
     }
 

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java

@@ -74,6 +74,11 @@ import java.util.List;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT;
+
 /**
  * Ozone RPC Client Implementation, it connects to KSM, SCM and DataNode
  * to execute client calls. This uses RPC protocol for communication
@@ -94,6 +99,7 @@ public class RpcClient implements ClientProtocol {
   private final UserGroupInformation ugi;
   private final OzoneAcl.OzoneACLRights userRights;
   private final OzoneAcl.OzoneACLRights groupRights;
+  private final long streamBufferSize;
 
    /**
     * Creates RpcClient instance with the given configuration.
@@ -148,6 +154,9 @@ public class RpcClient implements ClientProtocol {
     } else {
       chunkSize = configuredChunkSize;
     }
+    // streamBufferSize by default is set equal to default scm block size.
+    streamBufferSize = conf.getLong(OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB,
+        OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT) * OzoneConsts.MB;
   }
 
   @Override
@@ -463,6 +472,7 @@ public class RpcClient implements ClientProtocol {
             .setRequestID(requestId)
             .setType(OzoneProtos.ReplicationType.valueOf(type.toString()))
             .setFactor(OzoneProtos.ReplicationFactor.valueOf(factor.getValue()))
+            .setStreamBufferSize(streamBufferSize)
             .build();
     groupOutputStream.addPreallocateBlocks(
         openKey.getKeyInfo().getLatestVersionLocations(),

+ 184 - 73
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java

@@ -18,22 +18,32 @@
 
 package org.apache.hadoop.scm.storage;
 
-import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey;
-import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.writeChunk;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.UUID;
-
+import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
-
 import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue;
 import org.apache.hadoop.scm.XceiverClientManager;
 import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.util.Time;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
+    .Result.SUCCESS;
+import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey;
+import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.writeChunk;
+
 
 /**
  * An {@link OutputStream} used by the REST service in combination with the
@@ -57,12 +67,12 @@ public class ChunkOutputStream extends OutputStream {
   private final String key;
   private final String traceID;
   private final KeyData.Builder containerKeyData;
+  private final String streamId;
   private XceiverClientManager xceiverClientManager;
   private XceiverClientSpi xceiverClient;
   private ByteBuffer buffer;
-  private final String streamId;
-  private int chunkIndex;
   private int chunkSize;
+  private int streamBufferSize;
 
   /**
    * Creates a new ChunkOutputStream.
@@ -73,14 +83,18 @@ public class ChunkOutputStream extends OutputStream {
    * @param xceiverClient client to perform container calls
    * @param traceID container protocol call args
    * @param chunkSize chunk size
+   * @param maxBufferSize -- Controls the maximum amount of memory that we need
+   * to allocate data buffering.
    */
   public ChunkOutputStream(String containerKey, String key,
       XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient,
-      String traceID, int chunkSize) {
+      String traceID, int chunkSize, int maxBufferSize) {
     this.containerKey = containerKey;
     this.key = key;
     this.traceID = traceID;
     this.chunkSize = chunkSize;
+    this.streamBufferSize = maxBufferSize;
+
     KeyValue keyValue = KeyValue.newBuilder()
         .setKey("TYPE").setValue("KEY").build();
     this.containerKeyData = KeyData.newBuilder()
@@ -89,22 +103,24 @@ public class ChunkOutputStream extends OutputStream {
         .addMetadata(keyValue);
     this.xceiverClientManager = xceiverClientManager;
     this.xceiverClient = xceiverClient;
-    this.buffer = ByteBuffer.allocate(chunkSize);
+    this.buffer = ByteBuffer.allocate(maxBufferSize);
     this.streamId = UUID.randomUUID().toString();
-    this.chunkIndex = 0;
   }
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public synchronized void write(int b) throws IOException {
     checkOpen();
-    int rollbackPosition = buffer.position();
-    int rollbackLimit = buffer.limit();
-    buffer.put((byte)b);
-    if (buffer.position() == chunkSize) {
-      flushBufferToChunk(rollbackPosition, rollbackLimit);
-    }
+    byte[] c = new byte[1];
+    c[0] = (byte) b;
+    write(c, 0, 1);
   }
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public void write(byte[] b, int off, int len) throws IOException {
     if (b == null) {
@@ -118,17 +134,90 @@ public class ChunkOutputStream extends OutputStream {
       return;
     }
     checkOpen();
-    while (len > 0) {
-      int writeLen = Math.min(chunkSize - buffer.position(), len);
-      int rollbackPosition = buffer.position();
-      int rollbackLimit = buffer.limit();
-      buffer.put(b, off, writeLen);
-      if (buffer.position() == chunkSize) {
-        flushBufferToChunk(rollbackPosition, rollbackLimit);
+    int rollbackPosition = buffer.position();
+    int rollbackLimit = buffer.limit();
+    try {
+      List<ImmutablePair<CompletableFuture<ContainerProtos
+              .ContainerCommandResponseProto>, ChunkInfo>>
+          writeFutures = writeInParallel(b, off, len);
+      // This is a rendezvous point for this function call, all chunk I/O
+      // for this block must complete before we can declare this call as
+      // complete.
+
+      // Wait until all the futures complete or throws an exception if any of
+      // the calls ended with an exception this call will throw.
+      // if futures is null, it means that we wrote the data to the buffer and
+      // returned.
+      if (writeFutures != null) {
+        CompletableFuture.allOf(writeFutures.toArray(new
+            CompletableFuture[writeFutures.size()])).join();
+
+        // Wrote this data, we will clear this buffer now.
+        buffer.clear();
+      }
+    } catch (InterruptedException | ExecutionException e) {
+      buffer.position(rollbackPosition);
+      buffer.limit(rollbackLimit);
+      throw new IOException("Unexpected error in write. ", e);
+    }
+  }
+
+  /**
+   * Write a given block into many small chunks in parallel.
+   *
+   * @param b
+   * @param off
+   * @param len
+   * @throws IOException
+   * @throws ExecutionException
+   * @throws InterruptedException
+   */
+  public List<ImmutablePair<CompletableFuture<ContainerProtos
+          .ContainerCommandResponseProto>, ChunkInfo>>
+      writeInParallel(byte[] b, int off, int len)
+      throws IOException, ExecutionException, InterruptedException {
+
+    Preconditions.checkArgument(len <= streamBufferSize,
+        "A chunk write cannot be " + "larger than max buffer size limit.");
+    long newBlockCount = len / chunkSize;
+    buffer.put(b, off, len);
+    List<ImmutablePair<CompletableFuture<ContainerProtos
+            .ContainerCommandResponseProto>, ChunkInfo>>
+        writeFutures = new LinkedList<>();
+
+    // We if must have at least a chunkSize of data ready to write, if so we
+    // will go ahead and start writing that data.
+    if (buffer.position() >= chunkSize) {
+      // Allocate new byte slices which will point to each chunk of data
+      // that we want to write. Divide the byte buffer into individual chunks
+      // each of length equals to chunkSize max where each chunk will be
+      // assigned a chunkId where, for each chunk the async write requests will
+      // be made and wait for all of them to return before the write call
+      // returns.
+      for (int chunkId = 0; chunkId < newBlockCount; chunkId++) {
+        // Please note : We are not flipping the slice when we write since
+        // the slices are pointing the buffer start and end as needed for
+        // the chunk write. Also please note, Duplicate does not create a
+        // copy of data, it only creates metadata that points to the data
+        // stream.
+        ByteBuffer chunk = buffer.duplicate();
+        Preconditions.checkState((chunkId * chunkSize) < buffer.limit(),
+            "Chunk offset cannot be beyond the limits of the buffer.");
+        chunk.position(chunkId * chunkSize);
+        // Min handles the case where the last block might be lesser than
+        // chunk Size.
+        chunk.limit(chunk.position() +
+            Math.min(chunkSize, chunk.remaining() - (chunkId * chunkSize)));
+
+        // Schedule all the writes, this is a non-block call which returns
+        // futures. We collect these futures and wait for all  of them to
+        // complete in the next line.
+        writeFutures.add(writeChunkToContainer(chunk, 0, chunkSize));
       }
-      off += writeLen;
-      len -= writeLen;
+      return writeFutures;
     }
+    // Nothing to do , return null.
+    return null;
   }
 
   @Override
@@ -137,7 +226,19 @@ public class ChunkOutputStream extends OutputStream {
     if (buffer.position() > 0) {
       int rollbackPosition = buffer.position();
       int rollbackLimit = buffer.limit();
-      flushBufferToChunk(rollbackPosition, rollbackLimit);
+      ByteBuffer chunk = buffer.duplicate();
+      try {
+
+        ImmutablePair<CompletableFuture<ContainerProtos
+            .ContainerCommandResponseProto>, ChunkInfo>
+            result = writeChunkToContainer(chunk, 0, chunkSize);
+        updateChunkInfo(result);
+        buffer.clear();
+      } catch (ExecutionException | InterruptedException e) {
+        buffer.position(rollbackPosition);
+        buffer.limit(rollbackLimit);
+        throw new IOException("Failure in flush", e);
+      }
     }
   }
 
@@ -147,10 +248,20 @@ public class ChunkOutputStream extends OutputStream {
         buffer != null) {
       try {
         if (buffer.position() > 0) {
-          writeChunkToContainer();
+          // This flip is needed since this is the real buffer to which we
+          // are writing and position will have moved each time we did a put.
+          buffer.flip();
+
+          // Call get immediately to make this call Synchronous.
+
+          ImmutablePair<CompletableFuture<ContainerProtos
+              .ContainerCommandResponseProto>, ChunkInfo>
+              result = writeChunkToContainer(buffer, 0, buffer.limit());
+          updateChunkInfo(result);
+          buffer.clear();
         }
         putKey(xceiverClient, containerKeyData.build(), traceID);
-      } catch (IOException e) {
+      } catch (IOException | InterruptedException | ExecutionException e) {
         throw new IOException(
             "Unexpected Storage Container Exception: " + e.toString(), e);
       } finally {
@@ -163,6 +274,24 @@ public class ChunkOutputStream extends OutputStream {
 
   }
 
+  private void updateChunkInfo(
+      ImmutablePair<
+          CompletableFuture<ContainerProtos.ContainerCommandResponseProto>,
+          ChunkInfo
+          > result) throws InterruptedException, ExecutionException {
+    // Wait for this call to complete.
+    ContainerProtos.ContainerCommandResponseProto response =
+        result.getLeft().get();
+
+    // If the write call to the chunk is successful, we need to add that
+    // chunk information to the containerKeyData.
+    // TODO: Clean up the garbage in case of failure.
+    if(response.getResult() == SUCCESS) {
+      ChunkInfo chunk = result.getRight();
+      containerKeyData.addChunks(chunk);
+    }
+  }
+
   /**
    * Checks if the stream is open.  If not, throws an exception.
    *
@@ -174,54 +303,36 @@ public class ChunkOutputStream extends OutputStream {
     }
   }
 
-  /**
-   * Attempts to flush buffered writes by writing a new chunk to the container.
-   * If successful, then clears the buffer to prepare to receive writes for a
-   * new chunk.
-   *
-   * @param rollbackPosition position to restore in buffer if write fails
-   * @param rollbackLimit limit to restore in buffer if write fails
-   * @throws IOException if there is an I/O error while performing the call
-   */
-  private synchronized void flushBufferToChunk(int rollbackPosition,
-      int rollbackLimit) throws IOException {
-    boolean success = false;
-    try {
-      writeChunkToContainer();
-      success = true;
-    } finally {
-      if (success) {
-        buffer.clear();
-      } else {
-        buffer.position(rollbackPosition);
-        buffer.limit(rollbackLimit);
-      }
-    }
-  }
-
   /**
    * Writes buffered data as a new chunk to the container and saves chunk
    * information to be used later in putKey call.
    *
-   * @throws IOException if there is an I/O error while performing the call
+   * @param data -- Data to write.
+   * @param offset - offset to the data buffer
+   * @param len - Length in bytes
+   * @return Returns a Immutable pair -- A future object that will contian
+   * the result of the operation, and the chunkInfo that we wrote.
+   *
+   * @throws IOException
+   * @throws ExecutionException
+   * @throws InterruptedException
    */
-  private synchronized void writeChunkToContainer() throws IOException {
-    buffer.flip();
-    ByteString data = ByteString.copyFrom(buffer);
-    ChunkInfo chunk = ChunkInfo
-        .newBuilder()
-        .setChunkName(
+  private ImmutablePair<
+      CompletableFuture<ContainerProtos.ContainerCommandResponseProto>,
+      ChunkInfo>
+      writeChunkToContainer(ByteBuffer data, int offset, int len)
+      throws IOException, ExecutionException, InterruptedException {
+
+
+    ByteString dataString = ByteString.copyFrom(data);
+    ChunkInfo chunk = ChunkInfo.newBuilder().setChunkName(
             DigestUtils.md5Hex(key) + "_stream_"
-                + streamId + "_chunk_" + ++chunkIndex)
+                + streamId + "_chunk_" + Time.monotonicNowNanos())
         .setOffset(0)
-        .setLen(data.size())
+        .setLen(len)
         .build();
-    try {
-      writeChunk(xceiverClient, chunk, key, data, traceID);
-    } catch (IOException e) {
-      throw new IOException(
-          "Unexpected Storage Container Exception: " + e.toString(), e);
-    }
-    containerKeyData.addChunks(chunk);
+    CompletableFuture<ContainerProtos.ContainerCommandResponseProto> response =
+        writeChunk(xceiverClient, chunk, key, dataString, traceID);
+    return new ImmutablePair(response, chunk);
   }
 }

+ 8 - 5
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java

@@ -53,6 +53,9 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue;
 import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
 import org.apache.hadoop.scm.XceiverClientSpi;
 
 /**
@@ -162,9 +165,10 @@ public final class ContainerProtocolCalls  {
    * @param traceID container protocol call args
    * @throws IOException if there is an I/O error while performing the call
    */
-  public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk,
-      String key, ByteString data, String traceID)
-      throws IOException {
+  public static CompletableFuture<ContainerCommandResponseProto> writeChunk(
+      XceiverClientSpi xceiverClient, ChunkInfo chunk, String key,
+      ByteString data, String traceID)
+      throws IOException, ExecutionException, InterruptedException {
     WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto
         .newBuilder()
         .setPipeline(xceiverClient.getPipeline().getProtobufMessage())
@@ -179,8 +183,7 @@ public final class ContainerProtocolCalls  {
         .setDatanodeID(id)
         .setWriteChunk(writeChunkRequest)
         .build();
-    ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
-    validateContainerResponse(response);
+    return xceiverClient.sendCommandAsync(request);
   }
 
   /**

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java

@@ -67,6 +67,11 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.List;
 
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT;
+
 /**
  * A {@link StorageHandler} implementation that distributes object storage
  * across the nodes of an HDFS cluster.
@@ -86,6 +91,7 @@ public final class DistributedStorageHandler implements StorageHandler {
   private final boolean useRatis;
   private final OzoneProtos.ReplicationType type;
   private final OzoneProtos.ReplicationFactor factor;
+  private final long streamBufferSize;
 
   /**
    * Creates a new DistributedStorageHandler.
@@ -127,6 +133,9 @@ public final class DistributedStorageHandler implements StorageHandler {
           chunkSize, ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE);
       chunkSize = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE;
     }
+    // streamBufferSize by default is set to default scm block size.
+    streamBufferSize = conf.getLong(OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB,
+        OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT) * OzoneConsts.MB;
   }
 
   @Override
@@ -418,6 +427,7 @@ public final class DistributedStorageHandler implements StorageHandler {
             .setRequestID(args.getRequestID())
             .setType(xceiverClientManager.getType())
             .setFactor(xceiverClientManager.getFactor())
+            .setStreamBufferSize(streamBufferSize)
             .build();
     groupOutputStream.addPreallocateBlocks(
         openKey.getKeyInfo().getLatestVersionLocations(),

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml

@@ -690,6 +690,15 @@
       Ozone block size.
     </description>
   </property>
+  <property>
+    <name>ozone.output.stream.buffer.size.in.mb</name>
+    <value>256</value>
+    <tag>OZONE</tag>
+    <description>
+      The maximum size of the buffer allocated for the ozone output stream for
+      write. Default size is equals to scm block size.
+    </description>
+  </property>
   <property>
     <name>ozone.scm.chunk.size</name>
     <value>16777216</value>

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java

@@ -1114,7 +1114,7 @@ public class TestKeySpaceManager {
           .getMetadataManager().getExpiredOpenKeys();
       Assert.assertEquals(0, openKeys.size());
 
-      Thread.sleep(2000);
+      //Thread.sleep(2000);
 
       openKeys = cluster.getKeySpaceManager().getMetadataManager()
           .getExpiredOpenKeys();