Browse Source

Merge -r 539131:539135 from trunk to 0.13 branch. Fixes: HADOOP-1369 and HADOOP-1361.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/branches/branch-0.13@539136 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 years ago
parent
commit
56f96061ab

+ 6 - 0
CHANGES.txt

@@ -398,6 +398,12 @@ Branch 0.13 (unreleased changes)
 119. HADOOP-1368.  Fix inconsistent synchronization in JobInProgress.
      (omalley via cutting)
 
+120. HADOOP-1369.  Fix inconsistent synchronization in TaskTracker.
+     (omalley via cutting)
+
+121. HADOOP-1361.  Fix various calls to skipBytes() to check return
+     value. (Hairong Kuang via cutting)
+
 
 Release 0.12.3 - 2007-04-06
 

+ 1 - 1
src/java/org/apache/hadoop/io/Text.java

@@ -230,7 +230,7 @@ public class Text implements WritableComparable {
   /** Skips over one Text in the input. */
   public static void skip(DataInput in) throws IOException {
     int length = WritableUtils.readVInt(in);
-    in.skipBytes(length);
+    WritableUtils.skipFully(in, length);
   }
 
   /** serialize

+ 1 - 1
src/java/org/apache/hadoop/io/UTF8.java

@@ -110,7 +110,7 @@ public class UTF8 implements WritableComparable {
   /** Skips over one UTF8 in the input. */
   public static void skip(DataInput in) throws IOException {
     int length = in.readUnsignedShort();
-    in.skipBytes(length);
+    WritableUtils.skipFully(in, length);
   }
 
   public void write(DataOutput out) throws IOException {

+ 22 - 1
src/java/org/apache/hadoop/io/WritableUtils.java

@@ -48,7 +48,9 @@ public final class WritableUtils  {
 
   public static void skipCompressedByteArray(DataInput in) throws IOException {
     int length = in.readInt();
-    if (length != -1) in.skipBytes(length);
+    if (length != -1) {
+      skipFully(in, length);
+    }
   }
 
   public static int  writeCompressedByteArray(DataOutput out, byte[] bytes) throws IOException {
@@ -382,4 +384,23 @@ public final class WritableUtils  {
     throws IOException{
     Text.writeString(out, enumVal.name()); 
   }
+  /**
+   * Skip <i>len</i> number of bytes in input stream<i>in</i>
+   * @param in input stream
+   * @param len number of bytes to skip
+   * @throws IOException when skipped less number of bytes
+   */
+  public static void skipFully(DataInput in, int len) throws IOException {
+    int total = 0;
+    int cur = 0;
+
+    while ((total<len) && ((cur = (int) in.skipBytes(len-total)) > 0)) {
+        total += cur;
+    }
+
+    if (total<len) {
+      throw new IOException("Not able to skip " + len + " bytes, possibly " +
+                            "due to end of input.");
+    }
+  }
 }

+ 15 - 17
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -24,7 +24,6 @@ import java.io.OutputStream;
 import java.io.PrintStream;
 import java.net.BindException;
 import java.net.InetSocketAddress;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -37,8 +36,6 @@ import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.regex.Pattern;
-import java.util.Collections;
-import java.util.Collection;
 
 import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
@@ -803,17 +800,17 @@ public class TaskTracker
     // else resend the previous status information.
     //
     if (status == null) {
-      List<TaskStatus> taskReports = 
-        new ArrayList<TaskStatus>(runningTasks.size());
       synchronized (this) {
+        List<TaskStatus> taskReports = 
+          new ArrayList<TaskStatus>(runningTasks.size());
         for (TaskInProgress tip: runningTasks.values()) {
           taskReports.add(tip.createStatus());
         }
+        status = 
+          new TaskTrackerStatus(taskTrackerName, localHostname, 
+                                httpPort, taskReports, 
+                                failures); 
       }
-      status = 
-        new TaskTrackerStatus(taskTrackerName, localHostname, 
-                              httpPort, taskReports, 
-                              failures); 
     } else {
       LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() +
                "' with reponseId '" + heartbeatResponseId);
@@ -822,14 +819,15 @@ public class TaskTracker
     //
     // Check if we should ask for a new Task
     //
-    boolean askForNewTask = false; 
-    if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) &&
-        acceptNewTasks) {
+    boolean askForNewTask;
+    synchronized (this) {
+      askForNewTask = (mapTotal < maxCurrentTasks || 
+                       reduceTotal < maxCurrentTasks) &&
+                      acceptNewTasks; 
+    }
+    if (askForNewTask) {
       checkLocalDirs(fConf.getLocalDirs());
-        
-      if (enoughFreeSpace(minSpaceStart)) {
-        askForNewTask = true;
-      }
+      askForNewTask = enoughFreeSpace(minSpaceStart);
     }
       
     //
@@ -1453,7 +1451,7 @@ public class TaskTracker
     /**
      * The map output has been lost.
      */
-    public synchronized void mapOutputLost(String failure
+    private synchronized void mapOutputLost(String failure
                                            ) throws IOException {
       if (runstate == TaskStatus.State.SUCCEEDED) {
         LOG.info("Reporting output lost:"+task.getTaskId());