Forráskód Böngészése

HADOOP-637. Fix a memory leak in IPC server. Contributed by Raghu.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@473062 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 éve
szülő
commit
c2b57e6519

+ 4 - 0
CHANGES.txt

@@ -34,6 +34,10 @@ Trunk (unreleased changes)
 10. HADOOP-694.  Fix a NullPointerException in jobtracker.
     (Mahadev Konar via cutting)
 
+11. HADOOP-637.  Fix a memory leak in the IPC server.  Direct buffers
+    are not collected like normal buffers, and provided little
+    advantage.  (Raghu Angadi via cutting)
+
 
 Release 0.8.0 - 2006-11-03
 

+ 12 - 16
src/java/org/apache/hadoop/ipc/Server.java

@@ -366,11 +366,11 @@ public abstract class Server {
       this.channel = channel;
       this.lastContact = lastContact;
       this.data = null;
-      this.dataLengthBuffer = null;
+      this.dataLengthBuffer = ByteBuffer.allocate(4);
       this.socket = channel.socket();
       this.out = new DataOutputStream
         (new BufferedOutputStream(
-         this.channelOut = new SocketChannelOutputStream(channel, 4096)));
+         this.channelOut = new SocketChannelOutputStream( channel )));
       InetAddress addr = socket.getInetAddress();
       if (addr == null) {
         this.hostAddress = "*Unknown*";
@@ -410,31 +410,27 @@ public abstract class Server {
 
     public int readAndProcess() throws IOException, InterruptedException {
       int count = -1;
-      if (dataLengthBuffer == null)
-        dataLengthBuffer = ByteBuffer.allocateDirect(4);
       if (dataLengthBuffer.remaining() > 0) {
-        count = channel.read(dataLengthBuffer);
-        if (count < 0) return count;
-        if (dataLengthBuffer.remaining() == 0) {
-          dataLengthBuffer.flip(); 
-          dataLength = dataLengthBuffer.getInt();
-          data = ByteBuffer.allocateDirect(dataLength);
-        }
-        //return count;
+        count = channel.read(dataLengthBuffer);       
+        if ( count < 0 || dataLengthBuffer.remaining() > 0 ) 
+          return count;        
+        dataLengthBuffer.flip(); 
+        dataLength = dataLengthBuffer.getInt();
+        data = ByteBuffer.allocate(dataLength);
       }
       count = channel.read(data);
       if (data.remaining() == 0) {
         data.flip();
         processData();
-        data = dataLengthBuffer = null; 
+        dataLengthBuffer.flip();
+        data = null; 
       }
       return count;
     }
 
     private void processData() throws  IOException, InterruptedException {
-      byte[] bytes = new byte[dataLength];
-      data.get(bytes);
-      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes));
+      DataInputStream dis =
+          new DataInputStream(new ByteArrayInputStream( data.array() ));
       int id = dis.readInt();                    // try to read an id
         
       if (LOG.isDebugEnabled())

+ 7 - 22
src/java/org/apache/hadoop/ipc/SocketChannelOutputStream.java

@@ -42,10 +42,10 @@ class SocketChannelOutputStream extends OutputStream {
     /** Constructor.
      * 
      */
-    public SocketChannelOutputStream(SocketChannel channel, int bufferSize)
+    public SocketChannelOutputStream(SocketChannel channel)
     {
         this.channel = channel;
-        buffer = ByteBuffer.allocateDirect(bufferSize);
+        buffer = ByteBuffer.allocate(8); // only for small writes
     }
 
     /* ------------------------------------------------------------------------------- */
@@ -85,16 +85,8 @@ class SocketChannelOutputStream extends OutputStream {
      */
     public void write(byte[] buf, int offset, int length) throws IOException
     {
-        if (length > buffer.capacity())
-            flush = ByteBuffer.wrap(buf,offset,length);
-        else
-         {
-             buffer.clear();
-             buffer.put(buf,offset,length);
-             buffer.flip();
-             flush = buffer;
-         }
-         flushBuffer();
+        flush = ByteBuffer.wrap(buf,offset,length);
+        flushBuffer();
     }
 
     /* ------------------------------------------------------------------------------- */
@@ -103,16 +95,8 @@ class SocketChannelOutputStream extends OutputStream {
      */
     public void write(byte[] buf) throws IOException
     {
-        if (buf.length > buffer.capacity())
-            flush = ByteBuffer.wrap(buf);
-        else
-         {
-             buffer.clear();
-             buffer.put(buf);
-             buffer.flip();
-             flush = buffer;
-         }
-         flushBuffer();
+        flush = ByteBuffer.wrap(buf);
+        flushBuffer();
     }
 
 
@@ -144,6 +128,7 @@ class SocketChannelOutputStream extends OutputStream {
                 }
             }
         }
+        flush = null;
     }
 
     /* ------------------------------------------------------------------------------- */