Przeglądaj źródła

Revert "HDDS-1395. Key write fails with BlockOutputStream has been closed exception (#749)"

This reverts commit d525633619f83a76bcb4d73b444bbfe8e3ab594b.
Shashikant Banerjee 6 lat temu
rodzic
commit
01451a57de
17 zmienionych plików z 576 dodań i 1193 usunięć
  1. 5 11
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
  2. 1 69
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java
  3. 2 11
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
  4. 1 0
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
  5. 2 2
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
  6. 2 2
      hadoop-hdds/common/src/main/resources/ozone-default.xml
  7. 1 1
      hadoop-hdds/pom.xml
  8. 38 0
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
  9. 0 7
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
  10. 0 344
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
  11. 306 68
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
  12. 203 153
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
  13. 1 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
  14. 4 4
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
  15. 0 501
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
  16. 9 18
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
  17. 1 1
      hadoop-ozone/pom.xml

+ 5 - 11
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java

@@ -29,7 +29,6 @@ import io.opentracing.util.GlobalTracer;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.proto.RaftProtos;
-import org.apache.ratis.protocol.GroupMismatchException;
 import org.apache.ratis.protocol.RaftRetryFailureException;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.thirdparty.com.google.protobuf
@@ -70,8 +69,7 @@ import java.util.stream.Collectors;
  * The underlying RPC mechanism can be chosen via the constructor.
  */
 public final class XceiverClientRatis extends XceiverClientSpi {
-  public static final Logger LOG =
-      LoggerFactory.getLogger(XceiverClientRatis.class);
+  static final Logger LOG = LoggerFactory.getLogger(XceiverClientRatis.class);
 
   public static XceiverClientRatis newXceiverClientRatis(
       org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
@@ -250,17 +248,13 @@ public final class XceiverClientRatis extends XceiverClientSpi {
       return clientReply;
     }
     LOG.debug("commit index : {} watch timeout : {}", index, timeout);
+    CompletableFuture<RaftClientReply> replyFuture = getClient()
+        .sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
     RaftClientReply reply;
     try {
-      CompletableFuture<RaftClientReply> replyFuture = getClient()
-          .sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
       replyFuture.get(timeout, TimeUnit.MILLISECONDS);
-    } catch (Exception e) {
-      Throwable t = HddsClientUtils.checkForException(e);
-      LOG.warn("3 way commit failed ", e);
-      if (t instanceof GroupMismatchException) {
-        throw e;
-      }
+    } catch (TimeoutException toe) {
+      LOG.warn("3 way commit failed ", toe);
       reply = getClient()
           .sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
           .get(timeout, TimeUnit.MILLISECONDS);

+ 1 - 69
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java

@@ -28,11 +28,8 @@ import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
 import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
@@ -43,10 +40,6 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
-import org.apache.ratis.protocol.AlreadyClosedException;
-import org.apache.ratis.protocol.GroupMismatchException;
-import org.apache.ratis.protocol.NotReplicatedException;
-import org.apache.ratis.protocol.RaftRetryFailureException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,12 +50,8 @@ import java.time.Instant;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
 import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+
 
 /**
  * Utility methods for Ozone and Container Clients.
@@ -83,18 +72,6 @@ public final class HddsClientUtils {
   private HddsClientUtils() {
   }
 
-  private static final List<Class<? extends Exception>> EXCEPTION_LIST =
-      new ArrayList<Class<? extends Exception>>() {{
-        add(TimeoutException.class);
-        add(ContainerNotOpenException.class);
-        add(RaftRetryFailureException.class);
-        add(AlreadyClosedException.class);
-        add(GroupMismatchException.class);
-        // Not Replicated Exception will be thrown if watch For commit
-        // does not succeed
-        add(NotReplicatedException.class);
-      }};
-
   /**
    * Date format that used in ozone. Here the format is thread safe to use.
    */
@@ -313,49 +290,4 @@ public final class HddsClientUtils {
                 Client.getRpcTimeout(conf)));
     return scmSecurityClient;
   }
-
-  public static Throwable checkForException(Exception e) throws IOException {
-    Throwable t = e;
-    while (t != null) {
-      for (Class<? extends Exception> cls : getExceptionList()) {
-        if (cls.isInstance(t)) {
-          return t;
-        }
-      }
-      t = t.getCause();
-    }
-
-    throw e instanceof IOException ? (IOException)e : new IOException(e);
-  }
-
-  public static RetryPolicy createRetryPolicy(int maxRetryCount,
-      long retryInterval) {
-    // retry with fixed sleep between retries
-    return RetryPolicies.retryUpToMaximumCountWithFixedSleep(
-        maxRetryCount, retryInterval, TimeUnit.MILLISECONDS);
-  }
-
-  public static Map<Class<? extends Throwable>,
-      RetryPolicy> getRetryPolicyByException(int maxRetryCount,
-      long retryInterval) {
-    Map<Class<? extends Throwable>, RetryPolicy> policyMap = new HashMap<>();
-    for (Class<? extends Exception> ex : EXCEPTION_LIST) {
-      if (ex == TimeoutException.class
-          || ex == RaftRetryFailureException.class) {
-        // retry without sleep
-        policyMap.put(ex, createRetryPolicy(maxRetryCount, 0));
-      } else {
-        // retry with fixed sleep between retries
-        policyMap.put(ex, createRetryPolicy(maxRetryCount, retryInterval));
-      }
-    }
-    // Default retry policy
-    policyMap
-        .put(Exception.class, createRetryPolicy(maxRetryCount, retryInterval));
-    return policyMap;
-  }
-
-  public static List<Class<? extends Exception>> getExceptionList() {
-    return EXCEPTION_LIST;
-  }
 }

+ 2 - 11
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java

@@ -80,7 +80,7 @@ public class BlockOutputStream extends OutputStream {
   public static final Logger LOG =
       LoggerFactory.getLogger(BlockOutputStream.class);
 
-  private volatile BlockID blockID;
+  private BlockID blockID;
   private final String key;
   private final String traceID;
   private final BlockData.Builder containerBlockData;
@@ -574,7 +574,7 @@ public class BlockOutputStream extends OutputStream {
    * @throws IOException if stream is closed
    */
   private void checkOpen() throws IOException {
-    if (isClosed()) {
+    if (xceiverClient == null) {
       throw new IOException("BlockOutputStream has been closed.");
     } else if (getIoException() != null) {
       adjustBuffersOnException();
@@ -582,10 +582,6 @@ public class BlockOutputStream extends OutputStream {
     }
   }
 
-  public boolean isClosed() {
-    return xceiverClient == null;
-  }
-
   /**
    * Writes buffered data as a new chunk to the container and saves chunk
    * information to be used later in putKey call.
@@ -639,9 +635,4 @@ public class BlockOutputStream extends OutputStream {
             + " length " + effectiveChunkSize);
     containerBlockData.addChunks(chunkInfo);
   }
-
-  @VisibleForTesting
-  public void setXceiverClient(XceiverClientSpi xceiverClient) {
-    this.xceiverClient = xceiverClient;
-  }
 }

+ 1 - 0
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java

@@ -188,6 +188,7 @@ public class CommitWatcher {
    */
   public XceiverClientReply watchForCommit(long commitIndex)
       throws IOException {
+    Preconditions.checkState(!commitIndex2flushedDataMap.isEmpty());
     long index;
     try {
       XceiverClientReply reply =

+ 2 - 2
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java

@@ -121,12 +121,12 @@ public final class ScmConfigKeys {
       TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS);
   public static final String DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY =
       "dfs.ratis.client.request.max.retries";
-  public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 180;
+  public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 20;
   public static final String DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY =
       "dfs.ratis.client.request.retry.interval";
   public static final TimeDuration
       DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_DEFAULT =
-      TimeDuration.valueOf(1000, TimeUnit.MILLISECONDS);
+      TimeDuration.valueOf(500, TimeUnit.MILLISECONDS);
   public static final String DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_KEY =
       "dfs.ratis.server.retry-cache.timeout.duration";
   public static final TimeDuration

+ 2 - 2
hadoop-hdds/common/src/main/resources/ozone-default.xml

@@ -237,13 +237,13 @@
   </property>
   <property>
     <name>dfs.ratis.client.request.max.retries</name>
-    <value>180</value>
+    <value>20</value>
     <tag>OZONE, RATIS, MANAGEMENT</tag>
     <description>Number of retries for ratis client request.</description>
   </property>
   <property>
     <name>dfs.ratis.client.request.retry.interval</name>
-    <value>1000ms</value>
+    <value>500ms</value>
     <tag>OZONE, RATIS, MANAGEMENT</tag>
     <description>Interval between successive retries for a ratis client request.
     </description>

+ 1 - 1
hadoop-hdds/pom.xml

@@ -47,7 +47,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <hdds.version>0.5.0-SNAPSHOT</hdds.version>
 
     <!-- Apache Ratis version -->
-    <ratis.version>0.4.0-fe2b15d-SNAPSHOT</ratis.version>
+    <ratis.version>0.3.0</ratis.version>
 
     <bouncycastle.version>1.60</bouncycastle.version>
 

+ 38 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java

@@ -18,11 +18,15 @@
 package org.apache.hadoop.ozone.client;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.hdds.client.OzoneQuota;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -32,11 +36,23 @@ import org.apache.hadoop.ozone.client.rest.response.KeyInfoDetails;
 import org.apache.hadoop.ozone.client.rest.response.KeyLocation;
 import org.apache.hadoop.ozone.client.rest.response.VolumeInfo;
 import org.apache.hadoop.ozone.client.rest.response.VolumeOwner;
+import org.apache.ratis.protocol.AlreadyClosedException;
+import org.apache.ratis.protocol.GroupMismatchException;
+import org.apache.ratis.protocol.RaftRetryFailureException;
 
 /** A utility class for OzoneClient. */
 public final class OzoneClientUtils {
 
   private OzoneClientUtils() {}
+
+  private static final List<Class<? extends Exception>> EXCEPTION_LIST =
+      new ArrayList<Class<? extends Exception>>() {{
+        add(TimeoutException.class);
+        add(ContainerNotOpenException.class);
+        add(RaftRetryFailureException.class);
+        add(AlreadyClosedException.class);
+        add(GroupMismatchException.class);
+      }};
   /**
    * Returns a BucketInfo object constructed using fields of the input
    * OzoneBucket object.
@@ -125,4 +141,26 @@ public final class OzoneClientUtils {
         maxRetryCount, retryInterval, TimeUnit.MILLISECONDS);
   }
 
+  public static List<Class<? extends Exception>> getExceptionList() {
+    return EXCEPTION_LIST;
+  }
+
+  public static Map<Class<? extends Throwable>, RetryPolicy>
+      getRetryPolicyByException(int maxRetryCount, long retryInterval) {
+    Map<Class<? extends Throwable>, RetryPolicy> policyMap = new HashMap<>();
+    for (Class<? extends Exception> ex : EXCEPTION_LIST) {
+      if (ex == TimeoutException.class ||
+          ex == RaftRetryFailureException.class) {
+        // retry without sleep
+        policyMap.put(ex, createRetryPolicy(maxRetryCount, 0));
+      } else {
+        // retry with fixed sleep between retries
+        policyMap.put(ex, createRetryPolicy(maxRetryCount, retryInterval));
+      }
+    }
+    // Default retry policy
+    policyMap.put(Exception.class, createRetryPolicy(
+        maxRetryCount, retryInterval));
+    return policyMap;
+  }
 }

+ 0 - 7
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java

@@ -149,13 +149,6 @@ public final class BlockOutputStreamEntry extends OutputStream {
     }
   }
 
-  boolean isClosed() {
-    if (outputStream != null) {
-      return  ((BlockOutputStream) outputStream).isClosed();
-    }
-    return false;
-  }
-
   long getTotalAckDataLength() {
     if (outputStream != null) {
       BlockOutputStream out = (BlockOutputStream) this.outputStream;

+ 0 - 344
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java

@@ -1,344 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.client.io;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.scm.storage.BufferPool;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.om.helpers.*;
-import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.ListIterator;
-
-/**
- * This class manages the stream entries list and handles block allocation
- * from OzoneManager.
- */
-public class BlockOutputStreamEntryPool {
-
-  public static final Logger LOG =
-      LoggerFactory.getLogger(BlockOutputStreamEntryPool.class);
-
-  private final List<BlockOutputStreamEntry> streamEntries;
-  private int currentStreamIndex;
-  private final OzoneManagerProtocol omClient;
-  private final OmKeyArgs keyArgs;
-  private final XceiverClientManager xceiverClientManager;
-  private final int chunkSize;
-  private final String requestID;
-  private final long streamBufferFlushSize;
-  private final long streamBufferMaxSize;
-  private final long watchTimeout;
-  private final long blockSize;
-  private final int bytesPerChecksum;
-  private final ContainerProtos.ChecksumType checksumType;
-  private final BufferPool bufferPool;
-  private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
-  private final long openID;
-  private ExcludeList excludeList;
-
-  @SuppressWarnings("parameternumber")
-  public BlockOutputStreamEntryPool(OzoneManagerProtocol omClient,
-      int chunkSize, String requestId, HddsProtos.ReplicationFactor factor,
-      HddsProtos.ReplicationType type, long bufferFlushSize, long bufferMaxSize,
-      long size, long watchTimeout, ContainerProtos.ChecksumType checksumType,
-      int bytesPerChecksum, String uploadID, int partNumber,
-      boolean isMultipart, OmKeyInfo info,
-      XceiverClientManager xceiverClientManager, long openID) {
-    streamEntries = new ArrayList<>();
-    currentStreamIndex = 0;
-    this.omClient = omClient;
-    this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName())
-        .setBucketName(info.getBucketName()).setKeyName(info.getKeyName())
-        .setType(type).setFactor(factor).setDataSize(info.getDataSize())
-        .setIsMultipartKey(isMultipart).setMultipartUploadID(uploadID)
-        .setMultipartUploadPartNumber(partNumber).build();
-    this.xceiverClientManager = xceiverClientManager;
-    this.chunkSize = chunkSize;
-    this.requestID = requestId;
-    this.streamBufferFlushSize = bufferFlushSize;
-    this.streamBufferMaxSize = bufferMaxSize;
-    this.blockSize = size;
-    this.watchTimeout = watchTimeout;
-    this.bytesPerChecksum = bytesPerChecksum;
-    this.checksumType = checksumType;
-    this.openID = openID;
-    this.excludeList = new ExcludeList();
-
-    Preconditions.checkState(chunkSize > 0);
-    Preconditions.checkState(streamBufferFlushSize > 0);
-    Preconditions.checkState(streamBufferMaxSize > 0);
-    Preconditions.checkState(blockSize > 0);
-    Preconditions.checkState(streamBufferFlushSize % chunkSize == 0);
-    Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0);
-    Preconditions.checkState(blockSize % streamBufferMaxSize == 0);
-    this.bufferPool =
-        new BufferPool(chunkSize, (int) streamBufferMaxSize / chunkSize);
-  }
-
-  public BlockOutputStreamEntryPool() {
-    streamEntries = new ArrayList<>();
-    omClient = null;
-    keyArgs = null;
-    xceiverClientManager = null;
-    chunkSize = 0;
-    requestID = null;
-    streamBufferFlushSize = 0;
-    streamBufferMaxSize = 0;
-    bufferPool = new BufferPool(chunkSize, 1);
-    watchTimeout = 0;
-    blockSize = 0;
-    this.checksumType = ContainerProtos.ChecksumType.valueOf(
-        OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
-    this.bytesPerChecksum = OzoneConfigKeys
-        .OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB
-    currentStreamIndex = 0;
-    openID = -1;
-  }
-
-  /**
-   * When a key is opened, it is possible that there are some blocks already
-   * allocated to it for this open session. In this case, to make use of these
-   * blocks, we need to add these blocks to stream entries. But, a key's version
-   * also includes blocks from previous versions, we need to avoid adding these
-   * old blocks to stream entries, because these old blocks should not be picked
-   * for write. To do this, the following method checks that, only those
-   * blocks created in this particular open version are added to stream entries.
-   *
-   * @param version the set of blocks that are pre-allocated.
-   * @param openVersion the version corresponding to the pre-allocation.
-   * @throws IOException
-   */
-  public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
-      long openVersion) throws IOException {
-    // server may return any number of blocks, (0 to any)
-    // only the blocks allocated in this open session (block createVersion
-    // equals to open session version)
-    for (OmKeyLocationInfo subKeyInfo : version.getLocationList()) {
-      if (subKeyInfo.getCreateVersion() == openVersion) {
-        addKeyLocationInfo(subKeyInfo);
-      }
-    }
-  }
-
-  private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo)
-      throws IOException {
-    Preconditions.checkNotNull(subKeyInfo.getPipeline());
-    UserGroupInformation.getCurrentUser().addToken(subKeyInfo.getToken());
-    BlockOutputStreamEntry.Builder builder =
-        new BlockOutputStreamEntry.Builder()
-            .setBlockID(subKeyInfo.getBlockID())
-            .setKey(keyArgs.getKeyName())
-            .setXceiverClientManager(xceiverClientManager)
-            .setPipeline(subKeyInfo.getPipeline())
-            .setRequestId(requestID)
-            .setChunkSize(chunkSize)
-            .setLength(subKeyInfo.getLength())
-            .setStreamBufferFlushSize(streamBufferFlushSize)
-            .setStreamBufferMaxSize(streamBufferMaxSize)
-            .setWatchTimeout(watchTimeout)
-            .setbufferPool(bufferPool)
-            .setChecksumType(checksumType)
-            .setBytesPerChecksum(bytesPerChecksum)
-            .setToken(subKeyInfo.getToken());
-    streamEntries.add(builder.build());
-  }
-
-  public List<OmKeyLocationInfo> getLocationInfoList()  {
-    List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
-    for (BlockOutputStreamEntry streamEntry : streamEntries) {
-      long length = streamEntry.getCurrentPosition();
-
-      // Commit only those blocks to OzoneManager which are not empty
-      if (length != 0) {
-        OmKeyLocationInfo info =
-            new OmKeyLocationInfo.Builder().setBlockID(streamEntry.getBlockID())
-                .setLength(streamEntry.getCurrentPosition()).setOffset(0)
-                .setToken(streamEntry.getToken())
-                .setPipeline(streamEntry.getPipeline()).build();
-        locationInfoList.add(info);
-      }
-      LOG.debug(
-          "block written " + streamEntry.getBlockID() + ", length " + length
-              + " bcsID " + streamEntry.getBlockID()
-              .getBlockCommitSequenceId());
-    }
-    return locationInfoList;
-  }
-
-  /**
-   * Discards the subsequent pre allocated blocks and removes the streamEntries
-   * from the streamEntries list for the container which is closed.
-   * @param containerID id of the closed container
-   * @param pipelineId id of the associated pipeline
-   */
-  void discardPreallocatedBlocks(long containerID, PipelineID pipelineId) {
-    // currentStreamIndex < streamEntries.size() signifies that, there are still
-    // pre allocated blocks available.
-
-    // This will be called only to discard the next subsequent unused blocks
-    // in the streamEntryList.
-    if (currentStreamIndex + 1 < streamEntries.size()) {
-      ListIterator<BlockOutputStreamEntry> streamEntryIterator =
-          streamEntries.listIterator(currentStreamIndex + 1);
-      while (streamEntryIterator.hasNext()) {
-        BlockOutputStreamEntry streamEntry = streamEntryIterator.next();
-        Preconditions.checkArgument(streamEntry.getCurrentPosition() == 0);
-        if ((pipelineId != null && streamEntry.getPipeline().getId()
-            .equals(pipelineId)) || (containerID != -1
-            && streamEntry.getBlockID().getContainerID() == containerID)) {
-          streamEntryIterator.remove();
-        }
-      }
-    }
-  }
-
-  List<BlockOutputStreamEntry> getStreamEntries() {
-    return streamEntries;
-  }
-
-  XceiverClientManager getXceiverClientManager() {
-    return xceiverClientManager;
-  }
-
-  String getKeyName() {
-    return keyArgs.getKeyName();
-  }
-
-  long getKeyLength() {
-    return streamEntries.stream().mapToLong(e -> e.getCurrentPosition()).sum();
-  }
-  /**
-   * Contact OM to get a new block. Set the new block with the index (e.g.
-   * first block has index = 0, second has index = 1 etc.)
-   *
-   * The returned block is made to new BlockOutputStreamEntry to write.
-   *
-   * @throws IOException
-   */
-  private void allocateNewBlock() throws IOException {
-    OmKeyLocationInfo subKeyInfo =
-        omClient.allocateBlock(keyArgs, openID, excludeList);
-    addKeyLocationInfo(subKeyInfo);
-  }
-
-
-  void commitKey(long offset) throws IOException {
-    if (keyArgs != null) {
-      // in test, this could be null
-      long length = getKeyLength();
-      Preconditions.checkArgument(offset == length);
-      keyArgs.setDataSize(length);
-      keyArgs.setLocationInfoList(getLocationInfoList());
-      // When the key is multipart upload part file upload, we should not
-      // commit the key, as this is not an actual key, this is a just a
-      // partial key of a large file.
-      if (keyArgs.getIsMultipartKey()) {
-        commitUploadPartInfo =
-            omClient.commitMultipartUploadPart(keyArgs, openID);
-      } else {
-        omClient.commitKey(keyArgs, openID);
-      }
-    } else {
-      LOG.warn("Closing KeyOutputStream, but key args is null");
-    }
-  }
-
-  public BlockOutputStreamEntry getCurrentStreamEntry() {
-    if (streamEntries.isEmpty() || streamEntries.size() <= currentStreamIndex) {
-      return null;
-    } else {
-      return streamEntries.get(currentStreamIndex);
-    }
-  }
-
-  BlockOutputStreamEntry allocateBlockIfNeeded() throws IOException {
-    BlockOutputStreamEntry streamEntry = getCurrentStreamEntry();
-    if (streamEntry != null && streamEntry.isClosed()) {
-      // a stream entry gets closed either by :
-      // a. If the stream gets full
-      // b. it has encountered an exception
-      currentStreamIndex++;
-    }
-    if (streamEntries.size() <= currentStreamIndex) {
-      Preconditions.checkNotNull(omClient);
-      // allocate a new block, if a exception happens, log an error and
-      // throw exception to the caller directly, and the write fails.
-      int succeededAllocates = 0;
-      try {
-        allocateNewBlock();
-        succeededAllocates += 1;
-      } catch (IOException ioe) {
-        LOG.error("Try to allocate more blocks for write failed, already "
-            + "allocated " + succeededAllocates + " blocks for this write.");
-        throw ioe;
-      }
-    }
-    // in theory, this condition should never violate due the check above
-    // still do a sanity check.
-    Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
-    BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex);
-    return current;
-  }
-
-  long computeBufferData() {
-    return bufferPool.computeBufferData();
-  }
-
-  void cleanup() {
-    if (excludeList != null) {
-      excludeList.clear();
-      excludeList = null;
-    }
-    if (bufferPool != null) {
-      bufferPool.clearBufferPool();
-    }
-
-    if (streamEntries != null) {
-      streamEntries.clear();
-    }
-  }
-
-  public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
-    return commitUploadPartInfo;
-  }
-
-  public ExcludeList getExcludeList() {
-    return excludeList;
-  }
-
-  public long getStreamBufferMaxSize() {
-    return streamBufferMaxSize;
-  }
-
-  boolean isEmpty() {
-    return streamEntries.isEmpty();
-  }
-}

+ 306 - 68
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java

@@ -23,18 +23,21 @@ import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
+import org.apache.hadoop.hdds.scm.storage.BufferPool;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.client.OzoneClientUtils;
 import org.apache.hadoop.ozone.om.helpers.*;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ratis.protocol.AlreadyClosedException;
 import org.apache.ratis.protocol.GroupMismatchException;
 import org.apache.ratis.protocol.RaftRetryFailureException;
@@ -44,8 +47,10 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.OutputStream;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Collection;
+import java.util.ListIterator;
 import java.util.Map;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
@@ -72,41 +77,84 @@ public class KeyOutputStream extends OutputStream {
   public static final Logger LOG =
       LoggerFactory.getLogger(KeyOutputStream.class);
 
+  // array list's get(index) is O(1)
+  private final ArrayList<BlockOutputStreamEntry> streamEntries;
+  private int currentStreamIndex;
+  private final OzoneManagerProtocol omClient;
+  private final OmKeyArgs keyArgs;
+  private final long openID;
+  private final XceiverClientManager xceiverClientManager;
+  private final int chunkSize;
+  private final String requestID;
   private boolean closed;
+  private final long streamBufferFlushSize;
+  private final long streamBufferMaxSize;
+  private final long watchTimeout;
+  private final long blockSize;
+  private final int bytesPerChecksum;
+  private final ChecksumType checksumType;
+  private final BufferPool bufferPool;
+  private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
   private FileEncryptionInfo feInfo;
+  private ExcludeList excludeList;
   private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
   private int retryCount;
   private long offset;
-  private final BlockOutputStreamEntryPool blockOutputStreamEntryPool;
-
   /**
    * A constructor for testing purpose only.
    */
   @VisibleForTesting
+  @SuppressWarnings("parameternumber")
   public KeyOutputStream() {
+    streamEntries = new ArrayList<>();
+    omClient = null;
+    keyArgs = null;
+    openID = -1;
+    xceiverClientManager = null;
+    chunkSize = 0;
+    requestID = null;
     closed = false;
-    this.retryPolicyMap = HddsClientUtils.getExceptionList()
+    streamBufferFlushSize = 0;
+    streamBufferMaxSize = 0;
+    bufferPool = new BufferPool(chunkSize, 1);
+    watchTimeout = 0;
+    blockSize = 0;
+    this.checksumType = ChecksumType.valueOf(
+        OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
+    this.bytesPerChecksum = OzoneConfigKeys
+        .OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB
+    this.retryPolicyMap = OzoneClientUtils.getExceptionList()
         .stream()
         .collect(Collectors.toMap(Function.identity(),
             e -> RetryPolicies.TRY_ONCE_THEN_FAIL));
     retryCount = 0;
     offset = 0;
-    blockOutputStreamEntryPool = new BlockOutputStreamEntryPool();
   }
 
   @VisibleForTesting
   public List<BlockOutputStreamEntry> getStreamEntries() {
-    return blockOutputStreamEntryPool.getStreamEntries();
+    return streamEntries;
   }
-
   @VisibleForTesting
   public XceiverClientManager getXceiverClientManager() {
-    return blockOutputStreamEntryPool.getXceiverClientManager();
+    return xceiverClientManager;
   }
 
-  @VisibleForTesting
-  public List<OmKeyLocationInfo> getLocationInfoList() {
-    return blockOutputStreamEntryPool.getLocationInfoList();
+  public List<OmKeyLocationInfo> getLocationInfoList() throws IOException {
+    List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
+    for (BlockOutputStreamEntry streamEntry : streamEntries) {
+      OmKeyLocationInfo info =
+          new OmKeyLocationInfo.Builder().setBlockID(streamEntry.getBlockID())
+              .setLength(streamEntry.getCurrentPosition()).setOffset(0)
+              .setToken(streamEntry.getToken())
+              .setPipeline(streamEntry.getPipeline())
+              .build();
+      LOG.debug("block written " + streamEntry.getBlockID() + ", length "
+          + streamEntry.getCurrentPosition() + " bcsID "
+          + streamEntry.getBlockID().getBlockCommitSequenceId());
+      locationInfoList.add(info);
+    }
+    return locationInfoList;
   }
 
   @VisibleForTesting
@@ -123,16 +171,41 @@ public class KeyOutputStream extends OutputStream {
       ChecksumType checksumType, int bytesPerChecksum,
       String uploadID, int partNumber, boolean isMultipart,
       int maxRetryCount, long retryInterval) {
+    this.streamEntries = new ArrayList<>();
+    this.currentStreamIndex = 0;
+    this.omClient = omClient;
     OmKeyInfo info = handler.getKeyInfo();
-    blockOutputStreamEntryPool =
-        new BlockOutputStreamEntryPool(omClient, chunkSize, requestId, factor,
-            type, bufferFlushSize, bufferMaxSize, size, watchTimeout,
-            checksumType, bytesPerChecksum, uploadID, partNumber, isMultipart,
-            info, xceiverClientManager, handler.getId());
     // Retrieve the file encryption key info, null if file is not in
     // encrypted bucket.
     this.feInfo = info.getFileEncryptionInfo();
-    this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException(
+    this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName())
+        .setBucketName(info.getBucketName()).setKeyName(info.getKeyName())
+        .setType(type).setFactor(factor).setDataSize(info.getDataSize())
+        .setIsMultipartKey(isMultipart).setMultipartUploadID(
+            uploadID).setMultipartUploadPartNumber(partNumber)
+        .build();
+    this.openID = handler.getId();
+    this.xceiverClientManager = xceiverClientManager;
+    this.chunkSize = chunkSize;
+    this.requestID = requestId;
+    this.streamBufferFlushSize = bufferFlushSize;
+    this.streamBufferMaxSize = bufferMaxSize;
+    this.blockSize = size;
+    this.watchTimeout = watchTimeout;
+    this.bytesPerChecksum = bytesPerChecksum;
+    this.checksumType = checksumType;
+
+    Preconditions.checkState(chunkSize > 0);
+    Preconditions.checkState(streamBufferFlushSize > 0);
+    Preconditions.checkState(streamBufferMaxSize > 0);
+    Preconditions.checkState(blockSize > 0);
+    Preconditions.checkState(streamBufferFlushSize % chunkSize == 0);
+    Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0);
+    Preconditions.checkState(blockSize % streamBufferMaxSize == 0);
+    this.bufferPool =
+        new BufferPool(chunkSize, (int)streamBufferMaxSize / chunkSize);
+    this.excludeList = new ExcludeList();
+    this.retryPolicyMap = OzoneClientUtils.getRetryPolicyByException(
         maxRetryCount, retryInterval);
     this.retryCount = 0;
   }
@@ -152,7 +225,37 @@ public class KeyOutputStream extends OutputStream {
    */
   public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
       long openVersion) throws IOException {
-    blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion);
+    // server may return any number of blocks, (0 to any)
+    // only the blocks allocated in this open session (block createVersion
+    // equals to open session version)
+    for (OmKeyLocationInfo subKeyInfo : version.getLocationList()) {
+      if (subKeyInfo.getCreateVersion() == openVersion) {
+        addKeyLocationInfo(subKeyInfo);
+      }
+    }
+  }
+
+  private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo)
+      throws IOException {
+    Preconditions.checkNotNull(subKeyInfo.getPipeline());
+    UserGroupInformation.getCurrentUser().addToken(subKeyInfo.getToken());
+    BlockOutputStreamEntry.Builder builder =
+        new BlockOutputStreamEntry.Builder()
+            .setBlockID(subKeyInfo.getBlockID())
+            .setKey(keyArgs.getKeyName())
+            .setXceiverClientManager(xceiverClientManager)
+            .setPipeline(subKeyInfo.getPipeline())
+            .setRequestId(requestID)
+            .setChunkSize(chunkSize)
+            .setLength(subKeyInfo.getLength())
+            .setStreamBufferFlushSize(streamBufferFlushSize)
+            .setStreamBufferMaxSize(streamBufferMaxSize)
+            .setWatchTimeout(watchTimeout)
+            .setbufferPool(bufferPool)
+            .setChecksumType(checksumType)
+            .setBytesPerChecksum(bytesPerChecksum)
+            .setToken(subKeyInfo.getToken());
+    streamEntries.add(builder.build());
   }
 
   @Override
@@ -191,12 +294,34 @@ public class KeyOutputStream extends OutputStream {
     handleWrite(b, off, len, false);
   }
 
+  private long computeBufferData() {
+    return bufferPool.computeBufferData();
+  }
+
   private void handleWrite(byte[] b, int off, long len, boolean retry)
       throws IOException {
+    int succeededAllocates = 0;
     while (len > 0) {
       try {
-        BlockOutputStreamEntry current =
-            blockOutputStreamEntryPool.allocateBlockIfNeeded();
+        if (streamEntries.size() <= currentStreamIndex) {
+          Preconditions.checkNotNull(omClient);
+          // allocate a new block, if a exception happens, log an error and
+          // throw exception to the caller directly, and the write fails.
+          try {
+            allocateNewBlock(currentStreamIndex);
+            succeededAllocates += 1;
+          } catch (IOException ioe) {
+            LOG.error("Try to allocate more blocks for write failed, already "
+                + "allocated " + succeededAllocates
+                + " blocks for this write.");
+            throw ioe;
+          }
+        }
+        // in theory, this condition should never violate due the check above
+        // still do a sanity check.
+        Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
+        BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex);
+
         // length(len) will be in int range if the call is happening through
         // write API of blockOutputStream. Length can be in long range if it
         // comes via Exception path.
@@ -217,8 +342,7 @@ public class KeyOutputStream extends OutputStream {
           // to or less than the max length of the buffer allocated.
           // The len specified here is the combined sum of the data length of
           // the buffers
-          Preconditions.checkState(!retry || len <= blockOutputStreamEntryPool
-              .getStreamBufferMaxSize());
+          Preconditions.checkState(!retry || len <= streamBufferMaxSize);
           int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
           writeLen = retry ? (int) len : dataWritten;
           // In retry path, the data written is already accounted in offset.
@@ -226,7 +350,7 @@ public class KeyOutputStream extends OutputStream {
             offset += writeLen;
           }
           LOG.debug("writeLen {}, total len {}", writeLen, len);
-          handleException(current, ioe);
+          handleException(current, currentStreamIndex, ioe);
         }
         if (current.getRemaining() <= 0) {
           // since the current block is already written close the stream.
@@ -241,19 +365,80 @@ public class KeyOutputStream extends OutputStream {
     }
   }
 
+  /**
+   * Discards the subsequent pre allocated blocks and removes the streamEntries
+   * from the streamEntries list for the container which is closed.
+   * @param containerID id of the closed container
+   * @param pipelineId id of the associated pipeline
+   * @param streamIndex index of the stream
+   */
+  private void discardPreallocatedBlocks(long containerID,
+      PipelineID pipelineId, int streamIndex) {
+    // streamIndex < streamEntries.size() signifies that, there are still
+    // pre allocated blocks available.
+
+    // This will be called only to discard the next subsequent unused blocks
+    // in the streamEntryList.
+    if (streamIndex < streamEntries.size()) {
+      ListIterator<BlockOutputStreamEntry> streamEntryIterator =
+          streamEntries.listIterator(streamIndex);
+      while (streamEntryIterator.hasNext()) {
+        BlockOutputStreamEntry streamEntry = streamEntryIterator.next();
+        Preconditions.checkArgument(streamEntry.getCurrentPosition() == 0);
+        if (((pipelineId != null && streamEntry.getPipeline().getId()
+            .equals(pipelineId)) || (containerID != -1
+            && streamEntry.getBlockID().getContainerID() == containerID))) {
+          streamEntryIterator.remove();
+        }
+      }
+    }
+  }
+
+  /**
+   * It might be possible that the blocks pre allocated might never get written
+   * while the stream gets closed normally. In such cases, it would be a good
+   * idea to trim down the locationInfoList by removing the unused blocks if any
+   * so as only the used block info gets updated on OzoneManager during close.
+   */
+  private void removeEmptyBlocks() {
+    if (currentStreamIndex < streamEntries.size()) {
+      ListIterator<BlockOutputStreamEntry> streamEntryIterator =
+          streamEntries.listIterator(currentStreamIndex);
+      while (streamEntryIterator.hasNext()) {
+        if (streamEntryIterator.next().getCurrentPosition() == 0) {
+          streamEntryIterator.remove();
+        }
+      }
+    }
+  }
+
+  private void cleanup() {
+    if (excludeList != null) {
+      excludeList.clear();
+      excludeList = null;
+    }
+    if (bufferPool != null) {
+      bufferPool.clearBufferPool();
+    }
+
+    if (streamEntries != null) {
+      streamEntries.clear();
+    }
+  }
   /**
    * It performs following actions :
    * a. Updates the committed length at datanode for the current stream in
-   * datanode.
+   *    datanode.
    * b. Reads the data from the underlying buffer and writes it the next stream.
    *
    * @param streamEntry StreamEntry
-   * @param exception   actual exception that occurred
+   * @param streamIndex Index of the entry
+   * @param exception actual exception that occurred
    * @throws IOException Throws IOException if Write fails
    */
   private void handleException(BlockOutputStreamEntry streamEntry,
-      IOException exception) throws IOException {
-    Throwable t = HddsClientUtils.checkForException(exception);
+      int streamIndex, IOException exception) throws IOException {
+    Throwable t = checkForException(exception);
     boolean retryFailure = checkForRetryFailure(t);
     boolean closedContainerException = false;
     if (!retryFailure) {
@@ -263,19 +448,15 @@ public class KeyOutputStream extends OutputStream {
     long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength();
     //set the correct length for the current stream
     streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
-    long bufferedDataLen = blockOutputStreamEntryPool.computeBufferData();
-    LOG.debug(
-        "Encountered exception {}. The last committed block length is {}, "
+    long bufferedDataLen = computeBufferData();
+    LOG.warn("Encountered exception {}. The last committed block length is {}, "
             + "uncommitted data length is {} retry count {}", exception,
         totalSuccessfulFlushedData, bufferedDataLen, retryCount);
-    Preconditions.checkArgument(
-        bufferedDataLen <= blockOutputStreamEntryPool.getStreamBufferMaxSize());
-    Preconditions.checkArgument(
-        offset - blockOutputStreamEntryPool.getKeyLength() == bufferedDataLen);
+    Preconditions.checkArgument(bufferedDataLen <= streamBufferMaxSize);
+    Preconditions.checkArgument(offset - getKeyLength() == bufferedDataLen);
     long containerId = streamEntry.getBlockID().getContainerID();
     Collection<DatanodeDetails> failedServers = streamEntry.getFailedServers();
     Preconditions.checkNotNull(failedServers);
-    ExcludeList excludeList = blockOutputStreamEntryPool.getExcludeList();
     if (!failedServers.isEmpty()) {
       excludeList.addDatanodes(failedServers);
     }
@@ -289,42 +470,45 @@ public class KeyOutputStream extends OutputStream {
     // just clean up the current stream.
     streamEntry.cleanup(retryFailure);
 
-    // discard all subsequent blocks the containers and pipelines which
+    // discard all sunsequent blocks the containers and pipelines which
     // are in the exclude list so that, the very next retry should never
     // write data on the  closed container/pipeline
     if (closedContainerException) {
       // discard subsequent pre allocated blocks from the streamEntries list
       // from the closed container
-      blockOutputStreamEntryPool
-          .discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(),
-              null);
+      discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(), null,
+          streamIndex + 1);
     } else {
       // In case there is timeoutException or Watch for commit happening over
       // majority or the client connection failure to the leader in the
-      // pipeline, just discard all the pre allocated blocks on this pipeline.
+      // pipeline, just discard all the preallocated blocks on this pipeline.
       // Next block allocation will happen with excluding this specific pipeline
       // This will ensure if 2 way commit happens , it cannot span over multiple
       // blocks
-      blockOutputStreamEntryPool
-          .discardPreallocatedBlocks(-1, pipelineId);
+      discardPreallocatedBlocks(-1, pipelineId, streamIndex + 1);
     }
     if (bufferedDataLen > 0) {
       // If the data is still cached in the underlying stream, we need to
       // allocate new block and write this data in the datanode.
+      currentStreamIndex += 1;
       handleRetry(exception, bufferedDataLen);
       // reset the retryCount after handling the exception
       retryCount = 0;
     }
+    if (totalSuccessfulFlushedData == 0) {
+      streamEntries.remove(streamIndex);
+      currentStreamIndex -= 1;
+    }
   }
 
   private void markStreamClosed() {
-    blockOutputStreamEntryPool.cleanup();
+    cleanup();
     closed = true;
   }
 
   private void handleRetry(IOException exception, long len) throws IOException {
-    RetryPolicy retryPolicy = retryPolicyMap
-        .get(HddsClientUtils.checkForException(exception).getClass());
+    RetryPolicy retryPolicy =
+        retryPolicyMap.get(checkForException(exception).getClass());
     if (retryPolicy == null) {
       retryPolicy = retryPolicyMap.get(Exception.class);
     }
@@ -360,11 +544,10 @@ public class KeyOutputStream extends OutputStream {
       }
     }
     retryCount++;
-    LOG.trace("Retrying Write request. Already tried " + retryCount
-        + " time(s); retry policy is " + retryPolicy);
+    LOG.trace("Retrying Write request. Already tried "
+        + retryCount + " time(s); retry policy is " + retryPolicy);
     handleWrite(null, 0, len, true);
   }
-
   /**
    * Checks if the provided exception signifies retry failure in ratis client.
    * In case of retry failure, ratis client throws RaftRetryFailureException
@@ -379,6 +562,40 @@ public class KeyOutputStream extends OutputStream {
     return t instanceof ContainerNotOpenException;
   }
 
+  public Throwable checkForException(IOException ioe) throws IOException {
+    Throwable t = ioe.getCause();
+    while (t != null) {
+      for (Class<? extends Exception> cls : OzoneClientUtils
+          .getExceptionList()) {
+        if (cls.isInstance(t)) {
+          return t;
+        }
+      }
+      t = t.getCause();
+    }
+    throw ioe;
+  }
+
+  private long getKeyLength() {
+    return streamEntries.stream().mapToLong(e -> e.getCurrentPosition())
+        .sum();
+  }
+
+  /**
+   * Contact OM to get a new block. Set the new block with the index (e.g.
+   * first block has index = 0, second has index = 1 etc.)
+   *
+   * The returned block is made to new BlockOutputStreamEntry to write.
+   *
+   * @param index the index of the block.
+   * @throws IOException
+   */
+  private void allocateNewBlock(int index) throws IOException {
+    OmKeyLocationInfo subKeyInfo =
+        omClient.allocateBlock(keyArgs, openID, excludeList);
+    addKeyLocationInfo(subKeyInfo);
+  }
+
   @Override
   public void flush() throws IOException {
     checkNotClosed();
@@ -395,19 +612,20 @@ public class KeyOutputStream extends OutputStream {
    * written to new stream , it will be at max half full. In such cases, we
    * should just write the data and not close the stream as the block won't be
    * completely full.
-   *
    * @param op Flag which decides whether to call close or flush on the
-   *           outputStream.
+   *              outputStream.
    * @throws IOException In case, flush or close fails with exception.
    */
   private void handleFlushOrClose(StreamAction op) throws IOException {
-    if (blockOutputStreamEntryPool.isEmpty()) {
+    if (streamEntries.size() == 0) {
       return;
     }
     while (true) {
       try {
-        BlockOutputStreamEntry entry =
-            blockOutputStreamEntryPool.getCurrentStreamEntry();
+        int size = streamEntries.size();
+        int streamIndex =
+            currentStreamIndex >= size ? size - 1 : currentStreamIndex;
+        BlockOutputStreamEntry entry = streamEntries.get(streamIndex);
         if (entry != null) {
           try {
             Collection<DatanodeDetails> failedServers =
@@ -415,8 +633,7 @@ public class KeyOutputStream extends OutputStream {
             // failed servers can be null in case there is no data written in
             // the stream
             if (failedServers != null && !failedServers.isEmpty()) {
-              blockOutputStreamEntryPool.getExcludeList()
-                  .addDatanodes(failedServers);
+              excludeList.addDatanodes(failedServers);
             }
             switch (op) {
             case CLOSE:
@@ -425,6 +642,7 @@ public class KeyOutputStream extends OutputStream {
             case FULL:
               if (entry.getRemaining() == 0) {
                 entry.close();
+                currentStreamIndex++;
               }
               break;
             case FLUSH:
@@ -434,7 +652,7 @@ public class KeyOutputStream extends OutputStream {
               throw new IOException("Invalid Operation");
             }
           } catch (IOException ioe) {
-            handleException(entry, ioe);
+            handleException(entry, streamIndex, ioe);
             continue;
           }
         }
@@ -459,16 +677,34 @@ public class KeyOutputStream extends OutputStream {
     closed = true;
     try {
       handleFlushOrClose(StreamAction.CLOSE);
-      blockOutputStreamEntryPool.commitKey(offset);
+      if (keyArgs != null) {
+        // in test, this could be null
+        removeEmptyBlocks();
+        long length = getKeyLength();
+        Preconditions.checkArgument(offset == length);
+        keyArgs.setDataSize(length);
+        keyArgs.setLocationInfoList(getLocationInfoList());
+        // When the key is multipart upload part file upload, we should not
+        // commit the key, as this is not an actual key, this is a just a
+        // partial key of a large file.
+        if (keyArgs.getIsMultipartKey()) {
+          commitUploadPartInfo = omClient.commitMultipartUploadPart(keyArgs,
+              openID);
+        } else {
+          omClient.commitKey(keyArgs, openID);
+        }
+      } else {
+        LOG.warn("Closing KeyOutputStream, but key args is null");
+      }
     } catch (IOException ioe) {
       throw ioe;
     } finally {
-      blockOutputStreamEntryPool.cleanup();
+      cleanup();
     }
   }
 
   public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
-    return blockOutputStreamEntryPool.getCommitUploadPartInfo();
+    return commitUploadPartInfo;
   }
 
   public FileEncryptionInfo getFileEncryptionInfo() {
@@ -477,7 +713,7 @@ public class KeyOutputStream extends OutputStream {
 
   @VisibleForTesting
   public ExcludeList getExcludeList() {
-    return blockOutputStreamEntryPool.getExcludeList();
+    return excludeList;
   }
 
   /**
@@ -503,6 +739,7 @@ public class KeyOutputStream extends OutputStream {
     private int maxRetryCount;
     private long retryInterval;
 
+
     public Builder setMultipartUploadID(String uploadID) {
       this.multipartUploadID = uploadID;
       return this;
@@ -523,7 +760,8 @@ public class KeyOutputStream extends OutputStream {
       return this;
     }
 
-    public Builder setOmClient(OzoneManagerProtocol client) {
+    public Builder setOmClient(
+        OzoneManagerProtocol client) {
       this.omClient = client;
       return this;
     }
@@ -568,12 +806,12 @@ public class KeyOutputStream extends OutputStream {
       return this;
     }
 
-    public Builder setChecksumType(ChecksumType cType) {
+    public Builder setChecksumType(ChecksumType cType){
       this.checksumType = cType;
       return this;
     }
 
-    public Builder setBytesPerChecksum(int bytes) {
+    public Builder setBytesPerChecksum(int bytes){
       this.bytesPerChecksum = bytes;
       return this;
     }
@@ -593,9 +831,9 @@ public class KeyOutputStream extends OutputStream {
       return this;
     }
 
-    public KeyOutputStream build() {
-      return new KeyOutputStream(openHandler, xceiverManager, omClient,
-          chunkSize, requestID, factor, type, streamBufferFlushSize,
+    public KeyOutputStream build() throws IOException {
+      return new KeyOutputStream(openHandler, xceiverManager,
+          omClient, chunkSize, requestID, factor, type, streamBufferFlushSize,
           streamBufferMaxSize, blockSize, watchTimeout, checksumType,
           bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey,
           maxRetryCount, retryInterval);
@@ -610,8 +848,8 @@ public class KeyOutputStream extends OutputStream {
   private void checkNotClosed() throws IOException {
     if (closed) {
       throw new IOException(
-          ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: "
-              + blockOutputStreamEntryPool.getKeyName());
+          ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + keyArgs
+              .getKeyName());
     }
   }
 }

+ 203 - 153
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java

@@ -24,8 +24,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
 import org.apache.hadoop.hdds.scm.XceiverClientRatis;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -75,23 +75,27 @@ public class TestBlockOutputStreamWithFailures {
    *
    * @throws IOException
    */
-  @Before public void init() throws Exception {
+  @Before
+  public void init() throws Exception {
     chunkSize = 100;
     flushSize = 2 * chunkSize;
     maxFlushSize = 2 * flushSize;
     blockSize = 2 * maxFlushSize;
-    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "1s");
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
-    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 5, TimeUnit.SECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
     conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
     conf.setQuietMode(false);
     conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
         StorageUnit.MB);
-    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(7)
-        .setBlockSize(blockSize).setChunkSize(chunkSize)
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(7)
+        .setBlockSize(blockSize)
+        .setChunkSize(chunkSize)
         .setStreamBufferFlushSize(flushSize)
         .setStreamBufferMaxSize(maxFlushSize)
-        .setStreamBufferSizeUnit(StorageUnit.BYTES).build();
+        .setStreamBufferSizeUnit(StorageUnit.BYTES)
+        .build();
     cluster.waitForClusterToBeReady();
     //the easiest way to create an open container is creating a key
     client = OzoneClientFactory.getClient(conf);
@@ -110,24 +114,25 @@ public class TestBlockOutputStreamWithFailures {
   /**
    * Shutdown MiniDFSCluster.
    */
-  @After public void shutdown() {
+  @After
+  public void shutdown() {
     if (cluster != null) {
       cluster.shutdown();
     }
   }
 
-  @Test public void testWatchForCommitWithCloseContainerException()
-      throws Exception {
+  @Test
+  public void testWatchForCommitWithCloseContainerException() throws Exception {
     XceiverClientMetrics metrics =
         XceiverClientManager.getXceiverClientMetrics();
-    long writeChunkCount =
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
-    long putBlockCount =
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock);
-    long pendingWriteChunkCount =
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
-    long pendingPutBlockCount =
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock);
+    long writeChunkCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long putBlockCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.PutBlock);
     long totalOpCount = metrics.getTotalOpCount();
     String keyName = getKeyName();
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
@@ -150,14 +155,15 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 2,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 6,
+        metrics.getTotalOpCount());
 
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
-    KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
+    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
 
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
-    OutputStream stream =
-        keyOutputStream.getStreamEntries().get(0).getOutputStream();
+    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+        .getOutputStream();
     Assert.assertTrue(stream instanceof BlockOutputStream);
     BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
 
@@ -193,7 +199,8 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 3,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 8,
+        metrics.getTotalOpCount());
 
     // flush is a sync call, all pending operations will complete
     Assert.assertEquals(pendingWriteChunkCount,
@@ -226,8 +233,9 @@ public class TestBlockOutputStreamWithFailures {
     // rewritten plus one partial chunk plus two putBlocks for flushSize
     // and one flush for partial chunk
     key.flush();
+
     Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
-    Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
+    Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
         .getIoException()) instanceof ContainerNotOpenException);
 
     // Make sure the retryCount is reset after the exception is handled
@@ -239,7 +247,8 @@ public class TestBlockOutputStreamWithFailures {
     // make sure the bufferPool is empty
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
-    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert
+        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
     Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(pendingWriteChunkCount,
@@ -250,23 +259,25 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 8,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 22,
+        metrics.getTotalOpCount());
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
     validateData(keyName, dataString.concat(dataString).getBytes());
   }
 
-  @Test public void testWatchForCommitDatanodeFailure() throws Exception {
+  @Test
+  public void testWatchForCommitDatanodeFailure() throws Exception {
     XceiverClientMetrics metrics =
         XceiverClientManager.getXceiverClientMetrics();
-    long writeChunkCount =
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
-    long putBlockCount =
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock);
-    long pendingWriteChunkCount =
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
-    long pendingPutBlockCount =
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock);
+    long writeChunkCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long putBlockCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.PutBlock);
     long totalOpCount = metrics.getTotalOpCount();
     String keyName = getKeyName();
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
@@ -288,13 +299,14 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 2,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 6,
+        metrics.getTotalOpCount());
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
-    KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
+    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
 
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
-    OutputStream stream =
-        keyOutputStream.getStreamEntries().get(0).getOutputStream();
+    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+        .getOutputStream();
     Assert.assertTrue(stream instanceof BlockOutputStream);
     BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
 
@@ -332,7 +344,8 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 3,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 8,
+        metrics.getTotalOpCount());
 
     // Since the data in the buffer is already flushed, flush here will have
     // no impact on the counters and data structures
@@ -363,7 +376,8 @@ public class TestBlockOutputStreamWithFailures {
     Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     // now close the stream, It will update the ack length after watchForCommit
     key.close();
-    Assert.assertEquals(blockSize, blockOutputStream.getTotalAckDataLength());
+    Assert
+        .assertEquals(blockSize, blockOutputStream.getTotalAckDataLength());
     // Make sure the retryCount is reset after the exception is handled
     Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
     // make sure the bufferPool is empty
@@ -382,23 +396,25 @@ public class TestBlockOutputStreamWithFailures {
     // 4 flushes at flushSize boundaries + 2 flush for partial chunks
     Assert.assertEquals(putBlockCount + 6,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 16, metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 16,
+        metrics.getTotalOpCount());
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
     validateData(keyName, dataString.concat(dataString).getBytes());
   }
 
-  @Test public void test2DatanodesFailure() throws Exception {
+  @Test
+  public void test2DatanodesFailure() throws Exception {
     XceiverClientMetrics metrics =
         XceiverClientManager.getXceiverClientMetrics();
-    long writeChunkCount =
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
-    long putBlockCount =
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock);
-    long pendingWriteChunkCount =
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
-    long pendingPutBlockCount =
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock);
+    long writeChunkCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long putBlockCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.PutBlock);
     long totalOpCount = metrics.getTotalOpCount();
     String keyName = getKeyName();
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
@@ -420,13 +436,14 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 2,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 6,
+        metrics.getTotalOpCount());
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
-    KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
+    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
 
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
-    OutputStream stream =
-        keyOutputStream.getStreamEntries().get(0).getOutputStream();
+    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+        .getOutputStream();
     Assert.assertTrue(stream instanceof BlockOutputStream);
     BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
 
@@ -462,7 +479,8 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 3,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 8,
+        metrics.getTotalOpCount());
 
     // Since the data in the buffer is already flushed, flush here will have
     // no impact on the counters and data structures
@@ -494,7 +512,7 @@ public class TestBlockOutputStreamWithFailures {
     // rewritten plus one partial chunk plus two putBlocks for flushSize
     // and one flush for partial chunk
     key.flush();
-    Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
+    Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
         .getIoException()) instanceof RaftRetryFailureException);
     // Make sure the retryCount is reset after the exception is handled
     Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
@@ -504,7 +522,8 @@ public class TestBlockOutputStreamWithFailures {
     key.close();
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
-    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert
+        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
@@ -514,27 +533,30 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 8,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount());
-    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert.assertEquals(totalOpCount + 22,
+        metrics.getTotalOpCount());
+    Assert
+        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     // make sure the bufferPool is empty
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size());
+    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     validateData(keyName, data1);
   }
 
-  @Test public void testFailureWithPrimeSizedData() throws Exception {
+  @Test
+  public void testFailureWithPrimeSizedData() throws Exception {
     XceiverClientMetrics metrics =
         XceiverClientManager.getXceiverClientMetrics();
-    long writeChunkCount =
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
-    long putBlockCount =
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock);
-    long pendingWriteChunkCount =
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
-    long pendingPutBlockCount =
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock);
+    long writeChunkCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long putBlockCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.PutBlock);
     long totalOpCount = metrics.getTotalOpCount();
     String keyName = getKeyName();
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
@@ -555,21 +577,24 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 1, metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 1,
+        metrics.getTotalOpCount());
 
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
-    KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
+    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
 
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
-    OutputStream stream =
-        keyOutputStream.getStreamEntries().get(0).getOutputStream();
+    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+        .getOutputStream();
     Assert.assertTrue(stream instanceof BlockOutputStream);
     BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
 
+
     Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
     Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
 
-    Assert.assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
+    Assert.assertEquals(0,
+        blockOutputStream.getTotalDataFlushedLength());
 
     Assert.assertTrue(blockOutputStream.getTotalAckDataLength() == 0);
 
@@ -588,7 +613,8 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 1,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 3, metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 3,
+        metrics.getTotalOpCount());
 
     // Since the data in the buffer is already flushed, flush here will have
     // no impact on the counters and data structures
@@ -615,7 +641,7 @@ public class TestBlockOutputStreamWithFailures {
     key.flush();
 
     Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
-    Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
+    Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
         .getIoException()) instanceof ContainerNotOpenException);
     // Make sure the retryCount is reset after the exception is handled
     Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
@@ -627,7 +653,8 @@ public class TestBlockOutputStreamWithFailures {
     // make sure the bufferPool is empty
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
-    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert
+        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
@@ -637,24 +664,26 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 3,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 9, metrics.getTotalOpCount());
-    Assert.assertTrue(keyOutputStream.getLocationInfoList().size() == 0);
+    Assert.assertEquals(totalOpCount + 9,
+        metrics.getTotalOpCount());
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0);
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
     validateData(keyName, dataString.concat(dataString).getBytes());
   }
 
-  @Test public void testExceptionDuringClose() throws Exception {
+  @Test
+  public void testExceptionDuringClose() throws Exception {
     XceiverClientMetrics metrics =
         XceiverClientManager.getXceiverClientMetrics();
-    long writeChunkCount =
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
-    long putBlockCount =
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock);
-    long pendingWriteChunkCount =
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
-    long pendingPutBlockCount =
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock);
+    long writeChunkCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long putBlockCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.PutBlock);
     long totalOpCount = metrics.getTotalOpCount();
     String keyName = getKeyName();
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
@@ -675,21 +704,24 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 1, metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 1,
+        metrics.getTotalOpCount());
 
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
-    KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
+    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
 
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
-    OutputStream stream =
-        keyOutputStream.getStreamEntries().get(0).getOutputStream();
+    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+        .getOutputStream();
     Assert.assertTrue(stream instanceof BlockOutputStream);
     BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
 
+
     Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
     Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
 
-    Assert.assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
+    Assert.assertEquals(0,
+        blockOutputStream.getTotalDataFlushedLength());
 
     Assert.assertTrue(blockOutputStream.getTotalAckDataLength() == 0);
 
@@ -708,7 +740,8 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 1,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 3, metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 3,
+        metrics.getTotalOpCount());
 
     // Since the data in the buffer is already flushed, flush here will have
     // no impact on the counters and data structures
@@ -734,14 +767,15 @@ public class TestBlockOutputStreamWithFailures {
     // now close the stream, It will hit exception
     key.close();
 
-    Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
+    Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
         .getIoException()) instanceof ContainerNotOpenException);
     // Make sure the retryCount is reset after the exception is handled
     Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
     // make sure the bufferPool is empty
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
-    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert
+        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
@@ -751,24 +785,26 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 3,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 9, metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 9,
+        metrics.getTotalOpCount());
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0);
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
     validateData(keyName, dataString.concat(dataString).getBytes());
   }
 
-  @Test public void testWatchForCommitWithSingleNodeRatis() throws Exception {
+  @Test
+  public void testWatchForCommitWithSingleNodeRatis() throws Exception {
     XceiverClientMetrics metrics =
         XceiverClientManager.getXceiverClientMetrics();
-    long writeChunkCount =
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
-    long putBlockCount =
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock);
-    long pendingWriteChunkCount =
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
-    long pendingPutBlockCount =
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock);
+    long writeChunkCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long putBlockCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.PutBlock);
     long totalOpCount = metrics.getTotalOpCount();
     String keyName = getKeyName();
     OzoneOutputStream key =
@@ -792,14 +828,15 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 2,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 6,
+        metrics.getTotalOpCount());
 
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
-    KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
+    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
 
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
-    OutputStream stream =
-        keyOutputStream.getStreamEntries().get(0).getOutputStream();
+    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+        .getOutputStream();
     Assert.assertTrue(stream instanceof BlockOutputStream);
     BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
 
@@ -835,7 +872,8 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 3,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 8,
+        metrics.getTotalOpCount());
 
     // flush is a sync call, all pending operations will complete
     Assert.assertEquals(pendingWriteChunkCount,
@@ -869,7 +907,7 @@ public class TestBlockOutputStreamWithFailures {
     // and one flush for partial chunk
     key.flush();
 
-    Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
+    Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
         .getIoException()) instanceof ContainerNotOpenException);
     // Make sure the retryCount is reset after the exception is handled
     Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
@@ -881,9 +919,10 @@ public class TestBlockOutputStreamWithFailures {
     // make sure the bufferPool is empty
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
-    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert
+        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size());
+    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount,
@@ -892,23 +931,25 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 8,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 22,
+        metrics.getTotalOpCount());
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
     validateData(keyName, dataString.concat(dataString).getBytes());
   }
 
-  @Test public void testDatanodeFailureWithSingleNodeRatis() throws Exception {
+  @Test
+  public void testDatanodeFailureWithSingleNodeRatis() throws Exception {
     XceiverClientMetrics metrics =
         XceiverClientManager.getXceiverClientMetrics();
-    long writeChunkCount =
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
-    long putBlockCount =
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock);
-    long pendingWriteChunkCount =
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
-    long pendingPutBlockCount =
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock);
+    long writeChunkCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long putBlockCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.PutBlock);
     long totalOpCount = metrics.getTotalOpCount();
     String keyName = getKeyName();
     OzoneOutputStream key =
@@ -931,13 +972,14 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 2,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 6,
+        metrics.getTotalOpCount());
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
-    KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
+    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
 
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
-    OutputStream stream =
-        keyOutputStream.getStreamEntries().get(0).getOutputStream();
+    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+        .getOutputStream();
     Assert.assertTrue(stream instanceof BlockOutputStream);
     BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
 
@@ -973,7 +1015,8 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 3,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 8,
+        metrics.getTotalOpCount());
 
     // Since the data in the buffer is already flushed, flush here will have
     // no impact on the counters and data structures
@@ -1001,7 +1044,7 @@ public class TestBlockOutputStreamWithFailures {
 
     key.flush();
 
-    Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
+    Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
         .getIoException()) instanceof RaftRetryFailureException);
     Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
     // Make sure the retryCount is reset after the exception is handled
@@ -1009,7 +1052,8 @@ public class TestBlockOutputStreamWithFailures {
     Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     // now close the stream, It will update the ack length after watchForCommit
     key.close();
-    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert
+        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     // make sure the bufferPool is empty
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
@@ -1029,25 +1073,27 @@ public class TestBlockOutputStreamWithFailures {
     // flush failed + 3 more flushes for the next block
     Assert.assertEquals(putBlockCount + 8,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount());
-    Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size());
+    Assert.assertEquals(totalOpCount + 22,
+        metrics.getTotalOpCount());
+    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
     cluster.restartHddsDatanode(pipeline.getNodes().get(0), true);
     validateData(keyName, dataString.concat(dataString).getBytes());
   }
 
-  @Test public void testDatanodeFailureWithPreAllocation() throws Exception {
+  @Test
+  public void testDatanodeFailureWithPreAllocation() throws Exception {
     XceiverClientMetrics metrics =
         XceiverClientManager.getXceiverClientMetrics();
-    long writeChunkCount =
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
-    long putBlockCount =
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock);
-    long pendingWriteChunkCount =
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
-    long pendingPutBlockCount =
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock);
+    long writeChunkCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long putBlockCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.PutBlock);
     long totalOpCount = metrics.getTotalOpCount();
     String keyName = getKeyName();
     OzoneOutputStream key =
@@ -1071,13 +1117,14 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 2,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 6,
+        metrics.getTotalOpCount());
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
-    KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
+    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
 
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 3);
-    OutputStream stream =
-        keyOutputStream.getStreamEntries().get(0).getOutputStream();
+    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+        .getOutputStream();
     Assert.assertTrue(stream instanceof BlockOutputStream);
     BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
 
@@ -1113,7 +1160,8 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 3,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 8,
+        metrics.getTotalOpCount());
 
     // Since the data in the buffer is already flushed, flush here will have
     // no impact on the counters and data structures
@@ -1140,7 +1188,7 @@ public class TestBlockOutputStreamWithFailures {
 
     key.flush();
 
-    Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
+    Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
         .getIoException()) instanceof RaftRetryFailureException);
 
     // Make sure the retryCount is reset after the exception is handled
@@ -1149,12 +1197,13 @@ public class TestBlockOutputStreamWithFailures {
 
     // now close the stream, It will update the ack length after watchForCommit
     key.close();
-    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert
+        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     // make sure the bufferPool is empty
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size());
+    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount,
@@ -1170,7 +1219,8 @@ public class TestBlockOutputStreamWithFailures {
     // flush failed + 3 more flushes for the next block
     Assert.assertEquals(putBlockCount + 8,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 22,
+        metrics.getTotalOpCount());
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
     cluster.restartHddsDatanode(pipeline.getNodes().get(0), true);

+ 1 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java

@@ -291,7 +291,7 @@ public class TestCloseContainerHandlingByClient {
         (KeyOutputStream) key.getOutputStream();
     // With the initial size provided, it should have preallocated 4 blocks
     Assert.assertEquals(4, keyOutputStream.getStreamEntries().size());
-    // write data 4 blocks and one more chunk
+    // write data 3 blocks and one more chunk
     byte[] writtenData =
         ContainerTestHelper.getFixedLengthString(keyString, keyLen)
             .getBytes(UTF_8);

+ 4 - 4
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java

@@ -21,7 +21,6 @@ import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
@@ -51,6 +50,7 @@ import java.util.concurrent.TimeUnit;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
 
 /**
  * Tests failure detection and handling in BlockOutputStream Class.
@@ -85,7 +85,7 @@ public class TestOzoneClientRetriesOnException {
     blockSize = 2 * maxFlushSize;
     conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
-   // conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
     conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
     conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3);
     conf.setQuietMode(false);
@@ -150,7 +150,7 @@ public class TestOzoneClientRetriesOnException {
             .getPipeline(container.getPipelineID());
     ContainerTestHelper.waitForPipelineClose(key, cluster, true);
     key.flush();
-    Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
+    Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
         .getIoException()) instanceof GroupMismatchException);
     Assert.assertTrue(keyOutputStream.getExcludeList().getPipelineIds()
         .contains(pipeline.getId()));
@@ -201,7 +201,7 @@ public class TestOzoneClientRetriesOnException {
       key.write(data1);
       Assert.fail("Expected exception not thrown");
     } catch (IOException ioe) {
-      Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
+      Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
           .getIoException()) instanceof ContainerNotOpenException);
       Assert.assertTrue(ioe.getMessage().contains(
           "Retry request failed. retries get failed due to exceeded maximum "

+ 0 - 501
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java

@@ -1,501 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.ozone.client.rpc;
-
-import org.apache.hadoop.conf.StorageUnit;
-import org.apache.hadoop.hdds.client.ReplicationType;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.*;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
-import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.client.ObjectStore;
-import org.apache.hadoop.ozone.client.OzoneClient;
-import org.apache.hadoop.ozone.client.OzoneClientFactory;
-import org.apache.hadoop.ozone.client.io.KeyOutputStream;
-import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
-import org.apache.hadoop.ozone.container.ContainerTestHelper;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.ratis.protocol.GroupMismatchException;
-import org.apache.ratis.protocol.RaftRetryFailureException;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
-
-/**
- * This class verifies the watchForCommit Handling by xceiverClient.
- */
-public class TestWatchForCommit {
-
-  private MiniOzoneCluster cluster;
-  private OzoneClient client;
-  private ObjectStore objectStore;
-  private String volumeName;
-  private String bucketName;
-  private String keyString;
-  private int chunkSize;
-  private int flushSize;
-  private int maxFlushSize;
-  private int blockSize;
-  private StorageContainerLocationProtocolClientSideTranslatorPB
-      storageContainerLocationClient;
-  private static String containerOwner = "OZONE";
-
-  /**
-   * Create a MiniDFSCluster for testing.
-   * <p>
-   * Ozone is made active by setting OZONE_ENABLED = true
-   *
-   * @throws IOException
-   */
-  private void startCluster(OzoneConfiguration conf) throws Exception {
-    chunkSize = 100;
-    flushSize = 2 * chunkSize;
-    maxFlushSize = 2 * flushSize;
-    blockSize = 2 * maxFlushSize;
-
-    conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
-    conf.setTimeDuration(
-        OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY,
-        1, TimeUnit.SECONDS);
-
-    conf.setQuietMode(false);
-    cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(7)
-        .setBlockSize(blockSize)
-        .setChunkSize(chunkSize)
-        .setStreamBufferFlushSize(flushSize)
-        .setStreamBufferMaxSize(maxFlushSize)
-        .setStreamBufferSizeUnit(StorageUnit.BYTES)
-        .build();
-    cluster.waitForClusterToBeReady();
-    //the easiest way to create an open container is creating a key
-    client = OzoneClientFactory.getClient(conf);
-    objectStore = client.getObjectStore();
-    keyString = UUID.randomUUID().toString();
-    volumeName = "watchforcommithandlingtest";
-    bucketName = volumeName;
-    objectStore.createVolume(volumeName);
-    objectStore.getVolume(volumeName).createBucket(bucketName);
-    storageContainerLocationClient = cluster
-        .getStorageContainerLocationClient();
-  }
-
-
-  /**
-   * Shutdown MiniDFSCluster.
-   */
-  private void shutdown() {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
-  private String getKeyName() {
-    return UUID.randomUUID().toString();
-  }
-
-  @Test
-  public void testWatchForCommitWithKeyWrite() throws Exception {
-    // in this case, watch request should fail with RaftRetryFailureException
-    // and will be captured in keyOutputStream and the failover will happen
-    // to a different block
-    OzoneConfiguration conf = new OzoneConfiguration();
-    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
-        TimeUnit.SECONDS);
-    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 5);
-    startCluster(conf);
-    XceiverClientMetrics metrics =
-        XceiverClientManager.getXceiverClientMetrics();
-    long writeChunkCount = metrics.getContainerOpCountMetrics(
-        ContainerProtos.Type.WriteChunk);
-    long putBlockCount = metrics.getContainerOpCountMetrics(
-        ContainerProtos.Type.PutBlock);
-    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
-        ContainerProtos.Type.WriteChunk);
-    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
-        ContainerProtos.Type.PutBlock);
-    long totalOpCount = metrics.getTotalOpCount();
-    String keyName = getKeyName();
-    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
-    int dataLength = maxFlushSize + 50;
-    // write data more than 1 chunk
-    byte[] data1 =
-        ContainerTestHelper.getFixedLengthString(keyString, dataLength)
-            .getBytes(UTF_8);
-    key.write(data1);
-    // since its hitting the full bufferCondition, it will call watchForCommit
-    // and completes atleast putBlock for first flushSize worth of data
-    Assert.assertTrue(
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
-            <= pendingWriteChunkCount + 2);
-    Assert.assertTrue(
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
-            <= pendingPutBlockCount + 1);
-    Assert.assertEquals(writeChunkCount + 4,
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
-    Assert.assertEquals(putBlockCount + 2,
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 6,
-        metrics.getTotalOpCount());
-    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
-    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
-
-    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
-    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
-        .getOutputStream();
-    Assert.assertTrue(stream instanceof BlockOutputStream);
-    BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
-
-    // we have just written data more than flush Size(2 chunks), at this time
-    // buffer pool will have 3 buffers allocated worth of chunk size
-
-    Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
-    // writtenDataLength as well flushedDataLength will be updated here
-    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
-    Assert.assertEquals(maxFlushSize,
-        blockOutputStream.getTotalDataFlushedLength());
-
-    // since data equals to maxBufferSize is written, this will be a blocking
-    // call and hence will wait for atleast flushSize worth of data to get
-    // acked by all servers right here
-    Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
-
-    // watchForCommit will clean up atleast one entry from the map where each
-    // entry corresponds to flushSize worth of data
-    Assert.assertTrue(
-        blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
-
-    // Now do a flush. This will flush the data and update the flush length and
-    // the map.
-    key.flush();
-
-    Assert.assertEquals(pendingWriteChunkCount,
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
-    Assert.assertEquals(pendingPutBlockCount,
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(writeChunkCount + 5,
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
-    Assert.assertEquals(putBlockCount + 3,
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 8,
-        metrics.getTotalOpCount());
-
-    // Since the data in the buffer is already flushed, flush here will have
-    // no impact on the counters and data structures
-
-    Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
-    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
-    Assert.assertEquals(dataLength,
-        blockOutputStream.getTotalDataFlushedLength());
-    // flush will make sure one more entry gets updated in the map
-    Assert.assertTrue(
-        blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
-
-    XceiverClientRatis raftClient =
-        (XceiverClientRatis) blockOutputStream.getXceiverClient();
-    Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
-    Pipeline pipeline = raftClient.getPipeline();
-    cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
-    cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
-    // again write data with more than max buffer limit. This will call
-    // watchForCommit again. Since the commit will happen 2 way, the
-    // commitInfoMap will get updated for servers which are alive
-
-    // 4 writeChunks = maxFlushSize + 2 putBlocks  will be discarded here
-    // once exception is hit
-    key.write(data1);
-
-    // As a part of handling the exception, 4 failed writeChunks  will be
-    // rewritten plus one partial chunk plus two putBlocks for flushSize
-    // and one flush for partial chunk
-    key.flush();
-    Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
-        .getIoException()) instanceof RaftRetryFailureException);
-    // Make sure the retryCount is reset after the exception is handled
-    Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
-    // now close the stream, It will update the ack length after watchForCommit
-    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
-    key.close();
-    Assert
-        .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
-    Assert
-        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
-    Assert.assertEquals(pendingWriteChunkCount,
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
-    Assert.assertEquals(pendingPutBlockCount,
-        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(writeChunkCount + 14,
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
-    Assert.assertEquals(putBlockCount + 8,
-        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 22,
-        metrics.getTotalOpCount());
-    Assert
-        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    // make sure the bufferPool is empty
-    Assert
-        .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    validateData(keyName, data1);
-    shutdown();
-  }
-
-  @Test
-  public void testWatchForCommitWithSmallerTimeoutValue() throws Exception {
-    OzoneConfiguration conf = new OzoneConfiguration();
-    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3,
-        TimeUnit.SECONDS);
-    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
-    startCluster(conf);
-    XceiverClientManager clientManager = new XceiverClientManager(conf);
-    ContainerWithPipeline container1 = storageContainerLocationClient
-        .allocateContainer(HddsProtos.ReplicationType.RATIS,
-            HddsProtos.ReplicationFactor.THREE, containerOwner);
-    XceiverClientSpi xceiverClient = clientManager
-        .acquireClient(container1.getPipeline());
-    Assert.assertEquals(1, xceiverClient.getRefcount());
-    Assert.assertEquals(container1.getPipeline(),
-        xceiverClient.getPipeline());
-    Pipeline pipeline = xceiverClient.getPipeline();
-    XceiverClientReply reply = xceiverClient.sendCommandAsync(
-        ContainerTestHelper.getCreateContainerRequest(
-            container1.getContainerInfo().getContainerID(),
-            xceiverClient.getPipeline()));
-    reply.getResponse().get();
-    long index = reply.getLogIndex();
-    cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
-    cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
-    try {
-      // just watch for a lo index which in not updated in the commitInfo Map
-      xceiverClient.watchForCommit(index + 1, 3000);
-      Assert.fail("expected exception not thrown");
-    } catch (Exception e) {
-      Assert.assertTrue(
-          HddsClientUtils.checkForException(e) instanceof TimeoutException);
-    }
-    // After releasing the xceiverClient, this connection should be closed
-    // and any container operations should fail
-    clientManager.releaseClient(xceiverClient, false);
-    shutdown();
-  }
-
-  @Test
-  public void testWatchForCommitForRetryfailure() throws Exception {
-    OzoneConfiguration conf = new OzoneConfiguration();
-    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT,
-        100, TimeUnit.SECONDS);
-    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
-    startCluster(conf);
-    XceiverClientManager clientManager = new XceiverClientManager(conf);
-    ContainerWithPipeline container1 = storageContainerLocationClient
-        .allocateContainer(HddsProtos.ReplicationType.RATIS,
-            HddsProtos.ReplicationFactor.THREE, containerOwner);
-    XceiverClientSpi xceiverClient = clientManager
-        .acquireClient(container1.getPipeline());
-    Assert.assertEquals(1, xceiverClient.getRefcount());
-    Assert.assertEquals(container1.getPipeline(),
-        xceiverClient.getPipeline());
-    Pipeline pipeline = xceiverClient.getPipeline();
-    XceiverClientReply reply = xceiverClient.sendCommandAsync(
-        ContainerTestHelper.getCreateContainerRequest(
-            container1.getContainerInfo().getContainerID(),
-            xceiverClient.getPipeline()));
-    reply.getResponse().get();
-    long index = reply.getLogIndex();
-    cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
-    cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
-    // again write data with more than max buffer limit. This wi
-    try {
-      // just watch for a lo index which in not updated in the commitInfo Map
-      xceiverClient.watchForCommit(index + 1, 20000);
-      Assert.fail("expected exception not thrown");
-    } catch (Exception e) {
-      Assert.assertTrue(HddsClientUtils
-          .checkForException(e) instanceof RaftRetryFailureException);
-    }
-    clientManager.releaseClient(xceiverClient, false);
-    shutdown();
-  }
-
-  @Test
-  public void test2WayCommitForRetryfailure() throws Exception {
-    OzoneConfiguration conf = new OzoneConfiguration();
-    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
-        TimeUnit.SECONDS);
-    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 8);
-    startCluster(conf);
-    GenericTestUtils.LogCapturer logCapturer =
-        GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
-    XceiverClientManager clientManager = new XceiverClientManager(conf);
-
-    ContainerWithPipeline container1 = storageContainerLocationClient
-        .allocateContainer(HddsProtos.ReplicationType.RATIS,
-            HddsProtos.ReplicationFactor.THREE, containerOwner);
-    XceiverClientSpi xceiverClient = clientManager
-        .acquireClient(container1.getPipeline());
-    Assert.assertEquals(1, xceiverClient.getRefcount());
-    Assert.assertEquals(container1.getPipeline(),
-        xceiverClient.getPipeline());
-    Pipeline pipeline = xceiverClient.getPipeline();
-    XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient;
-    XceiverClientReply reply = xceiverClient.sendCommandAsync(
-        ContainerTestHelper.getCreateContainerRequest(
-            container1.getContainerInfo().getContainerID(),
-            xceiverClient.getPipeline()));
-    reply.getResponse().get();
-    Assert.assertEquals(3, ratisClient.getCommitInfoMap().size());
-    cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
-    reply = xceiverClient.sendCommandAsync(ContainerTestHelper
-        .getCloseContainer(pipeline,
-            container1.getContainerInfo().getContainerID()));
-    reply.getResponse().get();
-    xceiverClient.watchForCommit(reply.getLogIndex(), 20000);
-
-    // commitInfo Map will be reduced to 2 here
-    Assert.assertEquals(2, ratisClient.getCommitInfoMap().size());
-    clientManager.releaseClient(xceiverClient, false);
-    Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed"));
-    Assert
-        .assertTrue(logCapturer.getOutput().contains("Committed by majority"));
-    logCapturer.stopCapturing();
-    shutdown();
-  }
-
-  @Test
-  public void test2WayCommitForTimeoutException() throws Exception {
-    OzoneConfiguration conf = new OzoneConfiguration();
-    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3,
-        TimeUnit.SECONDS);
-    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
-    startCluster(conf);
-    GenericTestUtils.LogCapturer logCapturer =
-        GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
-    XceiverClientManager clientManager = new XceiverClientManager(conf);
-
-    ContainerWithPipeline container1 = storageContainerLocationClient
-        .allocateContainer(HddsProtos.ReplicationType.RATIS,
-            HddsProtos.ReplicationFactor.THREE, containerOwner);
-    XceiverClientSpi xceiverClient = clientManager
-        .acquireClient(container1.getPipeline());
-    Assert.assertEquals(1, xceiverClient.getRefcount());
-    Assert.assertEquals(container1.getPipeline(),
-        xceiverClient.getPipeline());
-    Pipeline pipeline = xceiverClient.getPipeline();
-    XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient;
-    XceiverClientReply reply = xceiverClient.sendCommandAsync(
-        ContainerTestHelper.getCreateContainerRequest(
-            container1.getContainerInfo().getContainerID(),
-            xceiverClient.getPipeline()));
-    reply.getResponse().get();
-    Assert.assertEquals(3, ratisClient.getCommitInfoMap().size());
-    cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
-    reply = xceiverClient.sendCommandAsync(ContainerTestHelper
-        .getCloseContainer(pipeline,
-            container1.getContainerInfo().getContainerID()));
-    reply.getResponse().get();
-    xceiverClient.watchForCommit(reply.getLogIndex(), 3000);
-
-    // commitInfo Map will be reduced to 2 here
-    Assert.assertEquals(2, ratisClient.getCommitInfoMap().size());
-    clientManager.releaseClient(xceiverClient, false);
-    Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed"));
-    Assert.assertTrue(logCapturer.getOutput().contains("TimeoutException"));
-    Assert
-        .assertTrue(logCapturer.getOutput().contains("Committed by majority"));
-    logCapturer.stopCapturing();
-    shutdown();
-  }
-
-  @Test
-  public void testWatchForCommitForGroupMismatchException() throws Exception {
-    OzoneConfiguration conf = new OzoneConfiguration();
-    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
-        TimeUnit.SECONDS);
-    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
-
-    // mark the node stale early so that pipleline gets destroyed quickly
-    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
-    startCluster(conf);
-    GenericTestUtils.LogCapturer logCapturer =
-        GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
-    XceiverClientManager clientManager = new XceiverClientManager(conf);
-
-    ContainerWithPipeline container1 = storageContainerLocationClient
-        .allocateContainer(HddsProtos.ReplicationType.RATIS,
-            HddsProtos.ReplicationFactor.THREE, containerOwner);
-    XceiverClientSpi xceiverClient = clientManager
-        .acquireClient(container1.getPipeline());
-    Assert.assertEquals(1, xceiverClient.getRefcount());
-    Assert.assertEquals(container1.getPipeline(),
-        xceiverClient.getPipeline());
-    Pipeline pipeline = xceiverClient.getPipeline();
-    XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient;
-    long containerId = container1.getContainerInfo().getContainerID();
-    XceiverClientReply reply = xceiverClient.sendCommandAsync(
-        ContainerTestHelper.getCreateContainerRequest(containerId,
-            xceiverClient.getPipeline()));
-    reply.getResponse().get();
-    Assert.assertEquals(3, ratisClient.getCommitInfoMap().size());
-    List<Pipeline> pipelineList = new ArrayList<>();
-    pipelineList.add(pipeline);
-    ContainerTestHelper.waitForPipelineClose(pipelineList, cluster);
-    try {
-      // just watch for a lo index which in not updated in the commitInfo Map
-      xceiverClient.watchForCommit(reply.getLogIndex() + 1, 20000);
-      Assert.fail("Expected exception not thrown");
-    } catch(Exception e) {
-      Assert.assertTrue(HddsClientUtils
-          .checkForException(e) instanceof GroupMismatchException);
-    }
-    clientManager.releaseClient(xceiverClient, false);
-    shutdown();
-  }
-
-  private OzoneOutputStream createKey(String keyName, ReplicationType type,
-      long size) throws Exception {
-    return ContainerTestHelper
-        .createKey(keyName, type, size, objectStore, volumeName, bucketName);
-  }
-
-  private void validateData(String keyName, byte[] data) throws Exception {
-    ContainerTestHelper
-        .validateData(keyName, data, objectStore, volumeName, bucketName);
-  }
-}

+ 9 - 18
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java

@@ -57,7 +57,6 @@ import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.ObjectStore;
-import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
@@ -69,6 +68,7 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.security.token.Token;
 
 import com.google.common.base.Preconditions;
@@ -723,11 +723,11 @@ public final class ContainerTestHelper {
       MiniOzoneCluster cluster) throws Exception {
     KeyOutputStream keyOutputStream =
         (KeyOutputStream) outputStream.getOutputStream();
-    List<BlockOutputStreamEntry> streamEntryList =
-        keyOutputStream.getStreamEntries();
+    List<OmKeyLocationInfo> locationInfoList =
+        keyOutputStream.getLocationInfoList();
     List<Long> containerIdList = new ArrayList<>();
-    for (BlockOutputStreamEntry entry : streamEntryList) {
-      long id = entry.getBlockID().getContainerID();
+    for (OmKeyLocationInfo info : locationInfoList) {
+      long id = info.getContainerID();
       if (!containerIdList.contains(id)) {
         containerIdList.add(id);
       }
@@ -741,14 +741,11 @@ public final class ContainerTestHelper {
       throws Exception {
     KeyOutputStream keyOutputStream =
         (KeyOutputStream) outputStream.getOutputStream();
-    List<BlockOutputStreamEntry> streamEntryList =
-        keyOutputStream.getStreamEntries();
+    List<OmKeyLocationInfo> locationInfoList =
+        keyOutputStream.getLocationInfoList();
     List<Long> containerIdList = new ArrayList<>();
-    for (BlockOutputStreamEntry entry : streamEntryList) {
-      long id = entry.getBlockID().getContainerID();
-      if (!containerIdList.contains(id)) {
-        containerIdList.add(id);
-      }
+    for (OmKeyLocationInfo info : locationInfoList) {
+      containerIdList.add(info.getContainerID());
     }
     Assert.assertTrue(!containerIdList.isEmpty());
     waitForPipelineClose(cluster, waitForContainerCreation,
@@ -787,12 +784,6 @@ public final class ContainerTestHelper {
         }
       }
     }
-    waitForPipelineClose(pipelineList, cluster);
-  }
-
-  public static void waitForPipelineClose(List<Pipeline> pipelineList,
-      MiniOzoneCluster cluster)
-      throws TimeoutException, InterruptedException, IOException {
     for (Pipeline pipeline1 : pipelineList) {
       // issue pipeline destroy command
       cluster.getStorageContainerManager().getPipelineManager()

+ 1 - 1
hadoop-ozone/pom.xml

@@ -29,7 +29,7 @@
     <hadoop.version>3.2.0</hadoop.version>
     <hdds.version>0.5.0-SNAPSHOT</hdds.version>
     <ozone.version>0.5.0-SNAPSHOT</ozone.version>
-    <ratis.version>0.4.0-fe2b15d-SNAPSHOT</ratis.version>
+    <ratis.version>0.3.0</ratis.version>
     <bouncycastle.version>1.60</bouncycastle.version>
     <ozone.release>Crater Lake</ozone.release>
     <declared.ozone.version>${ozone.version}</declared.ozone.version>