Browse Source

MAPREDUCE-2243. Close streams propely in a finally-block to avoid leakage in CompletedJobStatusStore, TaskLog, EventWriter and TotalOrderPartitioner. Contributed by Devaraj K

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1152787 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 14 years ago
parent
commit
7e18c90d39

+ 4 - 0
mapreduce/CHANGES.txt

@@ -362,6 +362,10 @@ Trunk (unreleased changes)
     MAPREDUCE-2463. Job history files are not moved to done folder when job
     history location is hdfs.  (Devaraj K via szetszwo)
 
+    MAPREDUCE-2243. Close streams propely in a finally-block to avoid leakage
+    in CompletedJobStatusStore, TaskLog, EventWriter and TotalOrderPartitioner.
+    (Devaraj K via szetszwo)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

+ 6 - 1
mapreduce/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
 /**
@@ -172,8 +173,9 @@ class CompletedJobStatusStore implements Runnable {
     if (active && retainTime > 0) {
       JobID jobId = job.getStatus().getJobID();
       Path jobStatusFile = getInfoFilePath(jobId);
+      FSDataOutputStream dataOut = null;
       try {
-        FSDataOutputStream dataOut = fs.create(jobStatusFile);
+        dataOut = fs.create(jobStatusFile);
 
         job.getStatus().write(dataOut);
 
@@ -189,6 +191,7 @@ class CompletedJobStatusStore implements Runnable {
         }
 
         dataOut.close();
+        dataOut = null; // set dataOut to null explicitly so that close in finally will not be executed again.
       } catch (IOException ex) {
         LOG.warn("Could not store [" + jobId + "] job info : " +
                  ex.getMessage(), ex);
@@ -198,6 +201,8 @@ class CompletedJobStatusStore implements Runnable {
         catch (IOException ex1) {
           //ignore
         }
+      } finally {
+        IOUtils.cleanup(LOG, dataOut);
       }
     }
   }

+ 55 - 41
mapreduce/src/java/org/apache/hadoop/mapred/TaskLog.java

@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SecureIOUtils;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.util.ProcessTree;
@@ -111,34 +112,42 @@ public class TaskLog {
     //stderr:<start-offset in the stderr file> <length>
     //syslog:<start-offset in the syslog file> <length>
     LogFileDetail l = new LogFileDetail();
-    String str = fis.readLine();
-    if (str == null) { //the file doesn't have anything
-      throw new IOException ("Index file for the log of " + taskid+" doesn't exist.");
-    }
-    l.location = str.substring(str.indexOf(LogFileDetail.LOCATION)+
-        LogFileDetail.LOCATION.length());
-    //special cases are the debugout and profile.out files. They are guaranteed
-    //to be associated with each task attempt since jvm reuse is disabled
-    //when profiling/debugging is enabled
-    if (filter.equals(LogName.DEBUGOUT) || filter.equals(LogName.PROFILE)) {
-      l.length = new File(l.location, filter.toString()).length();
-      l.start = 0;
-      fis.close();
-      return l;
-    }
-    str = fis.readLine();
-    while (str != null) {
-      //look for the exact line containing the logname
-      if (str.contains(filter.toString())) {
-        str = str.substring(filter.toString().length()+1);
-        String[] startAndLen = str.split(" ");
-        l.start = Long.parseLong(startAndLen[0]);
-        l.length = Long.parseLong(startAndLen[1]);
-        break;
+    String str = null;
+    try {
+      str = fis.readLine();
+      if (str == null) { // the file doesn't have anything
+        throw new IOException("Index file for the log of " + taskid
+            + " doesn't exist.");
+      }
+      l.location = str.substring(str.indexOf(LogFileDetail.LOCATION)
+          + LogFileDetail.LOCATION.length());
+      // special cases are the debugout and profile.out files. They are
+      // guaranteed
+      // to be associated with each task attempt since jvm reuse is disabled
+      // when profiling/debugging is enabled
+      if (filter.equals(LogName.DEBUGOUT) || filter.equals(LogName.PROFILE)) {
+        l.length = new File(l.location, filter.toString()).length();
+        l.start = 0;
+        fis.close();
+        return l;
       }
       str = fis.readLine();
+      while (str != null) {
+        // look for the exact line containing the logname
+        if (str.contains(filter.toString())) {
+          str = str.substring(filter.toString().length() + 1);
+          String[] startAndLen = str.split(" ");
+          l.start = Long.parseLong(startAndLen[0]);
+          l.length = Long.parseLong(startAndLen[1]);
+          break;
+        }
+        str = fis.readLine();
+      }
+      fis.close();
+      fis = null;
+    } finally {
+      IOUtils.cleanup(LOG, fis);
     }
-    fis.close();
     return l;
   }
   
@@ -189,22 +198,27 @@ public class TaskLog {
     //LOG_DIR: <the dir where the task logs are really stored>
     //STDOUT: <start-offset in the stdout file> <length>
     //STDERR: <start-offset in the stderr file> <length>
-    //SYSLOG: <start-offset in the syslog file> <length>    
-    dos.writeBytes(LogFileDetail.LOCATION + logLocation + "\n"
-        + LogName.STDOUT.toString() + ":");
-    dos.writeBytes(Long.toString(prevOutLength) + " ");
-    dos.writeBytes(Long.toString(new File(logLocation, LogName.STDOUT
-        .toString()).length() - prevOutLength)
-        + "\n" + LogName.STDERR + ":");
-    dos.writeBytes(Long.toString(prevErrLength) + " ");
-    dos.writeBytes(Long.toString(new File(logLocation, LogName.STDERR
-        .toString()).length() - prevErrLength)
-        + "\n" + LogName.SYSLOG.toString() + ":");
-    dos.writeBytes(Long.toString(prevLogLength) + " ");
-    dos.writeBytes(Long.toString(new File(logLocation, LogName.SYSLOG
-        .toString()).length() - prevLogLength)
-        + "\n");
-    dos.close();
+    //SYSLOG: <start-offset in the syslog file> <length>   
+    try{
+      dos.writeBytes(LogFileDetail.LOCATION + logLocation + "\n"
+          + LogName.STDOUT.toString() + ":");
+      dos.writeBytes(Long.toString(prevOutLength) + " ");
+      dos.writeBytes(Long.toString(new File(logLocation, LogName.STDOUT
+          .toString()).length() - prevOutLength)
+          + "\n" + LogName.STDERR + ":");
+      dos.writeBytes(Long.toString(prevErrLength) + " ");
+      dos.writeBytes(Long.toString(new File(logLocation, LogName.STDERR
+          .toString()).length() - prevErrLength)
+          + "\n" + LogName.SYSLOG.toString() + ":");
+      dos.writeBytes(Long.toString(prevLogLength) + " ");
+      dos.writeBytes(Long.toString(new File(logLocation, LogName.SYSLOG
+          .toString()).length() - prevLogLength)
+          + "\n");
+      dos.close();
+      dos = null;
+    } finally {
+      IOUtils.cleanup(LOG, dos);
+    }
 
     File indexFile = getIndexFile(currentTaskid, isCleanup);
     Path indexFilePath = new Path(indexFile.getAbsolutePath());

+ 11 - 2
mapreduce/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java

@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.CounterGroup;
 import org.apache.hadoop.mapreduce.Counters;
@@ -33,6 +34,8 @@ import org.apache.avro.io.DatumWriter;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.util.Utf8;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * Event Writer is an utility class used to write events to the underlying
@@ -47,6 +50,7 @@ class EventWriter {
   private DatumWriter<Event> writer =
     new SpecificDatumWriter<Event>(Event.class);
   private Encoder encoder;
+  private static final Log LOG = LogFactory.getLog(EventWriter.class);
   
   EventWriter(FSDataOutputStream out) throws IOException {
     this.out = out;
@@ -72,8 +76,13 @@ class EventWriter {
   }
 
   void close() throws IOException {
-    encoder.flush();
-    out.close();
+    try {
+      encoder.flush();
+      out.close();
+      out = null;
+    } finally {
+      IOUtils.cleanup(LOG, out);
+    }
   }
 
   private static final Schema GROUPS =

+ 13 - 4
mapreduce/src/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java

@@ -23,6 +23,8 @@ import java.lang.reflect.Array;
 import java.util.ArrayList;
 import java.util.Arrays;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configurable;
@@ -30,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.RawComparator;
@@ -56,6 +59,7 @@ public class TotalOrderPartitioner<K extends WritableComparable<?>,V>
   public static final String NATURAL_ORDER = 
     "mapreduce.totalorderpartitioner.naturalorder";
   Configuration conf;
+  private static final Log LOG = LogFactory.getLog(TotalOrderPartitioner.class);
 
   public TotalOrderPartitioner() { }
 
@@ -298,11 +302,16 @@ public class TotalOrderPartitioner<K extends WritableComparable<?>,V>
     ArrayList<K> parts = new ArrayList<K>();
     K key = ReflectionUtils.newInstance(keyClass, conf);
     NullWritable value = NullWritable.get();
-    while (reader.next(key, value)) {
-      parts.add(key);
-      key = ReflectionUtils.newInstance(keyClass, conf);
+    try {
+      while (reader.next(key, value)) {
+        parts.add(key);
+        key = ReflectionUtils.newInstance(keyClass, conf);
+      }
+      reader.close();
+      reader = null;
+    } finally {
+      IOUtils.cleanup(LOG, reader);
     }
-    reader.close();
     return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
   }