Browse Source

HADOOP-3073. close() on SocketInputStream or SocketOutputStream should close the underlying channel. (rangadi)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@641706 13f79535-47bb-0310-9956-ffa450edef68
Raghu Angadi 17 years ago
parent
commit
c2fcb90a7e

+ 3 - 0
CHANGES.txt

@@ -380,6 +380,9 @@ Trunk (unreleased changes)
     HADOOP-3067. DFSInputStream's position read does not close the sockets.
     (rangadi)
 
+    HADOOP-3073. close() on SocketInputStream or SocketOutputStream should
+    close the underlying channel. (rangadi)
+
 Release 0.16.2 - Unreleased
 
   BUG FIXES

+ 7 - 11
src/java/org/apache/hadoop/dfs/DFSClient.java

@@ -934,17 +934,13 @@ class DFSClient implements FSConstants {
       DataOutputStream out = new DataOutputStream(
         new BufferedOutputStream(NetUtils.getOutputStream(sock,WRITE_TIMEOUT)));
 
-      try {
-        //write the header.
-        out.writeShort( DATA_TRANSFER_VERSION );
-        out.write( OP_READ_BLOCK );
-        out.writeLong( blockId );
-        out.writeLong( startOffset );
-        out.writeLong( len );
-        out.flush();
-      } finally {
-        IOUtils.closeStream(out);
-      }
+      //write the header.
+      out.writeShort( DATA_TRANSFER_VERSION );
+      out.write( OP_READ_BLOCK );
+      out.writeLong( blockId );
+      out.writeLong( startOffset );
+      out.writeLong( len );
+      out.flush();
       
       //
       // Get bytes in block, set streams

+ 15 - 2
src/java/org/apache/hadoop/net/SocketInputStream.java

@@ -39,7 +39,7 @@ import java.nio.channels.SelectionKey;
 public class SocketInputStream extends InputStream
                                implements ReadableByteChannel {
 
-  private SocketIOWithTimeout reader;
+  private Reader reader;
 
   private static class Reader extends SocketIOWithTimeout {
     ReadableByteChannel channel;
@@ -121,10 +121,23 @@ public class SocketInputStream extends InputStream
     return read(ByteBuffer.wrap(b, off, len));
   }
 
-  public void close() throws IOException {
+  public synchronized void close() throws IOException {
+    /* close the channel since Socket.getInputStream().close()
+     * closes the socket.
+     */
+    reader.channel.close();
     reader.close();
   }
 
+  /**
+   * Returns underlying channel used by inputstream.
+   * This is useful in certain cases like channel for 
+   * {@link FileChannel#transferFrom(ReadableByteChannel, long, long)}.
+   */
+  public ReadableByteChannel getChannel() {
+    return reader.channel; 
+  }
+  
   //ReadableByteChannel interface
     
   public boolean isOpen() {

+ 15 - 2
src/java/org/apache/hadoop/net/SocketOutputStream.java

@@ -38,7 +38,7 @@ import java.nio.channels.WritableByteChannel;
 public class SocketOutputStream extends OutputStream 
                                 implements WritableByteChannel {                                
   
-  private SocketIOWithTimeout writer;
+  private Writer writer;
   
   private static class Writer extends SocketIOWithTimeout {
     WritableByteChannel channel;
@@ -116,10 +116,23 @@ public class SocketOutputStream extends OutputStream
     }
   }
 
-  public void close() throws IOException {
+  public synchronized void close() throws IOException {
+    /* close the channel since Socket.getOuputStream().close() 
+     * closes the socket.
+     */
+    writer.channel.close();
     writer.close();
   }
 
+  /**
+   * Returns underlying channel used by this stream.
+   * This is useful in certain cases like channel for 
+   * {@link FileChannel#transferTo(long, long, WritableByteChannel)}
+   */
+  public WritableByteChannel getChannel() {
+    return writer.channel; 
+  }
+
   //WritableByteChannle interface 
   
   public boolean isOpen() {

+ 10 - 0
src/test/org/apache/hadoop/net/TestSocketIOWithTimeout.java

@@ -129,6 +129,16 @@ public class TestSocketIOWithTimeout extends TestCase {
         throw new IOException("Unexpected InterruptedException : " + e);
       }
       
+      //make sure the channels are still open
+      assertTrue(source.isOpen());
+      assertTrue(sink.isOpen());
+      
+      // make sure close() closes the underlying channel.
+      in.close();
+      assertFalse(source.isOpen());
+      out.close();
+      assertFalse(sink.isOpen());
+      
     } finally {
       if (source != null) {
         source.close();