浏览代码

HADOOP-351. Make IPC code independent of Jetty. Contributed by Devaraj Das.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@423443 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 年之前
父节点
当前提交
e08cfe5d66
共有 3 个文件被更改,包括 161 次插入2 次删除
  1. 2 0
      CHANGES.txt
  2. 1 2
      src/java/org/apache/hadoop/ipc/Server.java
  3. 158 0
      src/java/org/apache/hadoop/ipc/SocketChannelOutputStream.java

+ 2 - 0
CHANGES.txt

@@ -55,6 +55,8 @@ Trunk (unreleased changes)
     redirected to datanodes where the data is local when possible.
     redirected to datanodes where the data is local when possible.
     (Devaraj Das via cutting)
     (Devaraj Das via cutting)
 
 
+16. HADOOP-351.  Make Hadoop IPC kernel independent of Jetty.
+    (Devaraj Das via cutting)
 
 
 Release 0.4.0 - 2006-06-28
 Release 0.4.0 - 2006-06-28
 
 

+ 1 - 2
src/java/org/apache/hadoop/ipc/Server.java

@@ -47,8 +47,7 @@ import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.WritableUtils;
-
-import org.mortbay.http.nio.SocketChannelOutputStream;
+import org.apache.hadoop.ipc.SocketChannelOutputStream;
 
 
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
  * parameter, and return a {@link Writable} as their value.  A service runs on

+ 158 - 0
src/java/org/apache/hadoop/ipc/SocketChannelOutputStream.java

@@ -0,0 +1,158 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+
+/* ------------------------------------------------------------------------------- */
+/** 
+ * Blocking output stream on non-blocking SocketChannel.  Makes the 
+ * assumption that writes will rarely need to block.
+ * All writes flush to the channel, and no additional buffering is done.
+ */
+class SocketChannelOutputStream extends OutputStream {    
+    
+    ByteBuffer buffer;
+    ByteBuffer flush;
+    SocketChannel channel;
+    Selector selector;
+    
+    /* ------------------------------------------------------------------------------- */
+    /** Constructor.
+     * 
+     */
+    public SocketChannelOutputStream(SocketChannel channel, int bufferSize)
+    {
+        this.channel = channel;
+        buffer = ByteBuffer.allocateDirect(bufferSize);
+    }
+
+    /* ------------------------------------------------------------------------------- */
+    /*
+     * @see java.io.OutputStream#write(int)
+     */
+    public void write(int b) throws IOException
+    {
+        buffer.clear();
+        buffer.put((byte)b);
+        buffer.flip();
+        flush = buffer;
+        flushBuffer();
+    }
+
+    
+    /* ------------------------------------------------------------------------------- */
+    /*
+     * @see java.io.OutputStream#close()
+     */
+    public void close() throws IOException
+    {
+        channel.close();
+    }
+
+    /* ------------------------------------------------------------------------------- */
+    /*
+     * @see java.io.OutputStream#flush()
+     */
+    public void flush() throws IOException
+    {
+    }
+
+    /* ------------------------------------------------------------------------------- */
+    /*
+     * @see java.io.OutputStream#write(byte[], int, int)
+     */
+    public void write(byte[] buf, int offset, int length) throws IOException
+    {
+        if (length > buffer.capacity())
+            flush = ByteBuffer.wrap(buf,offset,length);
+        else
+         {
+             buffer.clear();
+             buffer.put(buf,offset,length);
+             buffer.flip();
+             flush = buffer;
+         }
+         flushBuffer();
+    }
+
+    /* ------------------------------------------------------------------------------- */
+    /*
+     * @see java.io.OutputStream#write(byte[])
+     */
+    public void write(byte[] buf) throws IOException
+    {
+        if (buf.length > buffer.capacity())
+            flush = ByteBuffer.wrap(buf);
+        else
+         {
+             buffer.clear();
+             buffer.put(buf);
+             buffer.flip();
+             flush = buffer;
+         }
+         flushBuffer();
+    }
+
+
+    /* ------------------------------------------------------------------------------- */
+    private void flushBuffer() throws IOException
+    {
+        while (flush.hasRemaining())
+        {
+            int len = channel.write(flush);
+            if (len < 0)
+                throw new IOException("EOF");
+            if (len == 0)
+            {
+                // write channel full.  Try letting other threads have a go.
+                Thread.yield();
+                len = channel.write(flush);
+                if (len < 0)
+                    throw new IOException("EOF");
+                if (len == 0)
+                {
+                    // still full.  need to  block until it is writable.
+                    if (selector==null)
+                     {
+                            selector = Selector.open();
+                            channel.register(selector, SelectionKey.OP_WRITE);
+                     }
+
+                     selector.select();
+                }
+            }
+        }
+    }
+
+    /* ------------------------------------------------------------------------------- */
+    public void destroy()
+    {
+        if (selector != null)
+        {
+            try{ selector.close();}
+            catch(IOException e){}
+            selector = null;
+            buffer = null;
+            flush = null;
+            channel = null;
+        }
+    }
+}