Ver código fonte

HADOOP-1191. Fix a problem with the earlier patch, where interrupts received under an RPC were not causing the thread to exit.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@524929 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 anos atrás
pai
commit
89fecd5cb4

+ 2 - 4
src/java/org/apache/hadoop/ipc/Client.java

@@ -454,16 +454,14 @@ public class Client {
    * <code>address</code>, returning the value.  Throws exceptions if there are
    * network problems or if the remote code threw an exception. */
   public Writable call(Writable param, InetSocketAddress address)
-    throws IOException {
+    throws InterruptedException, IOException {
     Connection connection = getConnection(address);
     Call call = new Call(param);
     synchronized (call) {
       connection.sendParam(call);                 // send the parameter
       long wait = timeout;
       do {
-        try {
-          call.wait(wait);                       // wait for the result
-        } catch (InterruptedException e) {}
+        call.wait(wait);                       // wait for the result
         wait = timeout - (System.currentTimeMillis() - call.lastActivity);
       } while (!call.done && wait > 0);
 

+ 9 - 3
src/java/org/apache/hadoop/mapred/Task.java

@@ -208,6 +208,8 @@ abstract class Task implements Writable, Configurable {
         String status = taskProgress.toString();
         try {
           umbilical.progress(getTaskId(), progress, status, phase, counters);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();     // interrupt ourself
         } catch (IOException ie) {
           LOG.warn(StringUtils.stringifyException(ie));
         }
@@ -222,9 +224,13 @@ abstract class Task implements Writable, Configurable {
       try {
         if (needProgress) {
           // send a final status report
-          umbilical.progress(getTaskId(), taskProgress.get(), 
-                             taskProgress.toString(), phase, counters);
-          needProgress = false;
+          try {
+            umbilical.progress(getTaskId(), taskProgress.get(), 
+                               taskProgress.toString(), phase, counters);
+            needProgress = false;
+          } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();       // interrupt ourself
+          }
         }
         umbilical.done(getTaskId());
         return;

+ 2 - 1
src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.lang.InterruptedException;
 
 import org.apache.hadoop.ipc.VersionedProtocol;
 
@@ -42,7 +43,7 @@ interface TaskUmbilicalProtocol extends VersionedProtocol {
    */
   void progress(String taskid, float progress, String state, 
                 TaskStatus.Phase phase, Counters counters)
-    throws IOException;
+    throws IOException, InterruptedException;
 
   /** Report error messages back to parent.  Calls should be sparing, since all
    *  such messages are held in the job tracker.