ソースを参照

MAPREDUCE-5075. DistCp leaks input file handles since ThrottledInputStream does not close the wrapped InputStream. Contributed by Chris Nauroth

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1458741 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 12 年 前
コミット
718f0f92a9

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -250,6 +250,9 @@ Release 2.0.4-alpha - UNRELEASED
 
 
   BUG FIXES
   BUG FIXES
 
 
+    MAPREDUCE-5075. DistCp leaks input file handles since ThrottledInputStream
+    does not close the wrapped InputStream.  (Chris Nauroth via szetszwo)
+
 Release 2.0.3-alpha - 2013-02-06 
 Release 2.0.3-alpha - 2013-02-06 
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 4 - 5
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java

@@ -124,7 +124,7 @@ public class RetriableFileCopyCommand extends RetriableCommand {
             tmpTargetPath, true, BUFFER_SIZE,
             tmpTargetPath, true, BUFFER_SIZE,
             getReplicationFactor(fileAttributes, sourceFileStatus, targetFS, tmpTargetPath),
             getReplicationFactor(fileAttributes, sourceFileStatus, targetFS, tmpTargetPath),
             getBlockSize(fileAttributes, sourceFileStatus, targetFS, tmpTargetPath), context));
             getBlockSize(fileAttributes, sourceFileStatus, targetFS, tmpTargetPath), context));
-    return copyBytes(sourceFileStatus, outStream, BUFFER_SIZE, true, context);
+    return copyBytes(sourceFileStatus, outStream, BUFFER_SIZE, context);
   }
   }
 
 
   private void compareFileLengths(FileStatus sourceFileStatus, Path target,
   private void compareFileLengths(FileStatus sourceFileStatus, Path target,
@@ -170,8 +170,8 @@ public class RetriableFileCopyCommand extends RetriableCommand {
   }
   }
 
 
   private long copyBytes(FileStatus sourceFileStatus, OutputStream outStream,
   private long copyBytes(FileStatus sourceFileStatus, OutputStream outStream,
-                         int bufferSize, boolean mustCloseStream,
-                         Mapper.Context context) throws IOException {
+                         int bufferSize, Mapper.Context context)
+      throws IOException {
     Path source = sourceFileStatus.getPath();
     Path source = sourceFileStatus.getPath();
     byte buf[] = new byte[bufferSize];
     byte buf[] = new byte[bufferSize];
     ThrottledInputStream inStream = null;
     ThrottledInputStream inStream = null;
@@ -187,8 +187,7 @@ public class RetriableFileCopyCommand extends RetriableCommand {
         bytesRead = inStream.read(buf);
         bytesRead = inStream.read(buf);
       }
       }
     } finally {
     } finally {
-      if (mustCloseStream)
-        IOUtils.cleanup(LOG, outStream, inStream);
+      IOUtils.cleanup(LOG, outStream, inStream);
     }
     }
 
 
     return totalBytesRead;
     return totalBytesRead;

+ 5 - 0
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java

@@ -52,6 +52,11 @@ public class ThrottledInputStream extends InputStream {
     this.maxBytesPerSec = maxBytesPerSec;
     this.maxBytesPerSec = maxBytesPerSec;
   }
   }
 
 
+  @Override
+  public void close() throws IOException {
+    rawStream.close();
+  }
+
   /** @inheritDoc */
   /** @inheritDoc */
   @Override
   @Override
   public int read() throws IOException {
   public int read() throws IOException {

+ 1 - 1
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java

@@ -101,7 +101,7 @@ public class TestIntegration {
 
 
     try {
     try {
       addEntries(listFile, "singlefile1/file1");
       addEntries(listFile, "singlefile1/file1");
-      createFiles("singlefile1/file1", target.toString());
+      createFiles("singlefile1/file1", "target");
 
 
       runTest(listFile, target, sync);
       runTest(listFile, target, sync);