Kaynağa Gözat

HADOOP-3008. SocketIOWithTimeout throws InterruptedIOException if the
thread is interrupted while it is waiting. (rangadi)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@637985 13f79535-47bb-0310-9956-ffa450edef68

Raghu Angadi 17 yıl önce
ebeveyn
işleme
64d84d8a7c

+ 3 - 0
CHANGES.txt

@@ -257,6 +257,9 @@ Trunk (unreleased changes)
     hodlib/Hod/hod.py for Python < 2.5.1.
     (Vinod Kumar Vavilapalli via ddas)
 
+    HADOOP-3008. SocketIOWithTimeout throws InterruptedIOException if the
+    thread is interrupted while it is waiting. (rangadi)
+    
 Release 0.16.1 - 2008-03-13
 
   INCOMPATIBLE CHANGES

+ 8 - 5
src/java/org/apache/hadoop/net/SocketIOWithTimeout.java

@@ -19,16 +19,12 @@
 package org.apache.hadoop.net;
 
 import java.io.IOException;
-import java.net.Socket;
-import java.net.SocketAddress;
+import java.io.InterruptedIOException;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
-import java.nio.channels.DatagramChannel;
-import java.nio.channels.Pipe;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
 import java.nio.channels.spi.SelectorProvider;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -253,6 +249,13 @@ abstract class SocketIOWithTimeout {
               return 0;
             }
           }
+          
+          if (Thread.currentThread().isInterrupted()) {
+            throw new InterruptedIOException("Interruped while waiting for " +
+                                             "IO on channel " + channel +
+                                             ". " + timeout + 
+                                             " millis timeout left.");
+          }
         }
       } finally {
         if (key != null) {

+ 44 - 2
src/test/org/apache/hadoop/net/TestSocketIOWithTimeout.java

@@ -38,7 +38,7 @@ import junit.framework.TestCase;
  */
 public class TestSocketIOWithTimeout extends TestCase {
 
-  Log LOG = LogFactory.getLog(TestSocketIOWithTimeout.class);
+  static Log LOG = LogFactory.getLog(TestSocketIOWithTimeout.class);
   
   private static int TIMEOUT = 1*1000; 
   private static String TEST_STRING = "1234567890";
@@ -61,12 +61,33 @@ public class TestSocketIOWithTimeout extends TestCase {
         long diff = System.currentTimeMillis() - start;
         LOG.info("Got SocketTimeoutException as expected after " + 
                  diff + " millis : " + e.getMessage());
-        assertTrue(Math.abs(TIMEOUT - diff) <= 100);
+        assertTrue(Math.abs(TIMEOUT - diff) <= 200);
         break;
       }
     }
   }
   
+  /**
+   * Just reads one byte from the input stream.
+   */
+  static class ReadRunnable implements Runnable {
+    private InputStream in;
+
+    public ReadRunnable(InputStream in) {
+      this.in = in;
+    }
+    public void run() {
+      try {
+        in.read();
+      } catch (IOException e) {
+        LOG.info("Got expection while reading as expected : " + 
+                 e.getMessage());
+        return;
+      }
+      assertTrue(false);
+    }
+  }
+  
   public void testSocketIOWithTimeout() throws IOException {
     
     // first open pipe:
@@ -87,6 +108,27 @@ public class TestSocketIOWithTimeout extends TestCase {
       in.read(readBytes);
       assertTrue(Arrays.equals(writeBytes, readBytes));
       doIO(in, null);
+      
+      /*
+       * Verify that it handles interrupted threads properly.
+       * Use a large timeout and expect the thread to return quickly.
+       */
+      in = new SocketInputStream(source, 0);
+      Thread thread = new Thread(new ReadRunnable(in));
+      thread.start();
+      
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException ignored) {}
+      
+      thread.interrupt();
+      
+      try {
+        thread.join();
+      } catch (InterruptedException e) {
+        throw new IOException("Unexpected InterruptedException : " + e);
+      }
+      
     } finally {
       if (source != null) {
         source.close();