Sfoglia il codice sorgente

HDDS-1373. KeyOutputStream, close after write request fails after retries, runs into IllegalArgumentException.(#729)

Shashikant Banerjee 6 anni fa
parent
commit
d608be660f

+ 6 - 0
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java

@@ -102,4 +102,10 @@ public class ExcludeList {
     });
     return excludeList;
   }
+
+  public void clear() {
+    datanodes.clear();
+    containerIds.clear();
+    pipelineIds.clear();
+  }
 }

+ 114 - 84
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java

@@ -295,60 +295,66 @@ public class KeyOutputStream extends OutputStream {
       throws IOException {
     int succeededAllocates = 0;
     while (len > 0) {
-      if (streamEntries.size() <= currentStreamIndex) {
-        Preconditions.checkNotNull(omClient);
-        // allocate a new block, if a exception happens, log an error and
-        // throw exception to the caller directly, and the write fails.
+      try {
+        if (streamEntries.size() <= currentStreamIndex) {
+          Preconditions.checkNotNull(omClient);
+          // allocate a new block, if a exception happens, log an error and
+          // throw exception to the caller directly, and the write fails.
+          try {
+            allocateNewBlock(currentStreamIndex);
+            succeededAllocates += 1;
+          } catch (IOException ioe) {
+            LOG.error("Try to allocate more blocks for write failed, already "
+                + "allocated " + succeededAllocates
+                + " blocks for this write.");
+            throw ioe;
+          }
+        }
+        // in theory, this condition should never violate due the check above
+        // still do a sanity check.
+        Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
+        BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex);
+
+        // length(len) will be in int range if the call is happening through
+        // write API of blockOutputStream. Length can be in long range if it comes
+        // via Exception path.
+        int writeLen = Math.min((int) len, (int) current.getRemaining());
+        long currentPos = current.getWrittenDataLength();
         try {
-          allocateNewBlock(currentStreamIndex);
-          succeededAllocates += 1;
+          if (retry) {
+            current.writeOnRetry(len);
+          } else {
+            current.write(b, off, writeLen);
+            offset += writeLen;
+          }
         } catch (IOException ioe) {
-          LOG.error("Try to allocate more blocks for write failed, already "
-              + "allocated " + succeededAllocates + " blocks for this write.");
-          throw ioe;
-        }
-      }
-      // in theory, this condition should never violate due the check above
-      // still do a sanity check.
-      Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
-      BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex);
-
-      // length(len) will be in int range if the call is happening through
-      // write API of blockOutputStream. Length can be in long range if it comes
-      // via Exception path.
-      int writeLen = Math.min((int)len, (int) current.getRemaining());
-      long currentPos = current.getWrittenDataLength();
-      try {
-        if (retry) {
-          current.writeOnRetry(len);
-        } else {
-          current.write(b, off, writeLen);
-          offset += writeLen;
+          // for the current iteration, totalDataWritten - currentPos gives the
+          // amount of data already written to the buffer
+
+          // In the retryPath, the total data to be written will always be equal
+          // to or less than the max length of the buffer allocated.
+          // The len specified here is the combined sum of the data length of
+          // the buffers
+          Preconditions.checkState(!retry || len <= streamBufferMaxSize);
+          int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
+          writeLen = retry ? (int) len : dataWritten;
+          // In retry path, the data written is already accounted in offset.
+          if (!retry) {
+            offset += writeLen;
+          }
+          LOG.debug("writeLen {}, total len {}", writeLen, len);
+          handleException(current, currentStreamIndex, ioe);
         }
-      } catch (IOException ioe) {
-        // for the current iteration, totalDataWritten - currentPos gives the
-        // amount of data already written to the buffer
-
-        // In the retryPath, the total data to be written will always be equal
-        // to or less than the max length of the buffer allocated.
-        // The len specified here is the combined sum of the data length of
-        // the buffers
-        Preconditions.checkState(!retry || len <= streamBufferMaxSize);
-        int dataWritten  = (int) (current.getWrittenDataLength() - currentPos);
-        writeLen = retry ? (int) len : dataWritten;
-        // In retry path, the data written is already accounted in offset.
-        if (!retry) {
-          offset += writeLen;
+        if (current.getRemaining() <= 0) {
+          // since the current block is already written close the stream.
+          handleFlushOrClose(StreamAction.FULL);
         }
-        LOG.debug("writeLen {}, total len {}", writeLen, len);
-        handleException(current, currentStreamIndex, ioe);
-      }
-      if (current.getRemaining() <= 0) {
-        // since the current block is already written close the stream.
-        handleFlushOrClose(StreamAction.FULL);
+        len -= writeLen;
+        off += writeLen;
+      } catch (Exception e) {
+        markStreamClosed();
+        throw e;
       }
-      len -= writeLen;
-      off += writeLen;
     }
   }
 
@@ -365,7 +371,7 @@ public class KeyOutputStream extends OutputStream {
     // pre allocated blocks available.
 
     // This will be called only to discard the next subsequent unused blocks
-    // in the sreamEntryList.
+    // in the streamEntryList.
     if (streamIndex < streamEntries.size()) {
       ListIterator<BlockOutputStreamEntry> streamEntryIterator =
           streamEntries.listIterator(streamIndex);
@@ -398,6 +404,20 @@ public class KeyOutputStream extends OutputStream {
       }
     }
   }
+
+  private void cleanup() {
+    if (excludeList != null) {
+      excludeList.clear();
+      excludeList = null;
+    }
+    if (bufferPool != null) {
+      bufferPool.clearBufferPool();
+    }
+
+    if (streamEntries != null) {
+      streamEntries.clear();
+    }
+  }
   /**
    * It performs following actions :
    * a. Updates the committed length at datanode for the current stream in
@@ -418,8 +438,7 @@ public class KeyOutputStream extends OutputStream {
       closedContainerException = checkIfContainerIsClosed(t);
     }
     PipelineID pipelineId = null;
-    long totalSuccessfulFlushedData =
-        streamEntry.getTotalAckDataLength();
+    long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength();
     //set the correct length for the current stream
     streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
     long bufferedDataLen = computeBufferData();
@@ -450,8 +469,8 @@ public class KeyOutputStream extends OutputStream {
     if (closedContainerException) {
       // discard subsequent pre allocated blocks from the streamEntries list
       // from the closed container
-      discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(),
-          null, streamIndex + 1);
+      discardPreallocatedBlocks(streamEntry.getBlockID().getContainerID(), null,
+          streamIndex + 1);
     } else {
       // In case there is timeoutException or Watch for commit happening over
       // majority or the client connection failure to the leader in the
@@ -475,6 +494,11 @@ public class KeyOutputStream extends OutputStream {
     }
   }
 
+  private void markStreamClosed() {
+    cleanup();
+    closed = true;
+  }
+
   private void handleRetry(IOException exception, long len) throws IOException {
     RetryPolicy.RetryAction action;
     try {
@@ -586,40 +610,46 @@ public class KeyOutputStream extends OutputStream {
       return;
     }
     while (true) {
-      int size = streamEntries.size();
-      int streamIndex =
-          currentStreamIndex >= size ? size - 1 : currentStreamIndex;
-      BlockOutputStreamEntry entry = streamEntries.get(streamIndex);
-      if (entry != null) {
-        try {
-          Collection<DatanodeDetails> failedServers = entry.getFailedServers();
-          // failed servers can be null in case there is no data written in the
-          // stream
-          if (failedServers != null && !failedServers.isEmpty()) {
-            excludeList.addDatanodes(failedServers);
-          }
-          switch (op) {
-          case CLOSE:
-            entry.close();
-            break;
-          case FULL:
-            if (entry.getRemaining() == 0) {
+      try {
+        int size = streamEntries.size();
+        int streamIndex =
+            currentStreamIndex >= size ? size - 1 : currentStreamIndex;
+        BlockOutputStreamEntry entry = streamEntries.get(streamIndex);
+        if (entry != null) {
+          try {
+            Collection<DatanodeDetails> failedServers =
+                entry.getFailedServers();
+            // failed servers can be null in case there is no data written in the
+            // stream
+            if (failedServers != null && !failedServers.isEmpty()) {
+              excludeList.addDatanodes(failedServers);
+            }
+            switch (op) {
+            case CLOSE:
               entry.close();
-              currentStreamIndex++;
+              break;
+            case FULL:
+              if (entry.getRemaining() == 0) {
+                entry.close();
+                currentStreamIndex++;
+              }
+              break;
+            case FLUSH:
+              entry.flush();
+              break;
+            default:
+              throw new IOException("Invalid Operation");
             }
-            break;
-          case FLUSH:
-            entry.flush();
-            break;
-          default:
-            throw new IOException("Invalid Operation");
+          } catch (IOException ioe) {
+            handleException(entry, streamIndex, ioe);
+            continue;
           }
-        } catch (IOException ioe) {
-          handleException(entry, streamIndex, ioe);
-          continue;
         }
+        break;
+      } catch (Exception e) {
+        markStreamClosed();
+        throw e;
       }
-      break;
     }
   }
 
@@ -658,7 +688,7 @@ public class KeyOutputStream extends OutputStream {
     } catch (IOException ioe) {
       throw ioe;
     } finally {
-      bufferPool.clearBufferPool();
+      cleanup();
     }
   }
 

+ 11 - 5
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java

@@ -189,6 +189,7 @@ public class TestBlockOutputStream {
     // flush ensures watchForCommit updates the total length acknowledged
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
 
+    Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
     // now close the stream, It will update the ack length after watchForCommit
     key.close();
 
@@ -208,7 +209,7 @@ public class TestBlockOutputStream {
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     validateData(keyName, data1);
   }
 
@@ -263,6 +264,7 @@ public class TestBlockOutputStream {
     // Now do a flush. This will flush the data and update the flush length and
     // the map.
     key.flush();
+    Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
     // flush is a sync call, all pending operations will complete
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
@@ -302,7 +304,7 @@ public class TestBlockOutputStream {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
     Assert.assertEquals(totalOpCount + 3,
         metrics.getTotalOpCount());
-    Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     validateData(keyName, data1);
   }
 
@@ -397,6 +399,7 @@ public class TestBlockOutputStream {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
     Assert.assertEquals(totalOpCount + 3,
         metrics.getTotalOpCount());
+    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     validateData(keyName, data1);
   }
 
@@ -454,6 +457,7 @@ public class TestBlockOutputStream {
         blockOutputStream.getCommitIndex2flushedDataMap().size());
 
     Assert.assertEquals(0, blockOutputStream.getTotalAckDataLength());
+    Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
     key.close();
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
@@ -471,7 +475,7 @@ public class TestBlockOutputStream {
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     validateData(keyName, data1);
   }
 
@@ -536,6 +540,7 @@ public class TestBlockOutputStream {
     // Now do a flush. This will flush the data and update the flush length and
     // the map.
     key.flush();
+    Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount,
@@ -570,7 +575,7 @@ public class TestBlockOutputStream {
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     validateData(keyName, data1);
   }
 
@@ -638,6 +643,7 @@ public class TestBlockOutputStream {
     // Now do a flush. This will flush the data and update the flush length and
     // the map.
     key.flush();
+    Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount,
@@ -673,7 +679,7 @@ public class TestBlockOutputStream {
         metrics.getTotalOpCount());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(1, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     validateData(keyName, data1);
   }
 

+ 16 - 11
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java

@@ -234,6 +234,7 @@ public class TestBlockOutputStreamWithFailures {
     // and one flush for partial chunk
     key.flush();
 
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
         .getIoException()) instanceof ContainerNotOpenException);
 
@@ -249,7 +250,7 @@ public class TestBlockOutputStreamWithFailures {
     Assert
         .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount,
@@ -372,6 +373,7 @@ public class TestBlockOutputStreamWithFailures {
     key.flush();
     Assert.assertEquals(2, raftClient.getCommitInfoMap().size());
 
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     // now close the stream, It will update the ack length after watchForCommit
     key.close();
     Assert
@@ -382,7 +384,7 @@ public class TestBlockOutputStreamWithFailures {
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount,
@@ -515,13 +517,14 @@ public class TestBlockOutputStreamWithFailures {
     // Make sure the retryCount is reset after the exception is handled
     Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
     // now close the stream, It will update the ack length after watchForCommit
+
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     key.close();
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
     Assert
         .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount,
@@ -538,7 +541,7 @@ public class TestBlockOutputStreamWithFailures {
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     validateData(keyName, data1);
   }
 
@@ -637,6 +640,7 @@ public class TestBlockOutputStreamWithFailures {
     // and one flush for partial chunk
     key.flush();
 
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
         .getIoException()) instanceof ContainerNotOpenException);
     // Make sure the retryCount is reset after the exception is handled
@@ -652,7 +656,6 @@ public class TestBlockOutputStreamWithFailures {
     Assert
         .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount,
@@ -663,7 +666,7 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
     Assert.assertEquals(totalOpCount + 9,
         metrics.getTotalOpCount());
-    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0);
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
     validateData(keyName, dataString.concat(dataString).getBytes());
@@ -774,7 +777,6 @@ public class TestBlockOutputStreamWithFailures {
     Assert
         .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount,
@@ -785,7 +787,7 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
     Assert.assertEquals(totalOpCount + 9,
         metrics.getTotalOpCount());
-    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0);
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
     validateData(keyName, dataString.concat(dataString).getBytes());
@@ -911,6 +913,7 @@ public class TestBlockOutputStreamWithFailures {
     Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
     // commitInfoMap will remain intact as there is no server failure
     Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     // now close the stream, It will update the ack length after watchForCommit
     key.close();
     // make sure the bufferPool is empty
@@ -919,7 +922,7 @@ public class TestBlockOutputStreamWithFailures {
     Assert
         .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount,
@@ -1046,6 +1049,7 @@ public class TestBlockOutputStreamWithFailures {
     Assert.assertEquals(1, raftClient.getCommitInfoMap().size());
     // Make sure the retryCount is reset after the exception is handled
     Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
+    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
     // now close the stream, It will update the ack length after watchForCommit
     key.close();
     Assert
@@ -1054,7 +1058,7 @@ public class TestBlockOutputStreamWithFailures {
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount,
@@ -1071,6 +1075,7 @@ public class TestBlockOutputStreamWithFailures {
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock));
     Assert.assertEquals(totalOpCount + 22,
         metrics.getTotalOpCount());
+    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
     cluster.restartHddsDatanode(pipeline.getNodes().get(0), true);
@@ -1198,7 +1203,7 @@ public class TestBlockOutputStreamWithFailures {
     Assert
         .assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
-    Assert.assertEquals(2, keyOutputStream.getStreamEntries().size());
+    Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(pendingWriteChunkCount,
         metrics.getContainerOpsMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount,

+ 33 - 13
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java

@@ -19,10 +19,12 @@ package org.apache.hadoop.ozone.client.rpc;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -66,6 +68,7 @@ public class TestOzoneClientRetriesOnException {
   private String volumeName;
   private String bucketName;
   private String keyString;
+  private XceiverClientManager xceiverClientManager;
 
   /**
    * Create a MiniDFSCluster for testing.
@@ -84,8 +87,6 @@ public class TestOzoneClientRetriesOnException {
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
     conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
-    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 2);
-    conf.set(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY, "1s");
     conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3);
     conf.setQuietMode(false);
     cluster = MiniOzoneCluster.newBuilder(conf)
@@ -100,6 +101,7 @@ public class TestOzoneClientRetriesOnException {
     //the easiest way to create an open container is creating a key
     client = OzoneClientFactory.getClient(conf);
     objectStore = client.getObjectStore();
+    xceiverClientManager = new XceiverClientManager(conf);
     keyString = UUID.randomUUID().toString();
     volumeName = "testblockoutputstreamwithretries";
     bucketName = volumeName;
@@ -152,8 +154,9 @@ public class TestOzoneClientRetriesOnException {
         .getIoException()) instanceof GroupMismatchException);
     Assert.assertTrue(keyOutputStream.getExcludeList().getPipelineIds()
         .contains(pipeline.getId()));
-    key.close();
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 2);
+    key.close();
+    Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0);
     validateData(keyName, data1);
   }
 
@@ -171,13 +174,8 @@ public class TestOzoneClientRetriesOnException {
     byte[] data1 =
         ContainerTestHelper.getFixedLengthString(keyString, dataLength)
             .getBytes(UTF_8);
-    key.write(data1);
-
-    OutputStream stream = entries.get(0).getOutputStream();
-    Assert.assertTrue(stream instanceof BlockOutputStream);
-    BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
-    List<PipelineID> pipelineList = new ArrayList<>();
     long containerID;
+    List<Long> containerList = new ArrayList<>();
     for (BlockOutputStreamEntry entry : entries) {
       containerID = entry.getBlockID().getContainerID();
       ContainerInfo container =
@@ -186,18 +184,40 @@ public class TestOzoneClientRetriesOnException {
       Pipeline pipeline =
           cluster.getStorageContainerManager().getPipelineManager()
               .getPipeline(container.getPipelineID());
-      pipelineList.add(pipeline.getId());
+      XceiverClientSpi xceiverClient =
+          xceiverClientManager.acquireClient(pipeline);
+      if (!containerList.contains(containerID)) {
+        xceiverClient.sendCommand(ContainerTestHelper
+            .getCreateContainerRequest(containerID, pipeline));
+      }
+      xceiverClientManager.releaseClient(xceiverClient, false);
     }
-    ContainerTestHelper.waitForPipelineClose(key, cluster, false);
+    key.write(data1);
+    OutputStream stream = entries.get(0).getOutputStream();
+    Assert.assertTrue(stream instanceof BlockOutputStream);
+    BlockOutputStream blockOutputStream = (BlockOutputStream) stream;
+    ContainerTestHelper.waitForContainerClose(key, cluster);
     try {
       key.write(data1);
+      Assert.fail("Expected exception not thrown");
     } catch (IOException ioe) {
       Assert.assertTrue(keyOutputStream.checkForException(blockOutputStream
-          .getIoException()) instanceof GroupMismatchException);
+          .getIoException()) instanceof ContainerNotOpenException);
       Assert.assertTrue(ioe.getMessage().contains(
           "Retry request failed. retries get failed due to exceeded maximum "
               + "allowed retries number: 3"));
     }
+    try {
+      key.flush();
+      Assert.fail("Expected exception not thrown");
+    } catch (IOException ioe) {
+      Assert.assertTrue(ioe.getMessage().contains("Stream is closed"));
+    }
+    try {
+      key.close();
+    } catch (IOException ioe) {
+      Assert.fail("Expected should not be thrown");
+    }
   }
 
   private OzoneOutputStream createKey(String keyName, ReplicationType type,

+ 3 - 1
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java

@@ -727,7 +727,9 @@ public final class ContainerTestHelper {
         keyOutputStream.getLocationInfoList();
     List<Long> containerIdList = new ArrayList<>();
     for (OmKeyLocationInfo info : locationInfoList) {
-      containerIdList.add(info.getContainerID());
+      long id = info.getContainerID();
+      if (!containerIdList.contains(id))
+      containerIdList.add(id);
     }
     Assert.assertTrue(!containerIdList.isEmpty());
     waitForContainerClose(cluster, containerIdList.toArray(new Long[0]));