瀏覽代碼

HDFS-15211. EC: File write hangs during close in case of Exception during updatePipeline. Contributed by Ayush Saxena.

Surendra Singh Lilhore 5 年之前
父節點
當前提交
e9916052b1

+ 32 - 22
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java

@@ -393,6 +393,7 @@ public class DFSStripedOutputStream extends DFSOutputStream
       LOG.debug("newly failed streamers: " + newFailed);
     }
     if (failCount > (numAllBlocks - numDataBlocks)) {
+      closeAllStreamers();
       throw new IOException("Failed: the number of failed blocks = "
           + failCount + " > the number of parity blocks = "
           + (numAllBlocks - numDataBlocks));
@@ -400,6 +401,13 @@ public class DFSStripedOutputStream extends DFSOutputStream
     return newFailed;
   }
 
+  private void closeAllStreamers() {
+    // The write has failed, Close all the streamers.
+    for (StripedDataStreamer streamer : streamers) {
+      streamer.close(true);
+    }
+  }
+
   private void handleCurrentStreamerFailure(String err, Exception e)
       throws IOException {
     currentPacket = null;
@@ -654,6 +662,8 @@ public class DFSStripedOutputStream extends DFSOutputStream
       newFailed = waitCreatingStreamers(healthySet);
       if (newFailed.size() + failedStreamers.size() >
           numAllBlocks - numDataBlocks) {
+        // The write has failed, Close all the streamers.
+        closeAllStreamers();
         throw new IOException(
             "Data streamers failed while creating new block streams: "
                 + newFailed + ". There are not enough healthy streamers.");
@@ -1153,32 +1163,32 @@ public class DFSStripedOutputStream extends DFSOutputStream
 
   @Override
   protected synchronized void closeImpl() throws IOException {
-    if (isClosed()) {
-      exceptionLastSeen.check(true);
-
-      // Writing to at least {dataUnits} replicas can be considered as success,
-      // and the rest of data can be recovered.
-      final int minReplication = ecPolicy.getNumDataUnits();
-      int goodStreamers = 0;
-      final MultipleIOException.Builder b = new MultipleIOException.Builder();
-      for (final StripedDataStreamer si : streamers) {
-        try {
-          si.getLastException().check(true);
-          goodStreamers++;
-        } catch (IOException e) {
-          b.add(e);
+    try {
+      if (isClosed()) {
+        exceptionLastSeen.check(true);
+
+        // Writing to at least {dataUnits} replicas can be considered as
+        //  success, and the rest of data can be recovered.
+        final int minReplication = ecPolicy.getNumDataUnits();
+        int goodStreamers = 0;
+        final MultipleIOException.Builder b = new MultipleIOException.Builder();
+        for (final StripedDataStreamer si : streamers) {
+          try {
+            si.getLastException().check(true);
+            goodStreamers++;
+          } catch (IOException e) {
+            b.add(e);
+          }
         }
-      }
-      if (goodStreamers < minReplication) {
-        final IOException ioe = b.build();
-        if (ioe != null) {
-          throw ioe;
+        if (goodStreamers < minReplication) {
+          final IOException ioe = b.build();
+          if (ioe != null) {
+            throw ioe;
+          }
         }
+        return;
       }
-      return;
-    }
 
-    try {
       try {
         // flush from all upper layers
         flushBuffer();