Browse Source

HDFS-17080. fix ec connection leak. (#5807)

yangy 3 months ago
parent
commit
815ca41c69

+ 26 - 14
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java

@@ -253,6 +253,9 @@ abstract class StripeReader {
       strategy.getReadBuffer().clear();
       // we want to remember which block replicas we have tried
       corruptedBlocks.addCorruptedBlock(currentBlock, currentNode);
+      if (blockReader != null) {
+        blockReader.close();
+      }
       throw ce;
     } catch (IOException e) {
       DFSClient.LOG.warn("Exception while reading from "
@@ -260,6 +263,9 @@ abstract class StripeReader {
           + currentNode, e);
       //Clear buffer to make next decode success
       strategy.getReadBuffer().clear();
+      if (blockReader != null) {
+        blockReader.close();
+      }
       throw e;
     }
   }
@@ -329,21 +335,26 @@ abstract class StripeReader {
    * read the whole stripe. do decoding if necessary
    */
   void readStripe() throws IOException {
-    for (int i = 0; i < dataBlkNum; i++) {
-      if (alignedStripe.chunks[i] != null &&
-          alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
-        if (!readChunk(targetBlocks[i], i)) {
-          alignedStripe.missingChunksNum++;
+    try {
+      for (int i = 0; i < dataBlkNum; i++) {
+        if (alignedStripe.chunks[i] != null &&
+                alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
+          if (!readChunk(targetBlocks[i], i)) {
+            alignedStripe.missingChunksNum++;
+          }
         }
       }
-    }
-    // There are missing block locations at this stage. Thus we need to read
-    // the full stripe and one more parity block.
-    if (alignedStripe.missingChunksNum > 0) {
-      checkMissingBlocks();
-      readDataForDecoding();
-      // read parity chunks
-      readParityChunks(alignedStripe.missingChunksNum);
+      // There are missing block locations at this stage. Thus we need to read
+      // the full stripe and one more parity block.
+      if (alignedStripe.missingChunksNum > 0) {
+        checkMissingBlocks();
+        readDataForDecoding();
+        // read parity chunks
+        readParityChunks(alignedStripe.missingChunksNum);
+      }
+    } catch (IOException e) {
+      dfsStripedInputStream.close();
+      throw e;
     }
     // TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks
 
@@ -385,7 +396,8 @@ abstract class StripeReader {
         }
       } catch (InterruptedException ie) {
         String err = "Read request interrupted";
-        DFSClient.LOG.error(err);
+        DFSClient.LOG.error(err, ie);
+        dfsStripedInputStream.close();
         clearFutures();
         // Don't decode if read interrupted
         throw new InterruptedIOException(err);

+ 6 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java

@@ -153,7 +153,12 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
     // because we always (except the last split) read one extra line in
     // next() method.
     if (start != 0) {
-      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
+      try {
+        start += in.readLine(new Text(), 0, maxBytesToConsume(start));
+      } catch (Exception e) {
+        close();
+        throw e;
+      }
     }
     this.pos = start;
   }