Browse Source

HDDS-1395. Key write fails with BlockOutputStream has been closed exception (#749). Contributed by Shashikant Banerjee

Shashikant Banerjee 6 năm trước cách đây
mục cha
commit
7f0e2c67e0
17 tập tin đã thay đổi với 1193 bổ sung576 xóa
  1. 11 5
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
  2. 69 1
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java
  3. 11 2
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
  4. 0 1
      hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
  5. 2 2
      hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
  6. 2 2
      hadoop-hdds/common/src/main/resources/ozone-default.xml
  7. 1 1
      hadoop-hdds/pom.xml
  8. 0 38
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java
  9. 7 0
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
  10. 344 0
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
  11. 68 306
      hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
  12. 153 203
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
  13. 1 1
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
  14. 4 4
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
  15. 501 0
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
  16. 18 9
      hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
  17. 1 1
      hadoop-ozone/pom.xml

+ 11 - 5
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java

@@ -29,6 +29,7 @@ import io.opentracing.util.GlobalTracer;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.GroupMismatchException;
 import org.apache.ratis.protocol.RaftRetryFailureException;
 import org.apache.ratis.protocol.RaftRetryFailureException;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.thirdparty.com.google.protobuf
 import org.apache.ratis.thirdparty.com.google.protobuf
@@ -69,7 +70,8 @@ import java.util.stream.Collectors;
  * The underlying RPC mechanism can be chosen via the constructor.
  * The underlying RPC mechanism can be chosen via the constructor.
  */
  */
 public final class XceiverClientRatis extends XceiverClientSpi {
 public final class XceiverClientRatis extends XceiverClientSpi {
-  static final Logger LOG = LoggerFactory.getLogger(XceiverClientRatis.class);
+  public static final Logger LOG =
+      LoggerFactory.getLogger(XceiverClientRatis.class);
 
 
   public static XceiverClientRatis newXceiverClientRatis(
   public static XceiverClientRatis newXceiverClientRatis(
       org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
       org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
@@ -248,13 +250,17 @@ public final class XceiverClientRatis extends XceiverClientSpi {
       return clientReply;
       return clientReply;
     }
     }
     LOG.debug("commit index : {} watch timeout : {}", index, timeout);
     LOG.debug("commit index : {} watch timeout : {}", index, timeout);
-    CompletableFuture<RaftClientReply> replyFuture = getClient()
-        .sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
     RaftClientReply reply;
     RaftClientReply reply;
     try {
     try {
+      CompletableFuture<RaftClientReply> replyFuture = getClient()
+          .sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
       replyFuture.get(timeout, TimeUnit.MILLISECONDS);
       replyFuture.get(timeout, TimeUnit.MILLISECONDS);
-    } catch (TimeoutException toe) {
-      LOG.warn("3 way commit failed ", toe);
+    } catch (Exception e) {
+      Throwable t = HddsClientUtils.checkForException(e);
+      LOG.warn("3 way commit failed ", e);
+      if (t instanceof GroupMismatchException) {
+        throw e;
+      }
       reply = getClient()
       reply = getClient()
           .sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
           .sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
           .get(timeout, TimeUnit.MILLISECONDS);
           .get(timeout, TimeUnit.MILLISECONDS);

+ 69 - 1
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java

@@ -28,8 +28,11 @@ import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
 import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
 import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
 import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC;
@@ -40,6 +43,10 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.impl.client.HttpClients;
+import org.apache.ratis.protocol.AlreadyClosedException;
+import org.apache.ratis.protocol.GroupMismatchException;
+import org.apache.ratis.protocol.NotReplicatedException;
+import org.apache.ratis.protocol.RaftRetryFailureException;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
@@ -50,8 +57,12 @@ import java.time.Instant;
 import java.time.ZoneId;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
 import java.time.ZonedDateTime;
 import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.TimeoutException;
 
 
 /**
 /**
  * Utility methods for Ozone and Container Clients.
  * Utility methods for Ozone and Container Clients.
@@ -72,6 +83,18 @@ public final class HddsClientUtils {
   private HddsClientUtils() {
   private HddsClientUtils() {
   }
   }
 
 
+  private static final List<Class<? extends Exception>> EXCEPTION_LIST =
+      new ArrayList<Class<? extends Exception>>() {{
+        add(TimeoutException.class);
+        add(ContainerNotOpenException.class);
+        add(RaftRetryFailureException.class);
+        add(AlreadyClosedException.class);
+        add(GroupMismatchException.class);
+        // Not Replicated Exception will be thrown if watch For commit
+        // does not succeed
+        add(NotReplicatedException.class);
+      }};
+
   /**
   /**
    * Date format that used in ozone. Here the format is thread safe to use.
    * Date format that used in ozone. Here the format is thread safe to use.
    */
    */
@@ -290,4 +313,49 @@ public final class HddsClientUtils {
                 Client.getRpcTimeout(conf)));
                 Client.getRpcTimeout(conf)));
     return scmSecurityClient;
     return scmSecurityClient;
   }
   }
+
+  public static Throwable checkForException(Exception e) throws IOException {
+    Throwable t = e;
+    while (t != null) {
+      for (Class<? extends Exception> cls : getExceptionList()) {
+        if (cls.isInstance(t)) {
+          return t;
+        }
+      }
+      t = t.getCause();
+    }
+
+    throw e instanceof IOException ? (IOException)e : new IOException(e);
+  }
+
+  public static RetryPolicy createRetryPolicy(int maxRetryCount,
+      long retryInterval) {
+    // retry with fixed sleep between retries
+    return RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+        maxRetryCount, retryInterval, TimeUnit.MILLISECONDS);
+  }
+
+  public static Map<Class<? extends Throwable>,
+      RetryPolicy> getRetryPolicyByException(int maxRetryCount,
+      long retryInterval) {
+    Map<Class<? extends Throwable>, RetryPolicy> policyMap = new HashMap<>();
+    for (Class<? extends Exception> ex : EXCEPTION_LIST) {
+      if (ex == TimeoutException.class
+          || ex == RaftRetryFailureException.class) {
+        // retry without sleep
+        policyMap.put(ex, createRetryPolicy(maxRetryCount, 0));
+      } else {
+        // retry with fixed sleep between retries
+        policyMap.put(ex, createRetryPolicy(maxRetryCount, retryInterval));
+      }
+    }
+    // Default retry policy
+    policyMap
+        .put(Exception.class, createRetryPolicy(maxRetryCount, retryInterval));
+    return policyMap;
+  }
+
+  public static List<Class<? extends Exception>> getExceptionList() {
+    return EXCEPTION_LIST;
+  }
 }
 }

+ 11 - 2
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java

@@ -80,7 +80,7 @@ public class BlockOutputStream extends OutputStream {
   public static final Logger LOG =
   public static final Logger LOG =
       LoggerFactory.getLogger(BlockOutputStream.class);
       LoggerFactory.getLogger(BlockOutputStream.class);
 
 
-  private BlockID blockID;
+  private volatile BlockID blockID;
   private final String key;
   private final String key;
   private final String traceID;
   private final String traceID;
   private final BlockData.Builder containerBlockData;
   private final BlockData.Builder containerBlockData;
@@ -574,7 +574,7 @@ public class BlockOutputStream extends OutputStream {
    * @throws IOException if stream is closed
    * @throws IOException if stream is closed
    */
    */
   private void checkOpen() throws IOException {
   private void checkOpen() throws IOException {
-    if (xceiverClient == null) {
+    if (isClosed()) {
       throw new IOException("BlockOutputStream has been closed.");
       throw new IOException("BlockOutputStream has been closed.");
     } else if (getIoException() != null) {
     } else if (getIoException() != null) {
       adjustBuffersOnException();
       adjustBuffersOnException();
@@ -582,6 +582,10 @@ public class BlockOutputStream extends OutputStream {
     }
     }
   }
   }
 
 
+  public boolean isClosed() {
+    return xceiverClient == null;
+  }
+
   /**
   /**
    * Writes buffered data as a new chunk to the container and saves chunk
    * Writes buffered data as a new chunk to the container and saves chunk
    * information to be used later in putKey call.
    * information to be used later in putKey call.
@@ -635,4 +639,9 @@ public class BlockOutputStream extends OutputStream {
             + " length " + effectiveChunkSize);
             + " length " + effectiveChunkSize);
     containerBlockData.addChunks(chunkInfo);
     containerBlockData.addChunks(chunkInfo);
   }
   }
+
+  @VisibleForTesting
+  public void setXceiverClient(XceiverClientSpi xceiverClient) {
+    this.xceiverClient = xceiverClient;
+  }
 }
 }

+ 0 - 1
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java

@@ -188,7 +188,6 @@ public class CommitWatcher {
    */
    */
   public XceiverClientReply watchForCommit(long commitIndex)
   public XceiverClientReply watchForCommit(long commitIndex)
       throws IOException {
       throws IOException {
-    Preconditions.checkState(!commitIndex2flushedDataMap.isEmpty());
     long index;
     long index;
     try {
     try {
       XceiverClientReply reply =
       XceiverClientReply reply =

+ 2 - 2
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java

@@ -121,12 +121,12 @@ public final class ScmConfigKeys {
       TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS);
       TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS);
   public static final String DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY =
   public static final String DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY =
       "dfs.ratis.client.request.max.retries";
       "dfs.ratis.client.request.max.retries";
-  public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 20;
+  public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 180;
   public static final String DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY =
   public static final String DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY =
       "dfs.ratis.client.request.retry.interval";
       "dfs.ratis.client.request.retry.interval";
   public static final TimeDuration
   public static final TimeDuration
       DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_DEFAULT =
       DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_DEFAULT =
-      TimeDuration.valueOf(500, TimeUnit.MILLISECONDS);
+      TimeDuration.valueOf(1000, TimeUnit.MILLISECONDS);
   public static final String DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_KEY =
   public static final String DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_KEY =
       "dfs.ratis.server.retry-cache.timeout.duration";
       "dfs.ratis.server.retry-cache.timeout.duration";
   public static final TimeDuration
   public static final TimeDuration

+ 2 - 2
hadoop-hdds/common/src/main/resources/ozone-default.xml

@@ -237,13 +237,13 @@
   </property>
   </property>
   <property>
   <property>
     <name>dfs.ratis.client.request.max.retries</name>
     <name>dfs.ratis.client.request.max.retries</name>
-    <value>20</value>
+    <value>180</value>
     <tag>OZONE, RATIS, MANAGEMENT</tag>
     <tag>OZONE, RATIS, MANAGEMENT</tag>
     <description>Number of retries for ratis client request.</description>
     <description>Number of retries for ratis client request.</description>
   </property>
   </property>
   <property>
   <property>
     <name>dfs.ratis.client.request.retry.interval</name>
     <name>dfs.ratis.client.request.retry.interval</name>
-    <value>500ms</value>
+    <value>1000ms</value>
     <tag>OZONE, RATIS, MANAGEMENT</tag>
     <tag>OZONE, RATIS, MANAGEMENT</tag>
     <description>Interval between successive retries for a ratis client request.
     <description>Interval between successive retries for a ratis client request.
     </description>
     </description>

+ 1 - 1
hadoop-hdds/pom.xml

@@ -47,7 +47,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <hdds.version>0.5.0-SNAPSHOT</hdds.version>
     <hdds.version>0.5.0-SNAPSHOT</hdds.version>
 
 
     <!-- Apache Ratis version -->
     <!-- Apache Ratis version -->
-    <ratis.version>0.3.0</ratis.version>
+    <ratis.version>0.4.0-fe2b15d-SNAPSHOT</ratis.version>
 
 
     <bouncycastle.version>1.60</bouncycastle.version>
     <bouncycastle.version>1.60</bouncycastle.version>
 
 

+ 0 - 38
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java

@@ -18,15 +18,11 @@
 package org.apache.hadoop.ozone.client;
 package org.apache.hadoop.ozone.client;
 
 
 import java.util.ArrayList;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 
 import org.apache.hadoop.hdds.client.OzoneQuota;
 import org.apache.hadoop.hdds.client.OzoneQuota;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -36,23 +32,11 @@ import org.apache.hadoop.ozone.client.rest.response.KeyInfoDetails;
 import org.apache.hadoop.ozone.client.rest.response.KeyLocation;
 import org.apache.hadoop.ozone.client.rest.response.KeyLocation;
 import org.apache.hadoop.ozone.client.rest.response.VolumeInfo;
 import org.apache.hadoop.ozone.client.rest.response.VolumeInfo;
 import org.apache.hadoop.ozone.client.rest.response.VolumeOwner;
 import org.apache.hadoop.ozone.client.rest.response.VolumeOwner;
-import org.apache.ratis.protocol.AlreadyClosedException;
-import org.apache.ratis.protocol.GroupMismatchException;
-import org.apache.ratis.protocol.RaftRetryFailureException;
 
 
 /** A utility class for OzoneClient. */
 /** A utility class for OzoneClient. */
 public final class OzoneClientUtils {
 public final class OzoneClientUtils {
 
 
   private OzoneClientUtils() {}
   private OzoneClientUtils() {}
-
-  private static final List<Class<? extends Exception>> EXCEPTION_LIST =
-      new ArrayList<Class<? extends Exception>>() {{
-        add(TimeoutException.class);
-        add(ContainerNotOpenException.class);
-        add(RaftRetryFailureException.class);
-        add(AlreadyClosedException.class);
-        add(GroupMismatchException.class);
-      }};
   /**
   /**
    * Returns a BucketInfo object constructed using fields of the input
    * Returns a BucketInfo object constructed using fields of the input
    * OzoneBucket object.
    * OzoneBucket object.
@@ -141,26 +125,4 @@ public final class OzoneClientUtils {
         maxRetryCount, retryInterval, TimeUnit.MILLISECONDS);
         maxRetryCount, retryInterval, TimeUnit.MILLISECONDS);
   }
   }
 
 
-  public static List<Class<? extends Exception>> getExceptionList() {
-    return EXCEPTION_LIST;
-  }
-
-  public static Map<Class<? extends Throwable>, RetryPolicy>
-      getRetryPolicyByException(int maxRetryCount, long retryInterval) {
-    Map<Class<? extends Throwable>, RetryPolicy> policyMap = new HashMap<>();
-    for (Class<? extends Exception> ex : EXCEPTION_LIST) {
-      if (ex == TimeoutException.class ||
-          ex == RaftRetryFailureException.class) {
-        // retry without sleep
-        policyMap.put(ex, createRetryPolicy(maxRetryCount, 0));
-      } else {
-        // retry with fixed sleep between retries
-        policyMap.put(ex, createRetryPolicy(maxRetryCount, retryInterval));
-      }
-    }
-    // Default retry policy
-    policyMap.put(Exception.class, createRetryPolicy(
-        maxRetryCount, retryInterval));
-    return policyMap;
-  }
 }
 }

+ 7 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java

@@ -149,6 +149,13 @@ public final class BlockOutputStreamEntry extends OutputStream {
     }
     }
   }
   }
 
 
+  boolean isClosed() {
+    if (outputStream != null) {
+      return  ((BlockOutputStream) outputStream).isClosed();
+    }
+    return false;
+  }
+
   long getTotalAckDataLength() {
   long getTotalAckDataLength() {
     if (outputStream != null) {
     if (outputStream != null) {
       BlockOutputStream out = (BlockOutputStream) this.outputStream;
       BlockOutputStream out = (BlockOutputStream) this.outputStream;

+ 344 - 0
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java

@@ -0,0 +1,344 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BufferPool;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.om.helpers.*;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
+/**
+ * This class manages the stream entries list and handles block allocation
+ * from OzoneManager.
+ */
+public class BlockOutputStreamEntryPool {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(BlockOutputStreamEntryPool.class);
+
+  private final List<BlockOutputStreamEntry> streamEntries;
+  private int currentStreamIndex;
+  private final OzoneManagerProtocol omClient;
+  private final OmKeyArgs keyArgs;
+  private final XceiverClientManager xceiverClientManager;
+  private final int chunkSize;
+  private final String requestID;
+  private final long streamBufferFlushSize;
+  private final long streamBufferMaxSize;
+  private final long watchTimeout;
+  private final long blockSize;
+  private final int bytesPerChecksum;
+  private final ContainerProtos.ChecksumType checksumType;
+  private final BufferPool bufferPool;
+  private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
+  private final long openID;
+  private ExcludeList excludeList;
+
+  @SuppressWarnings("parameternumber")
+  public BlockOutputStreamEntryPool(OzoneManagerProtocol omClient,
+      int chunkSize, String requestId, HddsProtos.ReplicationFactor factor,
+      HddsProtos.ReplicationType type, long bufferFlushSize, long bufferMaxSize,
+      long size, long watchTimeout, ContainerProtos.ChecksumType checksumType,
+      int bytesPerChecksum, String uploadID, int partNumber,
+      boolean isMultipart, OmKeyInfo info,
+      XceiverClientManager xceiverClientManager, long openID) {
+    streamEntries = new ArrayList<>();
+    currentStreamIndex = 0;
+    this.omClient = omClient;
+    this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName())
+        .setBucketName(info.getBucketName()).setKeyName(info.getKeyName())
+        .setType(type).setFactor(factor).setDataSize(info.getDataSize())
+        .setIsMultipartKey(isMultipart).setMultipartUploadID(uploadID)
+        .setMultipartUploadPartNumber(partNumber).build();
+    this.xceiverClientManager = xceiverClientManager;
+    this.chunkSize = chunkSize;
+    this.requestID = requestId;
+    this.streamBufferFlushSize = bufferFlushSize;
+    this.streamBufferMaxSize = bufferMaxSize;
+    this.blockSize = size;
+    this.watchTimeout = watchTimeout;
+    this.bytesPerChecksum = bytesPerChecksum;
+    this.checksumType = checksumType;
+    this.openID = openID;
+    this.excludeList = new ExcludeList();
+
+    Preconditions.checkState(chunkSize > 0);
+    Preconditions.checkState(streamBufferFlushSize > 0);
+    Preconditions.checkState(streamBufferMaxSize > 0);
+    Preconditions.checkState(blockSize > 0);
+    Preconditions.checkState(streamBufferFlushSize % chunkSize == 0);
+    Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0);
+    Preconditions.checkState(blockSize % streamBufferMaxSize == 0);
+    this.bufferPool =
+        new BufferPool(chunkSize, (int) streamBufferMaxSize / chunkSize);
+  }
+
+  public BlockOutputStreamEntryPool() {
+    streamEntries = new ArrayList<>();
+    omClient = null;
+    keyArgs = null;
+    xceiverClientManager = null;
+    chunkSize = 0;
+    requestID = null;
+    streamBufferFlushSize = 0;
+    streamBufferMaxSize = 0;
+    bufferPool = new BufferPool(chunkSize, 1);
+    watchTimeout = 0;
+    blockSize = 0;
+    this.checksumType = ContainerProtos.ChecksumType.valueOf(
+        OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
+    this.bytesPerChecksum = OzoneConfigKeys
+        .OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB
+    currentStreamIndex = 0;
+    openID = -1;
+  }
+
+  /**
+   * When a key is opened, it is possible that there are some blocks already
+   * allocated to it for this open session. In this case, to make use of these
+   * blocks, we need to add these blocks to stream entries. But, a key's version
+   * also includes blocks from previous versions, we need to avoid adding these
+   * old blocks to stream entries, because these old blocks should not be picked
+   * for write. To do this, the following method checks that, only those
+   * blocks created in this particular open version are added to stream entries.
+   *
+   * @param version the set of blocks that are pre-allocated.
+   * @param openVersion the version corresponding to the pre-allocation.
+   * @throws IOException
+   */
+  public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
+      long openVersion) throws IOException {
+    // server may return any number of blocks, (0 to any)
+    // only the blocks allocated in this open session (block createVersion
+    // equals to open session version)
+    for (OmKeyLocationInfo subKeyInfo : version.getLocationList()) {
+      if (subKeyInfo.getCreateVersion() == openVersion) {
+        addKeyLocationInfo(subKeyInfo);
+      }
+    }
+  }
+
+  private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo)
+      throws IOException {
+    Preconditions.checkNotNull(subKeyInfo.getPipeline());
+    UserGroupInformation.getCurrentUser().addToken(subKeyInfo.getToken());
+    BlockOutputStreamEntry.Builder builder =
+        new BlockOutputStreamEntry.Builder()
+            .setBlockID(subKeyInfo.getBlockID())
+            .setKey(keyArgs.getKeyName())
+            .setXceiverClientManager(xceiverClientManager)
+            .setPipeline(subKeyInfo.getPipeline())
+            .setRequestId(requestID)
+            .setChunkSize(chunkSize)
+            .setLength(subKeyInfo.getLength())
+            .setStreamBufferFlushSize(streamBufferFlushSize)
+            .setStreamBufferMaxSize(streamBufferMaxSize)
+            .setWatchTimeout(watchTimeout)
+            .setbufferPool(bufferPool)
+            .setChecksumType(checksumType)
+            .setBytesPerChecksum(bytesPerChecksum)
+            .setToken(subKeyInfo.getToken());
+    streamEntries.add(builder.build());
+  }
+
+  public List<OmKeyLocationInfo> getLocationInfoList()  {
+    List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
+    for (BlockOutputStreamEntry streamEntry : streamEntries) {
+      long length = streamEntry.getCurrentPosition();
+
+      // Commit only those blocks to OzoneManager which are not empty
+      if (length != 0) {
+        OmKeyLocationInfo info =
+            new OmKeyLocationInfo.Builder().setBlockID(streamEntry.getBlockID())
+                .setLength(streamEntry.getCurrentPosition()).setOffset(0)
+                .setToken(streamEntry.getToken())
+                .setPipeline(streamEntry.getPipeline()).build();
+        locationInfoList.add(info);
+      }
+      LOG.debug(
+          "block written " + streamEntry.getBlockID() + ", length " + length
+              + " bcsID " + streamEntry.getBlockID()
+              .getBlockCommitSequenceId());
+    }
+    return locationInfoList;
+  }
+
+  /**
+   * Discards the subsequent pre allocated blocks and removes the streamEntries
+   * from the streamEntries list for the container which is closed.
+   * @param containerID id of the closed container
+   * @param pipelineId id of the associated pipeline
+   */
+  void discardPreallocatedBlocks(long containerID, PipelineID pipelineId) {
+    // currentStreamIndex < streamEntries.size() signifies that, there are still
+    // pre allocated blocks available.
+
+    // This will be called only to discard the next subsequent unused blocks
+    // in the streamEntryList.
+    if (currentStreamIndex + 1 < streamEntries.size()) {
+      ListIterator<BlockOutputStreamEntry> streamEntryIterator =
+          streamEntries.listIterator(currentStreamIndex + 1);
+      while (streamEntryIterator.hasNext()) {
+        BlockOutputStreamEntry streamEntry = streamEntryIterator.next();
+        Preconditions.checkArgument(streamEntry.getCurrentPosition() == 0);
+        if ((pipelineId != null && streamEntry.getPipeline().getId()
+            .equals(pipelineId)) || (containerID != -1
+            && streamEntry.getBlockID().getContainerID() == containerID)) {
+          streamEntryIterator.remove();
+        }
+      }
+    }
+  }
+
+  List<BlockOutputStreamEntry> getStreamEntries() {
+    return streamEntries;
+  }
+
+  XceiverClientManager getXceiverClientManager() {
+    return xceiverClientManager;
+  }
+
+  String getKeyName() {
+    return keyArgs.getKeyName();
+  }
+
+  long getKeyLength() {
+    return streamEntries.stream().mapToLong(e -> e.getCurrentPosition()).sum();
+  }
+  /**
+   * Contact OM to get a new block. Set the new block with the index (e.g.
+   * first block has index = 0, second has index = 1 etc.)
+   *
+   * The returned block is made to new BlockOutputStreamEntry to write.
+   *
+   * @throws IOException
+   */
+  private void allocateNewBlock() throws IOException {
+    OmKeyLocationInfo subKeyInfo =
+        omClient.allocateBlock(keyArgs, openID, excludeList);
+    addKeyLocationInfo(subKeyInfo);
+  }
+
+
+  void commitKey(long offset) throws IOException {
+    if (keyArgs != null) {
+      // in test, this could be null
+      long length = getKeyLength();
+      Preconditions.checkArgument(offset == length);
+      keyArgs.setDataSize(length);
+      keyArgs.setLocationInfoList(getLocationInfoList());
+      // When the key is multipart upload part file upload, we should not
+      // commit the key, as this is not an actual key, this is a just a
+      // partial key of a large file.
+      if (keyArgs.getIsMultipartKey()) {
+        commitUploadPartInfo =
+            omClient.commitMultipartUploadPart(keyArgs, openID);
+      } else {
+        omClient.commitKey(keyArgs, openID);
+      }
+    } else {
+      LOG.warn("Closing KeyOutputStream, but key args is null");
+    }
+  }
+
+  public BlockOutputStreamEntry getCurrentStreamEntry() {
+    if (streamEntries.isEmpty() || streamEntries.size() <= currentStreamIndex) {
+      return null;
+    } else {
+      return streamEntries.get(currentStreamIndex);
+    }
+  }
+
+  BlockOutputStreamEntry allocateBlockIfNeeded() throws IOException {
+    BlockOutputStreamEntry streamEntry = getCurrentStreamEntry();
+    if (streamEntry != null && streamEntry.isClosed()) {
+      // a stream entry gets closed either by :
+      // a. If the stream gets full
+      // b. it has encountered an exception
+      currentStreamIndex++;
+    }
+    if (streamEntries.size() <= currentStreamIndex) {
+      Preconditions.checkNotNull(omClient);
+      // allocate a new block, if a exception happens, log an error and
+      // throw exception to the caller directly, and the write fails.
+      int succeededAllocates = 0;
+      try {
+        allocateNewBlock();
+        succeededAllocates += 1;
+      } catch (IOException ioe) {
+        LOG.error("Try to allocate more blocks for write failed, already "
+            + "allocated " + succeededAllocates + " blocks for this write.");
+        throw ioe;
+      }
+    }
+    // in theory, this condition should never violate due the check above
+    // still do a sanity check.
+    Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
+    BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex);
+    return current;
+  }
+
+  long computeBufferData() {
+    return bufferPool.computeBufferData();
+  }
+
+  void cleanup() {
+    if (excludeList != null) {
+      excludeList.clear();
+      excludeList = null;
+    }
+    if (bufferPool != null) {
+      bufferPool.clearBufferPool();
+    }
+
+    if (streamEntries != null) {
+      streamEntries.clear();
+    }
+  }
+
+  public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
+    return commitUploadPartInfo;
+  }
+
+  public ExcludeList getExcludeList() {
+    return excludeList;
+  }
+
+  public long getStreamBufferMaxSize() {
+    return streamBufferMaxSize;
+  }
+
+  boolean isEmpty() {
+    return streamEntries.isEmpty();
+  }
+}

+ 68 - 306
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java

@@ -23,21 +23,18 @@ import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
-import org.apache.hadoop.hdds.scm.storage.BufferPool;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.ozone.client.OzoneClientUtils;
 import org.apache.hadoop.ozone.om.helpers.*;
 import org.apache.hadoop.ozone.om.helpers.*;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ratis.protocol.AlreadyClosedException;
 import org.apache.ratis.protocol.AlreadyClosedException;
 import org.apache.ratis.protocol.GroupMismatchException;
 import org.apache.ratis.protocol.GroupMismatchException;
 import org.apache.ratis.protocol.RaftRetryFailureException;
 import org.apache.ratis.protocol.RaftRetryFailureException;
@@ -47,10 +44,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.InterruptedIOException;
 import java.io.OutputStream;
 import java.io.OutputStream;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
 import java.util.Collection;
 import java.util.Collection;
-import java.util.ListIterator;
 import java.util.Map;
 import java.util.Map;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 import java.util.function.Function;
@@ -77,84 +72,41 @@ public class KeyOutputStream extends OutputStream {
   public static final Logger LOG =
   public static final Logger LOG =
       LoggerFactory.getLogger(KeyOutputStream.class);
       LoggerFactory.getLogger(KeyOutputStream.class);
 
 
-  // array list's get(index) is O(1)
-  private final ArrayList<BlockOutputStreamEntry> streamEntries;
-  private int currentStreamIndex;
-  private final OzoneManagerProtocol omClient;
-  private final OmKeyArgs keyArgs;
-  private final long openID;
-  private final XceiverClientManager xceiverClientManager;
-  private final int chunkSize;
-  private final String requestID;
   private boolean closed;
   private boolean closed;
-  private final long streamBufferFlushSize;
-  private final long streamBufferMaxSize;
-  private final long watchTimeout;
-  private final long blockSize;
-  private final int bytesPerChecksum;
-  private final ChecksumType checksumType;
-  private final BufferPool bufferPool;
-  private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
   private FileEncryptionInfo feInfo;
   private FileEncryptionInfo feInfo;
-  private ExcludeList excludeList;
   private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
   private final Map<Class<? extends Throwable>, RetryPolicy> retryPolicyMap;
   private int retryCount;
   private int retryCount;
   private long offset;
   private long offset;
+  private final BlockOutputStreamEntryPool blockOutputStreamEntryPool;
+
   /**
   /**
    * A constructor for testing purpose only.
    * A constructor for testing purpose only.
    */
    */
   @VisibleForTesting
   @VisibleForTesting
-  @SuppressWarnings("parameternumber")
   public KeyOutputStream() {
   public KeyOutputStream() {
-    streamEntries = new ArrayList<>();
-    omClient = null;
-    keyArgs = null;
-    openID = -1;
-    xceiverClientManager = null;
-    chunkSize = 0;
-    requestID = null;
     closed = false;
     closed = false;
-    streamBufferFlushSize = 0;
-    streamBufferMaxSize = 0;
-    bufferPool = new BufferPool(chunkSize, 1);
-    watchTimeout = 0;
-    blockSize = 0;
-    this.checksumType = ChecksumType.valueOf(
-        OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
-    this.bytesPerChecksum = OzoneConfigKeys
-        .OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB
-    this.retryPolicyMap = OzoneClientUtils.getExceptionList()
+    this.retryPolicyMap = HddsClientUtils.getExceptionList()
         .stream()
         .stream()
         .collect(Collectors.toMap(Function.identity(),
         .collect(Collectors.toMap(Function.identity(),
             e -> RetryPolicies.TRY_ONCE_THEN_FAIL));
             e -> RetryPolicies.TRY_ONCE_THEN_FAIL));
     retryCount = 0;
     retryCount = 0;
     offset = 0;
     offset = 0;
+    blockOutputStreamEntryPool = new BlockOutputStreamEntryPool();
   }
   }
 
 
   @VisibleForTesting
   @VisibleForTesting
   public List<BlockOutputStreamEntry> getStreamEntries() {
   public List<BlockOutputStreamEntry> getStreamEntries() {
-    return streamEntries;
+    return blockOutputStreamEntryPool.getStreamEntries();
   }
   }
+
   @VisibleForTesting
   @VisibleForTesting
   public XceiverClientManager getXceiverClientManager() {
   public XceiverClientManager getXceiverClientManager() {
-    return xceiverClientManager;
+    return blockOutputStreamEntryPool.getXceiverClientManager();
   }
   }
 
 
-  public List<OmKeyLocationInfo> getLocationInfoList() throws IOException {
-    List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
-    for (BlockOutputStreamEntry streamEntry : streamEntries) {
-      OmKeyLocationInfo info =
-          new OmKeyLocationInfo.Builder().setBlockID(streamEntry.getBlockID())
-              .setLength(streamEntry.getCurrentPosition()).setOffset(0)
-              .setToken(streamEntry.getToken())
-              .setPipeline(streamEntry.getPipeline())
-              .build();
-      LOG.debug("block written " + streamEntry.getBlockID() + ", length "
-          + streamEntry.getCurrentPosition() + " bcsID "
-          + streamEntry.getBlockID().getBlockCommitSequenceId());
-      locationInfoList.add(info);
-    }
-    return locationInfoList;
+  @VisibleForTesting
+  public List<OmKeyLocationInfo> getLocationInfoList() {
+    return blockOutputStreamEntryPool.getLocationInfoList();
   }
   }
 
 
   @VisibleForTesting
   @VisibleForTesting
@@ -171,41 +123,16 @@ public class KeyOutputStream extends OutputStream {
       ChecksumType checksumType, int bytesPerChecksum,
       ChecksumType checksumType, int bytesPerChecksum,
       String uploadID, int partNumber, boolean isMultipart,
       String uploadID, int partNumber, boolean isMultipart,
       int maxRetryCount, long retryInterval) {
       int maxRetryCount, long retryInterval) {
-    this.streamEntries = new ArrayList<>();
-    this.currentStreamIndex = 0;
-    this.omClient = omClient;
     OmKeyInfo info = handler.getKeyInfo();
     OmKeyInfo info = handler.getKeyInfo();
+    blockOutputStreamEntryPool =
+        new BlockOutputStreamEntryPool(omClient, chunkSize, requestId, factor,
+            type, bufferFlushSize, bufferMaxSize, size, watchTimeout,
+            checksumType, bytesPerChecksum, uploadID, partNumber, isMultipart,
+            info, xceiverClientManager, handler.getId());
     // Retrieve the file encryption key info, null if file is not in
     // Retrieve the file encryption key info, null if file is not in
     // encrypted bucket.
     // encrypted bucket.
     this.feInfo = info.getFileEncryptionInfo();
     this.feInfo = info.getFileEncryptionInfo();
-    this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName())
-        .setBucketName(info.getBucketName()).setKeyName(info.getKeyName())
-        .setType(type).setFactor(factor).setDataSize(info.getDataSize())
-        .setIsMultipartKey(isMultipart).setMultipartUploadID(
-            uploadID).setMultipartUploadPartNumber(partNumber)
-        .build();
-    this.openID = handler.getId();
-    this.xceiverClientManager = xceiverClientManager;
-    this.chunkSize = chunkSize;
-    this.requestID = requestId;
-    this.streamBufferFlushSize = bufferFlushSize;
-    this.streamBufferMaxSize = bufferMaxSize;
-    this.blockSize = size;
-    this.watchTimeout = watchTimeout;
-    this.bytesPerChecksum = bytesPerChecksum;
-    this.checksumType = checksumType;
-
-    Preconditions.checkState(chunkSize > 0);
-    Preconditions.checkState(streamBufferFlushSize > 0);
-    Preconditions.checkState(streamBufferMaxSize > 0);
-    Preconditions.checkState(blockSize > 0);
-    Preconditions.checkState(streamBufferFlushSize % chunkSize == 0);
-    Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0);
-    Preconditions.checkState(blockSize % streamBufferMaxSize == 0);
-    this.bufferPool =
-        new BufferPool(chunkSize, (int)streamBufferMaxSize / chunkSize);
-    this.excludeList = new ExcludeList();
-    this.retryPolicyMap = OzoneClientUtils.getRetryPolicyByException(
+    this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException(
         maxRetryCount, retryInterval);
         maxRetryCount, retryInterval);
     this.retryCount = 0;
     this.retryCount = 0;
   }
   }
@@ -225,37 +152,7 @@ public class KeyOutputStream extends OutputStream {
    */
    */
   public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
   public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
       long openVersion) throws IOException {
       long openVersion) throws IOException {
-    // server may return any number of blocks, (0 to any)
-    // only the blocks allocated in this open session (block createVersion
-    // equals to open session version)
-    for (OmKeyLocationInfo subKeyInfo : version.getLocationList()) {
-      if (subKeyInfo.getCreateVersion() == openVersion) {
-        addKeyLocationInfo(subKeyInfo);
-      }
-    }
-  }
-
-  private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo)
-      throws IOException {
-    Preconditions.checkNotNull(subKeyInfo.getPipeline());
-    UserGroupInformation.getCurrentUser().addToken(subKeyInfo.getToken());
-    BlockOutputStreamEntry.Builder builder =
-        new BlockOutputStreamEntry.Builder()
-            .setBlockID(subKeyInfo.getBlockID())
-            .setKey(keyArgs.getKeyName())
-            .setXceiverClientManager(xceiverClientManager)
-            .setPipeline(subKeyInfo.getPipeline())
-            .setRequestId(requestID)
-            .setChunkSize(chunkSize)
-            .setLength(subKeyInfo.getLength())
-            .setStreamBufferFlushSize(streamBufferFlushSize)
-            .setStreamBufferMaxSize(streamBufferMaxSize)
-            .setWatchTimeout(watchTimeout)
-            .setbufferPool(bufferPool)
-            .setChecksumType(checksumType)
-            .setBytesPerChecksum(bytesPerChecksum)
-            .setToken(subKeyInfo.getToken());
-    streamEntries.add(builder.build());
+    blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion);
   }
   }
 
 
   @Override
   @Override
@@ -294,34 +191,12 @@ public class KeyOutputStream extends OutputStream {
     handleWrite(b, off, len, false);
     handleWrite(b, off, len, false);
   }
   }
 
 
-  private long computeBufferData() {
-    return bufferPool.computeBufferData();
-  }
-
   private void handleWrite(byte[] b, int off, long len, boolean retry)
   private void handleWrite(byte[] b, int off, long len, boolean retry)
       throws IOException {
       throws IOException {
-    int succeededAllocates = 0;
     while (len > 0) {
     while (len > 0) {
       try {
       try {
-        if (streamEntries.size() <= currentStreamIndex) {
-          Preconditions.checkNotNull(omClient);
-          // allocate a new block, if a exception happens, log an error and
-          // throw exception to the caller directly, and the write fails.
-          try {
-            allocateNewBlock(currentStreamIndex);
-            succeededAllocates += 1;
-          } catch (IOException ioe) {
-            LOG.error("Try to allocate more blocks for write failed, already "
-                + "allocated " + succeededAllocates
-                + " blocks for this write.");
-            throw ioe;
-          }
-        }
-        // in theory, this condition should never violate due the check above
-        // still do a sanity check.
-        Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
-        BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex);
-
+        BlockOutputStreamEntry current =
+            blockOutputStreamEntryPool.allocateBlockIfNeeded();
         // length(len) will be in int range if the call is happening through
         // length(len) will be in int range if the call is happening through
         // write API of blockOutputStream. Length can be in long range if it
         // write API of blockOutputStream. Length can be in long range if it
         // comes via Exception path.
         // comes via Exception path.
@@ -342,7 +217,8 @@ public class KeyOutputStream extends OutputStream {
           // to or less than the max length of the buffer allocated.
           // to or less than the max length of the buffer allocated.
           // The len specified here is the combined sum of the data length of
           // The len specified here is the combined sum of the data length of
           // the buffers
           // the buffers
-          Preconditions.checkState(!retry || len <= streamBufferMaxSize);
+          Preconditions.checkState(!retry || len <= blockOutputStreamEntryPool
+              .getStreamBufferMaxSize());
           int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
           int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
           writeLen = retry ? (int) len : dataWritten;
           writeLen = retry ? (int) len : dataWritten;
           // In retry path, the data written is already accounted in offset.
           // In retry path, the data written is already accounted in offset.
@@ -350,7 +226,7 @@ public class KeyOutputStream extends OutputStream {
             offset += writeLen;
             offset += writeLen;
           }
           }
           LOG.debug("writeLen {}, total len {}", writeLen, len);
           LOG.debug("writeLen {}, total len {}", writeLen, len);
-          handleException(current, currentStreamIndex, ioe);
+          handleException(current, ioe);
         }
         }
         if (current.getRemaining() <= 0) {
         if (current.getRemaining() <= 0) {
           // since the current block is already written close the stream.
           // since the current block is already written close the stream.
@@ -365,80 +241,19 @@ public class KeyOutputStream extends OutputStream {
     }
     }
   }
   }
 
 
-  /**
-   * Discards the subsequent pre allocated blocks and removes the streamEntries
-   * from the streamEntries list for the container which is closed.
-   * @param containerID id of the closed container
-   * @param pipelineId id of the associated pipeline
-   * @param streamIndex index of the stream
-   */
-  private void discardPreallocatedBlocks(long containerID,
-      PipelineID pipelineId, int streamIndex) {
-    // streamIndex < streamEntries.size() signifies that, there are still
-    // pre allocated blocks available.
-
-    // This will be called only to discard the next subsequent unused blocks
-    // in the streamEntryList.
-    if (streamIndex < streamEntries.size()) {
-      ListIterator<BlockOutputStreamEntry> streamEntryIterator =
-          streamEntries.listIterator(streamIndex);
-      while (streamEntryIterator.hasNext()) {
-        BlockOutputStreamEntry streamEntry = streamEntryIterator.next();
-        Preconditions.checkArgument(streamEntry.getCurrentPosition() == 0);
-        if (((pipelineId != null && streamEntry.getPipeline().getId()
-            .equals(pipelineId)) || (containerID != -1
-            && streamEntry.getBlockID().getContainerID() == containerID))) {
-          streamEntryIterator.remove();
-        }
-      }
-    }
-  }
-
-  /**
-   * It might be possible that the blocks pre allocated might never get written
-   * while the stream gets closed normally. In such cases, it would be a good
-   * idea to trim down the locationInfoList by removing the unused blocks if any
-   * so as only the used block info gets updated on OzoneManager during close.
-   */
-  private void removeEmptyBlocks() {
-    if (currentStreamIndex < streamEntries.size()) {
-      ListIterator<BlockOutputStreamEntry> streamEntryIterator =
-          streamEntries.listIterator(currentStreamIndex);
-      while (streamEntryIterator.hasNext()) {
-        if (streamEntryIterator.next().getCurrentPosition() == 0) {
-          streamEntryIterator.remove();
-        }
-      }
-    }
-  }
-
-  private void cleanup() {
-    if (excludeList != null) {
-      excludeList.clear();
-      excludeList = null;
-    }
-    if (bufferPool != null) {
-      bufferPool.clearBufferPool();
-    }
-
-    if (streamEntries != null) {
-      streamEntries.clear();
-    }
-  }
   /**
   /**
    * It performs following actions :
    * It performs following actions :
    * a. Updates the committed length at datanode for the current stream in
    * a. Updates the committed length at datanode for the current stream in
-   *    datanode.
+   * datanode.
    * b. Reads the data from the underlying buffer and writes it the next stream.
    * b. Reads the data from the underlying buffer and writes it the next stream.
    *
    *
    * @param streamEntry StreamEntry
    * @param streamEntry StreamEntry
-   * @param streamIndex Index of the entry
-   * @param exception actual exception that occurred
+   * @param exception   actual exception that occurred
    * @throws IOException Throws IOException if Write fails
    * @throws IOException Throws IOException if Write fails
    */
    */
   private void handleException(BlockOutputStreamEntry streamEntry,
   private void handleException(BlockOutputStreamEntry streamEntry,
-      int streamIndex, IOException exception) throws IOException {
-    Throwable t = checkForException(exception);
+      IOException exception) throws IOException {
+    Throwable t = HddsClientUtils.checkForException(exception);
     boolean retryFailure = checkForRetryFailure(t);
     boolean retryFailure = checkForRetryFailure(t);
     boolean closedContainerException = false;
     boolean closedContainerException = false;
     if (!retryFailure) {
     if (!retryFailure) {
@@ -448,15 +263,19 @@ public class KeyOutputStream extends OutputStream {
     long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength();
     long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength();
     //set the correct length for the current stream
     //set the correct length for the current stream
     streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
     streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
-    long bufferedDataLen = computeBufferData();
-    LOG.warn("Encountered exception {}. The last committed block length is {}, "
+    long bufferedDataLen = blockOutputStreamEntryPool.computeBufferData();
+    LOG.debug(
+        "Encountered exception {}. The last committed block length is {}, "
             + "uncommitted data length is {} retry count {}", exception,
             + "uncommitted data length is {} retry count {}", exception,
         totalSuccessfulFlushedData, bufferedDataLen, retryCount);
         totalSuccessfulFlushedData, bufferedDataLen, retryCount);
-    Preconditions.checkArgument(bufferedDataLen <= streamBufferMaxSize);
-    Preconditions.checkArgument(offset - getKeyLength() == bufferedDataLen);
+    Preconditions.checkArgument(
+        bufferedDataLen <= blockOutputStreamEntryPool.getStreamBufferMaxSize());
+    Preconditions.checkArgument(
+        offset - blockOutputStreamEntryPool.getKeyLength() == bufferedDataLen);
     long containerId = streamEntry.getBlockID().getContainerID();
     long containerId = streamEntry.getBlockID().getContainerID();
     Collection<DatanodeDetails> failedServers = streamEntry.getFailedServers();
     Collection<DatanodeDetails> failedServers = streamEntry.getFailedServers();
     Preconditions.checkNotNull(failedServers);
     Preconditions.checkNotNull(failedServers);
+    ExcludeList excludeList = blockOutputStreamEntryPool.getExcludeList();
     if (!failedServers.isEmpty()) {
     if (!failedServers.isEmpty()) {
       excludeList.addDatanodes(failedServers);
       excludeList.addDatanodes(failedServers);
     }
     }
@@ -470,45 +289,42 @@ public class KeyOutputStream extends OutputStream {
     // just clean up the current stream.
     // just clean up the current stream.
     streamEntry.cleanup(retryFailure);
     streamEntry.cleanup(retryFailure);
 
 
-    // discard all sunsequent blocks the containers and pipelines which
+    // discard all subsequent blocks the containers and pipelines which
     // are in the exclude list so that, the very next retry should never
     // are in the exclude list so that, the very next retry should never
     // write data on the  closed container/pipeline
     // write data on the  closed container/pipeline
     if (closedContainerException) {
     if (closedContainerException) {
       // discard subsequent pre allocated blocks from the streamEntries list
       // discard subsequent pre allocated blocks from the streamEntries list
       // from the closed container
       // from the closed container
-      discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(), null,
-          streamIndex + 1);
+      blockOutputStreamEntryPool
+          .discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(),
+              null);
     } else {
     } else {
       // In case there is timeoutException or Watch for commit happening over
       // In case there is timeoutException or Watch for commit happening over
       // majority or the client connection failure to the leader in the
       // majority or the client connection failure to the leader in the
-      // pipeline, just discard all the preallocated blocks on this pipeline.
+      // pipeline, just discard all the pre allocated blocks on this pipeline.
       // Next block allocation will happen with excluding this specific pipeline
       // Next block allocation will happen with excluding this specific pipeline
       // This will ensure if 2 way commit happens , it cannot span over multiple
       // This will ensure if 2 way commit happens , it cannot span over multiple
       // blocks
       // blocks
-      discardPreallocatedBlocks(-1, pipelineId, streamIndex + 1);
+      blockOutputStreamEntryPool
+          .discardPreallocatedBlocks(-1, pipelineId);
     }
     }
     if (bufferedDataLen > 0) {
     if (bufferedDataLen > 0) {
       // If the data is still cached in the underlying stream, we need to
       // If the data is still cached in the underlying stream, we need to
       // allocate new block and write this data in the datanode.
       // allocate new block and write this data in the datanode.
-      currentStreamIndex += 1;
       handleRetry(exception, bufferedDataLen);
       handleRetry(exception, bufferedDataLen);
       // reset the retryCount after handling the exception
       // reset the retryCount after handling the exception
       retryCount = 0;
       retryCount = 0;
     }
     }
-    if (totalSuccessfulFlushedData == 0) {
-      streamEntries.remove(streamIndex);
-      currentStreamIndex -= 1;
-    }
   }
   }
 
 
   private void markStreamClosed() {
   private void markStreamClosed() {
-    cleanup();
+    blockOutputStreamEntryPool.cleanup();
     closed = true;
     closed = true;
   }
   }
 
 
   private void handleRetry(IOException exception, long len) throws IOException {
   private void handleRetry(IOException exception, long len) throws IOException {
-    RetryPolicy retryPolicy =
-        retryPolicyMap.get(checkForException(exception).getClass());
+    RetryPolicy retryPolicy = retryPolicyMap
+        .get(HddsClientUtils.checkForException(exception).getClass());
     if (retryPolicy == null) {
     if (retryPolicy == null) {
       retryPolicy = retryPolicyMap.get(Exception.class);
       retryPolicy = retryPolicyMap.get(Exception.class);
     }
     }
@@ -544,10 +360,11 @@ public class KeyOutputStream extends OutputStream {
       }
       }
     }
     }
     retryCount++;
     retryCount++;
-    LOG.trace("Retrying Write request. Already tried "
-        + retryCount + " time(s); retry policy is " + retryPolicy);
+    LOG.trace("Retrying Write request. Already tried " + retryCount
+        + " time(s); retry policy is " + retryPolicy);
     handleWrite(null, 0, len, true);
     handleWrite(null, 0, len, true);
   }
   }
+
   /**
   /**
    * Checks if the provided exception signifies retry failure in ratis client.
    * Checks if the provided exception signifies retry failure in ratis client.
    * In case of retry failure, ratis client throws RaftRetryFailureException
    * In case of retry failure, ratis client throws RaftRetryFailureException
@@ -562,40 +379,6 @@ public class KeyOutputStream extends OutputStream {
     return t instanceof ContainerNotOpenException;
     return t instanceof ContainerNotOpenException;
   }
   }
 
 
-  public Throwable checkForException(IOException ioe) throws IOException {
-    Throwable t = ioe.getCause();
-    while (t != null) {
-      for (Class<? extends Exception> cls : OzoneClientUtils
-          .getExceptionList()) {
-        if (cls.isInstance(t)) {
-          return t;
-        }
-      }
-      t = t.getCause();
-    }
-    throw ioe;
-  }
-
-  private long getKeyLength() {
-    return streamEntries.stream().mapToLong(e -> e.getCurrentPosition())
-        .sum();
-  }
-
-  /**
-   * Contact OM to get a new block. Set the new block with the index (e.g.
-   * first block has index = 0, second has index = 1 etc.)
-   *
-   * The returned block is made to new BlockOutputStreamEntry to write.
-   *
-   * @param index the index of the block.
-   * @throws IOException
-   */
-  private void allocateNewBlock(int index) throws IOException {
-    OmKeyLocationInfo subKeyInfo =
-        omClient.allocateBlock(keyArgs, openID, excludeList);
-    addKeyLocationInfo(subKeyInfo);
-  }
-
   @Override
   @Override
   public void flush() throws IOException {
   public void flush() throws IOException {
     checkNotClosed();
     checkNotClosed();
@@ -612,20 +395,19 @@ public class KeyOutputStream extends OutputStream {
    * written to new stream , it will be at max half full. In such cases, we
    * written to new stream , it will be at max half full. In such cases, we
    * should just write the data and not close the stream as the block won't be
    * should just write the data and not close the stream as the block won't be
    * completely full.
    * completely full.
+   *
    * @param op Flag which decides whether to call close or flush on the
    * @param op Flag which decides whether to call close or flush on the
-   *              outputStream.
+   *           outputStream.
    * @throws IOException In case, flush or close fails with exception.
    * @throws IOException In case, flush or close fails with exception.
    */
    */
   private void handleFlushOrClose(StreamAction op) throws IOException {
   private void handleFlushOrClose(StreamAction op) throws IOException {
-    if (streamEntries.size() == 0) {
+    if (blockOutputStreamEntryPool.isEmpty()) {
       return;
       return;
     }
     }
     while (true) {
     while (true) {
       try {
       try {
-        int size = streamEntries.size();
-        int streamIndex =
-            currentStreamIndex >= size ? size - 1 : currentStreamIndex;
-        BlockOutputStreamEntry entry = streamEntries.get(streamIndex);
+        BlockOutputStreamEntry entry =
+            blockOutputStreamEntryPool.getCurrentStreamEntry();
         if (entry != null) {
         if (entry != null) {
           try {
           try {
             Collection<DatanodeDetails> failedServers =
             Collection<DatanodeDetails> failedServers =
@@ -633,7 +415,8 @@ public class KeyOutputStream extends OutputStream {
             // failed servers can be null in case there is no data written in
             // failed servers can be null in case there is no data written in
             // the stream
             // the stream
             if (failedServers != null && !failedServers.isEmpty()) {
             if (failedServers != null && !failedServers.isEmpty()) {
-              excludeList.addDatanodes(failedServers);
+              blockOutputStreamEntryPool.getExcludeList()
+                  .addDatanodes(failedServers);
             }
             }
             switch (op) {
             switch (op) {
             case CLOSE:
             case CLOSE:
@@ -642,7 +425,6 @@ public class KeyOutputStream extends OutputStream {
             case FULL:
             case FULL:
               if (entry.getRemaining() == 0) {
               if (entry.getRemaining() == 0) {
                 entry.close();
                 entry.close();
-                currentStreamIndex++;
               }
               }
               break;
               break;
             case FLUSH:
             case FLUSH:
@@ -652,7 +434,7 @@ public class KeyOutputStream extends OutputStream {
               throw new IOException("Invalid Operation");
               throw new IOException("Invalid Operation");
             }
             }
           } catch (IOException ioe) {
           } catch (IOException ioe) {
-            handleException(entry, streamIndex, ioe);
+            handleException(entry, ioe);
             continue;
             continue;
           }
           }
         }
         }
@@ -677,34 +459,16 @@ public class KeyOutputStream extends OutputStream {
     closed = true;
     closed = true;
     try {
     try {
       handleFlushOrClose(StreamAction.CLOSE);
       handleFlushOrClose(StreamAction.CLOSE);
-      if (keyArgs != null) {
-        // in test, this could be null
-        removeEmptyBlocks();
-        long length = getKeyLength();
-        Preconditions.checkArgument(offset == length);
-        keyArgs.setDataSize(length);
-        keyArgs.setLocationInfoList(getLocationInfoList());
-        // When the key is multipart upload part file upload, we should not
-        // commit the key, as this is not an actual key, this is a just a
-        // partial key of a large file.
-        if (keyArgs.getIsMultipartKey()) {
-          commitUploadPartInfo = omClient.commitMultipartUploadPart(keyArgs,
-              openID);
-        } else {
-          omClient.commitKey(keyArgs, openID);
-        }
-      } else {
-        LOG.warn("Closing KeyOutputStream, but key args is null");
-      }
+      blockOutputStreamEntryPool.commitKey(offset);
     } catch (IOException ioe) {
     } catch (IOException ioe) {
       throw ioe;
       throw ioe;
     } finally {
     } finally {
-      cleanup();
+      blockOutputStreamEntryPool.cleanup();
     }
     }
   }
   }
 
 
   public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
   public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
-    return commitUploadPartInfo;
+    return blockOutputStreamEntryPool.getCommitUploadPartInfo();
   }
   }
 
 
   public FileEncryptionInfo getFileEncryptionInfo() {
   public FileEncryptionInfo getFileEncryptionInfo() {
@@ -713,7 +477,7 @@ public class KeyOutputStream extends OutputStream {
 
 
   @VisibleForTesting
   @VisibleForTesting
   public ExcludeList getExcludeList() {
   public ExcludeList getExcludeList() {
-    return excludeList;
+    return blockOutputStreamEntryPool.getExcludeList();
   }
   }
 
 
   /**
   /**
@@ -739,7 +503,6 @@ public class KeyOutputStream extends OutputStream {
     private int maxRetryCount;
     private int maxRetryCount;
     private long retryInterval;
     private long retryInterval;
 
 
-
     public Builder setMultipartUploadID(String uploadID) {
     public Builder setMultipartUploadID(String uploadID) {
       this.multipartUploadID = uploadID;
       this.multipartUploadID = uploadID;
       return this;
       return this;
@@ -760,8 +523,7 @@ public class KeyOutputStream extends OutputStream {
       return this;
       return this;
     }
     }
 
 
-    public Builder setOmClient(
-        OzoneManagerProtocol client) {
+    public Builder setOmClient(OzoneManagerProtocol client) {
       this.omClient = client;
       this.omClient = client;
       return this;
       return this;
     }
     }
@@ -806,12 +568,12 @@ public class KeyOutputStream extends OutputStream {
       return this;
       return this;
     }
     }
 
 
-    public Builder setChecksumType(ChecksumType cType){
+    public Builder setChecksumType(ChecksumType cType) {
       this.checksumType = cType;
       this.checksumType = cType;
       return this;
       return this;
     }
     }
 
 
-    public Builder setBytesPerChecksum(int bytes){
+    public Builder setBytesPerChecksum(int bytes) {
       this.bytesPerChecksum = bytes;
       this.bytesPerChecksum = bytes;
       return this;
       return this;
     }
     }
@@ -831,9 +593,9 @@ public class KeyOutputStream extends OutputStream {
       return this;
       return this;
     }
     }
 
 
-    public KeyOutputStream build() throws IOException {
-      return new KeyOutputStream(openHandler, xceiverManager,
-          omClient, chunkSize, requestID, factor, type, streamBufferFlushSize,
+    public KeyOutputStream build() {
+      return new KeyOutputStream(openHandler, xceiverManager, omClient,
+          chunkSize, requestID, factor, type, streamBufferFlushSize,
           streamBufferMaxSize, blockSize, watchTimeout, checksumType,
           streamBufferMaxSize, blockSize, watchTimeout, checksumType,
           bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey,
           bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey,
           maxRetryCount, retryInterval);
           maxRetryCount, retryInterval);
@@ -848,8 +610,8 @@ public class KeyOutputStream extends OutputStream {
   private void checkNotClosed() throws IOException {
   private void checkNotClosed() throws IOException {
     if (closed) {
     if (closed) {
       throw new IOException(
       throw new IOException(
-          ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: " + keyArgs
-              .getKeyName());
+          ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: "
+              + blockOutputStreamEntryPool.getKeyName());
     }
     }
   }
   }
 }
 }

+ 153 - 203
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java

@@ -24,8 +24,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
 import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
 import org.apache.hadoop.hdds.scm.XceiverClientRatis;
 import org.apache.hadoop.hdds.scm.XceiverClientRatis;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .ContainerNotOpenException;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -75,27 +75,23 @@ public class TestBlockOutputStreamWithFailures {
    *
    *
    * @throws IOException
    * @throws IOException
    */
    */
-  @Before
-  public void init() throws Exception {
+  @Before public void init() throws Exception {
     chunkSize = 100;
     chunkSize = 100;
     flushSize = 2 * chunkSize;
     flushSize = 2 * chunkSize;
     maxFlushSize = 2 * flushSize;
     maxFlushSize = 2 * flushSize;
     blockSize = 2 * maxFlushSize;
     blockSize = 2 * maxFlushSize;
-    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "1s");
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
-    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 5, TimeUnit.SECONDS);
     conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
     conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
     conf.setQuietMode(false);
     conf.setQuietMode(false);
     conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
     conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
         StorageUnit.MB);
         StorageUnit.MB);
-    cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(7)
-        .setBlockSize(blockSize)
-        .setChunkSize(chunkSize)
+    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(7)
+        .setBlockSize(blockSize).setChunkSize(chunkSize)
         .setStreamBufferFlushSize(flushSize)
         .setStreamBufferFlushSize(flushSize)
         .setStreamBufferMaxSize(maxFlushSize)
         .setStreamBufferMaxSize(maxFlushSize)
-        .setStreamBufferSizeUnit(StorageUnit.BYTES)
-        .build();
+        .setStreamBufferSizeUnit(StorageUnit.BYTES).build();
     cluster.waitForClusterToBeReady();
     cluster.waitForClusterToBeReady();
     //the easiest way to create an open container is creating a key
     //the easiest way to create an open container is creating a key
     client = OzoneClientFactory.getClient(conf);
     client = OzoneClientFactory.getClient(conf);
@@ -114,25 +110,24 @@ public class TestBlockOutputStreamWithFailures {
   /**
   /**
    * Shutdown MiniDFSCluster.
    * Shutdown MiniDFSCluster.
    */
    */
-  @After
-  public void shutdown() {
+  @After public void shutdown() {
     if (cluster != null) {
     if (cluster != null) {
       cluster.shutdown();
       cluster.shutdown();
     }
     }
   }
   }
 
 
-  @Test
-  public void testWatchForCommitWithCloseContainerException() throws Exception {
+  @Test public void testWatchForCommitWithCloseContainerException()
+      throws Exception {
     XceiverClientMetrics metrics =
     XceiverClientMetrics metrics =
         XceiverClientManager.getXceiverClientMetrics();
         XceiverClientManager.getXceiverClientMetrics();
-    long writeChunkCount = metrics.getContainerOpCountMetrics(
-        ContainerProtos.Type.WriteChunk);
-    long putBlockCount = metrics.getContainerOpCountMetrics(
-        ContainerProtos.Type.PutBlock);
-    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
-        ContainerProtos.Type.WriteChunk);
-    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
-        ContainerProtos.Type.PutBlock);
+    long writeChunkCount =
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
+    long putBlockCount =
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount =
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock);
     long totalOpCount = metrics.getTotalOpCount();
     long totalOpCount = metrics.getTotalOpCount();
     String keyName = getKeyName();
     String keyName = getKeyName();
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
@@ -155,15 +150,14 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 2,
     Assert.assertEquals(putBlockCount + 2,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 6,
-        metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
 
 
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
-    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+    KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
 
 
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
-    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
-        .getOutputStream();
+    OutputStream stream =
+        keyOutputStream.getStreamEntries().get(0).getOutputStream();
     Assert.assertTrue(stream instanceof BlockOutputStream);
     Assert.assertTrue(stream instanceof BlockOutputStream);
     BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
     BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
 
 
@@ -199,8 +193,7 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 3,
     Assert.assertEquals(putBlockCount + 3,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 8,
-        metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount());
 
 
     // flush is a sync call, all pending operations will complete
     // flush is a sync call, all pending operations will complete
     Assert.assertEquals(pendingWriteChunkCount,
     Assert.assertEquals(pendingWriteChunkCount,
@@ -233,9 +226,8 @@ public class TestBlockOutputStreamWithFailures {
     // rewritten plus one partial chunk plus two putBlocks for flushSize
     // rewritten plus one partial chunk plus two putBlocks for flushSize
     // and one flush for partial chunk
     // and one flush for partial chunk
     key.flush();
     key.flush();
-
     Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
-    Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
+    Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
         .getIoException()) instanceof ContainerNotOpenException);
         .getIoException()) instanceof ContainerNotOpenException);
 
 
     // Make sure the retryCount is reset after the exception is handled
     // Make sure the retryCount is reset after the exception is handled
@@ -247,8 +239,7 @@ public class TestBlockOutputStreamWithFailures {
     // make sure the bufferPool is empty
     // make sure the bufferPool is empty
     Assert
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
-    Assert
-        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
     Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(pendingWriteChunkCount,
     Assert.assertEquals(pendingWriteChunkCount,
@@ -259,25 +250,23 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 8,
     Assert.assertEquals(putBlockCount + 8,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 22,
-        metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount());
     // Written the same data twice
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
     String dataString = new String(data1, UTF_8);
     validateData(keyName, dataString.concat(dataString).getBytes());
     validateData(keyName, dataString.concat(dataString).getBytes());
   }
   }
 
 
-  @Test
-  public void testWatchForCommitDatanodeFailure() throws Exception {
+  @Test public void testWatchForCommitDatanodeFailure() throws Exception {
     XceiverClientMetrics metrics =
     XceiverClientMetrics metrics =
         XceiverClientManager.getXceiverClientMetrics();
         XceiverClientManager.getXceiverClientMetrics();
-    long writeChunkCount = metrics.getContainerOpCountMetrics(
-        ContainerProtos.Type.WriteChunk);
-    long putBlockCount = metrics.getContainerOpCountMetrics(
-        ContainerProtos.Type.PutBlock);
-    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
-        ContainerProtos.Type.WriteChunk);
-    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
-        ContainerProtos.Type.PutBlock);
+    long writeChunkCount =
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
+    long putBlockCount =
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount =
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock);
     long totalOpCount = metrics.getTotalOpCount();
     long totalOpCount = metrics.getTotalOpCount();
     String keyName = getKeyName();
     String keyName = getKeyName();
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
@@ -299,14 +288,13 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 2,
     Assert.assertEquals(putBlockCount + 2,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 6,
-        metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
-    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+    KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
 
 
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
-    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
-        .getOutputStream();
+    OutputStream stream =
+        keyOutputStream.getStreamEntries().get(0).getOutputStream();
     Assert.assertTrue(stream instanceof BlockOutputStream);
     Assert.assertTrue(stream instanceof BlockOutputStream);
     BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
     BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
 
 
@@ -344,8 +332,7 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 3,
     Assert.assertEquals(putBlockCount + 3,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 8,
-        metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount());
 
 
     // Since the data in the buffer is already flushed, flush here will have
     // Since the data in the buffer is already flushed, flush here will have
     // no impact on the counters and data structures
     // no impact on the counters and data structures
@@ -376,8 +363,7 @@ public class TestBlockOutputStreamWithFailures {
     Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     // now close the stream, It will update the ack length after watchForCommit
     // now close the stream, It will update the ack length after watchForCommit
     key.close();
     key.close();
-    Assert
-        .assertEquals(blockSize, blockOutputStream.getTotalAckDataLength());
+    Assert.assertEquals(blockSize, blockOutputStream.getTotalAckDataLength());
     // Make sure the retryCount is reset after the exception is handled
     // Make sure the retryCount is reset after the exception is handled
     Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
     Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
     // make sure the bufferPool is empty
     // make sure the bufferPool is empty
@@ -396,25 +382,23 @@ public class TestBlockOutputStreamWithFailures {
     // 4 flushes at flushSize boundaries + 2 flush for partial chunks
     // 4 flushes at flushSize boundaries + 2 flush for partial chunks
     Assert.assertEquals(putBlockCount + 6,
     Assert.assertEquals(putBlockCount + 6,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 16,
-        metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 16, metrics.getTotalOpCount());
     // Written the same data twice
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
     String dataString = new String(data1, UTF_8);
     validateData(keyName, dataString.concat(dataString).getBytes());
     validateData(keyName, dataString.concat(dataString).getBytes());
   }
   }
 
 
-  @Test
-  public void test2DatanodesFailure() throws Exception {
+  @Test public void test2DatanodesFailure() throws Exception {
     XceiverClientMetrics metrics =
     XceiverClientMetrics metrics =
         XceiverClientManager.getXceiverClientMetrics();
         XceiverClientManager.getXceiverClientMetrics();
-    long writeChunkCount = metrics.getContainerOpCountMetrics(
-        ContainerProtos.Type.WriteChunk);
-    long putBlockCount = metrics.getContainerOpCountMetrics(
-        ContainerProtos.Type.PutBlock);
-    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
-        ContainerProtos.Type.WriteChunk);
-    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
-        ContainerProtos.Type.PutBlock);
+    long writeChunkCount =
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
+    long putBlockCount =
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount =
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock);
     long totalOpCount = metrics.getTotalOpCount();
     long totalOpCount = metrics.getTotalOpCount();
     String keyName = getKeyName();
     String keyName = getKeyName();
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
@@ -436,14 +420,13 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 2,
     Assert.assertEquals(putBlockCount + 2,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 6,
-        metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
-    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+    KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
 
 
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
-    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
-        .getOutputStream();
+    OutputStream stream =
+        keyOutputStream.getStreamEntries().get(0).getOutputStream();
     Assert.assertTrue(stream instanceof BlockOutputStream);
     Assert.assertTrue(stream instanceof BlockOutputStream);
     BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
     BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
 
 
@@ -479,8 +462,7 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 3,
     Assert.assertEquals(putBlockCount + 3,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 8,
-        metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount());
 
 
     // Since the data in the buffer is already flushed, flush here will have
     // Since the data in the buffer is already flushed, flush here will have
     // no impact on the counters and data structures
     // no impact on the counters and data structures
@@ -512,7 +494,7 @@ public class TestBlockOutputStreamWithFailures {
     // rewritten plus one partial chunk plus two putBlocks for flushSize
     // rewritten plus one partial chunk plus two putBlocks for flushSize
     // and one flush for partial chunk
     // and one flush for partial chunk
     key.flush();
     key.flush();
-    Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
+    Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
         .getIoException()) instanceof RaftRetryFailureException);
         .getIoException()) instanceof RaftRetryFailureException);
     // Make sure the retryCount is reset after the exception is handled
     // Make sure the retryCount is reset after the exception is handled
     Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
     Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
@@ -522,8 +504,7 @@ public class TestBlockOutputStreamWithFailures {
     key.close();
     key.close();
     Assert
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
-    Assert
-        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
     Assert.assertEquals(pendingWriteChunkCount,
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
@@ -533,30 +514,27 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 8,
     Assert.assertEquals(putBlockCount + 8,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 22,
-        metrics.getTotalOpCount());
-    Assert
-        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount());
+    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     // make sure the bufferPool is empty
     // make sure the bufferPool is empty
     Assert
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size());
     validateData(keyName, data1);
     validateData(keyName, data1);
   }
   }
 
 
-  @Test
-  public void testFailureWithPrimeSizedData() throws Exception {
+  @Test public void testFailureWithPrimeSizedData() throws Exception {
     XceiverClientMetrics metrics =
     XceiverClientMetrics metrics =
         XceiverClientManager.getXceiverClientMetrics();
         XceiverClientManager.getXceiverClientMetrics();
-    long writeChunkCount = metrics.getContainerOpCountMetrics(
-        ContainerProtos.Type.WriteChunk);
-    long putBlockCount = metrics.getContainerOpCountMetrics(
-        ContainerProtos.Type.PutBlock);
-    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
-        ContainerProtos.Type.WriteChunk);
-    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
-        ContainerProtos.Type.PutBlock);
+    long writeChunkCount =
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
+    long putBlockCount =
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount =
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock);
     long totalOpCount = metrics.getTotalOpCount();
     long totalOpCount = metrics.getTotalOpCount();
     String keyName = getKeyName();
     String keyName = getKeyName();
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
@@ -577,24 +555,21 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount,
     Assert.assertEquals(putBlockCount,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 1,
-        metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 1, metrics.getTotalOpCount());
 
 
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
-    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+    KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
 
 
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
-    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
-        .getOutputStream();
+    OutputStream stream =
+        keyOutputStream.getStreamEntries().get(0).getOutputStream();
     Assert.assertTrue(stream instanceof BlockOutputStream);
     Assert.assertTrue(stream instanceof BlockOutputStream);
     BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
     BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
 
 
-
     Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
     Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
     Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
     Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
 
 
-    Assert.assertEquals(0,
-        blockOutputStream.getTotalDataFlushedLength());
+    Assert.assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
 
 
     Assert.assertTrue(blockOutputStream.getTotalAckDataLength() == 0);
     Assert.assertTrue(blockOutputStream.getTotalAckDataLength() == 0);
 
 
@@ -613,8 +588,7 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 1,
     Assert.assertEquals(putBlockCount + 1,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 3,
-        metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 3, metrics.getTotalOpCount());
 
 
     // Since the data in the buffer is already flushed, flush here will have
     // Since the data in the buffer is already flushed, flush here will have
     // no impact on the counters and data structures
     // no impact on the counters and data structures
@@ -641,7 +615,7 @@ public class TestBlockOutputStreamWithFailures {
     key.flush();
     key.flush();
 
 
     Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
-    Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
+    Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
         .getIoException()) instanceof ContainerNotOpenException);
         .getIoException()) instanceof ContainerNotOpenException);
     // Make sure the retryCount is reset after the exception is handled
     // Make sure the retryCount is reset after the exception is handled
     Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
     Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
@@ -653,8 +627,7 @@ public class TestBlockOutputStreamWithFailures {
     // make sure the bufferPool is empty
     // make sure the bufferPool is empty
     Assert
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
-    Assert
-        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
     Assert.assertEquals(pendingWriteChunkCount,
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
@@ -664,26 +637,24 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 3,
     Assert.assertEquals(putBlockCount + 3,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 9,
-        metrics.getTotalOpCount());
-    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0);
+    Assert.assertEquals(totalOpCount + 9, metrics.getTotalOpCount());
+    Assert.assertTrue(keyOutputStream.getLocationInfoList().size() == 0);
     // Written the same data twice
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
     String dataString = new String(data1, UTF_8);
     validateData(keyName, dataString.concat(dataString).getBytes());
     validateData(keyName, dataString.concat(dataString).getBytes());
   }
   }
 
 
-  @Test
-  public void testExceptionDuringClose() throws Exception {
+  @Test public void testExceptionDuringClose() throws Exception {
     XceiverClientMetrics metrics =
     XceiverClientMetrics metrics =
         XceiverClientManager.getXceiverClientMetrics();
         XceiverClientManager.getXceiverClientMetrics();
-    long writeChunkCount = metrics.getContainerOpCountMetrics(
-        ContainerProtos.Type.WriteChunk);
-    long putBlockCount = metrics.getContainerOpCountMetrics(
-        ContainerProtos.Type.PutBlock);
-    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
-        ContainerProtos.Type.WriteChunk);
-    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
-        ContainerProtos.Type.PutBlock);
+    long writeChunkCount =
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
+    long putBlockCount =
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount =
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock);
     long totalOpCount = metrics.getTotalOpCount();
     long totalOpCount = metrics.getTotalOpCount();
     String keyName = getKeyName();
     String keyName = getKeyName();
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
     OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
@@ -704,24 +675,21 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount,
     Assert.assertEquals(putBlockCount,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 1,
-        metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 1, metrics.getTotalOpCount());
 
 
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
-    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+    KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
 
 
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
-    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
-        .getOutputStream();
+    OutputStream stream =
+        keyOutputStream.getStreamEntries().get(0).getOutputStream();
     Assert.assertTrue(stream instanceof BlockOutputStream);
     Assert.assertTrue(stream instanceof BlockOutputStream);
     BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
     BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
 
 
-
     Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
     Assert.assertEquals(2, blockOutputStream.getBufferPool().getSize());
     Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
     Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
 
 
-    Assert.assertEquals(0,
-        blockOutputStream.getTotalDataFlushedLength());
+    Assert.assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
 
 
     Assert.assertTrue(blockOutputStream.getTotalAckDataLength() == 0);
     Assert.assertTrue(blockOutputStream.getTotalAckDataLength() == 0);
 
 
@@ -740,8 +708,7 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 1,
     Assert.assertEquals(putBlockCount + 1,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 3,
-        metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 3, metrics.getTotalOpCount());
 
 
     // Since the data in the buffer is already flushed, flush here will have
     // Since the data in the buffer is already flushed, flush here will have
     // no impact on the counters and data structures
     // no impact on the counters and data structures
@@ -767,15 +734,14 @@ public class TestBlockOutputStreamWithFailures {
     // now close the stream, It will hit exception
     // now close the stream, It will hit exception
     key.close();
     key.close();
 
 
-    Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
+    Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
         .getIoException()) instanceof ContainerNotOpenException);
         .getIoException()) instanceof ContainerNotOpenException);
     // Make sure the retryCount is reset after the exception is handled
     // Make sure the retryCount is reset after the exception is handled
     Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
     Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
     // make sure the bufferPool is empty
     // make sure the bufferPool is empty
     Assert
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
-    Assert
-        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
     Assert.assertEquals(pendingWriteChunkCount,
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
@@ -785,26 +751,24 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 3,
     Assert.assertEquals(putBlockCount + 3,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 9,
-        metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 9, metrics.getTotalOpCount());
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0);
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0);
     // Written the same data twice
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
     String dataString = new String(data1, UTF_8);
     validateData(keyName, dataString.concat(dataString).getBytes());
     validateData(keyName, dataString.concat(dataString).getBytes());
   }
   }
 
 
-  @Test
-  public void testWatchForCommitWithSingleNodeRatis() throws Exception {
+  @Test public void testWatchForCommitWithSingleNodeRatis() throws Exception {
     XceiverClientMetrics metrics =
     XceiverClientMetrics metrics =
         XceiverClientManager.getXceiverClientMetrics();
         XceiverClientManager.getXceiverClientMetrics();
-    long writeChunkCount = metrics.getContainerOpCountMetrics(
-        ContainerProtos.Type.WriteChunk);
-    long putBlockCount = metrics.getContainerOpCountMetrics(
-        ContainerProtos.Type.PutBlock);
-    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
-        ContainerProtos.Type.WriteChunk);
-    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
-        ContainerProtos.Type.PutBlock);
+    long writeChunkCount =
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
+    long putBlockCount =
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount =
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock);
     long totalOpCount = metrics.getTotalOpCount();
     long totalOpCount = metrics.getTotalOpCount();
     String keyName = getKeyName();
     String keyName = getKeyName();
     OzoneOutputStream key =
     OzoneOutputStream key =
@@ -828,15 +792,14 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 2,
     Assert.assertEquals(putBlockCount + 2,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 6,
-        metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
 
 
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
-    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+    KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
 
 
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
-    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
-        .getOutputStream();
+    OutputStream stream =
+        keyOutputStream.getStreamEntries().get(0).getOutputStream();
     Assert.assertTrue(stream instanceof BlockOutputStream);
     Assert.assertTrue(stream instanceof BlockOutputStream);
     BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
     BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
 
 
@@ -872,8 +835,7 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 3,
     Assert.assertEquals(putBlockCount + 3,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 8,
-        metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount());
 
 
     // flush is a sync call, all pending operations will complete
     // flush is a sync call, all pending operations will complete
     Assert.assertEquals(pendingWriteChunkCount,
     Assert.assertEquals(pendingWriteChunkCount,
@@ -907,7 +869,7 @@ public class TestBlockOutputStreamWithFailures {
     // and one flush for partial chunk
     // and one flush for partial chunk
     key.flush();
     key.flush();
 
 
-    Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
+    Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
         .getIoException()) instanceof ContainerNotOpenException);
         .getIoException()) instanceof ContainerNotOpenException);
     // Make sure the retryCount is reset after the exception is handled
     // Make sure the retryCount is reset after the exception is handled
     Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
     Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
@@ -919,10 +881,9 @@ public class TestBlockOutputStreamWithFailures {
     // make sure the bufferPool is empty
     // make sure the bufferPool is empty
     Assert
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
-    Assert
-        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size());
     Assert.assertEquals(pendingWriteChunkCount,
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount,
     Assert.assertEquals(pendingPutBlockCount,
@@ -931,25 +892,23 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 8,
     Assert.assertEquals(putBlockCount + 8,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 22,
-        metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount());
     // Written the same data twice
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
     String dataString = new String(data1, UTF_8);
     validateData(keyName, dataString.concat(dataString).getBytes());
     validateData(keyName, dataString.concat(dataString).getBytes());
   }
   }
 
 
-  @Test
-  public void testDatanodeFailureWithSingleNodeRatis() throws Exception {
+  @Test public void testDatanodeFailureWithSingleNodeRatis() throws Exception {
     XceiverClientMetrics metrics =
     XceiverClientMetrics metrics =
         XceiverClientManager.getXceiverClientMetrics();
         XceiverClientManager.getXceiverClientMetrics();
-    long writeChunkCount = metrics.getContainerOpCountMetrics(
-        ContainerProtos.Type.WriteChunk);
-    long putBlockCount = metrics.getContainerOpCountMetrics(
-        ContainerProtos.Type.PutBlock);
-    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
-        ContainerProtos.Type.WriteChunk);
-    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
-        ContainerProtos.Type.PutBlock);
+    long writeChunkCount =
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
+    long putBlockCount =
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount =
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock);
     long totalOpCount = metrics.getTotalOpCount();
     long totalOpCount = metrics.getTotalOpCount();
     String keyName = getKeyName();
     String keyName = getKeyName();
     OzoneOutputStream key =
     OzoneOutputStream key =
@@ -972,14 +931,13 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 2,
     Assert.assertEquals(putBlockCount + 2,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 6,
-        metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
-    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+    KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
 
 
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
-    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
-        .getOutputStream();
+    OutputStream stream =
+        keyOutputStream.getStreamEntries().get(0).getOutputStream();
     Assert.assertTrue(stream instanceof BlockOutputStream);
     Assert.assertTrue(stream instanceof BlockOutputStream);
     BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
     BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
 
 
@@ -1015,8 +973,7 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 3,
     Assert.assertEquals(putBlockCount + 3,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 8,
-        metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount());
 
 
     // Since the data in the buffer is already flushed, flush here will have
     // Since the data in the buffer is already flushed, flush here will have
     // no impact on the counters and data structures
     // no impact on the counters and data structures
@@ -1044,7 +1001,7 @@ public class TestBlockOutputStreamWithFailures {
 
 
     key.flush();
     key.flush();
 
 
-    Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
+    Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
         .getIoException()) instanceof RaftRetryFailureException);
         .getIoException()) instanceof RaftRetryFailureException);
     Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
     Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
     // Make sure the retryCount is reset after the exception is handled
     // Make sure the retryCount is reset after the exception is handled
@@ -1052,8 +1009,7 @@ public class TestBlockOutputStreamWithFailures {
     Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     // now close the stream, It will update the ack length after watchForCommit
     // now close the stream, It will update the ack length after watchForCommit
     key.close();
     key.close();
-    Assert
-        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     // make sure the bufferPool is empty
     // make sure the bufferPool is empty
     Assert
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
@@ -1073,27 +1029,25 @@ public class TestBlockOutputStreamWithFailures {
     // flush failed + 3 more flushes for the next block
     // flush failed + 3 more flushes for the next block
     Assert.assertEquals(putBlockCount + 8,
     Assert.assertEquals(putBlockCount + 8,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 22,
-        metrics.getTotalOpCount());
-    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount());
+    Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size());
     // Written the same data twice
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
     String dataString = new String(data1, UTF_8);
     cluster.restartHddsDatanode(pipeline.getNodes().get(0), true);
     cluster.restartHddsDatanode(pipeline.getNodes().get(0), true);
     validateData(keyName, dataString.concat(dataString).getBytes());
     validateData(keyName, dataString.concat(dataString).getBytes());
   }
   }
 
 
-  @Test
-  public void testDatanodeFailureWithPreAllocation() throws Exception {
+  @Test public void testDatanodeFailureWithPreAllocation() throws Exception {
     XceiverClientMetrics metrics =
     XceiverClientMetrics metrics =
         XceiverClientManager.getXceiverClientMetrics();
         XceiverClientManager.getXceiverClientMetrics();
-    long writeChunkCount = metrics.getContainerOpCountMetrics(
-        ContainerProtos.Type.WriteChunk);
-    long putBlockCount = metrics.getContainerOpCountMetrics(
-        ContainerProtos.Type.PutBlock);
-    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
-        ContainerProtos.Type.WriteChunk);
-    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
-        ContainerProtos.Type.PutBlock);
+    long writeChunkCount =
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
+    long putBlockCount =
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount =
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock);
     long totalOpCount = metrics.getTotalOpCount();
     long totalOpCount = metrics.getTotalOpCount();
     String keyName = getKeyName();
     String keyName = getKeyName();
     OzoneOutputStream key =
     OzoneOutputStream key =
@@ -1117,14 +1071,13 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 2,
     Assert.assertEquals(putBlockCount + 2,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 6,
-        metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
     Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
-    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+    KeyOutputStream keyOutputStream = (KeyOutputStream) key.getOutputStream();
 
 
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 3);
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 3);
-    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
-        .getOutputStream();
+    OutputStream stream =
+        keyOutputStream.getStreamEntries().get(0).getOutputStream();
     Assert.assertTrue(stream instanceof BlockOutputStream);
     Assert.assertTrue(stream instanceof BlockOutputStream);
     BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
     BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
 
 
@@ -1160,8 +1113,7 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(putBlockCount + 3,
     Assert.assertEquals(putBlockCount + 3,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 8,
-        metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 8, metrics.getTotalOpCount());
 
 
     // Since the data in the buffer is already flushed, flush here will have
     // Since the data in the buffer is already flushed, flush here will have
     // no impact on the counters and data structures
     // no impact on the counters and data structures
@@ -1188,7 +1140,7 @@ public class TestBlockOutputStreamWithFailures {
 
 
     key.flush();
     key.flush();
 
 
-    Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
+    Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
         .getIoException()) instanceof RaftRetryFailureException);
         .getIoException()) instanceof RaftRetryFailureException);
 
 
     // Make sure the retryCount is reset after the exception is handled
     // Make sure the retryCount is reset after the exception is handled
@@ -1197,13 +1149,12 @@ public class TestBlockOutputStreamWithFailures {
 
 
     // now close the stream, It will update the ack length after watchForCommit
     // now close the stream, It will update the ack length after watchForCommit
     key.close();
     key.close();
-    Assert
-        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     // make sure the bufferPool is empty
     // make sure the bufferPool is empty
     Assert
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size());
     Assert.assertEquals(pendingWriteChunkCount,
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount,
     Assert.assertEquals(pendingPutBlockCount,
@@ -1219,8 +1170,7 @@ public class TestBlockOutputStreamWithFailures {
     // flush failed + 3 more flushes for the next block
     // flush failed + 3 more flushes for the next block
     Assert.assertEquals(putBlockCount + 8,
     Assert.assertEquals(putBlockCount + 8,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
-    Assert.assertEquals(totalOpCount + 22,
-        metrics.getTotalOpCount());
+    Assert.assertEquals(totalOpCount + 22, metrics.getTotalOpCount());
     // Written the same data twice
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
     String dataString = new String(data1, UTF_8);
     cluster.restartHddsDatanode(pipeline.getNodes().get(0), true);
     cluster.restartHddsDatanode(pipeline.getNodes().get(0), true);

+ 1 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java

@@ -291,7 +291,7 @@ public class TestCloseContainerHandlingByClient {
         (KeyOutputStream) key.getOutputStream();
         (KeyOutputStream) key.getOutputStream();
     // With the initial size provided, it should have preallocated 4 blocks
     // With the initial size provided, it should have preallocated 4 blocks
     Assert.assertEquals(4, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(4, keyOutputStream.getStreamEntries().size());
-    // write data 3 blocks and one more chunk
+    // write data 4 blocks and one more chunk
     byte[] writtenData =
     byte[] writtenData =
         ContainerTestHelper.getFixedLengthString(keyString, keyLen)
         ContainerTestHelper.getFixedLengthString(keyString, keyLen)
             .getBytes(UTF_8);
             .getBytes(UTF_8);

+ 4 - 4
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java

@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
@@ -50,7 +51,6 @@ import java.util.concurrent.TimeUnit;
 
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
 
 
 /**
 /**
  * Tests failure detection and handling in BlockOutputStream Class.
  * Tests failure detection and handling in BlockOutputStream Class.
@@ -85,7 +85,7 @@ public class TestOzoneClientRetriesOnException {
     blockSize = 2 * maxFlushSize;
     blockSize = 2 * maxFlushSize;
     conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
-    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+   // conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
     conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
     conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
     conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3);
     conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3);
     conf.setQuietMode(false);
     conf.setQuietMode(false);
@@ -150,7 +150,7 @@ public class TestOzoneClientRetriesOnException {
             .getPipeline(container.getPipelineID());
             .getPipeline(container.getPipelineID());
     ContainerTestHelper.waitForPipelineClose(key, cluster, true);
     ContainerTestHelper.waitForPipelineClose(key, cluster, true);
     key.flush();
     key.flush();
-    Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
+    Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
         .getIoException()) instanceof GroupMismatchException);
         .getIoException()) instanceof GroupMismatchException);
     Assert.assertTrue(keyOutputStream.getExcludeList().getPipelineIds()
     Assert.assertTrue(keyOutputStream.getExcludeList().getPipelineIds()
         .contains(pipeline.getId()));
         .contains(pipeline.getId()));
@@ -201,7 +201,7 @@ public class TestOzoneClientRetriesOnException {
       key.write(data1);
       key.write(data1);
       Assert.fail("Expected exception not thrown");
       Assert.fail("Expected exception not thrown");
     } catch (IOException ioe) {
     } catch (IOException ioe) {
-      Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
+      Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
           .getIoException()) instanceof ContainerNotOpenException);
           .getIoException()) instanceof ContainerNotOpenException);
       Assert.assertTrue(ioe.getMessage().contains(
       Assert.assertTrue(ioe.getMessage().contains(
           "Retry request failed. retries get failed due to exceeded maximum "
           "Retry request failed. retries get failed due to exceeded maximum "

+ 501 - 0
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java

@@ -0,0 +1,501 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.*;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.ratis.protocol.GroupMismatchException;
+import org.apache.ratis.protocol.RaftRetryFailureException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+
+/**
+ * This class verifies the watchForCommit Handling by xceiverClient.
+ */
+public class TestWatchForCommit {
+
+  private MiniOzoneCluster cluster;
+  private OzoneClient client;
+  private ObjectStore objectStore;
+  private String volumeName;
+  private String bucketName;
+  private String keyString;
+  private int chunkSize;
+  private int flushSize;
+  private int maxFlushSize;
+  private int blockSize;
+  private StorageContainerLocationProtocolClientSideTranslatorPB
+      storageContainerLocationClient;
+  private static String containerOwner = "OZONE";
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true
+   *
+   * @throws IOException
+   */
+  private void startCluster(OzoneConfiguration conf) throws Exception {
+    chunkSize = 100;
+    flushSize = 2 * chunkSize;
+    maxFlushSize = 2 * flushSize;
+    blockSize = 2 * maxFlushSize;
+
+    conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(
+        OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY,
+        1, TimeUnit.SECONDS);
+
+    conf.setQuietMode(false);
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(7)
+        .setBlockSize(blockSize)
+        .setChunkSize(chunkSize)
+        .setStreamBufferFlushSize(flushSize)
+        .setStreamBufferMaxSize(maxFlushSize)
+        .setStreamBufferSizeUnit(StorageUnit.BYTES)
+        .build();
+    cluster.waitForClusterToBeReady();
+    //the easiest way to create an open container is creating a key
+    client = OzoneClientFactory.getClient(conf);
+    objectStore = client.getObjectStore();
+    keyString = UUID.randomUUID().toString();
+    volumeName = "watchforcommithandlingtest";
+    bucketName = volumeName;
+    objectStore.createVolume(volumeName);
+    objectStore.getVolume(volumeName).createBucket(bucketName);
+    storageContainerLocationClient = cluster
+        .getStorageContainerLocationClient();
+  }
+
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  private void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  private String getKeyName() {
+    return UUID.randomUUID().toString();
+  }
+
+  @Test
+  public void testWatchForCommitWithKeyWrite() throws Exception {
+    // in this case, watch request should fail with RaftRetryFailureException
+    // and will be captured in keyOutputStream and the failover will happen
+    // to a different block
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
+        TimeUnit.SECONDS);
+    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 5);
+    startCluster(conf);
+    XceiverClientMetrics metrics =
+        XceiverClientManager.getXceiverClientMetrics();
+    long writeChunkCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long putBlockCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.PutBlock);
+    long pendingWriteChunkCount =  metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long pendingPutBlockCount = metrics.getContainerOpsMetrics(
+        ContainerProtos.Type.PutBlock);
+    long totalOpCount = metrics.getTotalOpCount();
+    String keyName = getKeyName();
+    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+    int dataLength = maxFlushSize + 50;
+    // write data more than 1 chunk
+    byte[] data1 =
+        ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+            .getBytes(UTF_8);
+    key.write(data1);
+    // since its hitting the full bufferCondition, it will call watchForCommit
+    // and completes atleast putBlock for first flushSize worth of data
+    Assert.assertTrue(
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk)
+            <= pendingWriteChunkCount + 2);
+    Assert.assertTrue(
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock)
+            <= pendingPutBlockCount + 1);
+    Assert.assertEquals(writeChunkCount + 4,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 2,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 6,
+        metrics.getTotalOpCount());
+    Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+    KeyOutputStream keyOutputStream = (KeyOutputStream)key.getOutputStream();
+
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 1);
+    OutputStream stream = keyOutputStream.getStreamEntries().get(0)
+        .getOutputStream();
+    Assert.assertTrue(stream instanceof BlockOutputStream);
+    BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+
+    // we have just written data more than flush Size(2 chunks), at this time
+    // buffer pool will have 3 buffers allocated worth of chunk size
+
+    Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+    // writtenDataLength as well flushedDataLength will be updated here
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    Assert.assertEquals(maxFlushSize,
+        blockOutputStream.getTotalDataFlushedLength());
+
+    // since data equals to maxBufferSize is written, this will be a blocking
+    // call and hence will wait for atleast flushSize worth of data to get
+    // acked by all servers right here
+    Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= flushSize);
+
+    // watchForCommit will clean up atleast one entry from the map where each
+    // entry corresponds to flushSize worth of data
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
+
+    // Now do a flush. This will flush the data and update the flush length and
+    // the map.
+    key.flush();
+
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(writeChunkCount + 5,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 3,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 8,
+        metrics.getTotalOpCount());
+
+    // Since the data in the buffer is already flushed, flush here will have
+    // no impact on the counters and data structures
+
+    Assert.assertEquals(4, blockOutputStream.getBufferPool().getSize());
+    Assert.assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+    Assert.assertEquals(dataLength,
+        blockOutputStream.getTotalDataFlushedLength());
+    // flush will make sure one more entry gets updated in the map
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
+
+    XceiverClientRatis raftClient =
+        (XceiverClientRatis) blockOutputStream.getXceiverClient();
+    Assert.assertEquals(3, raftClient.getCommitInfoMap().size());
+    Pipeline pipeline = raftClient.getPipeline();
+    cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
+    cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
+    // again write data with more than max buffer limit. This will call
+    // watchForCommit again. Since the commit will happen 2 way, the
+    // commitInfoMap will get updated for servers which are alive
+
+    // 4 writeChunks = maxFlushSize + 2 putBlocks  will be discarded here
+    // once exception is hit
+    key.write(data1);
+
+    // As a part of handling the exception, 4 failed writeChunks  will be
+    // rewritten plus one partial chunk plus two putBlocks for flushSize
+    // and one flush for partial chunk
+    key.flush();
+    Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
+        .getIoException()) instanceof RaftRetryFailureException);
+    // Make sure the retryCount is reset after the exception is handled
+    Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
+    // now close the stream, It will update the ack length after watchForCommit
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
+    key.close();
+    Assert
+        .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+    Assert
+        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(pendingWriteChunkCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(pendingPutBlockCount,
+        metrics.getContainerOpsMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(writeChunkCount + 14,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+    Assert.assertEquals(putBlockCount + 8,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
+    Assert.assertEquals(totalOpCount + 22,
+        metrics.getTotalOpCount());
+    Assert
+        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    // make sure the bufferPool is empty
+    Assert
+        .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    validateData(keyName, data1);
+    shutdown();
+  }
+
+  @Test
+  public void testWatchForCommitWithSmallerTimeoutValue() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3,
+        TimeUnit.SECONDS);
+    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
+    startCluster(conf);
+    XceiverClientManager clientManager = new XceiverClientManager(conf);
+    ContainerWithPipeline container1 = storageContainerLocationClient
+        .allocateContainer(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE, containerOwner);
+    XceiverClientSpi xceiverClient = clientManager
+        .acquireClient(container1.getPipeline());
+    Assert.assertEquals(1, xceiverClient.getRefcount());
+    Assert.assertEquals(container1.getPipeline(),
+        xceiverClient.getPipeline());
+    Pipeline pipeline = xceiverClient.getPipeline();
+    XceiverClientReply reply = xceiverClient.sendCommandAsync(
+        ContainerTestHelper.getCreateContainerRequest(
+            container1.getContainerInfo().getContainerID(),
+            xceiverClient.getPipeline()));
+    reply.getResponse().get();
+    long index = reply.getLogIndex();
+    cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
+    cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
+    try {
+      // just watch for a lo index which in not updated in the commitInfo Map
+      xceiverClient.watchForCommit(index + 1, 3000);
+      Assert.fail("expected exception not thrown");
+    } catch (Exception e) {
+      Assert.assertTrue(
+          HddsClientUtils.checkForException(e) instanceof TimeoutException);
+    }
+    // After releasing the xceiverClient, this connection should be closed
+    // and any container operations should fail
+    clientManager.releaseClient(xceiverClient, false);
+    shutdown();
+  }
+
+  @Test
+  public void testWatchForCommitForRetryfailure() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT,
+        100, TimeUnit.SECONDS);
+    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
+    startCluster(conf);
+    XceiverClientManager clientManager = new XceiverClientManager(conf);
+    ContainerWithPipeline container1 = storageContainerLocationClient
+        .allocateContainer(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE, containerOwner);
+    XceiverClientSpi xceiverClient = clientManager
+        .acquireClient(container1.getPipeline());
+    Assert.assertEquals(1, xceiverClient.getRefcount());
+    Assert.assertEquals(container1.getPipeline(),
+        xceiverClient.getPipeline());
+    Pipeline pipeline = xceiverClient.getPipeline();
+    XceiverClientReply reply = xceiverClient.sendCommandAsync(
+        ContainerTestHelper.getCreateContainerRequest(
+            container1.getContainerInfo().getContainerID(),
+            xceiverClient.getPipeline()));
+    reply.getResponse().get();
+    long index = reply.getLogIndex();
+    cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
+    cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
+    // again write data with more than max buffer limit. This wi
+    try {
+      // just watch for a lo index which in not updated in the commitInfo Map
+      xceiverClient.watchForCommit(index + 1, 20000);
+      Assert.fail("expected exception not thrown");
+    } catch (Exception e) {
+      Assert.assertTrue(HddsClientUtils
+          .checkForException(e) instanceof RaftRetryFailureException);
+    }
+    clientManager.releaseClient(xceiverClient, false);
+    shutdown();
+  }
+
+  @Test
+  public void test2WayCommitForRetryfailure() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
+        TimeUnit.SECONDS);
+    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 8);
+    startCluster(conf);
+    GenericTestUtils.LogCapturer logCapturer =
+        GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
+    XceiverClientManager clientManager = new XceiverClientManager(conf);
+
+    ContainerWithPipeline container1 = storageContainerLocationClient
+        .allocateContainer(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE, containerOwner);
+    XceiverClientSpi xceiverClient = clientManager
+        .acquireClient(container1.getPipeline());
+    Assert.assertEquals(1, xceiverClient.getRefcount());
+    Assert.assertEquals(container1.getPipeline(),
+        xceiverClient.getPipeline());
+    Pipeline pipeline = xceiverClient.getPipeline();
+    XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient;
+    XceiverClientReply reply = xceiverClient.sendCommandAsync(
+        ContainerTestHelper.getCreateContainerRequest(
+            container1.getContainerInfo().getContainerID(),
+            xceiverClient.getPipeline()));
+    reply.getResponse().get();
+    Assert.assertEquals(3, ratisClient.getCommitInfoMap().size());
+    cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
+    reply = xceiverClient.sendCommandAsync(ContainerTestHelper
+        .getCloseContainer(pipeline,
+            container1.getContainerInfo().getContainerID()));
+    reply.getResponse().get();
+    xceiverClient.watchForCommit(reply.getLogIndex(), 20000);
+
+    // commitInfo Map will be reduced to 2 here
+    Assert.assertEquals(2, ratisClient.getCommitInfoMap().size());
+    clientManager.releaseClient(xceiverClient, false);
+    Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed"));
+    Assert
+        .assertTrue(logCapturer.getOutput().contains("Committed by majority"));
+    logCapturer.stopCapturing();
+    shutdown();
+  }
+
+  @Test
+  public void test2WayCommitForTimeoutException() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3,
+        TimeUnit.SECONDS);
+    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
+    startCluster(conf);
+    GenericTestUtils.LogCapturer logCapturer =
+        GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
+    XceiverClientManager clientManager = new XceiverClientManager(conf);
+
+    ContainerWithPipeline container1 = storageContainerLocationClient
+        .allocateContainer(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE, containerOwner);
+    XceiverClientSpi xceiverClient = clientManager
+        .acquireClient(container1.getPipeline());
+    Assert.assertEquals(1, xceiverClient.getRefcount());
+    Assert.assertEquals(container1.getPipeline(),
+        xceiverClient.getPipeline());
+    Pipeline pipeline = xceiverClient.getPipeline();
+    XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient;
+    XceiverClientReply reply = xceiverClient.sendCommandAsync(
+        ContainerTestHelper.getCreateContainerRequest(
+            container1.getContainerInfo().getContainerID(),
+            xceiverClient.getPipeline()));
+    reply.getResponse().get();
+    Assert.assertEquals(3, ratisClient.getCommitInfoMap().size());
+    cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
+    reply = xceiverClient.sendCommandAsync(ContainerTestHelper
+        .getCloseContainer(pipeline,
+            container1.getContainerInfo().getContainerID()));
+    reply.getResponse().get();
+    xceiverClient.watchForCommit(reply.getLogIndex(), 3000);
+
+    // commitInfo Map will be reduced to 2 here
+    Assert.assertEquals(2, ratisClient.getCommitInfoMap().size());
+    clientManager.releaseClient(xceiverClient, false);
+    Assert.assertTrue(logCapturer.getOutput().contains("3 way commit failed"));
+    Assert.assertTrue(logCapturer.getOutput().contains("TimeoutException"));
+    Assert
+        .assertTrue(logCapturer.getOutput().contains("Committed by majority"));
+    logCapturer.stopCapturing();
+    shutdown();
+  }
+
+  @Test
+  public void testWatchForCommitForGroupMismatchException() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
+        TimeUnit.SECONDS);
+    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
+
+    // mark the node stale early so that pipleline gets destroyed quickly
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+    startCluster(conf);
+    GenericTestUtils.LogCapturer logCapturer =
+        GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
+    XceiverClientManager clientManager = new XceiverClientManager(conf);
+
+    ContainerWithPipeline container1 = storageContainerLocationClient
+        .allocateContainer(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE, containerOwner);
+    XceiverClientSpi xceiverClient = clientManager
+        .acquireClient(container1.getPipeline());
+    Assert.assertEquals(1, xceiverClient.getRefcount());
+    Assert.assertEquals(container1.getPipeline(),
+        xceiverClient.getPipeline());
+    Pipeline pipeline = xceiverClient.getPipeline();
+    XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient;
+    long containerId = container1.getContainerInfo().getContainerID();
+    XceiverClientReply reply = xceiverClient.sendCommandAsync(
+        ContainerTestHelper.getCreateContainerRequest(containerId,
+            xceiverClient.getPipeline()));
+    reply.getResponse().get();
+    Assert.assertEquals(3, ratisClient.getCommitInfoMap().size());
+    List<Pipeline> pipelineList = new ArrayList<>();
+    pipelineList.add(pipeline);
+    ContainerTestHelper.waitForPipelineClose(pipelineList, cluster);
+    try {
+      // just watch for a lo index which in not updated in the commitInfo Map
+      xceiverClient.watchForCommit(reply.getLogIndex() + 1, 20000);
+      Assert.fail("Expected exception not thrown");
+    } catch(Exception e) {
+      Assert.assertTrue(HddsClientUtils
+          .checkForException(e) instanceof GroupMismatchException);
+    }
+    clientManager.releaseClient(xceiverClient, false);
+    shutdown();
+  }
+
+  private OzoneOutputStream createKey(String keyName, ReplicationType type,
+      long size) throws Exception {
+    return ContainerTestHelper
+        .createKey(keyName, type, size, objectStore, volumeName, bucketName);
+  }
+
+  private void validateData(String keyName, byte[] data) throws Exception {
+    ContainerTestHelper
+        .validateData(keyName, data, objectStore, volumeName, bucketName);
+  }
+}

+ 18 - 9
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java

@@ -57,6 +57,7 @@ import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
@@ -68,7 +69,6 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.Token;
 
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Preconditions;
@@ -723,11 +723,11 @@ public final class ContainerTestHelper {
       MiniOzoneCluster cluster) throws Exception {
       MiniOzoneCluster cluster) throws Exception {
     KeyOutputStream keyOutputStream =
     KeyOutputStream keyOutputStream =
         (KeyOutputStream) outputStream.getOutputStream();
         (KeyOutputStream) outputStream.getOutputStream();
-    List<OmKeyLocationInfo> locationInfoList =
-        keyOutputStream.getLocationInfoList();
+    List<BlockOutputStreamEntry> streamEntryList =
+        keyOutputStream.getStreamEntries();
     List<Long> containerIdList = new ArrayList<>();
     List<Long> containerIdList = new ArrayList<>();
-    for (OmKeyLocationInfo info : locationInfoList) {
-      long id = info.getContainerID();
+    for (BlockOutputStreamEntry entry : streamEntryList) {
+      long id = entry.getBlockID().getContainerID();
       if (!containerIdList.contains(id)) {
       if (!containerIdList.contains(id)) {
         containerIdList.add(id);
         containerIdList.add(id);
       }
       }
@@ -741,11 +741,14 @@ public final class ContainerTestHelper {
       throws Exception {
       throws Exception {
     KeyOutputStream keyOutputStream =
     KeyOutputStream keyOutputStream =
         (KeyOutputStream) outputStream.getOutputStream();
         (KeyOutputStream) outputStream.getOutputStream();
-    List<OmKeyLocationInfo> locationInfoList =
-        keyOutputStream.getLocationInfoList();
+    List<BlockOutputStreamEntry> streamEntryList =
+        keyOutputStream.getStreamEntries();
     List<Long> containerIdList = new ArrayList<>();
     List<Long> containerIdList = new ArrayList<>();
-    for (OmKeyLocationInfo info : locationInfoList) {
-      containerIdList.add(info.getContainerID());
+    for (BlockOutputStreamEntry entry : streamEntryList) {
+      long id = entry.getBlockID().getContainerID();
+      if (!containerIdList.contains(id)) {
+        containerIdList.add(id);
+      }
     }
     }
     Assert.assertTrue(!containerIdList.isEmpty());
     Assert.assertTrue(!containerIdList.isEmpty());
     waitForPipelineClose(cluster, waitForContainerCreation,
     waitForPipelineClose(cluster, waitForContainerCreation,
@@ -784,6 +787,12 @@ public final class ContainerTestHelper {
         }
         }
       }
       }
     }
     }
+    waitForPipelineClose(pipelineList, cluster);
+  }
+
+  public static void waitForPipelineClose(List<Pipeline> pipelineList,
+      MiniOzoneCluster cluster)
+      throws TimeoutException, InterruptedException, IOException {
     for (Pipeline pipeline1 : pipelineList) {
     for (Pipeline pipeline1 : pipelineList) {
       // issue pipeline destroy command
       // issue pipeline destroy command
       cluster.getStorageContainerManager().getPipelineManager()
       cluster.getStorageContainerManager().getPipelineManager()

+ 1 - 1
hadoop-ozone/pom.xml

@@ -29,7 +29,7 @@
     <hadoop.version>3.2.0</hadoop.version>
     <hadoop.version>3.2.0</hadoop.version>
     <hdds.version>0.5.0-SNAPSHOT</hdds.version>
     <hdds.version>0.5.0-SNAPSHOT</hdds.version>
     <ozone.version>0.5.0-SNAPSHOT</ozone.version>
     <ozone.version>0.5.0-SNAPSHOT</ozone.version>
-    <ratis.version>0.3.0</ratis.version>
+    <ratis.version>0.4.0-fe2b15d-SNAPSHOT</ratis.version>
     <bouncycastle.version>1.60</bouncycastle.version>
     <bouncycastle.version>1.60</bouncycastle.version>
     <ozone.release>Crater Lake</ozone.release>
     <ozone.release>Crater Lake</ozone.release>
     <declared.ozone.version>${ozone.version}</declared.ozone.version>
     <declared.ozone.version>${ozone.version}</declared.ozone.version>