Browse Source

HDDS-870. Avoid creating block sized buffer in ChunkGroupOutputStream. Contributed by Shashikant Banerjee.

Jitendra Pandey 6 years ago
parent
commit
1afba83f2c

+ 5 - 1
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java

@@ -290,12 +290,16 @@ public class XceiverClientGrpc extends XceiverClientSpi {
   }
 
   @Override
-  public void watchForCommit(long index, long timeout)
+  public long watchForCommit(long index, long timeout)
       throws InterruptedException, ExecutionException, TimeoutException,
       IOException {
     // there is no notion of watch for commit index in standalone pipeline
+    return 0;
   };
 
+  public long getReplicatedMinCommitIndex() {
+    return 0;
+  }
   /**
    * Returns pipeline Type.
    *

+ 86 - 24
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java

@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.hdds.scm;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.RaftRetryFailureException;
 import org.apache.ratis.retry.RetryPolicy;
@@ -42,15 +44,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Objects;
-import java.util.Collection;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * An abstract implementation of {@link XceiverClientSpi} using Ratis.
@@ -79,6 +80,12 @@ public final class XceiverClientRatis extends XceiverClientSpi {
   private final int maxOutstandingRequests;
   private final RetryPolicy retryPolicy;
 
+  // Map to track commit index at every server
+  private final ConcurrentHashMap<String, Long> commitInfoMap;
+
+  // create a separate RaftClient for watchForCommit API
+  private RaftClient watchClient;
+
   /**
    * Constructs a client.
    */
@@ -89,6 +96,30 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     this.rpcType = rpcType;
     this.maxOutstandingRequests = maxOutStandingChunks;
     this.retryPolicy = retryPolicy;
+    commitInfoMap = new ConcurrentHashMap<>();
+    watchClient = null;
+  }
+
+  private void updateCommitInfosMap(
+      Collection<RaftProtos.CommitInfoProto> commitInfoProtos) {
+    // if the commitInfo map is empty, just update the commit indexes for each
+    // of the servers
+    if (commitInfoMap.isEmpty()) {
+      commitInfoProtos.forEach(proto -> commitInfoMap
+          .put(proto.getServer().getAddress(), proto.getCommitIndex()));
+      // In case the commit is happening 2 way, just update the commitIndex
+      // for the servers which have been successfully updating the commit
+      // indexes. This is important because getReplicatedMinCommitIndex()
+      // should always return the min commit index out of the nodes which have
+      // been replicating data successfully.
+    } else {
+      commitInfoProtos.forEach(proto -> commitInfoMap
+          .computeIfPresent(proto.getServer().getAddress(),
+              (address, index) -> {
+                index = proto.getCommitIndex();
+                return index;
+              }));
+    }
   }
 
   /**
@@ -125,6 +156,9 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     if (c != null) {
       closeRaftClient(c);
     }
+    if (watchClient != null) {
+      closeRaftClient(watchClient);
+    }
   }
 
   private void closeRaftClient(RaftClient raftClient) {
@@ -148,39 +182,73 @@ public final class XceiverClientRatis extends XceiverClientSpi {
         getClient().sendAsync(() -> byteString);
   }
 
+  // gets the minimum log index replicated to all servers
+  @Override
+  public long getReplicatedMinCommitIndex() {
+    OptionalLong minIndex =
+        commitInfoMap.values().parallelStream().mapToLong(v -> v).min();
+    return minIndex.isPresent() ? minIndex.getAsLong() : 0;
+  }
+
+  private void getFailedServer(
+      Collection<RaftProtos.CommitInfoProto> commitInfos) {
+    for (RaftProtos.CommitInfoProto proto : commitInfos) {
+
+    }
+  }
+
   @Override
-  public void watchForCommit(long index, long timeout)
+  public long watchForCommit(long index, long timeout)
       throws InterruptedException, ExecutionException, TimeoutException,
       IOException {
+    long commitIndex = getReplicatedMinCommitIndex();
+    if (commitIndex >= index) {
+      // return the min commit index till which the log has been replicated to
+      // all servers
+      return commitIndex;
+    }
     LOG.debug("commit index : {} watch timeout : {}", index, timeout);
     // create a new RaftClient instance for watch request
-    RaftClient raftClient =
-        RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy);
-    CompletableFuture<RaftClientReply> replyFuture = raftClient
+    if (watchClient == null) {
+      watchClient =
+          RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy);
+    }
+    CompletableFuture<RaftClientReply> replyFuture = watchClient
         .sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
+    RaftClientReply reply;
     try {
-      replyFuture.get(timeout, TimeUnit.MILLISECONDS);
+      reply = replyFuture.get(timeout, TimeUnit.MILLISECONDS);
     } catch (TimeoutException toe) {
       LOG.warn("3 way commit failed ", toe);
 
-      closeRaftClient(raftClient);
+      closeRaftClient(watchClient);
       // generate a new raft client instance again so that next watch request
       // does not get blocked for the previous one
 
       // TODO : need to remove the code to create the new RaftClient instance
       // here once the watch request bypassing sliding window in Raft Client
       // gets fixed.
-      raftClient =
+      watchClient =
           RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy);
-      raftClient
+      reply = watchClient
           .sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
           .get(timeout, TimeUnit.MILLISECONDS);
-      LOG.info("Could not commit " + index + " to all the nodes."
-          + "Committed by majority.");
-    } finally {
-      closeRaftClient(raftClient);
+      Optional<RaftProtos.CommitInfoProto>
+          proto = reply.getCommitInfos().stream().min(Comparator.comparing(
+          RaftProtos.CommitInfoProto :: getCommitIndex));
+      Preconditions.checkState(proto.isPresent());
+      String address = proto.get().getServer().getAddress();
+      // since 3 way commit has failed, the updated map from now on  will
+      // only store entries for those datanodes which have had successful
+      // replication.
+      commitInfoMap.remove(address);
+      LOG.info(
+          "Could not commit " + index + " to all the nodes. Server " + address
+              + " has failed" + "Committed by majority.");
     }
+    return index;
   }
+
   /**
    * Sends a given command to server gets a waitable future back.
    *
@@ -193,8 +261,6 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     XceiverClientAsyncReply asyncReply = new XceiverClientAsyncReply(null);
     CompletableFuture<RaftClientReply> raftClientReply =
         sendRequestAsync(request);
-    Collection<XceiverClientAsyncReply.CommitInfo> commitInfos =
-        new ArrayList<>();
     CompletableFuture<ContainerCommandResponseProto> containerCommandResponse =
         raftClientReply.whenComplete((reply, e) -> LOG.debug(
             "received reply {} for request: cmdType={} containerID={}"
@@ -212,14 +278,10 @@ public final class XceiverClientRatis extends XceiverClientSpi {
                 ContainerCommandResponseProto response =
                     ContainerCommandResponseProto
                         .parseFrom(reply.getMessage().getContent());
-                reply.getCommitInfos().forEach(e -> {
-                  XceiverClientAsyncReply.CommitInfo commitInfo =
-                      new XceiverClientAsyncReply.CommitInfo(
-                          e.getServer().getAddress(), e.getCommitIndex());
-                  commitInfos.add(commitInfo);
-                  asyncReply.setCommitInfos(commitInfos);
+                if (response.getResult() == ContainerProtos.Result.SUCCESS) {
+                  updateCommitInfosMap(reply.getCommitInfos());
                   asyncReply.setLogIndex(reply.getLogIndex());
-                });
+                }
                 return response;
               } catch (InvalidProtocolBufferException e) {
                 throw new CompletionException(e);

+ 267 - 159
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java

@@ -37,15 +37,13 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.Buffer;
 import java.nio.ByteBuffer;
 import java.util.UUID;
 import java.util.List;
 import java.util.ArrayList;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
+import java.util.concurrent.*;
+
 import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
     .putBlockAsync;
 import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
@@ -84,25 +82,30 @@ public class ChunkOutputStream extends OutputStream {
   private final long streamBufferFlushSize;
   private final long streamBufferMaxSize;
   private final long watchTimeout;
-  private ByteBuffer buffer;
+  private List<ByteBuffer> bufferList;
   // The IOException will be set by response handling thread in case there is an
   // exception received in the response. If the exception is set, the next
   // request will fail upfront.
   private IOException ioException;
   private ExecutorService responseExecutor;
 
-  // position of the buffer where the last flush was attempted
-  private int lastFlushPos;
+  // the effective length of data flushed so far
+  private long totalDataFlushedLength;
+
+  // effective data write attempted so far for the block
+  private long writtenDataLength;
 
-  // position of the buffer till which the flush was successfully
-  // acknowledged by all nodes in pipeline
-  private int lastSuccessfulFlushIndex;
+  // total data which has been successfully flushed and acknowledged
+  // by all servers
+  private long totalAckDataLength;
 
   // list to hold up all putBlock futures
   private List<CompletableFuture<ContainerProtos.ContainerCommandResponseProto>>
       futureList;
-  // list maintaining commit indexes for putBlocks
-  private List<Long> commitIndexList;
+  // map containing mapping for putBlock logIndex to to flushedDataLength Map.
+  private ConcurrentHashMap<Long, Long> commitIndex2flushedDataMap;
+
+  private int currentBufferIndex;
 
   /**
    * Creates a new ChunkOutputStream.
@@ -113,12 +116,17 @@ public class ChunkOutputStream extends OutputStream {
    * @param xceiverClient        client to perform container calls
    * @param traceID              container protocol call args
    * @param chunkSize            chunk size
+   * @param bufferList           list of byte buffers
+   * @param streamBufferFlushSize flush size
+   * @param streamBufferMaxSize   max size of the currentBuffer
+   * @param watchTimeout          watch timeout
+   * @param checksum              checksum
    */
   public ChunkOutputStream(BlockID blockID, String key,
       XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient,
       String traceID, int chunkSize, long streamBufferFlushSize,
-      long streamBufferMaxSize, long watchTimeout, ByteBuffer buffer,
-      Checksum checksum) {
+      long streamBufferMaxSize, long watchTimeout,
+      List<ByteBuffer> bufferList, Checksum checksum) {
     this.blockID = blockID;
     this.key = key;
     this.traceID = traceID;
@@ -135,24 +143,36 @@ public class ChunkOutputStream extends OutputStream {
     this.streamBufferFlushSize = streamBufferFlushSize;
     this.streamBufferMaxSize = streamBufferMaxSize;
     this.watchTimeout = watchTimeout;
-    this.buffer = buffer;
-    this.ioException = null;
+    this.bufferList = bufferList;
     this.checksum = checksum;
 
     // A single thread executor handle the responses of async requests
     responseExecutor = Executors.newSingleThreadExecutor();
-    commitIndexList = new ArrayList<>();
-    lastSuccessfulFlushIndex = 0;
+    commitIndex2flushedDataMap = new ConcurrentHashMap<>();
+    totalAckDataLength = 0;
     futureList = new ArrayList<>();
-    lastFlushPos = 0;
+    totalDataFlushedLength = 0;
+    currentBufferIndex = 0;
+    writtenDataLength = 0;
   }
 
   public BlockID getBlockID() {
     return blockID;
   }
 
-  public int getLastSuccessfulFlushIndex() {
-    return lastSuccessfulFlushIndex;
+  public long getTotalSuccessfulFlushedData() {
+    return totalAckDataLength;
+  }
+
+  public long getWrittenDataLength() {
+    return writtenDataLength;
+  }
+
+  private long computeBufferData() {
+    int dataLength =
+        bufferList.stream().mapToInt(Buffer::position).sum();
+    Preconditions.checkState(dataLength <= streamBufferMaxSize);
+    return dataLength;
   }
 
 
@@ -176,139 +196,172 @@ public class ChunkOutputStream extends OutputStream {
     if (len == 0) {
       return;
     }
-    checkOpen();
     while (len > 0) {
+      checkOpen();
       int writeLen;
-      writeLen = Math.min(chunkSize - buffer.position() % chunkSize, len);
-      buffer.put(b, off, writeLen);
-      if (buffer.position() % chunkSize == 0) {
-        int pos = buffer.position() - chunkSize;
-        int limit = buffer.position();
+      allocateBuffer();
+      ByteBuffer currentBuffer = getCurrentBuffer();
+      writeLen =
+          Math.min(chunkSize - currentBuffer.position() % chunkSize, len);
+      currentBuffer.put(b, off, writeLen);
+      if (currentBuffer.position() % chunkSize == 0) {
+        int pos = currentBuffer.position() - chunkSize;
+        int limit = currentBuffer.position();
         writeChunk(pos, limit);
       }
       off += writeLen;
       len -= writeLen;
-      if (buffer.position() >= streamBufferFlushSize
-          && buffer.position() % streamBufferFlushSize == 0) {
-
-        lastFlushPos = buffer.position();
-        futureList.add(handlePartialFlush());
+      writtenDataLength += writeLen;
+      if (currentBuffer.position() == streamBufferFlushSize) {
+        totalDataFlushedLength += streamBufferFlushSize;
+        handlePartialFlush();
       }
-      if (buffer.position() >= streamBufferMaxSize
-          && buffer.position() % streamBufferMaxSize == 0) {
+      long bufferedData = computeBufferData();
+      // Data in the bufferList can not exceed streamBufferMaxSize
+      if (bufferedData == streamBufferMaxSize) {
         handleFullBuffer();
       }
     }
   }
 
+  private ByteBuffer getCurrentBuffer() {
+    ByteBuffer buffer = bufferList.get(currentBufferIndex);
+    if (!buffer.hasRemaining()) {
+      currentBufferIndex =
+          currentBufferIndex < getMaxNumBuffers() - 1 ? ++currentBufferIndex :
+              0;
+    }
+    return bufferList.get(currentBufferIndex);
+  }
+
+  private int getMaxNumBuffers() {
+    return (int)(streamBufferMaxSize/streamBufferFlushSize);
+  }
+
+  private void allocateBuffer() {
+    for (int i = bufferList.size(); i < getMaxNumBuffers(); i++) {
+      bufferList.add(ByteBuffer.allocate((int)streamBufferFlushSize));
+    }
+  }
+
   /**
    * Will be called on the retryPath in case closedContainerException/
    * TimeoutException.
    * @param len length of data to write
-   * @throws IOException if error occured
+   * @throws IOException if error occurred
    */
 
-  // In this case, the data is already cached in the buffer.
-  public void writeOnRetry(int len) throws IOException {
+  // In this case, the data is already cached in the currentBuffer.
+  public void writeOnRetry(long len) throws IOException {
     if (len == 0) {
       return;
     }
     int off = 0;
-    checkOpen();
+    int pos = off;
     while (len > 0) {
-      int writeLen;
+      long writeLen;
       writeLen = Math.min(chunkSize, len);
       if (writeLen == chunkSize) {
-        int pos = off;
         int limit = pos + chunkSize;
         writeChunk(pos, limit);
       }
       off += writeLen;
       len -= writeLen;
+      writtenDataLength += writeLen;
       if (off % streamBufferFlushSize == 0) {
-        lastFlushPos = off;
-        futureList.add(handlePartialFlush());
+        // reset the position to zero as now we wll readng thhe next buffer in
+        // the list
+        pos = 0;
+        totalDataFlushedLength += streamBufferFlushSize;
+        handlePartialFlush();
       }
-      if (off % streamBufferMaxSize == 0) {
+      if (computeBufferData() % streamBufferMaxSize == 0) {
         handleFullBuffer();
       }
     }
   }
 
-  private void handleResponse(
-      ContainerProtos.ContainerCommandResponseProto response,
-      XceiverClientAsyncReply asyncReply) {
-    validateResponse(response);
-    discardBuffer(asyncReply);
-  }
-
-  private void discardBuffer(XceiverClientAsyncReply asyncReply) {
-    if (!commitIndexList.isEmpty()) {
-      long index = commitIndexList.get(0);
-      if (checkIfBufferDiscardRequired(asyncReply, index)) {
-        updateFlushIndex();
-      }
-    }
-  }
-
   /**
-   * just update the lastSuccessfulFlushIndex. Since we have allocated
-   * the buffer more than the streamBufferMaxSize, we can keep on writing
-   * to the buffer. In case of failure, we will read the data starting from
-   * lastSuccessfulFlushIndex.
+   * just update the totalAckDataLength. Since we have allocated
+   * the currentBuffer more than the streamBufferMaxSize, we can keep on writing
+   * to the currentBuffer. In case of failure, we will read the data starting
+   * from totalAckDataLength.
    */
-  private void updateFlushIndex() {
-    lastSuccessfulFlushIndex += streamBufferFlushSize;
-    LOG.debug("Discarding buffer till pos " + lastSuccessfulFlushIndex);
-    if (!commitIndexList.isEmpty()) {
-      commitIndexList.remove(0);
+  private void updateFlushIndex(long index) {
+    if (!commitIndex2flushedDataMap.isEmpty()) {
+      Preconditions.checkState(commitIndex2flushedDataMap.containsKey(index));
+      totalAckDataLength = commitIndex2flushedDataMap.remove(index);
+      LOG.debug("Total data successfully replicated: " + totalAckDataLength);
       futureList.remove(0);
-    }
-
-  }
-  /**
-   * Check if the last commitIndex stored at the beginning of the
-   * commitIndexList is less than equal to current commitInfo indexes.
-   * If its true, the buffer has been successfully flushed till the
-   * last position where flush happened.
-   */
-  private boolean checkIfBufferDiscardRequired(
-      XceiverClientAsyncReply asyncReply, long commitIndex) {
-    if (asyncReply.getCommitInfos() != null) {
-      for (XceiverClientAsyncReply.CommitInfo info : asyncReply
-          .getCommitInfos()) {
-        if (info.getCommitIndex() < commitIndex) {
-          return false;
-        }
+      // Flush has been committed to required servers successful.
+      // just swap the bufferList head and tail after clearing.
+      ByteBuffer currentBuffer = bufferList.remove(0);
+      currentBuffer.clear();
+      if (currentBufferIndex != 0) {
+        currentBufferIndex--;
       }
+      bufferList.add(currentBuffer);
     }
-    return true;
   }
 
   /**
-   * This is a blocking call.It will wait for the flush till the commit index
-   * at the head of the commitIndexList gets replicated to all or majority.
+   * This is a blocking call. It will wait for the flush till the commit index
+   * at the head of the commitIndex2flushedDataMap gets replicated to all or
+   * majority.
    * @throws IOException
    */
   private void handleFullBuffer() throws IOException {
-    if (!commitIndexList.isEmpty()) {
-      watchForCommit(commitIndexList.get(0));
+    try {
+      checkOpen();
+      if (!futureList.isEmpty()) {
+        waitOnFlushFutures();
+      }
+    } catch (InterruptedException | ExecutionException e) {
+      adjustBuffersOnException();
+      throw new IOException(
+          "Unexpected Storage Container Exception: " + e.toString(), e);
+    }
+    if (!commitIndex2flushedDataMap.isEmpty()) {
+      watchForCommit(
+          commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v)
+              .min().getAsLong());
     }
   }
 
+  private void adjustBuffers(long commitIndex) {
+    commitIndex2flushedDataMap.keySet().stream().forEach(index -> {
+      if (index <= commitIndex) {
+        updateFlushIndex(index);
+      } else {
+        return;
+      }
+    });
+  }
+
+  // It may happen that once the exception is encountered , we still might
+  // have successfully flushed up to a certain index. Make sure the buffers
+  // only contain data which have not been sufficiently replicated
+  private void adjustBuffersOnException() {
+    adjustBuffers(xceiverClient.getReplicatedMinCommitIndex());
+  }
+
   /**
    * calls watchForCommit API of the Ratis Client. For Standalone client,
    * it is a no op.
    * @param commitIndex log index to watch for
+   * @return minimum commit index replicated to all nodes
    * @throws IOException IOException in case watch gets timed out
    */
   private void watchForCommit(long commitIndex) throws IOException {
     checkOpen();
-    Preconditions.checkState(!commitIndexList.isEmpty());
+    Preconditions.checkState(!commitIndex2flushedDataMap.isEmpty());
     try {
-      xceiverClient.watchForCommit(commitIndex, watchTimeout);
+      long index =
+          xceiverClient.watchForCommit(commitIndex, watchTimeout);
+      adjustBuffers(index);
     } catch (TimeoutException | InterruptedException | ExecutionException e) {
       LOG.warn("watchForCommit failed for index " + commitIndex, e);
+      adjustBuffersOnException();
       throw new IOException(
           "Unexpected Storage Container Exception: " + e.toString(), e);
     }
@@ -317,68 +370,79 @@ public class ChunkOutputStream extends OutputStream {
   private CompletableFuture<ContainerProtos.
       ContainerCommandResponseProto> handlePartialFlush()
       throws IOException {
+    checkOpen();
+    long flushPos = totalDataFlushedLength;
     String requestId =
         traceID + ContainerProtos.Type.PutBlock + chunkIndex + blockID;
+    CompletableFuture<ContainerProtos.
+        ContainerCommandResponseProto> flushFuture;
     try {
       XceiverClientAsyncReply asyncReply =
           putBlockAsync(xceiverClient, containerBlockData.build(), requestId);
       CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
           asyncReply.getResponse();
-
-      return future.thenApplyAsync(e -> {
-        handleResponse(e, asyncReply);
+      flushFuture = future.thenApplyAsync(e -> {
+        try {
+          validateResponse(e);
+        } catch (IOException sce) {
+          future.completeExceptionally(sce);
+          return e;
+        }
         // if the ioException is not set, putBlock is successful
         if (ioException == null) {
           LOG.debug(
-              "Adding index " + asyncReply.getLogIndex() + " commitList size "
-                  + commitIndexList.size());
+              "Adding index " + asyncReply.getLogIndex() + " commitMap size "
+                  + commitIndex2flushedDataMap.size());
           BlockID responseBlockID = BlockID.getFromProtobuf(
               e.getPutBlock().getCommittedBlockLength().getBlockID());
           Preconditions.checkState(blockID.getContainerBlockID()
               .equals(responseBlockID.getContainerBlockID()));
           // updates the bcsId of the block
           blockID = responseBlockID;
-          long index = asyncReply.getLogIndex();
           // for standalone protocol, logIndex will always be 0.
-          if (index != 0) {
-            commitIndexList.add(index);
-          } else {
-            updateFlushIndex();
-          }
+          commitIndex2flushedDataMap.put(asyncReply.getLogIndex(), flushPos);
         }
         return e;
-      }, responseExecutor);
+      }, responseExecutor).exceptionally(e -> {
+        LOG.debug(
+            "putBlock failed for blockID " + blockID + " with exception " + e
+                .getLocalizedMessage());
+        CompletionException ce =  new CompletionException(e);
+        setIoException(ce);
+        throw ce;
+      });
     } catch (IOException | InterruptedException | ExecutionException e) {
       throw new IOException(
           "Unexpected Storage Container Exception: " + e.toString(), e);
     }
+    futureList.add(flushFuture);
+    return flushFuture;
   }
 
   @Override
   public void flush() throws IOException {
     if (xceiverClientManager != null && xceiverClient != null
-        && buffer != null) {
+        && bufferList != null) {
       checkOpen();
-      if (buffer.position() > 0 && lastSuccessfulFlushIndex != buffer
-          .position()) {
+      int bufferSize = bufferList.size();
+      if (bufferSize > 0) {
         try {
-
-          // flush the last chunk data residing on the buffer
-          if (buffer.position() % chunkSize > 0) {
-            int pos = buffer.position() - (buffer.position() % chunkSize);
-            writeChunk(pos, buffer.position());
-          }
-          if (lastFlushPos != buffer.position()) {
-            lastFlushPos = buffer.position();
+          // flush the last chunk data residing on the currentBuffer
+          if (totalDataFlushedLength < writtenDataLength) {
+            ByteBuffer currentBuffer = getCurrentBuffer();
+            int pos = currentBuffer.position() - (currentBuffer.position()
+                % chunkSize);
+            int limit = currentBuffer.position() - pos;
+            writeChunk(pos, currentBuffer.position());
+            totalDataFlushedLength += limit;
             handlePartialFlush();
           }
-          CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
-              futureList.toArray(new CompletableFuture[futureList.size()]));
-          combinedFuture.get();
+          waitOnFlushFutures();
           // just check again if the exception is hit while waiting for the
           // futures to ensure flush has indeed succeeded
           checkOpen();
         } catch (InterruptedException | ExecutionException e) {
+          adjustBuffersOnException();
           throw new IOException(
               "Unexpected Storage Container Exception: " + e.toString(), e);
         }
@@ -388,11 +452,11 @@ public class ChunkOutputStream extends OutputStream {
 
   private void writeChunk(int pos, int limit) throws IOException {
     // Please note : We are not flipping the slice when we write since
-    // the slices are pointing the buffer start and end as needed for
+    // the slices are pointing the currentBuffer start and end as needed for
     // the chunk write. Also please note, Duplicate does not create a
     // copy of data, it only creates metadata that points to the data
     // stream.
-    ByteBuffer chunk = buffer.duplicate();
+    ByteBuffer chunk = bufferList.get(currentBufferIndex).duplicate();
     chunk.position(pos);
     chunk.limit(limit);
     writeChunkToContainer(chunk);
@@ -401,49 +465,78 @@ public class ChunkOutputStream extends OutputStream {
   @Override
   public void close() throws IOException {
     if (xceiverClientManager != null && xceiverClient != null
-        && buffer != null) {
-      try {
-        if (buffer.position() > lastFlushPos) {
-          int pos = buffer.position() - (buffer.position() % chunkSize);
-          writeChunk(pos, buffer.position());
-          futureList.add(handlePartialFlush());
-        }
-        CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
-            futureList.toArray(new CompletableFuture[futureList.size()]));
-
-        // wait for all the transactions to complete
-        combinedFuture.get();
-
-        // irrespective of whether the commitIndexList is empty or not,
-        // ensure there is no exception set(For Standalone Protocol)
-        checkOpen();
-        if (!commitIndexList.isEmpty()) {
-          // wait for the last commit index in the commitIndexList to get
-          // committed to all or majority of nodes in case timeout happens.
-          long lastIndex = commitIndexList.get(commitIndexList.size() - 1);
-          LOG.debug(
-              "waiting for last flush Index " + lastIndex + " to catch up");
-          watchForCommit(lastIndex);
-          updateFlushIndex();
+        && bufferList != null) {
+      int bufferSize = bufferList.size();
+      if (bufferSize > 0) {
+        try {
+          // flush the last chunk data residing on the currentBuffer
+          if (totalDataFlushedLength < writtenDataLength) {
+            ByteBuffer currentBuffer = getCurrentBuffer();
+            int pos = currentBuffer.position() - (currentBuffer.position()
+                % chunkSize);
+            int limit = currentBuffer.position() - pos;
+            writeChunk(pos, currentBuffer.position());
+            totalDataFlushedLength += limit;
+            handlePartialFlush();
+          }
+          waitOnFlushFutures();
+          // irrespective of whether the commitIndex2flushedDataMap is empty
+          // or not, ensure there is no exception set
+          checkOpen();
+          if (!commitIndex2flushedDataMap.isEmpty()) {
+            // wait for the last commit index in the commitIndex2flushedDataMap
+            // to get committed to all or majority of nodes in case timeout
+            // happens.
+            long lastIndex =
+                commitIndex2flushedDataMap.keySet().stream()
+                    .mapToLong(v -> v).max().getAsLong();
+            LOG.debug(
+                "waiting for last flush Index " + lastIndex + " to catch up");
+            watchForCommit(lastIndex);
+          }
+        } catch (InterruptedException | ExecutionException e) {
+          adjustBuffersOnException();
+          throw new IOException(
+              "Unexpected Storage Container Exception: " + e.toString(), e);
+        } finally {
+          cleanup();
         }
-      } catch (InterruptedException | ExecutionException e) {
-        throw new IOException(
-            "Unexpected Storage Container Exception: " + e.toString(), e);
-      } finally {
-        cleanup();
       }
+      // clear the currentBuffer
+      bufferList.stream().forEach(ByteBuffer::clear);
     }
-    // clear the buffer
-    buffer.clear();
+  }
+
+  private void waitOnFlushFutures()
+      throws InterruptedException, ExecutionException {
+    CompletableFuture<Void> combinedFuture = CompletableFuture
+        .allOf(futureList.toArray(new CompletableFuture[futureList.size()]));
+    // wait for all the transactions to complete
+    combinedFuture.get();
   }
 
   private void validateResponse(
-      ContainerProtos.ContainerCommandResponseProto responseProto) {
+      ContainerProtos.ContainerCommandResponseProto responseProto)
+      throws IOException {
     try {
+      // if the ioException is already set, it means a prev request has failed
+      // just throw the exception. The current operation will fail with the
+      // original error
+      if (ioException != null) {
+        throw ioException;
+      }
       ContainerProtocolCalls.validateContainerResponse(responseProto);
     } catch (StorageContainerException sce) {
-      ioException = new IOException(
-          "Unexpected Storage Container Exception: " + sce.toString(), sce);
+      LOG.error("Unexpected Storage Container Exception: ", sce);
+      setIoException(sce);
+      throw sce;
+    }
+  }
+
+  private void setIoException(Exception e) {
+    if (ioException != null) {
+      ioException =  new IOException(
+          "Unexpected Storage Container Exception: " + e.toString(), e);
     }
   }
 
@@ -457,7 +550,10 @@ public class ChunkOutputStream extends OutputStream {
       futureList.clear();
     }
     futureList = null;
-    commitIndexList = null;
+    if (commitIndex2flushedDataMap != null) {
+      commitIndex2flushedDataMap.clear();
+    }
+    commitIndex2flushedDataMap = null;
     responseExecutor.shutdown();
   }
 
@@ -471,6 +567,7 @@ public class ChunkOutputStream extends OutputStream {
     if (xceiverClient == null) {
       throw new IOException("ChunkOutputStream has been closed.");
     } else if (ioException != null) {
+      adjustBuffersOnException();
       throw ioException;
     }
   }
@@ -504,16 +601,27 @@ public class ChunkOutputStream extends OutputStream {
       CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
           asyncReply.getResponse();
       future.thenApplyAsync(e -> {
-        handleResponse(e, asyncReply);
+        try {
+          validateResponse(e);
+        } catch (IOException sce) {
+          future.completeExceptionally(sce);
+        }
         return e;
-      }, responseExecutor);
+      }, responseExecutor).exceptionally(e -> {
+        LOG.debug(
+            "writing chunk failed " + chunkInfo.getChunkName() + " blockID "
+                + blockID + " with exception " + e.getLocalizedMessage());
+        CompletionException ce = new CompletionException(e);
+        setIoException(ce);
+        throw ce;
+      });
     } catch (IOException | InterruptedException | ExecutionException e) {
       throw new IOException(
           "Unexpected Storage Container Exception: " + e.toString(), e);
     }
     LOG.debug(
         "writing chunk " + chunkInfo.getChunkName() + " blockID " + blockID
-            + " length " + chunk.remaining());
+            + " length " + effectiveChunkSize);
     containerBlockData.addChunks(chunkInfo);
   }
 }

+ 1 - 43
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientAsyncReply.java

@@ -21,8 +21,6 @@ package org.apache.hadoop.hdds.scm;
 
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandResponseProto;
-
-import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -32,49 +30,13 @@ public class XceiverClientAsyncReply {
 
   private CompletableFuture<ContainerCommandResponseProto> response;
   private Long logIndex;
-  private Collection<CommitInfo> commitInfos;
 
   public XceiverClientAsyncReply(
       CompletableFuture<ContainerCommandResponseProto> response) {
-    this(response, 0, null);
-  }
-
-  public XceiverClientAsyncReply(
-      CompletableFuture<ContainerCommandResponseProto> response, long index,
-      Collection<CommitInfo> commitInfos) {
-    this.commitInfos = commitInfos;
-    this.logIndex = index;
+    this.logIndex = (long)0;
     this.response = response;
   }
 
-  /**
-   * A class having details about latest commitIndex for each server in the
-   * Ratis pipeline. For Standalone pipeline, commitInfo will be null.
-   */
-  public static class CommitInfo {
-
-    private final String server;
-
-    private final Long commitIndex;
-
-    public CommitInfo(String server, long commitIndex) {
-      this.server = server;
-      this.commitIndex = commitIndex;
-    }
-
-    public String getServer() {
-      return server;
-    }
-
-    public long getCommitIndex() {
-      return commitIndex;
-    }
-  }
-
-  public Collection<CommitInfo> getCommitInfos() {
-    return commitInfos;
-  }
-
   public CompletableFuture<ContainerCommandResponseProto> getResponse() {
     return response;
   }
@@ -83,10 +45,6 @@ public class XceiverClientAsyncReply {
     return logIndex;
   }
 
-  public void setCommitInfos(Collection<CommitInfo> commitInfos) {
-    this.commitInfos = commitInfos;
-  }
-
   public void setLogIndex(Long logIndex) {
     this.logIndex = logIndex;
   }

+ 18 - 1
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java

@@ -125,7 +125,24 @@ public abstract class XceiverClientSpi implements Closeable {
    */
   public abstract HddsProtos.ReplicationType getPipelineType();
 
-  public abstract void watchForCommit(long index, long timeout)
+  /**
+   * Check if an specfic commitIndex is replicated to majority/all servers.
+   * @param index index to watch for
+   * @param timeout timeout provided for the watch ipeartion to complete
+   * @return the min commit index replicated to all or majority servers
+   *         in case of a failure
+   * @throws InterruptedException
+   * @throws ExecutionException
+   * @throws TimeoutException
+   * @throws IOException
+   */
+  public abstract long watchForCommit(long index, long timeout)
       throws InterruptedException, ExecutionException, TimeoutException,
       IOException;
+
+  /**
+   * returns the min commit index replicated to all servers.
+   * @return min commit index replicated to all servers.
+   */
+  public abstract long getReplicatedMinCommitIndex();
 }

+ 0 - 1
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java

@@ -189,7 +189,6 @@ public final class ContainerProtocolCalls  {
             .setContainerID(containerBlockData.getBlockID().getContainerID())
             .setTraceID(traceID).setDatanodeUuid(id)
             .setPutBlock(createBlockRequest).build();
-    xceiverClient.sendCommand(request);
     return xceiverClient.sendCommandAsync(request);
   }
 

+ 4 - 4
hadoop-hdds/common/src/main/resources/ozone-default.xml

@@ -365,8 +365,8 @@
     <name>ozone.client.stream.buffer.flush.size</name>
     <value>64MB</value>
     <tag>OZONE, CLIENT</tag>
-    <description>Size in mb which determines at what buffer position , a partial
-      flush will be initiated during write. It should be ideally a mutiple
+    <description>Size which determines at what buffer position , a partial
+      flush will be initiated during write. It should be ideally a multiple
       of chunkSize.
     </description>
   </property>
@@ -374,8 +374,8 @@
     <name>ozone.client.stream.buffer.max.size</name>
     <value>128MB</value>
     <tag>OZONE, CLIENT</tag>
-    <description>Size in mb which determines at what buffer position ,
-      write call be blocked till acknowledgement of the fisrt partial flush
+    <description>Size which determines at what buffer position,
+      write call be blocked till acknowledgement of the first partial flush
       happens by all servers.
     </description>
   </property>

+ 2 - 2
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java

@@ -21,7 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
 /**
- * DispatcherContext class holds transport protocol specfic context info
+ * DispatcherContext class holds transport protocol specific context info
  * required for execution of container commands over the container dispatcher.
  */
 @InterfaceAudience.Private
@@ -121,7 +121,7 @@ public class DispatcherContext {
     }
 
     /**
-     * Builds and returns DatanodeDetails instance.
+     * Builds and returns DispatcherContext instance.
      *
      * @return DispatcherContext
      */

+ 60 - 59
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java

@@ -83,8 +83,8 @@ public class ChunkGroupOutputStream extends OutputStream {
   private final long streamBufferMaxSize;
   private final long watchTimeout;
   private final long blockSize;
-  private ByteBuffer buffer;
   private final Checksum checksum;
+  private List<ByteBuffer> bufferList;
   /**
    * A constructor for testing purpose only.
    */
@@ -101,7 +101,9 @@ public class ChunkGroupOutputStream extends OutputStream {
     closed = false;
     streamBufferFlushSize = 0;
     streamBufferMaxSize = 0;
-    buffer = ByteBuffer.allocate(1);
+    bufferList = new ArrayList<>(1);
+    ByteBuffer buffer = ByteBuffer.allocate(1);
+    bufferList.add(buffer);
     watchTimeout = 0;
     blockSize = 0;
     this.checksum = new Checksum();
@@ -177,15 +179,7 @@ public class ChunkGroupOutputStream extends OutputStream {
     Preconditions.checkState(streamBufferFlushSize % chunkSize == 0);
     Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0);
     Preconditions.checkState(blockSize % streamBufferMaxSize == 0);
-
-    // This byteBuffer will be used to cache data until all the blockCommits
-    // (putBlock) gets replicated to all/majority servers. The idea here is to
-    // allocate the buffer of size blockSize so that as and when a chunk is
-    // is replicated to all servers, as a part of discarding the buffer, we
-    // don't necessarily need to run compaction(buffer.compact() on the buffer
-    // to actually discard the acknowledged data. Compaction is inefficient so
-    // it would be a better choice to avoid compaction on the happy I/O path.
-    this.buffer = ByteBuffer.allocate((int) blockSize);
+    this.bufferList = new ArrayList<>();
   }
 
   /**
@@ -222,12 +216,7 @@ public class ChunkGroupOutputStream extends OutputStream {
     streamEntries.add(new ChunkOutputStreamEntry(subKeyInfo.getBlockID(),
         keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID,
         chunkSize, subKeyInfo.getLength(), streamBufferFlushSize,
-        streamBufferMaxSize, watchTimeout, buffer, checksum));
-  }
-
-  @VisibleForTesting
-  public long getByteOffset() {
-    return getKeyLength();
+        streamBufferMaxSize, watchTimeout, bufferList, checksum));
   }
 
 
@@ -254,11 +243,6 @@ public class ChunkGroupOutputStream extends OutputStream {
   public void write(byte[] b, int off, int len)
       throws IOException {
     checkNotClosed();
-    handleWrite(b, off, len, false, buffer.position());
-  }
-
-  private void handleWrite(byte[] b, int off, int len, boolean retry,
-      int pos) throws IOException {
     if (b == null) {
       throw new NullPointerException();
     }
@@ -269,8 +253,17 @@ public class ChunkGroupOutputStream extends OutputStream {
     if (len == 0) {
       return;
     }
+    handleWrite(b, off, len, false);
+  }
+
+  private long computeBufferData() {
+    return bufferList.stream().mapToInt(value -> value.position())
+        .sum();
+  }
+
+  private void handleWrite(byte[] b, int off, long len, boolean retry)
+      throws IOException {
     int succeededAllocates = 0;
-    int initialPos;
     while (len > 0) {
       if (streamEntries.size() <= currentStreamIndex) {
         Preconditions.checkNotNull(omClient);
@@ -289,8 +282,12 @@ public class ChunkGroupOutputStream extends OutputStream {
       // still do a sanity check.
       Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
       ChunkOutputStreamEntry current = streamEntries.get(currentStreamIndex);
-      int writeLen = Math.min(len, (int) current.getRemaining());
-      initialPos = pos < buffer.position() ? pos : buffer.position();
+
+      // length(len) will be in int range if the call is happening through
+      // write API of chunkOutputStream. Length can be in long range if it comes
+      // via Exception path.
+      int writeLen = Math.min((int)len, (int) current.getRemaining());
+      long currentPos = current.getWrittenDataLength();
       try {
         if (retry) {
           current.writeOnRetry(len);
@@ -299,9 +296,10 @@ public class ChunkGroupOutputStream extends OutputStream {
         }
       } catch (IOException ioe) {
         if (checkIfContainerIsClosed(ioe) || checkIfTimeoutException(ioe)) {
-          // for the current iteration, current pos - initialPos gives the
+          // for the current iteration, totalDataWritten - currentPos gives the
           // amount of data already written to the buffer
-          writeLen = buffer.position() - initialPos;
+          writeLen = (int) (current.getWrittenDataLength() - currentPos);
+          LOG.debug("writeLen {}, total len {}", writeLen, len);
           handleException(current, currentStreamIndex);
         } else {
           throw ioe;
@@ -366,30 +364,20 @@ public class ChunkGroupOutputStream extends OutputStream {
    */
   private void handleException(ChunkOutputStreamEntry streamEntry,
       int streamIndex) throws IOException {
-    int lastSuccessfulFlushIndex = streamEntry.getLastSuccessfulFlushIndex();
-    int currentPos = buffer.position();
-
-    // In case of a failure, read the data from the position till the last
-    // acknowledgement happened.
-    if (lastSuccessfulFlushIndex > 0) {
-      buffer.position(lastSuccessfulFlushIndex);
-      buffer.limit(currentPos);
-      buffer.compact();
-    }
-
-    if (buffer.position() > 0) {
-      //set the correct length for the current stream
-      streamEntry.currentPosition = lastSuccessfulFlushIndex;
+    long totalSuccessfulFlushedData =
+        streamEntry.getTotalSuccessfulFlushedData();
+    //set the correct length for the current stream
+    streamEntry.currentPosition = totalSuccessfulFlushedData;
+    long bufferedDataLen = computeBufferData();
+    // just clean up the current stream.
+    streamEntry.cleanup();
+    if (bufferedDataLen > 0) {
       // If the data is still cached in the underlying stream, we need to
       // allocate new block and write this data in the datanode.
       currentStreamIndex += 1;
-      handleWrite(buffer.array(), 0, buffer.position(), true,
-          lastSuccessfulFlushIndex);
+      handleWrite(null, 0, bufferedDataLen, true);
     }
-
-    // just clean up the current stream.
-    streamEntry.cleanup();
-    if (lastSuccessfulFlushIndex == 0) {
+    if (totalSuccessfulFlushedData == 0) {
       streamEntries.remove(streamIndex);
       currentStreamIndex -= 1;
     }
@@ -433,7 +421,7 @@ public class ChunkGroupOutputStream extends OutputStream {
   }
 
   private long getKeyLength() {
-    return streamEntries.parallelStream().mapToLong(e -> e.currentPosition)
+    return streamEntries.stream().mapToLong(e -> e.currentPosition)
         .sum();
   }
 
@@ -517,10 +505,10 @@ public class ChunkGroupOutputStream extends OutputStream {
     } catch (IOException ioe) {
       throw ioe;
     } finally {
-      if (buffer != null) {
-        buffer.clear();
+      if (bufferList != null) {
+        bufferList.stream().forEach(e -> e.clear());
       }
-      buffer = null;
+      bufferList = null;
     }
   }
 
@@ -633,13 +621,13 @@ public class ChunkGroupOutputStream extends OutputStream {
     private final long streamBufferFlushSize;
     private final long streamBufferMaxSize;
     private final long watchTimeout;
-    private ByteBuffer buffer;
+    private List<ByteBuffer> bufferList;
 
     ChunkOutputStreamEntry(BlockID blockID, String key,
         XceiverClientManager xceiverClientManager,
         XceiverClientSpi xceiverClient, String requestId, int chunkSize,
         long length, long streamBufferFlushSize, long streamBufferMaxSize,
-        long watchTimeout, ByteBuffer buffer, Checksum checksum) {
+        long watchTimeout, List<ByteBuffer> bufferList, Checksum checksum) {
       this.outputStream = null;
       this.blockID = blockID;
       this.key = key;
@@ -653,8 +641,8 @@ public class ChunkGroupOutputStream extends OutputStream {
       this.streamBufferFlushSize = streamBufferFlushSize;
       this.streamBufferMaxSize = streamBufferMaxSize;
       this.watchTimeout = watchTimeout;
-      this.buffer = buffer;
       this.checksum = checksum;
+      this.bufferList = bufferList;
     }
 
     /**
@@ -676,7 +664,7 @@ public class ChunkGroupOutputStream extends OutputStream {
       this.currentPosition = 0;
       streamBufferFlushSize = 0;
       streamBufferMaxSize = 0;
-      buffer = null;
+      bufferList = null;
       watchTimeout = 0;
       this.checksum = checksum;
     }
@@ -694,7 +682,7 @@ public class ChunkGroupOutputStream extends OutputStream {
         this.outputStream =
             new ChunkOutputStream(blockID, key, xceiverClientManager,
                 xceiverClient, requestId, chunkSize, streamBufferFlushSize,
-                streamBufferMaxSize, watchTimeout, buffer, checksum);
+                streamBufferMaxSize, watchTimeout, bufferList, checksum);
       }
     }
 
@@ -731,11 +719,24 @@ public class ChunkGroupOutputStream extends OutputStream {
       }
     }
 
-    int getLastSuccessfulFlushIndex() throws IOException {
+    long getTotalSuccessfulFlushedData() throws IOException {
       if (this.outputStream instanceof ChunkOutputStream) {
         ChunkOutputStream out = (ChunkOutputStream) this.outputStream;
         blockID = out.getBlockID();
-        return out.getLastSuccessfulFlushIndex();
+        return out.getTotalSuccessfulFlushedData();
+      } else if (outputStream == null) {
+        // For a pre allocated block for which no write has been initiated,
+        // the OutputStream will be null here.
+        // In such cases, the default blockCommitSequenceId will be 0
+        return 0;
+      }
+      throw new IOException("Invalid Output Stream for Key: " + key);
+    }
+
+    long getWrittenDataLength() throws IOException {
+      if (this.outputStream instanceof ChunkOutputStream) {
+        ChunkOutputStream out = (ChunkOutputStream) this.outputStream;
+        return out.getWrittenDataLength();
       } else if (outputStream == null) {
         // For a pre allocated block for which no write has been initiated,
         // the OutputStream will be null here.
@@ -753,7 +754,7 @@ public class ChunkGroupOutputStream extends OutputStream {
       }
     }
 
-    void writeOnRetry(int len) throws IOException {
+    void writeOnRetry(long len) throws IOException {
       checkStream();
       if (this.outputStream instanceof ChunkOutputStream) {
         ChunkOutputStream out = (ChunkOutputStream) this.outputStream;

+ 11 - 8
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java

@@ -109,6 +109,10 @@ public class TestCloseContainerHandlingByClient {
     objectStore.getVolume(volumeName).createBucket(bucketName);
   }
 
+  private String getKeyName() {
+    return UUID.randomUUID().toString();
+  }
+
   /**
    * Shutdown MiniDFSCluster.
    */
@@ -121,7 +125,7 @@ public class TestCloseContainerHandlingByClient {
 
   @Test
   public void testBlockWritesWithFlushAndClose() throws Exception {
-    String keyName = "standalone";
+    String keyName = getKeyName();
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
     // write data more than 1 chunk
     byte[] data = ContainerTestHelper
@@ -153,7 +157,7 @@ public class TestCloseContainerHandlingByClient {
 
   @Test
   public void testBlockWritesCloseConsistency() throws Exception {
-    String keyName = "standalone2";
+    String keyName = getKeyName();
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
     // write data more than 1 chunk
     byte[] data = ContainerTestHelper
@@ -181,7 +185,7 @@ public class TestCloseContainerHandlingByClient {
   @Test
   public void testMultiBlockWrites() throws Exception {
 
-    String keyName = "standalone3";
+    String keyName = getKeyName();
     OzoneOutputStream key =
         createKey(keyName, ReplicationType.RATIS, (4 * blockSize));
     ChunkGroupOutputStream groupOutputStream =
@@ -227,8 +231,7 @@ public class TestCloseContainerHandlingByClient {
 
   @Test
   public void testMultiBlockWrites2() throws Exception {
-    String keyName = "ratis2";
-    long dataLength;
+    String keyName = getKeyName();
     OzoneOutputStream key =
         createKey(keyName, ReplicationType.RATIS, 4 * blockSize);
     ChunkGroupOutputStream groupOutputStream =
@@ -272,7 +275,7 @@ public class TestCloseContainerHandlingByClient {
   @Test
   public void testMultiBlockWrites3() throws Exception {
 
-    String keyName = "standalone5";
+    String keyName = getKeyName();
     int keyLen = 4 * blockSize;
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, keyLen);
     ChunkGroupOutputStream groupOutputStream =
@@ -391,7 +394,7 @@ public class TestCloseContainerHandlingByClient {
   // on the datanode.
   @Test
   public void testDiscardPreallocatedBlocks() throws Exception {
-    String keyName = "discardpreallocatedblocks";
+    String keyName = getKeyName();
     OzoneOutputStream key =
         createKey(keyName, ReplicationType.RATIS, 2 * blockSize);
     ChunkGroupOutputStream groupOutputStream =
@@ -447,7 +450,7 @@ public class TestCloseContainerHandlingByClient {
 
   @Test
   public void testBlockWriteViaRatis() throws Exception {
-    String keyName = "ratis";
+    String keyName = getKeyName();
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
     byte[] data = ContainerTestHelper
         .getFixedLengthString(keyString, chunkSize + chunkSize / 2)

+ 51 - 5
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java

@@ -65,10 +65,6 @@ public class TestFailureHandlingByClient {
   private static String keyString;
   private static int maxRetries;
 
-  /**
-   * TODO: we will spawn new MiniOzoneCluster every time for each unit test
-   * invocation. Need to use the same instance for all tests.
-   */
   /**
    * Create a MiniDFSCluster for testing.
    * <p>
@@ -86,6 +82,11 @@ public class TestFailureHandlingByClient {
         TimeUnit.SECONDS);
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 5);
+    conf.setTimeDuration(
+        OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY,
+        1, TimeUnit.SECONDS);
+
     conf.setQuietMode(false);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(6).build();
@@ -112,7 +113,7 @@ public class TestFailureHandlingByClient {
 
   @Test
   public void testBlockWritesWithDnFailures() throws Exception {
-    String keyName = "ratis3";
+    String keyName = UUID.randomUUID().toString();
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
     byte[] data =
         ContainerTestHelper
@@ -189,6 +190,51 @@ public class TestFailureHandlingByClient {
     validateData(keyName, data.concat(data).getBytes());
   }
 
+  @Test
+  public void testMultiBlockWritesWithIntermittentDnFailures()
+      throws Exception {
+    String keyName = UUID.randomUUID().toString();
+    OzoneOutputStream key =
+        createKey(keyName, ReplicationType.RATIS, 6 * blockSize);
+    String data = ContainerTestHelper
+        .getFixedLengthString(keyString, blockSize + chunkSize);
+    key.write(data.getBytes());
+
+    // get the name of a valid container
+    Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
+    ChunkGroupOutputStream groupOutputStream =
+        (ChunkGroupOutputStream) key.getOutputStream();
+    List<OmKeyLocationInfo> locationInfoList =
+        groupOutputStream.getLocationInfoList();
+    Assert.assertTrue(locationInfoList.size() == 6);
+    long containerId = locationInfoList.get(1).getContainerID();
+    ContainerInfo container =
+        cluster.getStorageContainerManager().getContainerManager()
+            .getContainer(ContainerID.valueof(containerId));
+    Pipeline pipeline =
+        cluster.getStorageContainerManager().getPipelineManager()
+            .getPipeline(container.getPipelineID());
+    List<DatanodeDetails> datanodes = pipeline.getNodes();
+    cluster.shutdownHddsDatanode(datanodes.get(0));
+
+    // The write will fail but exception will be handled and length will be
+    // updated correctly in OzoneManager once the steam is closed
+    key.write(data.getBytes());
+
+    // shutdown the second datanode
+    cluster.shutdownHddsDatanode(datanodes.get(1));
+    key.write(data.getBytes());
+    key.close();
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+        .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
+        .build();
+    OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+    Assert.assertEquals(3 * data.getBytes().length, keyInfo.getDataSize());
+    validateData(keyName, data.concat(data).concat(data).getBytes());
+  }
+
+
   private OzoneOutputStream createKey(String keyName, ReplicationType type,
       long size) throws Exception {
     return ContainerTestHelper

+ 0 - 78
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java

@@ -18,15 +18,12 @@ package org.apache.hadoop.ozone.om;
 
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream;
-import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
 import org.apache.hadoop.hdds.scm.storage.ChunkInputStream;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.OutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 
@@ -41,81 +38,9 @@ public class TestChunkStreams {
   @Rule
   public ExpectedException exception = ExpectedException.none();
 
-  /**
-   * This test uses ByteArrayOutputStream as the underlying stream to test
-   * the correctness of ChunkGroupOutputStream.
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testWriteGroupOutputStream() throws Exception {
-    try (ChunkGroupOutputStream groupOutputStream =
-             new ChunkGroupOutputStream()) {
-      ArrayList<OutputStream> outputStreams = new ArrayList<>();
-
-      // 5 byte streams, each 100 bytes. write 500 bytes means writing to each
-      // of them with 100 bytes.
-      for (int i = 0; i < 5; i++) {
-        ByteArrayOutputStream out = new ByteArrayOutputStream(100);
-        outputStreams.add(out);
-        groupOutputStream.addStream(out, 100);
-      }
-      assertEquals(0, groupOutputStream.getByteOffset());
-
-      String dataString = RandomStringUtils.randomAscii(500);
-      byte[] data = dataString.getBytes(UTF_8);
-      groupOutputStream.write(data, 0, data.length);
-      assertEquals(500, groupOutputStream.getByteOffset());
-
-      String res = "";
-      int offset = 0;
-      for (OutputStream stream : outputStreams) {
-        String subString = stream.toString();
-        res += subString;
-        assertEquals(dataString.substring(offset, offset + 100), subString);
-        offset += 100;
-      }
-      assertEquals(dataString, res);
-    }
-  }
-
-  @Test
-  public void testErrorWriteGroupOutputStream() throws Exception {
-    try (ChunkGroupOutputStream groupOutputStream =
-             new ChunkGroupOutputStream()) {
-      ArrayList<OutputStream> outputStreams = new ArrayList<>();
-
-      // 5 byte streams, each 100 bytes. write 500 bytes means writing to each
-      // of them with 100 bytes. all 5 streams makes up a ChunkGroupOutputStream
-      // with a total of 500 bytes in size
-      for (int i = 0; i < 5; i++) {
-        ByteArrayOutputStream out = new ByteArrayOutputStream(100);
-        outputStreams.add(out);
-        groupOutputStream.addStream(out, 100);
-      }
-      assertEquals(0, groupOutputStream.getByteOffset());
-
-      // first writes of 100 bytes should succeed
-      groupOutputStream.write(RandomStringUtils.randomAscii(100)
-          .getBytes(UTF_8));
-      assertEquals(100, groupOutputStream.getByteOffset());
-
-      // second writes of 500 bytes should fail, as there should be only 400
-      // bytes space left
-      // TODO : if we decide to take the 400 bytes instead in the future,
-      // other add more informative error code rather than exception, need to
-      // change this part.
-      exception.expect(Exception.class);
-      groupOutputStream.write(RandomStringUtils.randomAscii(500)
-          .getBytes(UTF_8));
-      assertEquals(100, groupOutputStream.getByteOffset());
-    }
-  }
-
   @Test
   public void testReadGroupInputStream() throws Exception {
     try (ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream()) {
-      ArrayList<ChunkInputStream> inputStreams = new ArrayList<>();
 
       String dataString = RandomStringUtils.randomAscii(500);
       byte[] buf = dataString.getBytes(UTF_8);
@@ -157,7 +82,6 @@ public class TestChunkStreams {
                 return readLen;
               }
             };
-        inputStreams.add(in);
         offset += 100;
         groupInputStream.addStream(in, 100);
       }
@@ -173,7 +97,6 @@ public class TestChunkStreams {
   @Test
   public void testErrorReadGroupInputStream() throws Exception {
     try (ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream()) {
-      ArrayList<ChunkInputStream> inputStreams = new ArrayList<>();
 
       String dataString = RandomStringUtils.randomAscii(500);
       byte[] buf = dataString.getBytes(UTF_8);
@@ -215,7 +138,6 @@ public class TestChunkStreams {
                 return readLen;
               }
             };
-        inputStreams.add(in);
         offset += 100;
         groupInputStream.addStream(in, 100);
       }