Переглянути джерело

HDDS-1509. TestBlockOutputStreamWithFailures#test2DatanodesFailure fails intermittently. Contributed by Shashikant Banerjee (#805).

Shashikant Banerjee 6 роки тому
батько
коміт
83549dbbea

+ 18 - 7
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryPolicy;
@@ -37,6 +38,7 @@ import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.ratis.protocol.AlreadyClosedException;
 import org.apache.ratis.protocol.AlreadyClosedException;
 import org.apache.ratis.protocol.GroupMismatchException;
 import org.apache.ratis.protocol.GroupMismatchException;
+import org.apache.ratis.protocol.NotReplicatedException;
 import org.apache.ratis.protocol.RaftRetryFailureException;
 import org.apache.ratis.protocol.RaftRetryFailureException;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
@@ -259,15 +261,24 @@ public class KeyOutputStream extends OutputStream {
     if (!retryFailure) {
     if (!retryFailure) {
       closedContainerException = checkIfContainerIsClosed(t);
       closedContainerException = checkIfContainerIsClosed(t);
     }
     }
-    PipelineID pipelineId = null;
+    Pipeline pipeline = streamEntry.getPipeline();
+    PipelineID pipelineId = pipeline.getId();
     long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength();
     long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength();
     //set the correct length for the current stream
     //set the correct length for the current stream
     streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
     streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
     long bufferedDataLen = blockOutputStreamEntryPool.computeBufferData();
     long bufferedDataLen = blockOutputStreamEntryPool.computeBufferData();
-    LOG.debug(
-        "Encountered exception {}. The last committed block length is {}, "
-            + "uncommitted data length is {} retry count {}", exception,
-        totalSuccessfulFlushedData, bufferedDataLen, retryCount);
+    if (closedContainerException) {
+      LOG.debug(
+          "Encountered exception {}. The last committed block length is {}, "
+              + "uncommitted data length is {} retry count {}", exception,
+          totalSuccessfulFlushedData, bufferedDataLen, retryCount);
+    } else {
+      LOG.warn(
+          "Encountered exception {} on the pipeline {}. "
+              + "The last committed block length is {}, "
+              + "uncommitted data length is {} retry count {}", exception,
+          pipeline, totalSuccessfulFlushedData, bufferedDataLen, retryCount);
+    }
     Preconditions.checkArgument(
     Preconditions.checkArgument(
         bufferedDataLen <= blockOutputStreamEntryPool.getStreamBufferMaxSize());
         bufferedDataLen <= blockOutputStreamEntryPool.getStreamBufferMaxSize());
     Preconditions.checkArgument(
     Preconditions.checkArgument(
@@ -282,8 +293,8 @@ public class KeyOutputStream extends OutputStream {
     if (closedContainerException) {
     if (closedContainerException) {
       excludeList.addConatinerId(ContainerID.valueof(containerId));
       excludeList.addConatinerId(ContainerID.valueof(containerId));
     } else if (retryFailure || t instanceof TimeoutException
     } else if (retryFailure || t instanceof TimeoutException
-        || t instanceof GroupMismatchException) {
-      pipelineId = streamEntry.getPipeline().getId();
+        || t instanceof GroupMismatchException
+        || t instanceof NotReplicatedException) {
       excludeList.addPipeline(pipelineId);
       excludeList.addPipeline(pipelineId);
     }
     }
     // just clean up the current stream.
     // just clean up the current stream.

+ 30 - 12
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java

@@ -36,6 +36,7 @@ import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.ratis.protocol.GroupMismatchException;
 import org.apache.ratis.protocol.RaftRetryFailureException;
 import org.apache.ratis.protocol.RaftRetryFailureException;
 import org.junit.After;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assert;
@@ -75,7 +76,8 @@ public class TestBlockOutputStreamWithFailures {
    *
    *
    * @throws IOException
    * @throws IOException
    */
    */
-  @Before public void init() throws Exception {
+  @Before
+  public void init() throws Exception {
     chunkSize = 100;
     chunkSize = 100;
     flushSize = 2 * chunkSize;
     flushSize = 2 * chunkSize;
     maxFlushSize = 2 * flushSize;
     maxFlushSize = 2 * flushSize;
@@ -110,13 +112,15 @@ public class TestBlockOutputStreamWithFailures {
   /**
   /**
    * Shutdown MiniDFSCluster.
    * Shutdown MiniDFSCluster.
    */
    */
-  @After public void shutdown() {
+  @After
+  public void shutdown() {
     if (cluster != null) {
     if (cluster != null) {
       cluster.shutdown();
       cluster.shutdown();
     }
     }
   }
   }
 
 
-  @Test public void testWatchForCommitWithCloseContainerException()
+  @Test
+  public void testWatchForCommitWithCloseContainerException()
       throws Exception {
       throws Exception {
     XceiverClientMetrics metrics =
     XceiverClientMetrics metrics =
         XceiverClientManager.getXceiverClientMetrics();
         XceiverClientManager.getXceiverClientMetrics();
@@ -256,7 +260,8 @@ public class TestBlockOutputStreamWithFailures {
     validateData(keyName, dataString.concat(dataString).getBytes());
     validateData(keyName, dataString.concat(dataString).getBytes());
   }
   }
 
 
-  @Test public void testWatchForCommitDatanodeFailure() throws Exception {
+  @Test
+  public void testWatchForCommitDatanodeFailure() throws Exception {
     XceiverClientMetrics metrics =
     XceiverClientMetrics metrics =
         XceiverClientManager.getXceiverClientMetrics();
         XceiverClientManager.getXceiverClientMetrics();
     long writeChunkCount =
     long writeChunkCount =
@@ -388,7 +393,8 @@ public class TestBlockOutputStreamWithFailures {
     validateData(keyName, dataString.concat(dataString).getBytes());
     validateData(keyName, dataString.concat(dataString).getBytes());
   }
   }
 
 
-  @Test public void test2DatanodesFailure() throws Exception {
+  @Test
+  public void test2DatanodesFailure() throws Exception {
     XceiverClientMetrics metrics =
     XceiverClientMetrics metrics =
         XceiverClientManager.getXceiverClientMetrics();
         XceiverClientManager.getXceiverClientMetrics();
     long writeChunkCount =
     long writeChunkCount =
@@ -494,8 +500,15 @@ public class TestBlockOutputStreamWithFailures {
     // rewritten plus one partial chunk plus two putBlocks for flushSize
     // rewritten plus one partial chunk plus two putBlocks for flushSize
     // and one flush for partial chunk
     // and one flush for partial chunk
     key.flush();
     key.flush();
-    Assert.assertTrue(HddsClientUtils.checkForException(blockOutputStream
-        .getIoException()) instanceof RaftRetryFailureException);
+
+    // Since, 2 datanodes went down, if the pipeline gets destroyed quickly,
+    // it will hit GroupMismatchException else, it will fail with
+    // RaftRetryFailureException
+    Assert.assertTrue((HddsClientUtils.
+        checkForException(blockOutputStream
+            .getIoException()) instanceof RaftRetryFailureException)
+        || HddsClientUtils.checkForException(
+        blockOutputStream.getIoException()) instanceof GroupMismatchException);
     // Make sure the retryCount is reset after the exception is handled
     // Make sure the retryCount is reset after the exception is handled
     Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
     Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
     // now close the stream, It will update the ack length after watchForCommit
     // now close the stream, It will update the ack length after watchForCommit
@@ -524,7 +537,8 @@ public class TestBlockOutputStreamWithFailures {
     validateData(keyName, data1);
     validateData(keyName, data1);
   }
   }
 
 
-  @Test public void testFailureWithPrimeSizedData() throws Exception {
+  @Test
+  public void testFailureWithPrimeSizedData() throws Exception {
     XceiverClientMetrics metrics =
     XceiverClientMetrics metrics =
         XceiverClientManager.getXceiverClientMetrics();
         XceiverClientManager.getXceiverClientMetrics();
     long writeChunkCount =
     long writeChunkCount =
@@ -644,7 +658,8 @@ public class TestBlockOutputStreamWithFailures {
     validateData(keyName, dataString.concat(dataString).getBytes());
     validateData(keyName, dataString.concat(dataString).getBytes());
   }
   }
 
 
-  @Test public void testExceptionDuringClose() throws Exception {
+  @Test
+  public void testExceptionDuringClose() throws Exception {
     XceiverClientMetrics metrics =
     XceiverClientMetrics metrics =
         XceiverClientManager.getXceiverClientMetrics();
         XceiverClientManager.getXceiverClientMetrics();
     long writeChunkCount =
     long writeChunkCount =
@@ -758,7 +773,8 @@ public class TestBlockOutputStreamWithFailures {
     validateData(keyName, dataString.concat(dataString).getBytes());
     validateData(keyName, dataString.concat(dataString).getBytes());
   }
   }
 
 
-  @Test public void testWatchForCommitWithSingleNodeRatis() throws Exception {
+  @Test
+  public void testWatchForCommitWithSingleNodeRatis() throws Exception {
     XceiverClientMetrics metrics =
     XceiverClientMetrics metrics =
         XceiverClientManager.getXceiverClientMetrics();
         XceiverClientManager.getXceiverClientMetrics();
     long writeChunkCount =
     long writeChunkCount =
@@ -898,7 +914,8 @@ public class TestBlockOutputStreamWithFailures {
     validateData(keyName, dataString.concat(dataString).getBytes());
     validateData(keyName, dataString.concat(dataString).getBytes());
   }
   }
 
 
-  @Test public void testDatanodeFailureWithSingleNodeRatis() throws Exception {
+  @Test
+  public void testDatanodeFailureWithSingleNodeRatis() throws Exception {
     XceiverClientMetrics metrics =
     XceiverClientMetrics metrics =
         XceiverClientManager.getXceiverClientMetrics();
         XceiverClientManager.getXceiverClientMetrics();
     long writeChunkCount =
     long writeChunkCount =
@@ -1037,7 +1054,8 @@ public class TestBlockOutputStreamWithFailures {
     validateData(keyName, dataString.concat(dataString).getBytes());
     validateData(keyName, dataString.concat(dataString).getBytes());
   }
   }
 
 
-  @Test public void testDatanodeFailureWithPreAllocation() throws Exception {
+  @Test
+  public void testDatanodeFailureWithPreAllocation() throws Exception {
     XceiverClientMetrics metrics =
     XceiverClientMetrics metrics =
         XceiverClientManager.getXceiverClientMetrics();
         XceiverClientManager.getXceiverClientMetrics();
     long writeChunkCount =
     long writeChunkCount =