Pārlūkot izejas kodu

HADOOP-6170. Add facility to tunnel Avro RPCs through Hadoop RPCs.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@816727 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 15 gadi atpakaļ
vecāks
revīzija
ea50f15407

+ 4 - 0
CHANGES.txt

@@ -196,6 +196,10 @@ Trunk (unreleased changes)
     HADOOP-4952. Add new improved file system interface FileContext for the
     application writer (Sanjay Radia via suresh)
 
+    HADOOP-6170. Add facility to tunnel Avro RPCs through Hadoop RPCs.
+    This permits one to take advantage of both Avro's RPC versioning
+    features and Hadoop's proven RPC scalability.  (cutting)
+
   IMPROVEMENTS
 
     HADOOP-4565. Added CombineFileInputFormat to use data locality information

+ 8 - 0
build.xml

@@ -460,6 +460,14 @@
       <classpath refid="test.core.classpath"/>
      </javac>
 
+    <taskdef
+       name="paranamer" 
+       classname="com.thoughtworks.paranamer.ant.ParanamerGeneratorTask">
+      <classpath refid="classpath" />
+    </taskdef>
+    <paranamer sourceDirectory="${test.src.dir}/core"
+	       outputDirectory="${test.core.build.classes}"/>
+
     <delete dir="${test.cache.data}"/>
     <mkdir dir="${test.cache.data}"/>
     <copy file="${test.src.dir}/core/org/apache/hadoop/cli/testConf.xml" todir="${test.cache.data}"/>

+ 7 - 3
ivy.xml

@@ -271,15 +271,19 @@
     </dependency>
     <dependency org="org.apache.hadoop"
       name="avro"
-      rev="1.0.0"
+      rev="${avro.version}"
       conf="common->default"/>
     <dependency org="org.codehaus.jackson"
       name="jackson-mapper-asl"
-      rev="1.0.1"
+      rev="${jackson.version}"
       conf="common->default"/>
     <dependency org="com.thoughtworks.paranamer"
       name="paranamer"
-      rev="1.5"
+      rev="${paranamer.version}"
+      conf="common->default"/>
+    <dependency org="com.thoughtworks.paranamer"
+      name="paranamer-ant"
+      rev="${paranamer.version}"
       conf="common->default"/>
     </dependencies>
   

+ 6 - 0
ivy/libraries.properties

@@ -16,6 +16,8 @@
 #These are the versions of our dependencies (in alphabetical order)
 apacheant.version=1.7.0
 
+avro.version=1.1.0
+
 checkstyle.version=4.2
 
 commons-cli.version=1.2
@@ -42,6 +44,8 @@ hsqldb.version=1.8.0.10
 #ivy.version=2.0.0-beta2
 ivy.version=2.0.0-rc2
 
+jackson.version=1.0.1
+
 jasper.version=5.5.12
 jsp.version=2.1
 jsp-api.version=5.5.12
@@ -61,6 +65,8 @@ mina-core.version=2.0.0-M5
 
 oro.version=2.0.8
 
+paranamer.version=1.5
+
 rats-lib.version=0.6
 
 servlet.version=4.0.6

+ 1 - 1
src/java/org/apache/hadoop/io/serializer/avro/AvroGenericSerialization.java

@@ -52,7 +52,7 @@ public class AvroGenericSerialization extends AvroSerialization<Object> {
   @Override
   protected Schema getSchema(Object t, Map<String, String> metadata) {
     String jsonSchema = metadata.get(AVRO_SCHEMA_KEY);
-    return jsonSchema != null ? Schema.parse(jsonSchema) : GenericData.induce(t);
+    return jsonSchema != null ? Schema.parse(jsonSchema) : GenericData.get().induce(t);
   }
 
   @Override

+ 2 - 2
src/java/org/apache/hadoop/io/serializer/avro/AvroReflectSerialization.java

@@ -85,7 +85,7 @@ public class AvroReflectSerialization extends AvroSerialization<Object>{
             || "null".equals(clazz.getEnclosingClass().getName())) ? 
               clazz.getPackage().getName() + "." 
               : (clazz.getEnclosingClass().getName() + "$"));
-      return new ReflectDatumReader(ReflectData.getSchema(clazz), prefix);
+      return new ReflectDatumReader(ReflectData.get().getSchema(clazz), prefix);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
@@ -93,7 +93,7 @@ public class AvroReflectSerialization extends AvroSerialization<Object>{
 
   @Override
   protected Schema getSchema(Object t, Map<String, String> metadata) {
-    return ReflectData.getSchema(t.getClass());
+    return ReflectData.get().getSchema(t.getClass());
   }
 
   @Override

+ 2 - 2
src/java/org/apache/hadoop/io/serializer/avro/AvroSpecificSerialization.java

@@ -50,7 +50,7 @@ public class AvroSpecificSerialization
     try {
       Class<SpecificRecord> clazz = (Class<SpecificRecord>)
         getClassFromMetadata(metadata);
-      return new SpecificDatumReader(clazz.newInstance().schema());
+      return new SpecificDatumReader(clazz.newInstance().getSchema());
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
@@ -58,7 +58,7 @@ public class AvroSpecificSerialization
 
   @Override
   protected Schema getSchema(SpecificRecord t, Map<String, String> metadata) {
-    return t.schema();
+    return t.getSchema();
   }
 
   @Override

+ 223 - 0
src/java/org/apache/hadoop/ipc/AvroRpc.java

@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.io.*;
+import java.util.*;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import javax.net.SocketFactory;
+import javax.security.auth.login.LoginException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.net.NetUtils;
+
+import org.apache.avro.*;
+import org.apache.avro.ipc.*;
+import org.apache.avro.reflect.*;
+
+/** Tunnel Avro-format RPC requests over a Hadoop {@link RPC} connection.  This
+ * does not give cross-language wire compatibility, since the Hadoop RPC wire
+ * format is non-standard, but it does permit use of Avro's protocol versioning
+ * features for inter-Java RPCs. */
+public class AvroRpc {
+  private static int VERSION = 0;
+
+  /** Tunnel an Avro RPC request and response through Hadoop's RPC. */
+  private static interface TunnelProtocol extends VersionedProtocol {
+    /** All Avro methods and responses go through this. */
+    BufferListWritable call(BufferListWritable request) throws IOException;
+  }
+
+  /** A Writable that holds a List<ByteBuffer>, The Avro RPC Transceiver's
+   * basic unit of data transfer.*/
+  private static class BufferListWritable implements Writable {
+    private List<ByteBuffer> buffers;
+
+    public BufferListWritable() {}                // required for RPC Writables
+
+    public BufferListWritable(List<ByteBuffer> buffers) {
+      this.buffers = buffers;
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      int size = in.readInt();
+      buffers = new ArrayList<ByteBuffer>(size);
+      for (int i = 0; i < size; i++) {
+        int length = in.readInt();
+        ByteBuffer buffer = ByteBuffer.allocate(length);
+        in.readFully(buffer.array(), 0, length);
+        buffers.add(buffer);
+      }
+    }
+  
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(buffers.size());
+      for (ByteBuffer buffer : buffers) {
+        out.writeInt(buffer.remaining());
+        out.write(buffer.array(), buffer.position(), buffer.remaining());
+      }
+    }
+  }
+
+  /** An Avro RPC Transceiver that tunnels client requests through Hadoop
+   * RPC. */
+  private static class ClientTransceiver extends Transceiver {
+    private TunnelProtocol tunnel;
+    private InetSocketAddress remote;
+  
+    public ClientTransceiver(InetSocketAddress addr,
+                             UserGroupInformation ticket,
+                             Configuration conf, SocketFactory factory)
+      throws IOException {
+      this.tunnel = (TunnelProtocol)RPC.getProxy(TunnelProtocol.class, VERSION,
+                                                 addr, ticket, conf, factory);
+      this.remote = addr;
+    }
+
+    public String getRemoteName() { return remote.toString(); }
+
+    public List<ByteBuffer> transceive(List<ByteBuffer> request)
+      throws IOException {
+      return tunnel.call(new BufferListWritable(request)).buffers;
+    }
+
+    public List<ByteBuffer> readBuffers() throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    public void writeBuffers(List<ByteBuffer> buffers) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    public void close() throws IOException {}
+  }
+    
+  private static class Invoker extends ReflectRequestor {
+    public Invoker(Protocol protocol, Transceiver transceiver)
+      throws IOException {
+      super(protocol, transceiver);
+    }
+  }
+
+  /** Construct a client-side proxy object that implements the named protocol,
+   * talking to a server at the named address. */
+  public static Object getProxy(Class<?> protocol,
+                                InetSocketAddress addr,
+                                Configuration conf)
+    throws IOException {
+    UserGroupInformation ugi = null;
+    try {
+      ugi = UserGroupInformation.login(conf);
+    } catch (LoginException le) {
+      throw new RuntimeException("Couldn't login!");
+    }
+    return getProxy(protocol, addr, ugi, conf,
+                    NetUtils.getDefaultSocketFactory(conf));
+  }
+
+  /** Construct a client-side proxy object that implements the named protocol,
+   * talking to a server at the named address. */
+  public static Object getProxy
+    (final Class<?> protocol, final InetSocketAddress addr,
+     final UserGroupInformation ticket,
+     final Configuration conf, final SocketFactory factory)
+    throws IOException {
+
+    return Proxy.newProxyInstance
+      (protocol.getClassLoader(), new Class[] { protocol },
+       new InvocationHandler() {
+         public Object invoke(Object proxy, Method method, Object[] args) 
+           throws Throwable {
+           return new Invoker
+             (ReflectData.get().getProtocol(protocol),
+              new ClientTransceiver(addr, ticket, conf, factory))
+             .invoke(proxy, method, args);
+         }
+       });
+  }
+
+  /** An Avro RPC Transceiver that provides a request passed through Hadoop RPC
+   * to the Avro RPC Responder for processing. */
+  private static class ServerTransceiver extends Transceiver {
+    List<ByteBuffer> request;
+    
+    public ServerTransceiver(List<ByteBuffer> request) {
+      this.request = request;
+    }
+
+    public String getRemoteName() { return "remote"; }
+
+    public List<ByteBuffer> readBuffers() throws IOException {
+      return request;
+    }
+
+    public void writeBuffers(List<ByteBuffer> buffers) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    public void close() throws IOException {}
+  }
+
+  /** An Avro RPC Responder that can process requests passed via Hadoop RPC. */
+  private static class TunnelResponder extends ReflectResponder
+    implements TunnelProtocol {
+
+    public TunnelResponder(Class iface, Object impl) {
+      super(iface, impl);
+    }
+
+    public long getProtocolVersion(String protocol, long version)
+      throws IOException {
+      return VERSION;
+    }
+
+    public BufferListWritable call(final BufferListWritable request)
+      throws IOException {
+      return new BufferListWritable
+        (respond(new ServerTransceiver(request.buffers)));
+    }
+  }
+
+  /** Construct a server for a protocol implementation instance listening on a
+   * port and address. */
+  public static Server getServer(Object impl, String bindAddress, int port,
+                                 Configuration conf) 
+    throws IOException {
+    return RPC.getServer(new TunnelResponder(impl.getClass(), impl),
+                         bindAddress, port, conf);
+
+  }
+
+  /** Construct a server for a protocol implementation instance listening on a
+   * port and address. */
+  public static RPC.Server getServer(Object impl, String bindAddress, int port,
+                                     int numHandlers, boolean verbose,
+                                     Configuration conf) 
+    throws IOException {
+    return RPC.getServer(new TunnelResponder(impl.getClass(), impl),
+                         bindAddress, port, numHandlers, verbose, conf);
+  }
+
+}

+ 33 - 0
src/test/core/org/apache/hadoop/ipc/AvroTestProtocol.java

@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.util.Utf8;
+
+@SuppressWarnings("serial")
+public interface AvroTestProtocol {
+  public static class Problem extends AvroRemoteException {
+    public Problem() {}
+  }
+  void ping();
+  Utf8 echo(Utf8 value);
+  int add(int v1, int v2);
+  int error() throws Problem;
+}

+ 94 - 0
src/test/core/org/apache/hadoop/ipc/TestAvroRpc.java

@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.*;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.net.NetUtils;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.util.Utf8;
+
+/** Unit tests for AvroRpc. */
+public class TestAvroRpc extends TestCase {
+  private static final String ADDRESS = "0.0.0.0";
+
+  public static final Log LOG =
+    LogFactory.getLog(TestAvroRpc.class);
+  
+  private static Configuration conf = new Configuration();
+
+  int datasize = 1024*100;
+  int numThreads = 50;
+
+  public TestAvroRpc(String name) { super(name); }
+	
+  public static class TestImpl implements AvroTestProtocol {
+
+    public void ping() {}
+    
+    public Utf8 echo(Utf8 value) { return value; }
+
+    public int add(int v1, int v2) { return v1 + v2; }
+
+    public int error() throws Problem {
+      throw new Problem();
+    }
+  }
+
+  public void testCalls() throws Exception {
+    Configuration conf = new Configuration();
+    Server server = AvroRpc.getServer(new TestImpl(), ADDRESS, 0, conf);
+    AvroTestProtocol proxy = null;
+    try {
+      server.start();
+
+      InetSocketAddress addr = NetUtils.getConnectAddress(server);
+      proxy =
+        (AvroTestProtocol)AvroRpc.getProxy(AvroTestProtocol.class, addr, conf);
+      
+      proxy.ping();
+
+      Utf8 utf8Result = proxy.echo(new Utf8("hello world"));
+      assertEquals(new Utf8("hello world"), utf8Result);
+
+      int intResult = proxy.add(1, 2);
+      assertEquals(3, intResult);
+
+      boolean caught = false;
+      try {
+        proxy.error();
+      } catch (AvroRemoteException e) {
+        LOG.debug("Caught " + e);
+        caught = true;
+      }
+      assertTrue(caught);
+
+    } finally {
+      server.stop();
+    }
+  }
+}