Преглед изворни кода

HDDS-675. Add blocking buffer and use watchApi for flush/close in OzoneClient. Contributed by Shashikant Banerjee.

Shashikant Banerjee пре 6 година
родитељ
комит
671fd6524b
25 измењених фајлова са 1248 додато и 550 уклоњено
  1. 23 5
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
  2. 50 15
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
  3. 352 96
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java
  4. 98 0
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java
  5. 9 3
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
  6. 55 2
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
  7. 16 8
      hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
  8. 18 8
      hadoop-hdds/common/src/main/resources/ozone-default.xml
  9. 3 0
      hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
  10. 0 27
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
  11. 185 152
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
  12. 22 5
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
  13. 44 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
  14. 19 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
  15. 1 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
  16. 67 185
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
  17. 1 19
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
  18. 213 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
  19. 34 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
  20. 1 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
  21. 2 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java
  22. 1 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/TestOzoneRestWithMiniCluster.java
  23. 22 20
      hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
  24. 6 0
      hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
  25. 6 0
      hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java

+ 23 - 5
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
@@ -47,6 +48,7 @@ import java.util.Map;
 import java.util.HashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
  * A Client for the storageContainer protocol.
@@ -163,7 +165,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
         // In case the command gets retried on a 2nd datanode,
         // sendCommandAsyncCall will create a new channel and async stub
         // in case these don't exist for the specific datanode.
-        responseProto = sendCommandAsync(request, dn).get();
+        responseProto = sendCommandAsync(request, dn).getResponse().get();
         if (responseProto.getResult() == ContainerProtos.Result.SUCCESS) {
           break;
         }
@@ -197,13 +199,23 @@ public class XceiverClientGrpc extends XceiverClientSpi {
    * @throws IOException
    */
   @Override
-  public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
+  public XceiverClientAsyncReply sendCommandAsync(
       ContainerCommandRequestProto request)
       throws IOException, ExecutionException, InterruptedException {
-    return sendCommandAsync(request, pipeline.getFirstNode());
+    XceiverClientAsyncReply asyncReply =
+        sendCommandAsync(request, pipeline.getFirstNode());
+
+    // TODO : for now make this API sync in nature as async requests are
+    // served out of order over XceiverClientGrpc. This needs to be fixed
+    // if this API is to be used for I/O path. Currently, this is not
+    // used for Read/Write Operation but for tests.
+    if (!HddsUtils.isReadOnly(request)) {
+      asyncReply.getResponse().get();
+    }
+    return asyncReply;
   }
 
-  private CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
+  private XceiverClientAsyncReply sendCommandAsync(
       ContainerCommandRequestProto request, DatanodeDetails dn)
       throws IOException, ExecutionException, InterruptedException {
     if (closed) {
@@ -257,7 +269,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
             });
     requestObserver.onNext(request);
     requestObserver.onCompleted();
-    return replyFuture;
+    return new XceiverClientAsyncReply(replyFuture);
   }
 
   private void reconnect(DatanodeDetails dn)
@@ -288,6 +300,12 @@ public class XceiverClientGrpc extends XceiverClientSpi {
     // For stand alone pipeline, there is no notion called destroy pipeline.
   }
 
+  @Override
+  public void watchForCommit(long index, long timeout)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    // there is no notion of watch for commit index in standalone pipeline
+  };
+
   /**
    * Returns pipeline Type.
    *

+ 50 - 15
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java

@@ -50,9 +50,12 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -192,9 +195,22 @@ public final class XceiverClientRatis extends XceiverClientSpi {
         getClient().sendAsync(() -> byteString);
   }
 
-  public void watchForCommit(long index, long timeout) throws Exception {
-    getClient().sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED)
-        .get(timeout, TimeUnit.MILLISECONDS);
+  @Override
+  public void watchForCommit(long index, long timeout)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    // TODO: Create a new Raft client instance to watch
+    CompletableFuture<RaftClientReply> replyFuture = getClient()
+        .sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
+    try {
+      replyFuture.get(timeout, TimeUnit.MILLISECONDS);
+    } catch (TimeoutException toe) {
+      LOG.warn("3 way commit failed ", toe);
+      getClient()
+          .sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
+          .get(timeout, TimeUnit.MILLISECONDS);
+      LOG.info("Could not commit " + index + " to all the nodes."
+          + "Committed by majority.");
+    }
   }
   /**
    * Sends a given command to server gets a waitable future back.
@@ -204,18 +220,37 @@ public final class XceiverClientRatis extends XceiverClientSpi {
    * @throws IOException
    */
   @Override
-  public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
+  public XceiverClientAsyncReply sendCommandAsync(
       ContainerCommandRequestProto request) {
-    return sendRequestAsync(request).whenComplete((reply, e) ->
-          LOG.debug("received reply {} for request: {} exception: {}", request,
-              reply, e))
-        .thenApply(reply -> {
-          try {
-            return ContainerCommandResponseProto.parseFrom(
-                reply.getMessage().getContent());
-          } catch (InvalidProtocolBufferException e) {
-            throw new CompletionException(e);
-          }
-        });
+    XceiverClientAsyncReply asyncReply = new XceiverClientAsyncReply(null);
+    CompletableFuture<RaftClientReply> raftClientReply =
+        sendRequestAsync(request);
+    Collection<XceiverClientAsyncReply.CommitInfo> commitInfos =
+        new ArrayList<>();
+    CompletableFuture<ContainerCommandResponseProto> containerCommandResponse =
+        raftClientReply.whenComplete((reply, e) -> LOG
+            .debug("received reply {} for request: {} exception: {}", request,
+                reply, e))
+            .thenApply(reply -> {
+              try {
+                ContainerCommandResponseProto response =
+                    ContainerCommandResponseProto
+                        .parseFrom(reply.getMessage().getContent());
+                reply.getCommitInfos().forEach(e -> {
+                  XceiverClientAsyncReply.CommitInfo commitInfo =
+                      new XceiverClientAsyncReply.CommitInfo(
+                          e.getServer().getAddress(), e.getCommitIndex());
+                  commitInfos.add(commitInfo);
+                  asyncReply.setCommitInfos(commitInfos);
+                  asyncReply.setLogIndex(reply.getLogIndex());
+                });
+                return response;
+              } catch (InvalidProtocolBufferException e) {
+                throw new CompletionException(e);
+              }
+            });
+    asyncReply.setResponse(containerCommandResponse);
+    return asyncReply;
   }
+
 }

+ 352 - 96
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java

@@ -17,10 +17,10 @@
  */
 
 package org.apache.hadoop.hdds.scm.storage;
-
-
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientAsyncReply;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
@@ -29,16 +29,24 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
 import org.apache.hadoop.hdds.client.BlockID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.UUID;
-
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
-    .putBlock;
+    .putBlockAsync;
 import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
-    .writeChunk;
+    .writeChunkAsync;
 
 /**
  * An {@link OutputStream} used by the REST service in combination with the
@@ -57,6 +65,8 @@ import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
  * through to the container.
  */
 public class ChunkOutputStream extends OutputStream {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ChunkOutputStream.class);
 
   private BlockID blockID;
   private final String key;
@@ -64,67 +74,97 @@ public class ChunkOutputStream extends OutputStream {
   private final BlockData.Builder containerBlockData;
   private XceiverClientManager xceiverClientManager;
   private XceiverClientSpi xceiverClient;
-  private ByteBuffer buffer;
   private final String streamId;
   private int chunkIndex;
   private int chunkSize;
+  private final long streamBufferFlushSize;
+  private final long streamBufferMaxSize;
+  private final long watchTimeout;
+  private ByteBuffer buffer;
+  // The IOException will be set by response handling thread in case there is an
+  // exception received in the response. If the exception is set, the next
+  // request will fail upfront.
+  private IOException ioException;
+  private ExecutorService responseExecutor;
+
+  // position of the buffer where the last flush was attempted
+  private int lastFlushPos;
+
+  // position of the buffer till which the flush was successfully
+  // acknowledged by all nodes in pipeline
+  private int lastSuccessfulFlushIndex;
+
+  // list to hold up all putBlock futures
+  private List<CompletableFuture<ContainerProtos.ContainerCommandResponseProto>>
+      futureList;
+  // list maintaining commit indexes for putBlocks
+  private List<Long> commitIndexList;
 
   /**
    * Creates a new ChunkOutputStream.
    *
-   * @param blockID block ID
-   * @param key chunk key
+   * @param blockID              block ID
+   * @param key                  chunk key
    * @param xceiverClientManager client manager that controls client
-   * @param xceiverClient client to perform container calls
-   * @param traceID container protocol call args
-   * @param chunkSize chunk size
+   * @param xceiverClient        client to perform container calls
+   * @param traceID              container protocol call args
+   * @param chunkSize            chunk size
    */
   public ChunkOutputStream(BlockID blockID, String key,
-       XceiverClientManager xceiverClientManager,
-       XceiverClientSpi xceiverClient, String traceID, int chunkSize) {
+      XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient,
+      String traceID, int chunkSize, long streamBufferFlushSize,
+      long streamBufferMaxSize, long watchTimeout, ByteBuffer buffer) {
     this.blockID = blockID;
     this.key = key;
     this.traceID = traceID;
     this.chunkSize = chunkSize;
-    KeyValue keyValue = KeyValue.newBuilder()
-        .setKey("TYPE").setValue("KEY").build();
-    this.containerBlockData = BlockData.newBuilder()
-        .setBlockID(blockID.getDatanodeBlockIDProtobuf())
-        .addMetadata(keyValue);
+    KeyValue keyValue =
+        KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build();
+    this.containerBlockData =
+        BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf())
+            .addMetadata(keyValue);
     this.xceiverClientManager = xceiverClientManager;
     this.xceiverClient = xceiverClient;
-    this.buffer = ByteBuffer.allocate(chunkSize);
     this.streamId = UUID.randomUUID().toString();
     this.chunkIndex = 0;
-  }
+    this.streamBufferFlushSize = streamBufferFlushSize;
+    this.streamBufferMaxSize = streamBufferMaxSize;
+    this.watchTimeout = watchTimeout;
+    this.buffer = buffer;
+    this.ioException = null;
 
-  public ByteBuffer getBuffer() {
-    return buffer;
+    // A single thread executor handle the responses of async requests
+    responseExecutor = Executors.newSingleThreadExecutor();
+    commitIndexList = new ArrayList<>();
+    lastSuccessfulFlushIndex = 0;
+    futureList = new ArrayList<>();
+    lastFlushPos = 0;
   }
 
   public BlockID getBlockID() {
     return blockID;
   }
 
+  public int getLastSuccessfulFlushIndex() {
+    return lastSuccessfulFlushIndex;
+  }
+
+
   @Override
   public void write(int b) throws IOException {
     checkOpen();
-    int rollbackPosition = buffer.position();
-    int rollbackLimit = buffer.limit();
-    buffer.put((byte)b);
-    if (buffer.position() == chunkSize) {
-      flushBufferToChunk(rollbackPosition, rollbackLimit);
-    }
+    byte[] buf = new byte[1];
+    buf[0] = (byte) b;
+    write(buf, 0, 1);
   }
 
   @Override
-  public void write(byte[] b, int off, int len)
-      throws IOException {
+  public void write(byte[] b, int off, int len) throws IOException {
     if (b == null) {
       throw new NullPointerException();
     }
-    if ((off < 0) || (off > b.length) || (len < 0) ||
-        ((off + len) > b.length) || ((off + len) < 0)) {
+    if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+        || ((off + len) < 0)) {
       throw new IndexOutOfBoundsException();
     }
     if (len == 0) {
@@ -132,93 +172,300 @@ public class ChunkOutputStream extends OutputStream {
     }
     checkOpen();
     while (len > 0) {
-      int writeLen = Math.min(chunkSize - buffer.position(), len);
-      int rollbackPosition = buffer.position();
-      int rollbackLimit = buffer.limit();
+      int writeLen;
+      writeLen = Math.min(chunkSize - buffer.position() % chunkSize, len);
       buffer.put(b, off, writeLen);
-      if (buffer.position() == chunkSize) {
-        flushBufferToChunk(rollbackPosition, rollbackLimit);
+      if (buffer.position() % chunkSize == 0) {
+        int pos = buffer.position() - chunkSize;
+        int limit = buffer.position();
+        writeChunk(pos, limit);
       }
       off += writeLen;
       len -= writeLen;
+      if (buffer.position() >= streamBufferFlushSize
+          && buffer.position() % streamBufferFlushSize == 0) {
+
+        lastFlushPos = buffer.position();
+        futureList.add(handlePartialFlush());
+      }
+      if (buffer.position() >= streamBufferMaxSize
+          && buffer.position() % streamBufferMaxSize == 0) {
+        handleFullBuffer();
+      }
+    }
+  }
+
+  /**
+   * Will be called on the retryPath in case closedContainerException/
+   * TimeoutException.
+   * @param len length of data to write
+   * @throws IOException if error occured
+   */
+
+  // In this case, the data is already cached in the buffer.
+  public void writeOnRetry(int len) throws IOException {
+    if (len == 0) {
+      return;
+    }
+    int off = 0;
+    checkOpen();
+    while (len > 0) {
+      int writeLen;
+      writeLen = Math.min(chunkSize, len);
+      if (writeLen == chunkSize) {
+        int pos = off;
+        int limit = pos + chunkSize;
+        writeChunk(pos, limit);
+      }
+      off += writeLen;
+      len -= writeLen;
+      if (off % streamBufferFlushSize == 0) {
+        lastFlushPos = off;
+        futureList.add(handlePartialFlush());
+      }
+      if (off % streamBufferMaxSize == 0) {
+        handleFullBuffer();
+      }
+    }
+  }
+
+  private void handleResponse(
+      ContainerProtos.ContainerCommandResponseProto response,
+      XceiverClientAsyncReply asyncReply) {
+    validateResponse(response);
+    discardBuffer(asyncReply);
+  }
+
+  private void discardBuffer(XceiverClientAsyncReply asyncReply) {
+    if (!commitIndexList.isEmpty()) {
+      long index = commitIndexList.get(0);
+      if (checkIfBufferDiscardRequired(asyncReply, index)) {
+        updateFlushIndex();
+      }
+    }
+  }
+
+  /**
+   * just update the lastSuccessfulFlushIndex. Since we have allocated
+   * the buffer more than the streamBufferMaxSize, we can keep on writing
+   * to the buffer. In case of failure, we will read the data starting from
+   * lastSuccessfulFlushIndex.
+   */
+  private void updateFlushIndex() {
+    lastSuccessfulFlushIndex += streamBufferFlushSize;
+    LOG.debug("Discarding buffer till pos " + lastSuccessfulFlushIndex);
+    if (!commitIndexList.isEmpty()) {
+      commitIndexList.remove(0);
+      futureList.remove(0);
+    }
+
+  }
+  /**
+   * Check if the last commitIndex stored at the beginning of the
+   * commitIndexList is less than equal to current commitInfo indexes.
+   * If its true, the buffer has been successfully flushed till the
+   * last position where flush happened.
+   */
+  private boolean checkIfBufferDiscardRequired(
+      XceiverClientAsyncReply asyncReply, long commitIndex) {
+    if (asyncReply.getCommitInfos() != null) {
+      for (XceiverClientAsyncReply.CommitInfo info : asyncReply
+          .getCommitInfos()) {
+        if (info.getCommitIndex() < commitIndex) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   * This is a blocking call.It will wait for the flush till the commit index
+   * at the head of the commitIndexList gets replicated to all or majority.
+   * @throws IOException
+   */
+  private void handleFullBuffer() throws IOException {
+    if (!commitIndexList.isEmpty()) {
+      watchForCommit(commitIndexList.get(0));
+    }
+  }
+
+  /**
+   * calls watchForCommit API of the Ratis Client. For Standalone client,
+   * it is a no op.
+   * @param commitIndex log index to watch for
+   * @throws IOException IOException in case watch gets timed out
+   */
+  private void watchForCommit(long commitIndex) throws IOException {
+    checkOpen();
+    Preconditions.checkState(!commitIndexList.isEmpty());
+    try {
+      xceiverClient.watchForCommit(commitIndex, watchTimeout);
+    } catch (TimeoutException | InterruptedException | ExecutionException e) {
+      LOG.warn("watchForCommit failed for index " + commitIndex, e);
+      throw new IOException(
+          "Unexpected Storage Container Exception: " + e.toString(), e);
+    }
+  }
+
+  private CompletableFuture<ContainerProtos.
+      ContainerCommandResponseProto> handlePartialFlush()
+      throws IOException {
+    String requestId =
+        traceID + ContainerProtos.Type.PutBlock + chunkIndex + blockID;
+    try {
+      XceiverClientAsyncReply asyncReply =
+          putBlockAsync(xceiverClient, containerBlockData.build(), requestId);
+      CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
+          asyncReply.getResponse();
+
+      return future.thenApplyAsync(e -> {
+        handleResponse(e, asyncReply);
+        // if the ioException is not set, putBlock is successful
+        if (ioException == null) {
+          LOG.debug(
+              "Adding index " + asyncReply.getLogIndex() + " commitList size "
+                  + commitIndexList.size());
+          BlockID responseBlockID = BlockID.getFromProtobuf(
+              e.getPutBlock().getCommittedBlockLength().getBlockID());
+          Preconditions.checkState(blockID.getContainerBlockID()
+              .equals(responseBlockID.getContainerBlockID()));
+          // updates the bcsId of the block
+          blockID = responseBlockID;
+          long index = asyncReply.getLogIndex();
+          // for standalone protocol, logIndex will always be 0.
+          if (index != 0) {
+            commitIndexList.add(index);
+          } else {
+            updateFlushIndex();
+          }
+        }
+        return e;
+      }, responseExecutor);
+    } catch (IOException | InterruptedException | ExecutionException e) {
+      throw new IOException(
+          "Unexpected Storage Container Exception: " + e.toString(), e);
     }
   }
 
   @Override
   public void flush() throws IOException {
-    checkOpen();
-    if (buffer.position() > 0) {
-      int rollbackPosition = buffer.position();
-      int rollbackLimit = buffer.limit();
-      flushBufferToChunk(rollbackPosition, rollbackLimit);
+    if (xceiverClientManager != null && xceiverClient != null
+        && buffer != null) {
+      checkOpen();
+      if (buffer.position() > 0 && lastSuccessfulFlushIndex != buffer
+          .position()) {
+        try {
+
+          // flush the last chunk data residing on the buffer
+          if (buffer.position() % chunkSize > 0) {
+            int pos = buffer.position() - (buffer.position() % chunkSize);
+            writeChunk(pos, buffer.position());
+          }
+          if (lastFlushPos != buffer.position()) {
+            lastFlushPos = buffer.position();
+            handlePartialFlush();
+          }
+          CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
+              futureList.toArray(new CompletableFuture[futureList.size()]));
+          combinedFuture.get();
+          // just check again if the exception is hit while waiting for the
+          // futures to ensure flush has indeed succeeded
+          checkOpen();
+        } catch (InterruptedException | ExecutionException e) {
+          throw new IOException(
+              "Unexpected Storage Container Exception: " + e.toString(), e);
+        }
+      }
     }
   }
 
+  private void writeChunk(int pos, int limit) throws IOException {
+    // Please note : We are not flipping the slice when we write since
+    // the slices are pointing the buffer start and end as needed for
+    // the chunk write. Also please note, Duplicate does not create a
+    // copy of data, it only creates metadata that points to the data
+    // stream.
+    ByteBuffer chunk = buffer.duplicate();
+    chunk.position(pos);
+    chunk.limit(limit);
+    writeChunkToContainer(chunk);
+  }
+
   @Override
   public void close() throws IOException {
     if (xceiverClientManager != null && xceiverClient != null
         && buffer != null) {
-      if (buffer.position() > 0) {
-        writeChunkToContainer();
-      }
       try {
-        ContainerProtos.PutBlockResponseProto responseProto =
-            putBlock(xceiverClient, containerBlockData.build(), traceID);
-        BlockID responseBlockID = BlockID.getFromProtobuf(
-            responseProto.getCommittedBlockLength().getBlockID());
-        Preconditions.checkState(blockID.getContainerBlockID()
-            .equals(responseBlockID.getContainerBlockID()));
-        // updates the bcsId of the block
-        blockID = responseBlockID;
-      } catch (IOException e) {
+        if (buffer.position() > lastFlushPos) {
+          int pos = buffer.position() - (buffer.position() % chunkSize);
+          writeChunk(pos, buffer.position());
+          futureList.add(handlePartialFlush());
+        }
+        CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
+            futureList.toArray(new CompletableFuture[futureList.size()]));
+
+        // wait for all the transactions to complete
+        combinedFuture.get();
+
+        // irrespective of whether the commitIndexList is empty or not,
+        // ensure there is no exception set(For Standalone Protocol)
+        checkOpen();
+        if (!commitIndexList.isEmpty()) {
+          // wait for the last commit index in the commitIndexList to get
+          // committed to all or majority of nodes in case timeout happens.
+          long lastIndex = commitIndexList.get(commitIndexList.size() - 1);
+          LOG.debug(
+              "waiting for last flush Index " + lastIndex + " to catch up");
+          watchForCommit(lastIndex);
+          updateFlushIndex();
+        }
+      } catch (InterruptedException | ExecutionException e) {
         throw new IOException(
             "Unexpected Storage Container Exception: " + e.toString(), e);
       } finally {
         cleanup();
       }
     }
+    // clear the buffer
+    buffer.clear();
+  }
+
+  private void validateResponse(
+      ContainerProtos.ContainerCommandResponseProto responseProto) {
+    try {
+      ContainerProtocolCalls.validateContainerResponse(responseProto);
+    } catch (StorageContainerException sce) {
+      ioException = new IOException(
+          "Unexpected Storage Container Exception: " + sce.toString(), sce);
+    }
   }
 
   public void cleanup() {
-    xceiverClientManager.releaseClient(xceiverClient);
+    if (xceiverClientManager != null) {
+      xceiverClientManager.releaseClient(xceiverClient);
+    }
     xceiverClientManager = null;
     xceiverClient = null;
-    buffer = null;
+    if (futureList != null) {
+      futureList.clear();
+    }
+    futureList = null;
+    commitIndexList = null;
+    responseExecutor.shutdown();
   }
 
   /**
-   * Checks if the stream is open.  If not, throws an exception.
+   * Checks if the stream is open or exception has occured.
+   * If not, throws an exception.
    *
    * @throws IOException if stream is closed
    */
   private void checkOpen() throws IOException {
     if (xceiverClient == null) {
       throw new IOException("ChunkOutputStream has been closed.");
-    }
-  }
-
-  /**
-   * Attempts to flush buffered writes by writing a new chunk to the container.
-   * If successful, then clears the buffer to prepare to receive writes for a
-   * new chunk.
-   *
-   * @param rollbackPosition position to restore in buffer if write fails
-   * @param rollbackLimit limit to restore in buffer if write fails
-   * @throws IOException if there is an I/O error while performing the call
-   */
-  private void flushBufferToChunk(int rollbackPosition,
-      int rollbackLimit) throws IOException {
-    boolean success = false;
-    try {
-      writeChunkToContainer();
-      success = true;
-    } finally {
-      if (success) {
-        buffer.clear();
-      } else {
-        buffer.position(rollbackPosition);
-        buffer.limit(rollbackLimit);
-      }
+    } else if (ioException != null) {
+      throw ioException;
     }
   }
 
@@ -228,23 +475,32 @@ public class ChunkOutputStream extends OutputStream {
    *
    * @throws IOException if there is an I/O error while performing the call
    */
-  private void writeChunkToContainer() throws IOException {
-    buffer.flip();
-    ByteString data = ByteString.copyFrom(buffer);
-    ChunkInfo chunk = ChunkInfo
-        .newBuilder()
-        .setChunkName(
-            DigestUtils.md5Hex(key) + "_stream_"
-                + streamId + "_chunk_" + ++chunkIndex)
-        .setOffset(0)
-        .setLen(data.size())
-        .build();
+  private void writeChunkToContainer(ByteBuffer chunk) throws IOException {
+    int effectiveChunkSize = chunk.remaining();
+    ByteString data = ByteString.copyFrom(chunk);
+    ChunkInfo chunkInfo = ChunkInfo.newBuilder().setChunkName(
+        DigestUtils.md5Hex(key) + "_stream_" + streamId + "_chunk_"
+            + ++chunkIndex).setOffset(0).setLen(effectiveChunkSize).build();
+    // generate a unique requestId
+    String requestId =
+        traceID + ContainerProtos.Type.WriteChunk + chunkIndex + chunkInfo
+            .getChunkName();
     try {
-      writeChunk(xceiverClient, chunk, blockID, data, traceID);
-    } catch (IOException e) {
+      XceiverClientAsyncReply asyncReply =
+          writeChunkAsync(xceiverClient, chunkInfo, blockID, data, requestId);
+      CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
+          asyncReply.getResponse();
+      future.thenApplyAsync(e -> {
+        handleResponse(e, asyncReply);
+        return e;
+      }, responseExecutor);
+    } catch (IOException | InterruptedException | ExecutionException e) {
       throw new IOException(
           "Unexpected Storage Container Exception: " + e.toString(), e);
     }
-    containerBlockData.addChunks(chunk);
+    LOG.debug(
+        "writing chunk " + chunkInfo.getChunkName() + " blockID " + blockID
+            + " length " + chunk.remaining());
+    containerBlockData.addChunks(chunkInfo);
   }
 }

+ 98 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java

@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm;
+
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This class represents the Async reply from XceiverClient.
+ */
+public class XceiverClientAsyncReply {
+
+  private CompletableFuture<ContainerCommandResponseProto> response;
+  private Long logIndex;
+  private Collection<CommitInfo> commitInfos;
+
+  public XceiverClientAsyncReply(
+      CompletableFuture<ContainerCommandResponseProto> response) {
+    this(response, 0, null);
+  }
+
+  public XceiverClientAsyncReply(
+      CompletableFuture<ContainerCommandResponseProto> response, long index,
+      Collection<CommitInfo> commitInfos) {
+    this.commitInfos = commitInfos;
+    this.logIndex = index;
+    this.response = response;
+  }
+
+  /**
+   * A class having details about latest commitIndex for each server in the
+   * Ratis pipeline. For Standalone pipeline, commitInfo will be null.
+   */
+  public static class CommitInfo {
+
+    private final String server;
+
+    private final Long commitIndex;
+
+    public CommitInfo(String server, long commitIndex) {
+      this.server = server;
+      this.commitIndex = commitIndex;
+    }
+
+    public String getServer() {
+      return server;
+    }
+
+    public long getCommitIndex() {
+      return commitIndex;
+    }
+  }
+
+  public Collection<CommitInfo> getCommitInfos() {
+    return commitInfos;
+  }
+
+  public CompletableFuture<ContainerCommandResponseProto> getResponse() {
+    return response;
+  }
+
+  public long getLogIndex() {
+    return logIndex;
+  }
+
+  public void setCommitInfos(Collection<CommitInfo> commitInfos) {
+    this.commitInfos = commitInfos;
+  }
+
+  public void setLogIndex(Long logIndex) {
+    this.logIndex = logIndex;
+  }
+
+  public void setResponse(
+      CompletableFuture<ContainerCommandResponseProto> response) {
+    this.response = response;
+  }
+}

+ 9 - 3
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java

@@ -28,8 +28,8 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -98,7 +98,10 @@ public abstract class XceiverClientSpi implements Closeable {
   public ContainerCommandResponseProto sendCommand(
       ContainerCommandRequestProto request) throws IOException {
     try {
-      return sendCommandAsync(request).get();
+      XceiverClientAsyncReply reply;
+      reply = sendCommandAsync(request);
+      ContainerCommandResponseProto responseProto = reply.getResponse().get();
+      return responseProto;
     } catch (ExecutionException | InterruptedException e) {
       throw new IOException("Failed to command " + request, e);
     }
@@ -111,7 +114,7 @@ public abstract class XceiverClientSpi implements Closeable {
    * @return Response to the command
    * @throws IOException
    */
-  public abstract CompletableFuture<ContainerCommandResponseProto>
+  public abstract XceiverClientAsyncReply
       sendCommandAsync(ContainerCommandRequestProto request)
       throws IOException, ExecutionException, InterruptedException;
 
@@ -132,4 +135,7 @@ public abstract class XceiverClientSpi implements Closeable {
    * @return - {Stand_Alone, Ratis or Chained}
    */
   public abstract HddsProtos.ReplicationType getPipelineType();
+
+  public abstract void watchForCommit(long index, long timeout)
+      throws InterruptedException, ExecutionException, TimeoutException;
 }

+ 55 - 2
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdds.scm.storage;
 
+import org.apache.hadoop.hdds.scm.XceiverClientAsyncReply;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .BlockNotCommittedException;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -64,6 +65,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
 import org.apache.hadoop.hdds.client.BlockID;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutionException;
 
 /**
  * Implementation of all container protocol calls performed by Container
@@ -162,6 +164,31 @@ public final class ContainerProtocolCalls  {
     return response.getPutBlock();
   }
 
+  /**
+   * Calls the container protocol to put a container block.
+   *
+   * @param xceiverClient client to perform call
+   * @param containerBlockData block data to identify container
+   * @param traceID container protocol call args
+   * @return putBlockResponse
+   * @throws Exception if there is an error while performing the call
+   */
+  public static XceiverClientAsyncReply putBlockAsync(
+      XceiverClientSpi xceiverClient, BlockData containerBlockData,
+      String traceID)
+      throws IOException, InterruptedException, ExecutionException {
+    PutBlockRequestProto.Builder createBlockRequest =
+        PutBlockRequestProto.newBuilder().setBlockData(containerBlockData);
+    String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
+    ContainerCommandRequestProto request =
+        ContainerCommandRequestProto.newBuilder().setCmdType(Type.PutBlock)
+            .setContainerID(containerBlockData.getBlockID().getContainerID())
+            .setTraceID(traceID).setDatanodeUuid(id)
+            .setPutBlock(createBlockRequest).build();
+    xceiverClient.sendCommand(request);
+    return xceiverClient.sendCommandAsync(request);
+  }
+
   /**
    * Calls the container protocol to read a chunk.
    *
@@ -200,7 +227,7 @@ public final class ContainerProtocolCalls  {
    * @param blockID ID of the block
    * @param data the data of the chunk to write
    * @param traceID container protocol call args
-   * @throws IOException if there is an I/O error while performing the call
+   * @throws Exception if there is an error while performing the call
    */
   public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk,
       BlockID blockID, ByteString data, String traceID)
@@ -223,6 +250,32 @@ public final class ContainerProtocolCalls  {
     validateContainerResponse(response);
   }
 
+  /**
+   * Calls the container protocol to write a chunk.
+   *
+   * @param xceiverClient client to perform call
+   * @param chunk information about chunk to write
+   * @param blockID ID of the block
+   * @param data the data of the chunk to write
+   * @param traceID container protocol call args
+   * @throws IOException if there is an I/O error while performing the call
+   */
+  public static XceiverClientAsyncReply writeChunkAsync(
+      XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID,
+      ByteString data, String traceID)
+      throws IOException, ExecutionException, InterruptedException {
+    WriteChunkRequestProto.Builder writeChunkRequest =
+        WriteChunkRequestProto.newBuilder()
+            .setBlockID(blockID.getDatanodeBlockIDProtobuf())
+            .setChunkData(chunk).setData(data);
+    String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
+    ContainerCommandRequestProto request =
+        ContainerCommandRequestProto.newBuilder().setCmdType(Type.WriteChunk)
+            .setContainerID(blockID.getContainerID()).setTraceID(traceID)
+            .setDatanodeUuid(id).setWriteChunk(writeChunkRequest).build();
+    return xceiverClient.sendCommandAsync(request);
+  }
+
   /**
    * Allows writing a small file using single RPC. This takes the container
    * name, block name and data to write sends all that data to the container
@@ -420,7 +473,7 @@ public final class ContainerProtocolCalls  {
    * @param response container protocol call response
    * @throws IOException if the container protocol call failed
    */
-  private static void validateContainerResponse(
+  public static void validateContainerResponse(
       ContainerCommandResponseProto response
   ) throws StorageContainerException {
     if (response.getResult() == ContainerProtos.Result.SUCCESS) {

+ 16 - 8
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java

@@ -112,6 +112,22 @@ public final class OzoneConfigKeys {
   public static final String OZONE_CLIENT_PROTOCOL =
       "ozone.client.protocol";
 
+  public static final String OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE =
+      "ozone.client.stream.buffer.flush.size";
+
+  public static final long OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE_DEFAULT = 64;
+
+  public static final String OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE =
+      "ozone.client.stream.buffer.max.size";
+
+  public static final long OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE_DEFAULT = 128;
+
+  public static final String OZONE_CLIENT_WATCH_REQUEST_TIMEOUT =
+      "ozone.client.watch.request.timeout";
+
+  public static final String OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT =
+      "30s";
+
   // This defines the overall connection limit for the connection pool used in
   // RestClient.
   public static final String OZONE_REST_CLIENT_HTTP_CONNECTION_MAX =
@@ -192,14 +208,6 @@ public final class OzoneConfigKeys {
   public static final int
       OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT = 10;
 
-  public static final String OZONE_CLIENT_MAX_RETRIES =
-      "ozone.client.max.retries";
-  public static final int OZONE_CLIENT_MAX_RETRIES_DEFAULT = 50;
-
-  public static final String OZONE_CLIENT_RETRY_INTERVAL =
-      "ozone.client.retry.interval";
-  public static final String OZONE_CLIENT_RETRY_INTERVAL_DEFAULT = "200ms";
-
   public static final String DFS_CONTAINER_RATIS_ENABLED_KEY
       = ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
   public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT

+ 18 - 8
hadoop-hdds/common/src/main/resources/ozone-default.xml

@@ -335,19 +335,29 @@
     </description>
   </property>
   <property>
-    <name>ozone.client.max.retries</name>
-    <value>50</value>
+    <name>ozone.client.stream.buffer.flush.size</name>
+    <value>64</value>
     <tag>OZONE, CLIENT</tag>
-    <description>Maximum number of retries by Ozone Client on encountering
-      exception while fetching committed block length.
+    <description>Size in mb which determines at what buffer position , a partial
+      flush will be initiated during write. It should be ideally a mutiple
+      of chunkSize.
     </description>
   </property>
   <property>
-    <name>ozone.client.retry.interval</name>
-    <value>200ms</value>
+    <name>ozone.client.stream.buffer.max.size</name>
+    <value>128</value>
+    <tag>OZONE, CLIENT</tag>
+    <description>Size in mb which determines at what buffer position ,
+      write call be blocked till acknowledgement of the fisrt partial flush
+      happens by all servers.
+    </description>
+  </property>
+  <property>
+    <name>ozone.client.watch.request.timeout</name>
+    <value>30s</value>
     <tag>OZONE, CLIENT</tag>
-    <description>Interval between retries by Ozone Client on encountering
-      exception while fetching committed block length.
+    <description>Timeout for the watch API in Ratis client to acknowledge
+      a particular request getting replayed to all servers.
     </description>
   </property>
   <property>

+ 3 - 0
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java

@@ -121,6 +121,9 @@ public class BlockManagerImpl implements BlockManager {
     container.updateBlockCommitSequenceId(bcsId);
     // Increment keycount here
     container.getContainerData().incrKeyCount();
+    LOG.debug(
+        "Block " + data.getBlockID() + " successfully committed with bcsId "
+            + bcsId + " chunk size " + data.getChunks().size());
     return data.getSize();
   }
 

+ 0 - 27
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java

@@ -17,23 +17,14 @@
  */
 package org.apache.hadoop.ozone.client;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.client.OzoneQuota;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.scm.container.common.helpers.BlockNotCommittedException;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.rest.response.*;
 
 import java.util.ArrayList;
 import java.util.List;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
 /** A utility class for OzoneClient. */
 public final class OzoneClientUtils {
 
@@ -94,24 +85,6 @@ public final class OzoneClientUtils {
     return keyInfo;
   }
 
-  public static RetryPolicy createRetryPolicy(Configuration conf) {
-    int maxRetryCount =
-        conf.getInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, OzoneConfigKeys.
-            OZONE_CLIENT_MAX_RETRIES_DEFAULT);
-    long retryInterval = conf.getTimeDuration(OzoneConfigKeys.
-        OZONE_CLIENT_RETRY_INTERVAL, OzoneConfigKeys.
-        OZONE_CLIENT_RETRY_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
-    RetryPolicy basePolicy = RetryPolicies
-        .retryUpToMaximumCountWithFixedSleep(maxRetryCount, retryInterval,
-            TimeUnit.MILLISECONDS);
-    Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
-        new HashMap<Class<? extends Exception>, RetryPolicy>();
-    exceptionToPolicyMap.put(BlockNotCommittedException.class, basePolicy);
-    RetryPolicy retryPolicy = RetryPolicies
-        .retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL,
-            exceptionToPolicyMap);
-    return retryPolicy;
-  }
   /**
    * Returns a KeyInfoDetails object constructed using fields of the input
    * OzoneKeyDetails object.

+ 185 - 152
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java

@@ -24,11 +24,10 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -41,18 +40,17 @@ import org.apache.hadoop.hdds.scm.container.common.helpers
 import org.apache.hadoop.hdds.scm.protocolPB
     .StorageContainerLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.scm.storage.ChunkOutputStream;
-import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.ListIterator;
+import java.util.concurrent.TimeoutException;
 
 /**
  * Maintaining a list of ChunkInputStream. Write based on offset.
@@ -71,7 +69,6 @@ public class ChunkGroupOutputStream extends OutputStream {
   // array list's get(index) is O(1)
   private final ArrayList<ChunkOutputStreamEntry> streamEntries;
   private int currentStreamIndex;
-  private long byteOffset;
   private final OzoneManagerProtocolClientSideTranslatorPB omClient;
   private final
       StorageContainerLocationProtocolClientSideTranslatorPB scmClient;
@@ -81,7 +78,11 @@ public class ChunkGroupOutputStream extends OutputStream {
   private final int chunkSize;
   private final String requestID;
   private boolean closed;
-  private final RetryPolicy retryPolicy;
+  private final long streamBufferFlushSize;
+  private final long streamBufferMaxSize;
+  private final long watchTimeout;
+  private final long blockSize;
+  private ByteBuffer buffer;
   /**
    * A constructor for testing purpose only.
    */
@@ -96,7 +97,11 @@ public class ChunkGroupOutputStream extends OutputStream {
     chunkSize = 0;
     requestID = null;
     closed = false;
-    retryPolicy = null;
+    streamBufferFlushSize = 0;
+    streamBufferMaxSize = 0;
+    buffer = ByteBuffer.allocate(1);
+    watchTimeout = 0;
+    blockSize = 0;
   }
 
   /**
@@ -127,35 +132,54 @@ public class ChunkGroupOutputStream extends OutputStream {
           new OmKeyLocationInfo.Builder().setBlockID(streamEntry.blockID)
               .setLength(streamEntry.currentPosition).setOffset(0)
               .build();
+      LOG.debug("block written " + streamEntry.blockID + ", length "
+          + streamEntry.currentPosition + " bcsID " + streamEntry.blockID
+          .getBlockCommitSequenceId());
       locationInfoList.add(info);
     }
     return locationInfoList;
   }
 
-  public ChunkGroupOutputStream(
-      OpenKeySession handler, XceiverClientManager xceiverClientManager,
+  public ChunkGroupOutputStream(OpenKeySession handler,
+      XceiverClientManager xceiverClientManager,
       StorageContainerLocationProtocolClientSideTranslatorPB scmClient,
-      OzoneManagerProtocolClientSideTranslatorPB omClient,
-      int chunkSize, String requestId, ReplicationFactor factor,
-      ReplicationType type, RetryPolicy retryPolicy) throws IOException {
+      OzoneManagerProtocolClientSideTranslatorPB omClient, int chunkSize,
+      String requestId, ReplicationFactor factor, ReplicationType type,
+      long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout) {
     this.streamEntries = new ArrayList<>();
     this.currentStreamIndex = 0;
-    this.byteOffset = 0;
     this.omClient = omClient;
     this.scmClient = scmClient;
     OmKeyInfo info = handler.getKeyInfo();
-    this.keyArgs = new OmKeyArgs.Builder()
-        .setVolumeName(info.getVolumeName())
-        .setBucketName(info.getBucketName())
-        .setKeyName(info.getKeyName())
-        .setType(type)
-        .setFactor(factor)
-        .setDataSize(info.getDataSize()).build();
+    this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName())
+        .setBucketName(info.getBucketName()).setKeyName(info.getKeyName())
+        .setType(type).setFactor(factor).setDataSize(info.getDataSize())
+        .build();
     this.openID = handler.getId();
     this.xceiverClientManager = xceiverClientManager;
     this.chunkSize = chunkSize;
     this.requestID = requestId;
-    this.retryPolicy = retryPolicy;
+    this.streamBufferFlushSize = bufferFlushSize * OzoneConsts.MB;
+    this.streamBufferMaxSize = bufferMaxSize * OzoneConsts.MB;
+    this.blockSize = size * OzoneConsts.MB;
+    this.watchTimeout = watchTimeout;
+
+    Preconditions.checkState(chunkSize > 0);
+    Preconditions.checkState(streamBufferFlushSize > 0);
+    Preconditions.checkState(streamBufferMaxSize > 0);
+    Preconditions.checkState(blockSize > 0);
+    Preconditions.checkState(streamBufferFlushSize % chunkSize == 0);
+    Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0);
+    Preconditions.checkState(blockSize % streamBufferMaxSize == 0);
+
+    // This byteBuffer will be used to cache data until all the blockCommits
+    // (putBlock) gets replicated to all/majority servers. The idea here is to
+    // allocate the buffer of size blockSize so that as and when a chunk is
+    // is replicated to all servers, as a part of discarding the buffer, we
+    // don't necessarily need to run compaction(buffer.compact() on the buffer
+    // to actually discard the acknowledged data. Compaction is inefficient so
+    // it would be a better choice to avoid compaction on the happy I/O path.
+    this.buffer = ByteBuffer.allocate((int) blockSize);
   }
 
   /**
@@ -191,12 +215,13 @@ public class ChunkGroupOutputStream extends OutputStream {
         xceiverClientManager.acquireClient(containerWithPipeline.getPipeline());
     streamEntries.add(new ChunkOutputStreamEntry(subKeyInfo.getBlockID(),
         keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID,
-        chunkSize, subKeyInfo.getLength()));
+        chunkSize, subKeyInfo.getLength(), streamBufferFlushSize,
+        streamBufferMaxSize, watchTimeout, buffer));
   }
 
   @VisibleForTesting
   public long getByteOffset() {
-    return byteOffset;
+    return getKeyLength();
   }
 
 
@@ -223,21 +248,23 @@ public class ChunkGroupOutputStream extends OutputStream {
   public void write(byte[] b, int off, int len)
       throws IOException {
     checkNotClosed();
-    handleWrite(b, off, len);
+    handleWrite(b, off, len, false, buffer.position());
   }
 
-  private void handleWrite(byte[] b, int off, int len) throws IOException {
+  private void handleWrite(byte[] b, int off, int len, boolean retry,
+      int pos) throws IOException {
     if (b == null) {
       throw new NullPointerException();
     }
-    if ((off < 0) || (off > b.length) || (len < 0) ||
-        ((off + len) > b.length) || ((off + len) < 0)) {
+    if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+        || ((off + len) < 0)) {
       throw new IndexOutOfBoundsException();
     }
     if (len == 0) {
       return;
     }
     int succeededAllocates = 0;
+    int initialPos;
     while (len > 0) {
       if (streamEntries.size() <= currentStreamIndex) {
         Preconditions.checkNotNull(omClient);
@@ -247,8 +274,8 @@ public class ChunkGroupOutputStream extends OutputStream {
           allocateNewBlock(currentStreamIndex);
           succeededAllocates += 1;
         } catch (IOException ioe) {
-          LOG.error("Try to allocate more blocks for write failed, already " +
-              "allocated " + succeededAllocates + " blocks for this write.");
+          LOG.error("Try to allocate more blocks for write failed, already "
+              + "allocated " + succeededAllocates + " blocks for this write.");
           throw ioe;
         }
       }
@@ -257,12 +284,19 @@ public class ChunkGroupOutputStream extends OutputStream {
       Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
       ChunkOutputStreamEntry current = streamEntries.get(currentStreamIndex);
       int writeLen = Math.min(len, (int) current.getRemaining());
+      initialPos = pos < buffer.position() ? pos : buffer.position();
       try {
-        current.write(b, off, writeLen);
+        if (retry) {
+          current.writeOnRetry(len);
+        } else {
+          current.write(b, off, writeLen);
+        }
       } catch (IOException ioe) {
-        if (checkIfContainerIsClosed(ioe)) {
-          handleCloseContainerException(current, currentStreamIndex);
-          continue;
+        if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe)) {
+          // for the current iteration, current pos - initialPos gives the
+          // amount of data already written to the buffer
+          writeLen = buffer.position() - initialPos;
+          handleException(current, currentStreamIndex);
         } else {
           throw ioe;
         }
@@ -274,57 +308,6 @@ public class ChunkGroupOutputStream extends OutputStream {
       }
       len -= writeLen;
       off += writeLen;
-      byteOffset += writeLen;
-    }
-  }
-
-  private long getCommittedBlockLength(ChunkOutputStreamEntry streamEntry)
-      throws IOException {
-    long blockLength;
-    ContainerProtos.GetCommittedBlockLengthResponseProto responseProto;
-    RetryPolicy.RetryAction action;
-    int numRetries = 0;
-    while (true) {
-      try {
-        responseProto = ContainerProtocolCalls
-            .getCommittedBlockLength(streamEntry.xceiverClient,
-                streamEntry.blockID, requestID);
-        blockLength = responseProto.getBlockLength();
-        return blockLength;
-      } catch (StorageContainerException sce) {
-        try {
-          action = retryPolicy.shouldRetry(sce, numRetries, 0, true);
-        } catch (Exception e) {
-          throw e instanceof IOException ? (IOException) e : new IOException(e);
-        }
-        if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
-          if (action.reason != null) {
-            LOG.error(
-                "GetCommittedBlockLength request failed. " + action.reason,
-                sce);
-          }
-          throw sce;
-        }
-
-        // Throw the exception if the thread is interrupted
-        if (Thread.currentThread().isInterrupted()) {
-          LOG.warn("Interrupted while trying for connection");
-          throw sce;
-        }
-        Preconditions.checkArgument(
-            action.action == RetryPolicy.RetryAction.RetryDecision.RETRY);
-        try {
-          Thread.sleep(action.delayMillis);
-        } catch (InterruptedException e) {
-          throw (IOException) new InterruptedIOException(
-              "Interrupted: action=" + action + ", retry policy=" + retryPolicy)
-              .initCause(e);
-        }
-        numRetries++;
-        LOG.trace("Retrying GetCommittedBlockLength request. Already tried "
-            + numRetries + " time(s); retry policy is " + retryPolicy);
-        continue;
-      }
     }
   }
 
@@ -373,55 +356,35 @@ public class ChunkGroupOutputStream extends OutputStream {
    *
    * @param streamEntry StreamEntry
    * @param streamIndex Index of the entry
-   * @throws IOException Throws IOexception if Write fails
+   * @throws IOException Throws IOException if Write fails
    */
-  private void handleCloseContainerException(ChunkOutputStreamEntry streamEntry,
+  private void handleException(ChunkOutputStreamEntry streamEntry,
       int streamIndex) throws IOException {
-    long committedLength = 0;
-    ByteBuffer buffer = streamEntry.getBuffer();
-    if (buffer == null) {
-      // the buffer here will be null only when closeContainerException is
-      // hit while calling putKey during close on chunkOutputStream.
-      // Since closeContainer auto commit pending keys, no need to do
-      // anything here.
-      return;
-    }
-
-    // update currentStreamIndex in case of closed container exception. The
-    // current stream entry cannot be used for further writes because
-    // container is closed.
-    currentStreamIndex += 1;
+    int lastSuccessfulFlushIndex = streamEntry.getLastSuccessfulFlushIndex();
+    int currentPos = buffer.position();
 
-    // In case where not a single chunk of data has been written to the Datanode
-    // yet. This block does not yet exist on the datanode but cached on the
-    // outputStream buffer. No need to call GetCommittedBlockLength here
-    // for this block associated with the stream here.
-    if (streamEntry.currentPosition >= chunkSize
-        || streamEntry.currentPosition != buffer.position()) {
-      committedLength = getCommittedBlockLength(streamEntry);
-      // update the length of the current stream
-      streamEntry.currentPosition = committedLength;
+    // In case of a failure, read the data from the position till the last
+    // acknowledgement happened.
+    if (lastSuccessfulFlushIndex > 0) {
+      buffer.position(lastSuccessfulFlushIndex);
+      buffer.limit(currentPos);
+      buffer.compact();
     }
 
     if (buffer.position() > 0) {
+      //set the correct length for the current stream
+      streamEntry.currentPosition = lastSuccessfulFlushIndex;
       // If the data is still cached in the underlying stream, we need to
-      // allocate new block and write this data in the datanode. The cached
-      // data in the buffer does not exceed chunkSize.
-      Preconditions.checkState(buffer.position() < chunkSize);
-      // readjust the byteOffset value to the length actually been written.
-      byteOffset -= buffer.position();
-      handleWrite(buffer.array(), 0, buffer.position());
+      // allocate new block and write this data in the datanode.
+      currentStreamIndex += 1;
+      handleWrite(buffer.array(), 0, buffer.position(), true,
+          lastSuccessfulFlushIndex);
     }
 
-    // just clean up the current stream. Since the container is already closed,
-    // it will be auto committed. No need to call close again here.
+    // just clean up the current stream.
     streamEntry.cleanup();
-    // This case will arise when while writing the first chunk itself fails.
-    // In such case, the current block associated with the stream has no data
-    // written. Remove it from the current stream list.
-    if (committedLength == 0) {
+    if (lastSuccessfulFlushIndex == 0) {
       streamEntries.remove(streamIndex);
-      Preconditions.checkArgument(currentStreamIndex != 0);
       currentStreamIndex -= 1;
     }
     // discard subsequent pre allocated blocks from the streamEntries list
@@ -430,11 +393,15 @@ public class ChunkGroupOutputStream extends OutputStream {
   }
 
   private boolean checkIfContainerIsClosed(IOException ioe) {
-    return checkIfContainerNotOpenException(ioe) || Optional.of(ioe.getCause())
-        .filter(e -> e instanceof StorageContainerException)
-        .map(e -> (StorageContainerException) e)
-        .filter(sce -> sce.getResult() == Result.CLOSED_CONTAINER_IO)
-        .isPresent();
+    if (ioe.getCause() != null) {
+      return checkIfContainerNotOpenException(ioe) || Optional
+          .of(ioe.getCause())
+          .filter(e -> e instanceof StorageContainerException)
+          .map(e -> (StorageContainerException) e)
+          .filter(sce -> sce.getResult() == Result.CLOSED_CONTAINER_IO)
+          .isPresent();
+    }
+    return false;
   }
 
   private boolean checkIfContainerNotOpenException(IOException ioe) {
@@ -448,6 +415,15 @@ public class ChunkGroupOutputStream extends OutputStream {
     return false;
   }
 
+  private boolean checkIfTimeoutException(IOException ioe) {
+    if (ioe.getCause() != null) {
+      return Optional.of(ioe.getCause())
+          .filter(e -> e instanceof TimeoutException).isPresent();
+    } else {
+      return false;
+    }
+  }
+
   private long getKeyLength() {
     return streamEntries.parallelStream().mapToLong(e -> e.currentPosition)
         .sum();
@@ -495,11 +471,11 @@ public class ChunkGroupOutputStream extends OutputStream {
           entry.flush();
         }
       } catch (IOException ioe) {
-        if (checkIfContainerIsClosed(ioe)) {
+        if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe)) {
           // This call will allocate a new streamEntry and write the Data.
           // Close needs to be retried on the newly allocated streamEntry as
           // as well.
-          handleCloseContainerException(entry, streamIndex);
+          handleException(entry, streamIndex);
           handleFlushOrClose(close);
         } else {
           throw ioe;
@@ -519,16 +495,24 @@ public class ChunkGroupOutputStream extends OutputStream {
       return;
     }
     closed = true;
-    handleFlushOrClose(true);
-    if (keyArgs != null) {
-      // in test, this could be null
-      removeEmptyBlocks();
-      Preconditions.checkState(byteOffset == getKeyLength());
-      keyArgs.setDataSize(byteOffset);
-      keyArgs.setLocationInfoList(getLocationInfoList());
-      omClient.commitKey(keyArgs, openID);
-    } else {
-      LOG.warn("Closing ChunkGroupOutputStream, but key args is null");
+    try {
+      handleFlushOrClose(true);
+      if (keyArgs != null) {
+        // in test, this could be null
+        removeEmptyBlocks();
+        keyArgs.setDataSize(getKeyLength());
+        keyArgs.setLocationInfoList(getLocationInfoList());
+        omClient.commitKey(keyArgs, openID);
+      } else {
+        LOG.warn("Closing ChunkGroupOutputStream, but key args is null");
+      }
+    } catch (IOException ioe) {
+      throw ioe;
+    } finally {
+      if (buffer != null) {
+        buffer.clear();
+      }
+      buffer = null;
     }
   }
 
@@ -544,7 +528,10 @@ public class ChunkGroupOutputStream extends OutputStream {
     private String requestID;
     private ReplicationType type;
     private ReplicationFactor factor;
-    private RetryPolicy retryPolicy;
+    private long streamBufferFlushSize;
+    private long streamBufferMaxSize;
+    private long blockSize;
+    private long watchTimeout;
 
     public Builder setHandler(OpenKeySession handler) {
       this.openHandler = handler;
@@ -588,16 +575,31 @@ public class ChunkGroupOutputStream extends OutputStream {
       return this;
     }
 
-    public ChunkGroupOutputStream build() throws IOException {
-      return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient,
-          omClient, chunkSize, requestID, factor, type, retryPolicy);
+    public Builder setStreamBufferFlushSize(long size) {
+      this.streamBufferFlushSize = size;
+      return this;
+    }
+
+    public Builder setStreamBufferMaxSize(long size) {
+      this.streamBufferMaxSize = size;
+      return this;
+    }
+
+    public Builder setBlockSize(long size) {
+      this.blockSize = size;
+      return this;
     }
 
-    public Builder setRetryPolicy(RetryPolicy rPolicy) {
-      this.retryPolicy = rPolicy;
+    public Builder setWatchTimeout(long timeout) {
+      this.watchTimeout = timeout;
       return this;
     }
 
+    public ChunkGroupOutputStream build() throws IOException {
+      return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient,
+          omClient, chunkSize, requestID, factor, type, streamBufferFlushSize,
+          streamBufferMaxSize, blockSize, watchTimeout);
+    }
   }
 
   private static class ChunkOutputStreamEntry extends OutputStream {
@@ -613,10 +615,16 @@ public class ChunkGroupOutputStream extends OutputStream {
     // the current position of this stream 0 <= currentPosition < length
     private long currentPosition;
 
+    private final long streamBufferFlushSize;
+    private final long streamBufferMaxSize;
+    private final long watchTimeout;
+    private ByteBuffer buffer;
+
     ChunkOutputStreamEntry(BlockID blockID, String key,
         XceiverClientManager xceiverClientManager,
         XceiverClientSpi xceiverClient, String requestId, int chunkSize,
-        long length) {
+        long length, long streamBufferFlushSize, long streamBufferMaxSize,
+        long watchTimeout, ByteBuffer buffer) {
       this.outputStream = null;
       this.blockID = blockID;
       this.key = key;
@@ -627,6 +635,10 @@ public class ChunkGroupOutputStream extends OutputStream {
 
       this.length = length;
       this.currentPosition = 0;
+      this.streamBufferFlushSize = streamBufferFlushSize;
+      this.streamBufferMaxSize = streamBufferMaxSize;
+      this.watchTimeout = watchTimeout;
+      this.buffer = buffer;
     }
 
     /**
@@ -645,6 +657,10 @@ public class ChunkGroupOutputStream extends OutputStream {
 
       this.length = length;
       this.currentPosition = 0;
+      streamBufferFlushSize = 0;
+      streamBufferMaxSize = 0;
+      buffer = null;
+      watchTimeout = 0;
     }
 
     long getLength() {
@@ -657,9 +673,10 @@ public class ChunkGroupOutputStream extends OutputStream {
 
     private void checkStream() {
       if (this.outputStream == null) {
-        this.outputStream = new ChunkOutputStream(blockID,
-            key, xceiverClientManager, xceiverClient,
-            requestId, chunkSize);
+        this.outputStream =
+            new ChunkOutputStream(blockID, key, xceiverClientManager,
+                xceiverClient, requestId, chunkSize, streamBufferFlushSize,
+                streamBufferMaxSize, watchTimeout, buffer);
       }
     }
 
@@ -696,15 +713,21 @@ public class ChunkGroupOutputStream extends OutputStream {
       }
     }
 
-    ByteBuffer getBuffer() throws IOException {
+    int getLastSuccessfulFlushIndex() throws IOException {
       if (this.outputStream instanceof ChunkOutputStream) {
         ChunkOutputStream out = (ChunkOutputStream) this.outputStream;
-        return out.getBuffer();
+        blockID = out.getBlockID();
+        return out.getLastSuccessfulFlushIndex();
+      } else if (outputStream == null) {
+        // For a pre allocated block for which no write has been initiated,
+        // the OutputStream will be null here.
+        // In such cases, the default blockCommitSequenceId will be 0
+        return 0;
       }
       throw new IOException("Invalid Output Stream for Key: " + key);
     }
 
-    public void cleanup() {
+    void cleanup() {
       checkStream();
       if (this.outputStream instanceof ChunkOutputStream) {
         ChunkOutputStream out = (ChunkOutputStream) this.outputStream;
@@ -712,6 +735,16 @@ public class ChunkGroupOutputStream extends OutputStream {
       }
     }
 
+    void writeOnRetry(int len) throws IOException {
+      checkStream();
+      if (this.outputStream instanceof ChunkOutputStream) {
+        ChunkOutputStream out = (ChunkOutputStream) this.outputStream;
+        out.writeOnRetry(len);
+        this.currentPosition += len;
+      } else {
+        throw new IOException("Invalid Output Stream for Key: " + key);
+      }
+    }
   }
 
   /**

+ 22 - 5
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java

@@ -24,18 +24,17 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.StorageType;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.*;
 import org.apache.hadoop.hdds.client.OzoneQuota;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.ozone.client.VolumeArgs;
-import org.apache.hadoop.ozone.client.OzoneClientUtils;
 import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream;
 import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
 import org.apache.hadoop.ozone.client.io.LengthInputStream;
@@ -72,6 +71,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /**
@@ -94,7 +94,10 @@ public class RpcClient implements ClientProtocol {
   private final UserGroupInformation ugi;
   private final OzoneAcl.OzoneACLRights userRights;
   private final OzoneAcl.OzoneACLRights groupRights;
-  private final RetryPolicy retryPolicy;
+  private final long streamBufferFlushSize;
+  private final long streamBufferMaxSize;
+  private final long blockSize;
+  private final long watchTimeout;
 
    /**
     * Creates RpcClient instance with the given configuration.
@@ -135,7 +138,6 @@ public class RpcClient implements ClientProtocol {
                 Client.getRpcTimeout(conf)));
 
     this.xceiverClientManager = new XceiverClientManager(conf);
-    retryPolicy = OzoneClientUtils.createRetryPolicy(conf);
 
     int configuredChunkSize = conf.getInt(
         ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
@@ -149,6 +151,18 @@ public class RpcClient implements ClientProtocol {
     } else {
       chunkSize = configuredChunkSize;
     }
+    streamBufferFlushSize =
+        conf.getLong(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE,
+            OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE_DEFAULT);
+    streamBufferMaxSize =
+        conf.getLong(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE,
+            OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE_DEFAULT);
+    blockSize = conf.getLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB,
+        OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT);
+    watchTimeout =
+        conf.getTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT,
+            OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT,
+            TimeUnit.MILLISECONDS);
   }
 
   private InetSocketAddress getScmAddressForClient() throws IOException {
@@ -468,7 +482,10 @@ public class RpcClient implements ClientProtocol {
             .setRequestID(requestId)
             .setType(HddsProtos.ReplicationType.valueOf(type.toString()))
             .setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue()))
-            .setRetryPolicy(retryPolicy)
+            .setStreamBufferFlushSize(streamBufferFlushSize)
+            .setStreamBufferMaxSize(streamBufferMaxSize)
+            .setWatchTimeout(watchTimeout)
+            .setBlockSize(blockSize)
             .build();
     groupOutputStream.addPreallocateBlocks(
         openKey.getKeyInfo().getLatestVersionLocations(),

+ 44 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java

@@ -230,7 +230,10 @@ public interface MiniOzoneCluster {
 
     protected Boolean ozoneEnabled = true;
     protected Boolean randomContainerPort = true;
-
+    protected Optional<Integer> chunkSize = Optional.empty();
+    protected Optional<Long> streamBufferFlushSize = Optional.empty();
+    protected Optional<Long> streamBufferMaxSize = Optional.empty();
+    protected Optional<Long> blockSize = Optional.empty();
     // Use relative smaller number of handlers for testing
     protected int numOfOmHandlers = 20;
     protected int numOfScmHandlers = 20;
@@ -358,6 +361,46 @@ public interface MiniOzoneCluster {
       return this;
     }
 
+    /**
+     * Sets the chunk size.
+     *
+     * @return MiniOzoneCluster.Builder
+     */
+    public Builder setChunkSize(int size) {
+      chunkSize = Optional.of(size);
+      return this;
+    }
+
+    /**
+     * Sets the flush size for stream buffer.
+     *
+     * @return MiniOzoneCluster.Builder
+     */
+    public Builder setStreamBufferFlushSize(long size) {
+      streamBufferFlushSize = Optional.of(size);
+      return this;
+    }
+
+    /**
+     * Sets the max size for stream buffer.
+     *
+     * @return MiniOzoneCluster.Builder
+     */
+    public Builder setStreamBufferMaxSize(long size) {
+      streamBufferMaxSize = Optional.of(size);
+      return this;
+    }
+
+    /**
+     * Sets the block size for stream buffer.
+     *
+     * @return MiniOzoneCluster.Builder
+     */
+    public Builder setBlockSize(long size) {
+      blockSize = Optional.of(size);
+      return this;
+    }
+
     /**
      * Constructs and returns MiniOzoneCluster.
      *

+ 19 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java

@@ -391,6 +391,25 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
       Path metaDir = Paths.get(path, "ozone-meta");
       Files.createDirectories(metaDir);
       conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, metaDir.toString());
+      if (!chunkSize.isPresent()) {
+        chunkSize = Optional.of(1);
+      }
+      if (!streamBufferFlushSize.isPresent()) {
+        streamBufferFlushSize = Optional.of((long)chunkSize.get());
+      }
+      if (!streamBufferMaxSize.isPresent()) {
+        streamBufferMaxSize = Optional.of(2 * streamBufferFlushSize.get());
+      }
+      if (!blockSize.isPresent()) {
+        blockSize = Optional.of(2 * streamBufferMaxSize.get());
+      }
+      conf.setInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
+          (int) (chunkSize.get() * OzoneConsts.MB));
+      conf.setLong(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE,
+          streamBufferFlushSize.get());
+      conf.setLong(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE,
+          streamBufferMaxSize.get());
+      conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, blockSize.get());
       configureTrace();
     }
 

+ 1 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java

@@ -50,7 +50,7 @@ public interface RatisTestHelper {
 
   /** For testing Ozone with Ratis. */
   class RatisTestSuite implements Closeable {
-    static final RpcType RPC = SupportedRpcType.NETTY;
+    static final RpcType RPC = SupportedRpcType.GRPC;
     static final int NUM_DATANODES = 3;
 
     private final OzoneConfiguration conf;

+ 67 - 185
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java

@@ -17,7 +17,6 @@
 
 package org.apache.hadoop.ozone.client.rpc;
 
-import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -27,11 +26,6 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
-import org.apache.hadoop.ozone.HddsDatanodeService;
-import org.apache.hadoop.hdds.scm.container.common.helpers.
-    StorageContainerException;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -55,15 +49,17 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
-import org.slf4j.event.Level;
 
 import java.io.IOException;
-import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
 
 /**
  * Tests Close Container Exception handling by Ozone Client.
@@ -79,7 +75,6 @@ public class TestCloseContainerHandlingByClient {
   private static String volumeName;
   private static String bucketName;
   private static String keyString;
-  private static int maxRetries;
 
   /**
    * Create a MiniDFSCluster for testing.
@@ -91,15 +86,14 @@ public class TestCloseContainerHandlingByClient {
   @BeforeClass
   public static void init() throws Exception {
     conf = new OzoneConfiguration();
-    maxRetries = 100;
-    conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, maxRetries);
-    conf.set(OzoneConfigKeys.OZONE_CLIENT_RETRY_INTERVAL, "200ms");
     chunkSize = (int) OzoneConsts.MB;
     blockSize = 4 * chunkSize;
-    conf.setInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, chunkSize);
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
+    conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+    conf.setQuietMode(false);
     conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, (4));
-    cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(3).build();
+    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(7).build();
     cluster.waitForClusterToBeReady();
     //the easiest way to create an open container is creating a key
     client = OzoneClientFactory.getClient(conf);
@@ -121,44 +115,29 @@ public class TestCloseContainerHandlingByClient {
     }
   }
 
-  private static String fixedLengthString(String string, int length) {
-    return String.format("%1$" + length + "s", string);
-  }
-
   @Test
   public void testBlockWritesWithFlushAndClose() throws Exception {
     String keyName = "standalone";
-    OzoneOutputStream key =
-        createKey(keyName, ReplicationType.STAND_ALONE, 0);
+    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
     // write data more than 1 chunk
-    byte[] data =
-        fixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes();
+    byte[] data = ContainerTestHelper
+        .getFixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes();
     key.write(data);
 
     Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
     //get the name of a valid container
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setType(HddsProtos.ReplicationType.STAND_ALONE)
+        .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
         .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
         .build();
 
-    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE);
+    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS);
     key.write(data);
     key.flush();
     key.close();
     // read the key from OM again and match the length.The length will still
     // be the equal to the original data size.
     OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
-    List<OmKeyLocationInfo> keyLocationInfos =
-        keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
-    //we have written two blocks
-    Assert.assertEquals(2, keyLocationInfos.size());
-    OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(0);
-    Assert.assertEquals(data.length - (data.length % chunkSize),
-        omKeyLocationInfo.getLength());
-    Assert.assertEquals(data.length + (data.length % chunkSize),
-        keyLocationInfos.get(1).getLength());
     Assert.assertEquals(2 * data.length, keyInfo.getDataSize());
 
     // Written the same data twice
@@ -170,37 +149,24 @@ public class TestCloseContainerHandlingByClient {
   @Test
   public void testBlockWritesCloseConsistency() throws Exception {
     String keyName = "standalone2";
-    OzoneOutputStream key = createKey(keyName, ReplicationType.STAND_ALONE, 0);
+    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
     // write data more than 1 chunk
-    byte[] data =
-        fixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes();
+    byte[] data = ContainerTestHelper
+        .getFixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes();
     key.write(data);
 
     Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
     //get the name of a valid container
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setType(HddsProtos.ReplicationType.STAND_ALONE)
+        .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
         .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
         .build();
 
-    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE);
+    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS);
     key.close();
     // read the key from OM again and match the length.The length will still
     // be the equal to the original data size.
     OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
-    List<OmKeyLocationInfo> keyLocationInfos =
-        keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
-    // Though we have written only block initially, the close will hit
-    // closeContainerException and remaining data in the chunkOutputStream
-    // buffer will be copied into a different allocated block and will be
-    // committed.
-    Assert.assertEquals(2, keyLocationInfos.size());
-    OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(0);
-    Assert.assertEquals(data.length - (data.length % chunkSize),
-        omKeyLocationInfo.getLength());
-    Assert.assertEquals(data.length % chunkSize,
-        keyLocationInfos.get(1).getLength());
     Assert.assertEquals(data.length, keyInfo.getDataSize());
     validateData(keyName, data);
   }
@@ -210,29 +176,30 @@ public class TestCloseContainerHandlingByClient {
 
     String keyName = "standalone3";
     OzoneOutputStream key =
-        createKey(keyName, ReplicationType.STAND_ALONE, (4 * blockSize));
+        createKey(keyName, ReplicationType.RATIS, (4 * blockSize));
     ChunkGroupOutputStream groupOutputStream =
         (ChunkGroupOutputStream) key.getOutputStream();
     // With the initial size provided, it should have preallocated 4 blocks
     Assert.assertEquals(4, groupOutputStream.getStreamEntries().size());
-    // write data for 3 blocks and 1 more chunk
-    byte[] data = fixedLengthString(keyString, (3 * blockSize)).getBytes();
+    // write data more than 1 chunk
+    byte[] data =
+        ContainerTestHelper.getFixedLengthString(keyString, (3 * blockSize))
+            .getBytes();
     Assert.assertEquals(data.length, 3 * blockSize);
     key.write(data);
 
     Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
     //get the name of a valid container
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setType(HddsProtos.ReplicationType.STAND_ALONE)
+        .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
         .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
         .build();
 
-    waitForContainerClose(keyName, key,
-        HddsProtos.ReplicationType.STAND_ALONE);
+    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS);
     // write 1 more block worth of data. It will fail and new block will be
     // allocated
-    key.write(fixedLengthString(keyString, blockSize).getBytes());
+    key.write(ContainerTestHelper.getFixedLengthString(keyString, blockSize)
+        .getBytes());
 
     key.close();
     // read the key from OM again and match the length.The length will still
@@ -253,10 +220,10 @@ public class TestCloseContainerHandlingByClient {
 
   @Test
   public void testMultiBlockWrites2() throws Exception {
-    String keyName = "standalone4";
+    String keyName = "ratis2";
     long dataLength;
     OzoneOutputStream key =
-        createKey(keyName, ReplicationType.STAND_ALONE, 4 * blockSize);
+        createKey(keyName, ReplicationType.RATIS, 4 * blockSize);
     ChunkGroupOutputStream groupOutputStream =
         (ChunkGroupOutputStream) key.getOutputStream();
 
@@ -264,21 +231,21 @@ public class TestCloseContainerHandlingByClient {
     // With the initial size provided, it should have pre allocated 4 blocks
     Assert.assertEquals(4, groupOutputStream.getStreamEntries().size());
     String dataString =
-        fixedLengthString(keyString, (3 * blockSize + chunkSize));
+        ContainerTestHelper.getFixedLengthString(keyString, (2 * blockSize));
     byte[] data = dataString.getBytes();
     key.write(data);
     // 3 block are completely written to the DataNode in 3 blocks.
     // Data of length half of chunkSize resides in the chunkOutput stream buffer
-    String dataString2 = fixedLengthString(keyString, chunkSize * 1 / 2);
+    String dataString2 =
+        ContainerTestHelper.getFixedLengthString(keyString, chunkSize * 1 / 2);
     key.write(dataString2.getBytes());
     //get the name of a valid container
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setType(HddsProtos.ReplicationType.STAND_ALONE)
-        .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
+        .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
         .build();
 
-    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE);
+    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS);
 
     key.close();
     // read the key from OM again and match the length.The length will still
@@ -290,9 +257,8 @@ public class TestCloseContainerHandlingByClient {
     // closeContainerException and remaining data in the chunkOutputStream
     // buffer will be copied into a different allocated block and will be
     // committed.
-    Assert.assertEquals(5, keyLocationInfos.size());
-    dataLength = 3 * blockSize + (long) (1.5 * chunkSize);
-    Assert.assertEquals(dataLength, keyInfo.getDataSize());
+    Assert.assertEquals(dataString.concat(dataString2).getBytes().length,
+        keyInfo.getDataSize());
     validateData(keyName, dataString.concat(dataString2).getBytes());
   }
 
@@ -301,14 +267,14 @@ public class TestCloseContainerHandlingByClient {
 
     String keyName = "standalone5";
     int keyLen = 4 * blockSize;
-    OzoneOutputStream key =
-        createKey(keyName, ReplicationType.RATIS, keyLen);
+    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, keyLen);
     ChunkGroupOutputStream groupOutputStream =
         (ChunkGroupOutputStream) key.getOutputStream();
     // With the initial size provided, it should have preallocated 4 blocks
     Assert.assertEquals(4, groupOutputStream.getStreamEntries().size());
     // write data 3 blocks and one more chunk
-    byte[] writtenData = fixedLengthString(keyString, keyLen).getBytes();
+    byte[] writtenData =
+        ContainerTestHelper.getFixedLengthString(keyString, keyLen).getBytes();
     byte[] data = Arrays.copyOfRange(writtenData, 0, 3 * blockSize + chunkSize);
     Assert.assertEquals(data.length, 3 * blockSize + chunkSize);
     key.write(data);
@@ -316,17 +282,14 @@ public class TestCloseContainerHandlingByClient {
     Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
     //get the name of a valid container
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
-        .setBucketName(bucketName)
-        .setType(HddsProtos.ReplicationType.RATIS)
+        .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
         .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
         .build();
 
-    waitForContainerClose(keyName, key,
-        HddsProtos.ReplicationType.RATIS);
+    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS);
     // write 3 more chunks worth of data. It will fail and new block will be
     // allocated. This write completes 4 blocks worth of data written to key
-    data = Arrays
-        .copyOfRange(writtenData, 3 * blockSize + chunkSize, keyLen);
+    data = Arrays.copyOfRange(writtenData, 3 * blockSize + chunkSize, keyLen);
     key.write(data);
 
     key.close();
@@ -345,8 +308,6 @@ public class TestCloseContainerHandlingByClient {
     // closeContainerException and remaining data in the chunkOutputStream
     // buffer will be copied into a different allocated block and will be
     // committed.
-    Assert.assertEquals(5, keyLocationInfos.size());
-    Assert.assertEquals(4 * blockSize, keyInfo.getDataSize());
     long length = 0;
     for (OmKeyLocationInfo locationInfo : keyLocationInfos) {
       length += locationInfo.getLength();
@@ -378,9 +339,9 @@ public class TestCloseContainerHandlingByClient {
       cluster.getStorageContainerManager().getEventQueue()
           .fireEvent(SCMEvents.CLOSE_CONTAINER,
               ContainerID.valueof(containerID));
-      ContainerInfo container = cluster.getStorageContainerManager()
-          .getContainerManager()
-          .getContainer(ContainerID.valueof(containerID));
+      ContainerInfo container =
+          cluster.getStorageContainerManager().getContainerManager()
+              .getContainer(ContainerID.valueof(containerID));
       Pipeline pipeline =
           cluster.getStorageContainerManager().getPipelineManager()
               .getPipeline(container.getPipelineID());
@@ -406,8 +367,8 @@ public class TestCloseContainerHandlingByClient {
           .isContainerPresent(cluster, containerID, dn))) {
         for (DatanodeDetails datanodeDetails : datanodes) {
           GenericTestUtils.waitFor(() -> ContainerTestHelper
-                  .isContainerClosed(cluster, containerID, datanodeDetails),
-              500, 15 * 1000);
+                  .isContainerClosed(cluster, containerID, datanodeDetails), 500,
+              15 * 1000);
           //double check if it's really closed
           // (waitFor also throws an exception)
           Assert.assertTrue(ContainerTestHelper
@@ -425,29 +386,31 @@ public class TestCloseContainerHandlingByClient {
   public void testDiscardPreallocatedBlocks() throws Exception {
     String keyName = "discardpreallocatedblocks";
     OzoneOutputStream key =
-        createKey(keyName, ReplicationType.STAND_ALONE, 2 * blockSize);
+        createKey(keyName, ReplicationType.RATIS, 2 * blockSize);
     ChunkGroupOutputStream groupOutputStream =
         (ChunkGroupOutputStream) key.getOutputStream();
 
     Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
     // With the initial size provided, it should have pre allocated 4 blocks
     Assert.assertEquals(2, groupOutputStream.getStreamEntries().size());
-    String dataString = fixedLengthString(keyString, (1 * blockSize));
+    String dataString =
+        ContainerTestHelper.getFixedLengthString(keyString, (1 * blockSize));
     byte[] data = dataString.getBytes();
     key.write(data);
     List<OmKeyLocationInfo> locationInfos =
         new ArrayList<>(groupOutputStream.getLocationInfoList());
     long containerID = locationInfos.get(0).getContainerID();
-    ContainerInfo container = cluster.getStorageContainerManager()
-        .getContainerManager()
-        .getContainer(ContainerID.valueof(containerID));
+    ContainerInfo container =
+        cluster.getStorageContainerManager().getContainerManager()
+            .getContainer(ContainerID.valueof(containerID));
     Pipeline pipeline =
         cluster.getStorageContainerManager().getPipelineManager()
             .getPipeline(container.getPipelineID());
     List<DatanodeDetails> datanodes = pipeline.getNodes();
     Assert.assertEquals(1, datanodes.size());
-    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE);
-    dataString = fixedLengthString(keyString, (1 * blockSize));
+    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS);
+    dataString =
+        ContainerTestHelper.getFixedLengthString(keyString, (1 * blockSize));
     data = dataString.getBytes();
     key.write(data);
     Assert.assertEquals(2, groupOutputStream.getStreamEntries().size());
@@ -466,40 +429,28 @@ public class TestCloseContainerHandlingByClient {
 
   private OzoneOutputStream createKey(String keyName, ReplicationType type,
       long size) throws Exception {
-    ReplicationFactor factor =
-        type == ReplicationType.STAND_ALONE ? ReplicationFactor.ONE :
-            ReplicationFactor.THREE;
-    return objectStore.getVolume(volumeName).getBucket(bucketName)
-        .createKey(keyName, size, type, factor);
+    return ContainerTestHelper
+        .createKey(keyName, type, size, objectStore, volumeName, bucketName);
   }
 
   private void validateData(String keyName, byte[] data) throws Exception {
-    byte[] readData = new byte[data.length];
-    OzoneInputStream is =
-        objectStore.getVolume(volumeName).getBucket(bucketName)
-            .readKey(keyName);
-    is.read(readData);
-    MessageDigest sha1 = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
-    sha1.update(data);
-    MessageDigest sha2 = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
-    sha2.update(readData);
-    Assert.assertTrue(Arrays.equals(sha1.digest(), sha2.digest()));
-    is.close();
+    ContainerTestHelper
+        .validateData(keyName, data, objectStore, volumeName, bucketName);
   }
 
   @Test
   public void testBlockWriteViaRatis() throws Exception {
     String keyName = "ratis";
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
-    byte[] data =
-        fixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes();
+    byte[] data = ContainerTestHelper
+        .getFixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes();
     key.write(data);
 
     //get the name of a valid container
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName).
         setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
-        .setFactor(HddsProtos.ReplicationFactor.THREE)
-        .setKeyName(keyName).build();
+        .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
+        .build();
 
     Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
     waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS);
@@ -510,79 +461,10 @@ public class TestCloseContainerHandlingByClient {
     // The write will fail but exception will be handled and length will be
     // updated correctly in OzoneManager once the steam is closed
     key.close();
-    // read the key from OM again and match the length.The length will still
-    // be the equal to the original data size.
     OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
-    List<OmKeyLocationInfo> keyLocationInfos =
-        keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
-    //we have written two blocks
-    Assert.assertEquals(2, keyLocationInfos.size());
-    OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(0);
-    Assert.assertEquals(data.length - (data.length % chunkSize),
-        omKeyLocationInfo.getLength());
-    Assert.assertEquals(data.length + (data.length % chunkSize),
-        keyLocationInfos.get(1).getLength());
-    Assert.assertEquals(2 * data.length, keyInfo.getDataSize());
     String dataString = new String(data);
     dataString.concat(dataString);
+    Assert.assertEquals(2 * data.length, keyInfo.getDataSize());
     validateData(keyName, dataString.getBytes());
   }
-
-  @Test
-  public void testRetriesOnBlockNotCommittedException() throws Exception {
-    String keyName = "blockcommitexceptiontest";
-    OzoneOutputStream key = createKey(keyName, ReplicationType.STAND_ALONE, 0);
-    ChunkGroupOutputStream groupOutputStream =
-        (ChunkGroupOutputStream) key.getOutputStream();
-    GenericTestUtils.setLogLevel(ChunkGroupOutputStream.LOG, Level.TRACE);
-    GenericTestUtils.LogCapturer logCapturer =
-        GenericTestUtils.LogCapturer.captureLogs(ChunkGroupOutputStream.LOG);
-
-    Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
-    String dataString = fixedLengthString(keyString, (3 * chunkSize));
-    key.write(dataString.getBytes());
-    List<OmKeyLocationInfo> locationInfos =
-        groupOutputStream.getLocationInfoList();
-    long containerID = locationInfos.get(0).getContainerID();
-    ContainerInfo container = cluster.getStorageContainerManager()
-        .getContainerManager()
-        .getContainer(ContainerID.valueof(containerID));
-    Pipeline pipeline =
-        cluster.getStorageContainerManager().getPipelineManager()
-            .getPipeline(container.getPipelineID());
-    List<DatanodeDetails> datanodes = pipeline.getNodes();
-    Assert.assertEquals(1, datanodes.size());
-    // move the container on the datanode to Closing state, this will ensure
-    // closing the key will hit BLOCK_NOT_COMMITTED_EXCEPTION while trying
-    // to fetch the committed length
-    for (HddsDatanodeService datanodeService : cluster.getHddsDatanodes()) {
-      if (datanodes.get(0).equals(datanodeService.getDatanodeDetails())) {
-        datanodeService.getDatanodeStateMachine().getContainer()
-            .getContainerSet().getContainer(containerID).getContainerData()
-            .setState(ContainerProtos.ContainerDataProto.State.CLOSED);
-      }
-    }
-    dataString = fixedLengthString(keyString, (chunkSize * 1 / 2));
-    key.write(dataString.getBytes());
-    try {
-      key.close();
-      Assert.fail("Expected Exception not thrown");
-    } catch (IOException ioe) {
-      Assert.assertTrue(ioe instanceof StorageContainerException);
-      Assert.assertTrue(((StorageContainerException) ioe).getResult()
-          == ContainerProtos.Result.BLOCK_NOT_COMMITTED);
-    }
-    // It should retry only for max retries times
-    for (int i = 1; i <= maxRetries; i++) {
-      Assert.assertTrue(logCapturer.getOutput()
-          .contains("Retrying GetCommittedBlockLength request"));
-      Assert.assertTrue(logCapturer.getOutput().contains("Already tried " + i));
-    }
-    Assert.assertTrue(logCapturer.getOutput()
-        .contains("GetCommittedBlockLength request failed."));
-    Assert.assertTrue(logCapturer.getOutput().contains(
-        "retries get failed due to exceeded maximum allowed retries number"
-            + ": " + maxRetries));
-    logCapturer.stopCapturing();
-  }
 }

+ 1 - 19
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java

@@ -23,8 +23,6 @@ import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.
-    common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneClient;
@@ -141,15 +139,8 @@ public class TestContainerStateMachineFailures {
             .getContainer().getContainerSet()
             .getContainer(omKeyLocationInfo.getContainerID()).getContainerData()
             .getContainerPath()));
-    try {
-      // flush will throw an exception for the second write as the container
-      // dir has been deleted.
-      key.flush();
-      Assert.fail("Expected exception not thrown");
-    } catch (IOException ioe) {
-      Assert.assertTrue(ioe.getCause() instanceof StorageContainerException);
-    }
 
+    key.close();
     // Make sure the container is marked unhealthy
     Assert.assertTrue(
         cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
@@ -157,14 +148,5 @@ public class TestContainerStateMachineFailures {
             .getContainer(omKeyLocationInfo.getContainerID())
             .getContainerState()
             == ContainerProtos.ContainerDataProto.State.UNHEALTHY);
-    try {
-      // subsequent requests will fail with unhealthy container exception
-      key.close();
-      Assert.fail("Expected exception not thrown");
-    } catch (IOException ioe) {
-      Assert.assertTrue(ioe instanceof StorageContainerException);
-      Assert.assertTrue(((StorageContainerException) ioe).getResult()
-          == ContainerProtos.Result.BLOCK_NOT_COMMITTED);
-    }
   }
 }

+ 213 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java

@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+
+/**
+ * Tests Close Container Exception handling by Ozone Client.
+ */
+public class TestFailureHandlingByClient {
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+  private static OzoneClient client;
+  private static ObjectStore objectStore;
+  private static int chunkSize;
+  private static int blockSize;
+  private static String volumeName;
+  private static String bucketName;
+  private static String keyString;
+  private static int maxRetries;
+
+  /**
+   * TODO: we will spawn new MiniOzoneCluster every time for each unit test
+   * invocation. Need to use the same instance for all tests.
+   */
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true
+   *
+   * @throws IOException
+   */
+  @Before
+  public void init() throws Exception {
+    conf = new OzoneConfiguration();
+    maxRetries = 100;
+    chunkSize = (int) OzoneConsts.MB;
+    blockSize = 4 * chunkSize;
+    conf.setInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, chunkSize);
+    conf.setInt(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE, 1);
+    conf.setInt(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE, 2);
+    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 5,
+        TimeUnit.SECONDS);
+    conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+    conf.setQuietMode(false);
+    conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, (4));
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(6).build();
+    cluster.waitForClusterToBeReady();
+    //the easiest way to create an open container is creating a key
+    client = OzoneClientFactory.getClient(conf);
+    objectStore = client.getObjectStore();
+    keyString = UUID.randomUUID().toString();
+    volumeName = "datanodefailurehandlingtest";
+    bucketName = volumeName;
+    objectStore.createVolume(volumeName);
+    objectStore.getVolume(volumeName).createBucket(bucketName);
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @After
+  public void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  // TODO: currently, shutting down 2 datanodes in Ratis leads to
+  // watchForCommit Api in RaftClient to hand=g forever. Once that gets
+  // fixed, we need to execute the tets with 2 node failures.
+
+  @Test
+  public void testBlockWritesWithDnFailures() throws Exception {
+    String keyName = "ratis3";
+    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+    byte[] data =
+        ContainerTestHelper
+        .getFixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes();
+    key.write(data);
+
+    // get the name of a valid container
+    Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
+    ChunkGroupOutputStream groupOutputStream =
+        (ChunkGroupOutputStream) key.getOutputStream();
+    List<OmKeyLocationInfo> locationInfoList =
+        groupOutputStream.getLocationInfoList();
+    Assert.assertTrue(locationInfoList.size() == 1);
+    long containerId = locationInfoList.get(0).getContainerID();
+    ContainerInfo container = cluster.getStorageContainerManager()
+        .getContainerManager()
+        .getContainer(ContainerID.valueof(containerId));
+    Pipeline pipeline =
+        cluster.getStorageContainerManager().getPipelineManager()
+            .getPipeline(container.getPipelineID());
+    List<DatanodeDetails> datanodes = pipeline.getNodes();
+    cluster.shutdownHddsDatanode(datanodes.get(0));
+    // cluster.shutdownHddsDatanode(datanodes.get(1));
+    // The write will fail but exception will be handled and length will be
+    // updated correctly in OzoneManager once the steam is closed
+    key.close();
+    //get the name of a valid container
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+        .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
+        .build();
+    OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+    Assert.assertEquals(data.length, keyInfo.getDataSize());
+    validateData(keyName, data);
+    cluster.restartHddsDatanode(datanodes.get(0), true);
+  }
+
+  @Test
+  public void testMultiBlockWritesWithDnFailures() throws Exception {
+    String keyName = "ratis3";
+    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+    String data =
+        ContainerTestHelper
+        .getFixedLengthString(keyString, blockSize + chunkSize);
+    key.write(data.getBytes());
+
+    // get the name of a valid container
+    Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
+    ChunkGroupOutputStream groupOutputStream =
+        (ChunkGroupOutputStream) key.getOutputStream();
+    List<OmKeyLocationInfo> locationInfoList =
+        groupOutputStream.getLocationInfoList();
+    Assert.assertTrue(locationInfoList.size() == 2);
+    long containerId = locationInfoList.get(1).getContainerID();
+    ContainerInfo container = cluster.getStorageContainerManager()
+        .getContainerManager()
+        .getContainer(ContainerID.valueof(containerId));
+    Pipeline pipeline =
+        cluster.getStorageContainerManager().getPipelineManager()
+            .getPipeline(container.getPipelineID());
+    List<DatanodeDetails> datanodes = pipeline.getNodes();
+    cluster.shutdownHddsDatanode(datanodes.get(0));
+
+    //  cluster.shutdownHddsDatanode(datanodes.get(1));
+    // The write will fail but exception will be handled and length will be
+    // updated correctly in OzoneManager once the steam is closed
+    key.write(data.getBytes());
+    key.close();
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+        .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
+        .build();
+    OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+    Assert.assertEquals(2 * data.getBytes().length, keyInfo.getDataSize());
+    validateData(keyName, data.concat(data).getBytes());
+    cluster.restartHddsDatanode(datanodes.get(0), true);
+  }
+
+  private OzoneOutputStream createKey(String keyName, ReplicationType type,
+      long size) throws Exception {
+    return ContainerTestHelper
+        .createKey(keyName, type, size, objectStore, volumeName, bucketName);
+  }
+
+  private void validateData(String keyName, byte[] data) throws Exception {
+    ContainerTestHelper
+        .validateData(keyName, data, objectStore, volumeName, bucketName);
+  }
+}

+ 34 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java

@@ -21,10 +21,14 @@ package org.apache.hadoop.ozone.container;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -632,4 +636,34 @@ public final class ContainerTestHelper {
     return false;
   }
 
+  public static OzoneOutputStream createKey(String keyName,
+      ReplicationType type, long size, ObjectStore objectStore,
+      String volumeName, String bucketName) throws Exception {
+    org.apache.hadoop.hdds.client.ReplicationFactor factor =
+        type == ReplicationType.STAND_ALONE ?
+            org.apache.hadoop.hdds.client.ReplicationFactor.ONE :
+            org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
+    return objectStore.getVolume(volumeName).getBucket(bucketName)
+        .createKey(keyName, size, type, factor);
+  }
+
+  public static void validateData(String keyName, byte[] data,
+      ObjectStore objectStore, String volumeName, String bucketName)
+      throws Exception {
+    byte[] readData = new byte[data.length];
+    OzoneInputStream is =
+        objectStore.getVolume(volumeName).getBucket(bucketName)
+            .readKey(keyName);
+    is.read(readData);
+    MessageDigest sha1 = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+    sha1.update(data);
+    MessageDigest sha2 = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+    sha2.update(readData);
+    Assert.assertTrue(Arrays.equals(sha1.digest(), sha2.digest()));
+    is.close();
+  }
+
+  public static String getFixedLengthString(String string, int length) {
+    return String.format("%1$" + length + "s", string);
+  }
 }

+ 1 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java

@@ -468,7 +468,7 @@ public class TestOzoneContainer {
             client.getPipeline(), blockID, 1024);
 
         CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
-            response = client.sendCommandAsync(smallFileRequest);
+            response = client.sendCommandAsync(smallFileRequest).getResponse();
         computeResults.add(response);
       }
 

+ 2 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java

@@ -119,7 +119,8 @@ public class TestXceiverClientMetrics {
             smallFileRequest = ContainerTestHelper.getWriteSmallFileRequest(
                 client.getPipeline(), blockID, 1024);
             CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
-                response = client.sendCommandAsync(smallFileRequest);
+                response =
+                client.sendCommandAsync(smallFileRequest).getResponse();
             computeResults.add(response);
           }
 

+ 1 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/TestOzoneRestWithMiniCluster.java

@@ -62,7 +62,7 @@ public class TestOzoneRestWithMiniCluster {
   private static OzoneConfiguration conf;
   private static ClientProtocol client;
   private static ReplicationFactor replicationFactor = ReplicationFactor.ONE;
-  private static ReplicationType replicationType = ReplicationType.STAND_ALONE;
+  private static ReplicationType replicationType = ReplicationType.RATIS;
 
   @Rule
   public ExpectedException exception = ExpectedException.none();

+ 22 - 20
hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java

@@ -22,8 +22,7 @@ import com.google.common.base.Strings;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.ozone.client.OzoneClientUtils;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.io.LengthInputStream;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -33,7 +32,6 @@ import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.OzoneConsts.Versioning;
 import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream;
@@ -63,6 +61,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A {@link StorageHandler} implementation that distributes object storage
@@ -80,10 +79,10 @@ public final class DistributedStorageHandler implements StorageHandler {
   private final OzoneAcl.OzoneACLRights userRights;
   private final OzoneAcl.OzoneACLRights groupRights;
   private int chunkSize;
-  private final boolean useRatis;
-  private final HddsProtos.ReplicationType type;
-  private final HddsProtos.ReplicationFactor factor;
-  private final RetryPolicy retryPolicy;
+  private final long streamBufferFlushSize;
+  private final long streamBufferMaxSize;
+  private final long watchTimeout;
+  private final long blockSize;
 
   /**
    * Creates a new DistributedStorageHandler.
@@ -100,17 +99,6 @@ public final class DistributedStorageHandler implements StorageHandler {
     this.ozoneManagerClient = ozoneManagerClient;
     this.storageContainerLocationClient = storageContainerLocation;
     this.xceiverClientManager = new XceiverClientManager(conf);
-    this.useRatis = conf.getBoolean(
-        ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
-        ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
-
-    if(useRatis) {
-      type = HddsProtos.ReplicationType.RATIS;
-      factor = HddsProtos.ReplicationFactor.THREE;
-    } else {
-      type = HddsProtos.ReplicationType.STAND_ALONE;
-      factor = HddsProtos.ReplicationFactor.ONE;
-    }
 
     chunkSize = conf.getInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
         ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT);
@@ -118,7 +106,6 @@ public final class DistributedStorageHandler implements StorageHandler {
         OMConfigKeys.OZONE_OM_USER_RIGHTS_DEFAULT);
     groupRights = conf.getEnum(OMConfigKeys.OZONE_OM_GROUP_RIGHTS,
         OMConfigKeys.OZONE_OM_GROUP_RIGHTS_DEFAULT);
-    retryPolicy = OzoneClientUtils.createRetryPolicy(conf);
     if(chunkSize > ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE) {
       LOG.warn("The chunk size ({}) is not allowed to be more than"
               + " the maximum size ({}),"
@@ -126,6 +113,18 @@ public final class DistributedStorageHandler implements StorageHandler {
           chunkSize, ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE);
       chunkSize = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE;
     }
+    streamBufferFlushSize =
+        conf.getLong(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE,
+            OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE_DEFAULT);
+    streamBufferMaxSize =
+        conf.getLong(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE,
+            OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE_DEFAULT);
+    blockSize = conf.getLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB,
+        OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT);
+    watchTimeout =
+        conf.getTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT,
+            OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT,
+            TimeUnit.MILLISECONDS);
   }
 
   @Override
@@ -420,7 +419,10 @@ public final class DistributedStorageHandler implements StorageHandler {
             .setRequestID(args.getRequestID())
             .setType(xceiverClientManager.getType())
             .setFactor(xceiverClientManager.getFactor())
-            .setRetryPolicy(retryPolicy)
+            .setStreamBufferFlushSize(streamBufferFlushSize)
+            .setStreamBufferMaxSize(streamBufferMaxSize)
+            .setBlockSize(blockSize)
+            .setWatchTimeout(watchTimeout)
             .build();
     groupOutputStream.addPreallocateBlocks(
         openKey.getKeyInfo().getLatestVersionLocations(),

+ 6 - 0
hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java

@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -44,6 +45,7 @@ public class TestDataValidate {
   @BeforeClass
   public static void init() throws Exception {
     conf = new OzoneConfiguration();
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(5).build();
     cluster.waitForClusterToBeReady();
@@ -86,6 +88,8 @@ public class TestDataValidate {
     randomKeyGenerator.setNumOfKeys(1);
     randomKeyGenerator.setKeySize(20971520);
     randomKeyGenerator.setValidateWrites(true);
+    randomKeyGenerator.setType(ReplicationType.RATIS);
+    randomKeyGenerator.setFactor(ReplicationFactor.THREE);
     randomKeyGenerator.call();
     Assert.assertEquals(1, randomKeyGenerator.getNumberOfVolumesCreated());
     Assert.assertEquals(1, randomKeyGenerator.getNumberOfBucketsCreated());
@@ -101,6 +105,8 @@ public class TestDataValidate {
     randomKeyGenerator.setNumOfBuckets(5);
     randomKeyGenerator.setNumOfKeys(10);
     randomKeyGenerator.setValidateWrites(true);
+    randomKeyGenerator.setType(ReplicationType.RATIS);
+    randomKeyGenerator.setFactor(ReplicationFactor.THREE);
     randomKeyGenerator.call();
     Assert.assertEquals(2, randomKeyGenerator.getNumberOfVolumesCreated());
     Assert.assertEquals(10, randomKeyGenerator.getNumberOfBucketsCreated());

+ 6 - 0
hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java

@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -44,6 +45,7 @@ public class TestRandomKeyGenerator {
   @BeforeClass
   public static void init() throws Exception {
     conf = new OzoneConfiguration();
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build();
     cluster.waitForClusterToBeReady();
   }
@@ -65,6 +67,8 @@ public class TestRandomKeyGenerator {
     randomKeyGenerator.setNumOfVolumes(2);
     randomKeyGenerator.setNumOfBuckets(5);
     randomKeyGenerator.setNumOfKeys(10);
+    randomKeyGenerator.setFactor(ReplicationFactor.THREE);
+    randomKeyGenerator.setType(ReplicationType.RATIS);
     randomKeyGenerator.call();
     Assert.assertEquals(2, randomKeyGenerator.getNumberOfVolumesCreated());
     Assert.assertEquals(10, randomKeyGenerator.getNumberOfBucketsCreated());
@@ -81,6 +85,8 @@ public class TestRandomKeyGenerator {
     randomKeyGenerator.setNumOfKeys(10);
     randomKeyGenerator.setNumOfThreads(10);
     randomKeyGenerator.setKeySize(10240);
+    randomKeyGenerator.setFactor(ReplicationFactor.THREE);
+    randomKeyGenerator.setType(ReplicationType.RATIS);
     randomKeyGenerator.call();
     Assert.assertEquals(10, randomKeyGenerator.getNumberOfVolumesCreated());
     Assert.assertEquals(10, randomKeyGenerator.getNumberOfBucketsCreated());