瀏覽代碼

HADOOP-14376. Memory leak when reading a compressed file using the native library. Contributed by Eli Acherkan

(cherry picked from commit 7bc217224891b7f7f0a2e35e37e46b36d8c5309d)
Jason Lowe 8 年之前
父節點
當前提交
955e8316d5

+ 10 - 10
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java

@@ -336,15 +336,11 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
     }
     }
 
 
     public void close() throws IOException {
     public void close() throws IOException {
-      if (needsReset) {
-        // In the case that nothing is written to this stream, we still need to
-        // write out the header before closing, otherwise the stream won't be
-        // recognized by BZip2CompressionInputStream.
-        internalReset();
+      try {
+        super.close();
+      } finally {
+        output.close();
       }
       }
-      this.output.flush();
-      this.output.close();
-      needsReset = true;
     }
     }
 
 
   }// end of class BZip2CompressionOutputStream
   }// end of class BZip2CompressionOutputStream
@@ -454,8 +450,12 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec {
 
 
     public void close() throws IOException {
     public void close() throws IOException {
       if (!needsReset) {
       if (!needsReset) {
-        input.close();
-        needsReset = true;
+        try {
+          input.close();
+          needsReset = true;
+        } finally {
+          super.close();
+        }
       }
       }
     }
     }
 
 

+ 8 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java

@@ -157,7 +157,10 @@ public class CodecPool {
         LOG.debug("Got recycled compressor");
         LOG.debug("Got recycled compressor");
       }
       }
     }
     }
-    updateLeaseCount(compressorCounts, compressor, 1);
+    if (compressor != null &&
+        !compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
+      updateLeaseCount(compressorCounts, compressor, 1);
+    }
     return compressor;
     return compressor;
   }
   }
   
   
@@ -184,7 +187,10 @@ public class CodecPool {
         LOG.debug("Got recycled decompressor");
         LOG.debug("Got recycled decompressor");
       }
       }
     }
     }
-    updateLeaseCount(decompressorCounts, decompressor, 1);
+    if (decompressor != null &&
+        !decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
+      updateLeaseCount(decompressorCounts, decompressor, 1);
+    }
     return decompressor;
     return decompressor;
   }
   }
   
   

+ 7 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java

@@ -59,10 +59,13 @@ public abstract class CompressionInputStream extends InputStream implements Seek
 
 
   @Override
   @Override
   public void close() throws IOException {
   public void close() throws IOException {
-    in.close();
-    if (trackedDecompressor != null) {
-      CodecPool.returnDecompressor(trackedDecompressor);
-      trackedDecompressor = null;
+    try {
+      in.close();
+    } finally {
+      if (trackedDecompressor != null) {
+        CodecPool.returnDecompressor(trackedDecompressor);
+        trackedDecompressor = null;
+      }
     }
     }
   }
   }
   
   

+ 11 - 5
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java

@@ -56,11 +56,17 @@ public abstract class CompressionOutputStream extends OutputStream {
 
 
   @Override
   @Override
   public void close() throws IOException {
   public void close() throws IOException {
-    finish();
-    out.close();
-    if (trackedCompressor != null) {
-      CodecPool.returnCompressor(trackedCompressor);
-      trackedCompressor = null;
+    try {
+      finish();
+    } finally {
+      try {
+        out.close();
+      } finally {
+        if (trackedCompressor != null) {
+          CodecPool.returnCompressor(trackedCompressor);
+          trackedCompressor = null;
+        }
+      }
     }
     }
   }
   }
   
   

+ 1 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CompressorStream.java

@@ -103,10 +103,9 @@ public class CompressorStream extends CompressionOutputStream {
   public void close() throws IOException {
   public void close() throws IOException {
     if (!closed) {
     if (!closed) {
       try {
       try {
-        finish();
+        super.close();
       }
       }
       finally {
       finally {
-        out.close();
         closed = true;
         closed = true;
       }
       }
     }
     }

+ 5 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/DecompressorStream.java

@@ -221,8 +221,11 @@ public class DecompressorStream extends CompressionInputStream {
   @Override
   @Override
   public void close() throws IOException {
   public void close() throws IOException {
     if (!closed) {
     if (!closed) {
-      in.close();
-      closed = true;
+      try {
+        super.close();
+      } finally {
+        closed = true;
+      }
     }
     }
   }
   }
 
 

+ 60 - 43
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java

@@ -196,66 +196,83 @@ public class TestCodec {
     
     
     // Compress data
     // Compress data
     DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
     DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
-    CompressionOutputStream deflateFilter = 
+    int leasedCompressorsBefore = codec.getCompressorType() == null ? -1
+        : CodecPool.getLeasedCompressorsCount(codec);
+    try (CompressionOutputStream deflateFilter =
       codec.createOutputStream(compressedDataBuffer);
       codec.createOutputStream(compressedDataBuffer);
-    DataOutputStream deflateOut = 
-      new DataOutputStream(new BufferedOutputStream(deflateFilter));
-    deflateOut.write(data.getData(), 0, data.getLength());
-    deflateOut.flush();
-    deflateFilter.finish();
+      DataOutputStream deflateOut =
+        new DataOutputStream(new BufferedOutputStream(deflateFilter))) {
+      deflateOut.write(data.getData(), 0, data.getLength());
+      deflateOut.flush();
+      deflateFilter.finish();
+    }
+    if (leasedCompressorsBefore > -1) {
+      assertEquals("leased compressor not returned to the codec pool",
+          leasedCompressorsBefore, CodecPool.getLeasedCompressorsCount(codec));
+    }
     LOG.info("Finished compressing data");
     LOG.info("Finished compressing data");
     
     
     // De-compress data
     // De-compress data
     DataInputBuffer deCompressedDataBuffer = new DataInputBuffer();
     DataInputBuffer deCompressedDataBuffer = new DataInputBuffer();
     deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0, 
     deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0, 
                                  compressedDataBuffer.getLength());
                                  compressedDataBuffer.getLength());
-    CompressionInputStream inflateFilter = 
-      codec.createInputStream(deCompressedDataBuffer);
-    DataInputStream inflateIn = 
-      new DataInputStream(new BufferedInputStream(inflateFilter));
-
-    // Check
     DataInputBuffer originalData = new DataInputBuffer();
     DataInputBuffer originalData = new DataInputBuffer();
-    originalData.reset(data.getData(), 0, data.getLength());
-    DataInputStream originalIn = new DataInputStream(new BufferedInputStream(originalData));
-    for(int i=0; i < count; ++i) {
-      RandomDatum k1 = new RandomDatum();
-      RandomDatum v1 = new RandomDatum();
-      k1.readFields(originalIn);
-      v1.readFields(originalIn);
+    int leasedDecompressorsBefore =
+        CodecPool.getLeasedDecompressorsCount(codec);
+    try (CompressionInputStream inflateFilter =
+      codec.createInputStream(deCompressedDataBuffer);
+      DataInputStream inflateIn =
+        new DataInputStream(new BufferedInputStream(inflateFilter))) {
+
+      // Check
+      originalData.reset(data.getData(), 0, data.getLength());
+      DataInputStream originalIn =
+          new DataInputStream(new BufferedInputStream(originalData));
+      for(int i=0; i < count; ++i) {
+        RandomDatum k1 = new RandomDatum();
+        RandomDatum v1 = new RandomDatum();
+        k1.readFields(originalIn);
+        v1.readFields(originalIn);
       
       
-      RandomDatum k2 = new RandomDatum();
-      RandomDatum v2 = new RandomDatum();
-      k2.readFields(inflateIn);
-      v2.readFields(inflateIn);
-      assertTrue("original and compressed-then-decompressed-output not equal",
-                 k1.equals(k2) && v1.equals(v2));
+        RandomDatum k2 = new RandomDatum();
+        RandomDatum v2 = new RandomDatum();
+        k2.readFields(inflateIn);
+        v2.readFields(inflateIn);
+        assertTrue("original and compressed-then-decompressed-output not equal",
+                   k1.equals(k2) && v1.equals(v2));
       
       
-      // original and compressed-then-decompressed-output have the same hashCode
-      Map<RandomDatum, String> m = new HashMap<RandomDatum, String>();
-      m.put(k1, k1.toString());
-      m.put(v1, v1.toString());
-      String result = m.get(k2);
-      assertEquals("k1 and k2 hashcode not equal", result, k1.toString());
-      result = m.get(v2);
-      assertEquals("v1 and v2 hashcode not equal", result, v1.toString());
+        // original and compressed-then-decompressed-output have the same
+        // hashCode
+        Map<RandomDatum, String> m = new HashMap<RandomDatum, String>();
+        m.put(k1, k1.toString());
+        m.put(v1, v1.toString());
+        String result = m.get(k2);
+        assertEquals("k1 and k2 hashcode not equal", result, k1.toString());
+        result = m.get(v2);
+        assertEquals("v1 and v2 hashcode not equal", result, v1.toString());
+      }
     }
     }
+    assertEquals("leased decompressor not returned to the codec pool",
+        leasedDecompressorsBefore,
+        CodecPool.getLeasedDecompressorsCount(codec));
 
 
     // De-compress data byte-at-a-time
     // De-compress data byte-at-a-time
     originalData.reset(data.getData(), 0, data.getLength());
     originalData.reset(data.getData(), 0, data.getLength());
     deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0, 
     deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0, 
                                  compressedDataBuffer.getLength());
                                  compressedDataBuffer.getLength());
-    inflateFilter = 
+    try (CompressionInputStream inflateFilter =
       codec.createInputStream(deCompressedDataBuffer);
       codec.createInputStream(deCompressedDataBuffer);
-
-    // Check
-    originalIn = new DataInputStream(new BufferedInputStream(originalData));
-    int expected;
-    do {
-      expected = originalIn.read();
-      assertEquals("Inflated stream read by byte does not match",
-        expected, inflateFilter.read());
-    } while (expected != -1);
+      DataInputStream originalIn =
+        new DataInputStream(new BufferedInputStream(originalData))) {
+
+      // Check
+      int expected;
+      do {
+        expected = originalIn.read();
+        assertEquals("Inflated stream read by byte does not match",
+            expected, inflateFilter.read());
+      } while (expected != -1);
+    }
 
 
     LOG.info("SUCCESS! Completed checking " + count + " records");
     LOG.info("SUCCESS! Completed checking " + count + " records");
   }
   }