Просмотр исходного кода

HADOOP-2519 Performance improvements: Customized RPC serialization

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@608738 13f79535-47bb-0310-9956-ffa450edef68
Michael Stack 17 лет назад
Родитель
Сommit
e66ea85c6c

+ 1 - 0
src/contrib/hbase/CHANGES.txt

@@ -7,6 +7,7 @@ Trunk (unreleased changes)
    HADOOP-2079 Fix generated HLog, HRegion names
    HADOOP-2495 Minor performance improvements: Slim-down BatchOperation, etc. 
    HADOOP-2506 Remove the algebra package
+   HADOOP-2519 Performance improvements: Customized RPC serialization
 
   NEW FEATURES
     HADOOP-2061 Add new Base64 dialects

+ 5 - 3
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java

@@ -32,9 +32,11 @@ import java.io.IOException;
 public interface HMasterInterface extends VersionedProtocol {
   /**
    * Interface version.
-   * Version was incremented to 2 when we brought the hadoop RPC local to hbase.
+   * Version was incremented to 2 when we brought the hadoop RPC local to hbase
+   * -- HADOOP-2495 and then to 3 when we changed the RPC to send codes instead
+   * of actual class names (HADOOP-2519).
    */
-  public static final long versionID = 2L;
+  public static final long versionID = 3L;
 
   /** @return true if master is available */
   public boolean isMasterRunning();
@@ -109,4 +111,4 @@ public interface HMasterInterface extends VersionedProtocol {
    * @return address of server that serves the root region
    */
   public HServerAddress findRootRegion();
-}
+}

+ 155 - 83
src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java

@@ -18,42 +18,116 @@
 
 package org.apache.hadoop.hbase.io;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.lang.reflect.Array;
+import java.util.HashMap;
+import java.util.Map;
 
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.hadoop.conf.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HMsg;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.filter.RowFilterSet;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 
-/** A polymorphic Writable that writes an instance with it's class name.
- * Handles arrays, strings and primitive types without a Writable wrapper.
- * 
- * This is a copy of the hadoop version renamed.  Removes UTF8 (HADOOP-414).
- * Using Text intead of UTF-8 saves ~2% CPU between reading and writing objects
- * running a short sequentialWrite Performance Evaluation test just in
+/** 
+ * This is a customized version of the polymorphic hadoop
+ * {@link ObjectWritable}.  It removes UTF8 (HADOOP-414).
+ * Using {@link Text} intead of UTF-8 saves ~2% CPU between reading and writing
+ * objects running a short sequentialWrite Performance Evaluation test just in
  * ObjectWritable alone; more when we're doing randomRead-ing.  Other
  * optimizations include our passing codes for classes instead of the
- * actual class names themselves.
- * 
- * <p>Has other optimizations passing codes instead of class names.
+ * actual class names themselves.  This makes it so this class needs amendment
+ * if non-Writable classes are introduced -- if passed a Writable for which we
+ * have no code, we just do the old-school passing of the class name, etc. --
+ * but passing codes the  savings are large particularly when cell
+ * data is small (If < a couple of kilobytes, the encoding/decoding of class
+ * name and reflection to instantiate class was costing in excess of the cell
+ * handling).
  */
 public class HbaseObjectWritable implements Writable, Configurable {
-
-  private Class declaredClass;
+  protected final static Log LOG = LogFactory.getLog(HbaseObjectWritable.class);
+  
+  // Here we maintain two static maps of classes to code and vice versa.
+  // Add new classes+codes as wanted or figure way to auto-generate these
+  // maps from the HMasterInterface.
+  static final Map<Byte, Class<?>> CODE_TO_CLASS =
+    new HashMap<Byte, Class<?>>();
+  static final Map<Class<?>, Byte> CLASS_TO_CODE =
+    new HashMap<Class<?>, Byte>();
+  // Special code that means 'not-encoded'; in this case we do old school
+  // sending of the class name using reflection, etc.
+  private static final byte NOT_ENCODED = 0;
+  static {
+    byte code = NOT_ENCODED + 1;
+    // Primitive types.
+    addToMap(Boolean.TYPE, code++);
+    addToMap(Byte.TYPE, code++);
+    addToMap(Character.TYPE, code++);
+    addToMap(Short.TYPE, code++);
+    addToMap(Integer.TYPE, code++);
+    addToMap(Long.TYPE, code++);
+    addToMap(Float.TYPE, code++);
+    addToMap(Double.TYPE, code++);
+    addToMap(Void.TYPE, code++);
+    // Other java types
+    addToMap(String.class, code++);
+    addToMap(byte [].class, code++);
+    // Hadoop types
+    addToMap(Text.class, code++);
+    addToMap(Writable.class, code++);
+    addToMap(MapWritable.class, code++);
+    addToMap(NullInstance.class, code++);
+    try {
+      addToMap(Class.forName("[Lorg.apache.hadoop.io.Text;"), code++);
+    } catch (ClassNotFoundException e) {
+      e.printStackTrace();
+    }
+    // Hbase types
+    addToMap(HServerInfo.class, code++);
+    addToMap(HMsg.class, code++);
+    addToMap(HTableDescriptor.class, code++);
+    addToMap(HColumnDescriptor.class, code++);
+    addToMap(RowFilterInterface.class, code++);
+    addToMap(RowFilterSet.class, code++);
+    addToMap(HRegionInfo.class, code++);
+    addToMap(BatchUpdate.class, code++);
+    addToMap(HServerAddress.class, code++);
+    addToMap(HRegionInfo.class, code++);
+    try {
+      addToMap(Class.forName("[Lorg.apache.hadoop.hbase.HMsg;"), code++);
+    } catch (ClassNotFoundException e) {
+      e.printStackTrace();
+    }
+  }
+  
+  private Class<?> declaredClass;
   private Object instance;
   private Configuration conf;
 
-  public HbaseObjectWritable() {}
+  public HbaseObjectWritable() {
+    super();
+  }
   
   public HbaseObjectWritable(Object instance) {
     set(instance);
   }
 
-  public HbaseObjectWritable(Class declaredClass, Object instance) {
+  public HbaseObjectWritable(Class<?> declaredClass, Object instance) {
     this.declaredClass = declaredClass;
     this.instance = instance;
   }
@@ -62,7 +136,7 @@ public class HbaseObjectWritable implements Writable, Configurable {
   public Object get() { return instance; }
   
   /** Return the class this is meant to be. */
-  public Class getDeclaredClass() { return declaredClass; }
+  public Class<?> getDeclaredClass() { return declaredClass; }
   
   /** Reset the instance. */
   public void set(Object instance) {
@@ -83,55 +157,54 @@ public class HbaseObjectWritable implements Writable, Configurable {
     writeObject(out, instance, declaredClass, conf);
   }
 
-  private static final Map<String, Class<?>> PRIMITIVE_NAMES = new HashMap<String, Class<?>>();
-  static {
-    PRIMITIVE_NAMES.put("boolean", Boolean.TYPE);
-    PRIMITIVE_NAMES.put("byte", Byte.TYPE);
-    PRIMITIVE_NAMES.put("char", Character.TYPE);
-    PRIMITIVE_NAMES.put("short", Short.TYPE);
-    PRIMITIVE_NAMES.put("int", Integer.TYPE);
-    PRIMITIVE_NAMES.put("long", Long.TYPE);
-    PRIMITIVE_NAMES.put("float", Float.TYPE);
-    PRIMITIVE_NAMES.put("double", Double.TYPE);
-    PRIMITIVE_NAMES.put("void", Void.TYPE);
-  }
-
   private static class NullInstance extends Configured implements Writable {
-    private Class<?> declaredClass;
+    Class<?> declaredClass;
     public NullInstance() { super(null); }
-    public NullInstance(Class declaredClass, Configuration conf) {
+    
+    public NullInstance(Class<?> declaredClass, Configuration conf) {
       super(conf);
       this.declaredClass = declaredClass;
     }
+    
+    @SuppressWarnings("boxing")
     public void readFields(DataInput in) throws IOException {
-      String className = Text.readString(in);
-      declaredClass = PRIMITIVE_NAMES.get(className);
-      if (declaredClass == null) {
-        try {
-          declaredClass = getConf().getClassByName(className);
-        } catch (ClassNotFoundException e) {
-          throw new RuntimeException(e.toString());
-        }
-      }
+      this.declaredClass = CODE_TO_CLASS.get(in.readByte());
     }
+    
     public void write(DataOutput out) throws IOException {
-      Text.writeString(out, declaredClass.getName());
+      writeClassCode(out, this.declaredClass);
+    }
+  }
+  
+  /**
+   * Write out the code byte for passed Class.
+   * @param out
+   * @param c
+   * @throws IOException
+   */
+  @SuppressWarnings("boxing")
+  static void writeClassCode(final DataOutput out, final Class<?> c)
+  throws IOException {
+    Byte code = CLASS_TO_CODE.get(c);
+    if (code == null) {
+      LOG.error("Unsupported type " + c);
+      throw new UnsupportedOperationException("No code for unexpected " + c);
     }
+    out.writeByte(code);
   }
 
   /** Write a {@link Writable}, {@link String}, primitive type, or an array of
    * the preceding. */
+  @SuppressWarnings({ "boxing", "unchecked" })
   public static void writeObject(DataOutput out, Object instance,
                                  Class declaredClass, 
-                                 Configuration conf) throws IOException {
-
+                                 Configuration conf)
+  throws IOException {
     if (instance == null) {                       // null
       instance = new NullInstance(declaredClass, conf);
       declaredClass = Writable.class;
     }
-
-    Text.writeString(out, declaredClass.getName()); // always write declared
-
+    writeClassCode(out, declaredClass);
     if (declaredClass.isArray()) {                // array
       int length = Array.getLength(instance);
       out.writeInt(length);
@@ -139,12 +212,9 @@ public class HbaseObjectWritable implements Writable, Configurable {
         writeObject(out, Array.get(instance, i),
                     declaredClass.getComponentType(), conf);
       }
-      
     } else if (declaredClass == String.class) {   // String
       Text.writeString(out, (String)instance);
-      
     } else if (declaredClass.isPrimitive()) {     // primitive type
-
       if (declaredClass == Boolean.TYPE) {        // boolean
         out.writeBoolean(((Boolean)instance).booleanValue());
       } else if (declaredClass == Character.TYPE) { // char
@@ -168,9 +238,15 @@ public class HbaseObjectWritable implements Writable, Configurable {
     } else if (declaredClass.isEnum()) {         // enum
       Text.writeString(out, ((Enum)instance).name());
     } else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable
-      Text.writeString(out, instance.getClass().getName());
+      Class <?> c = instance.getClass();
+      Byte code = CLASS_TO_CODE.get(c);
+      if (code == null) {
+        out.writeByte(NOT_ENCODED);
+        Text.writeString(out, c.getName());
+      } else {
+        writeClassCode(out, c);
+      }
       ((Writable)instance).write(out);
-
     } else {
       throw new IOException("Can't write: "+instance+" as "+declaredClass);
     }
@@ -186,23 +262,13 @@ public class HbaseObjectWritable implements Writable, Configurable {
     
   /** Read a {@link Writable}, {@link String}, primitive type, or an array of
    * the preceding. */
-  @SuppressWarnings("unchecked")
-  public static Object readObject(DataInput in, HbaseObjectWritable objectWritable, Configuration conf)
-    throws IOException {
-    String className = Text.readString(in);
-    Class<?> declaredClass = PRIMITIVE_NAMES.get(className);
-    if (declaredClass == null) {
-      try {
-        declaredClass = conf.getClassByName(className);
-      } catch (ClassNotFoundException e) {
-        throw new RuntimeException("readObject can't find class", e);
-      }
-    }    
-
+  @SuppressWarnings({ "unchecked", "boxing" })
+  public static Object readObject(DataInput in,
+      HbaseObjectWritable objectWritable, Configuration conf)
+  throws IOException {
+    Class<?> declaredClass = CODE_TO_CLASS.get(in.readByte());
     Object instance;
-    
     if (declaredClass.isPrimitive()) {            // primitive types
-
       if (declaredClass == Boolean.TYPE) {             // boolean
         instance = Boolean.valueOf(in.readBoolean());
       } else if (declaredClass == Character.TYPE) {    // char
@@ -224,43 +290,49 @@ public class HbaseObjectWritable implements Writable, Configurable {
       } else {
         throw new IllegalArgumentException("Not a primitive: "+declaredClass);
       }
-
     } else if (declaredClass.isArray()) {              // array
       int length = in.readInt();
       instance = Array.newInstance(declaredClass.getComponentType(), length);
       for (int i = 0; i < length; i++) {
         Array.set(instance, i, readObject(in, conf));
       }
-      
     } else if (declaredClass == String.class) {        // String
       instance = Text.readString(in);
     } else if (declaredClass.isEnum()) {         // enum
-      instance = Enum.valueOf((Class<? extends Enum>) declaredClass, Text.readString(in));
+      instance = Enum.valueOf((Class<? extends Enum>) declaredClass,
+        Text.readString(in));
     } else {                                      // Writable
-      Class instanceClass = null;
-      try {
-        instanceClass = conf.getClassByName(Text.readString(in));
-      } catch (ClassNotFoundException e) {
-        throw new RuntimeException("readObject can't find class", e);
+      Class<?> instanceClass = null;
+      Byte b = in.readByte();
+      if (b.byteValue() == NOT_ENCODED) {
+        String className = Text.readString(in);
+        try {
+          instanceClass = conf.getClassByName(className);
+        } catch (ClassNotFoundException e) {
+          throw new RuntimeException("Can't find class " + className);
+        }
+      } else {
+        instanceClass = CODE_TO_CLASS.get(b);
       }
-      
       Writable writable = WritableFactories.newInstance(instanceClass, conf);
       writable.readFields(in);
       instance = writable;
-
       if (instanceClass == NullInstance.class) {  // null
         declaredClass = ((NullInstance)instance).declaredClass;
         instance = null;
       }
     }
-
     if (objectWritable != null) {                 // store values
       objectWritable.declaredClass = declaredClass;
       objectWritable.instance = instance;
     }
-
     return instance;
-      
+  }
+
+  @SuppressWarnings("boxing")
+  private static void addToMap(final Class<?> clazz, final byte code) {
+    CLASS_TO_CODE.put(clazz, code);
+    CODE_TO_CLASS.put(code, clazz);
   }
 
   public void setConf(Configuration conf) {
@@ -271,4 +343,4 @@ public class HbaseObjectWritable implements Writable, Configurable {
     return this.conf;
   }
   
-}
+}

+ 89 - 0
src/contrib/hbase/src/test/org/apache/hadoop/hbase/io/TestHbaseObjectWritable.java

@@ -0,0 +1,89 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.filter.StopRowFilter;
+import org.apache.hadoop.io.Text;
+
+public class TestHbaseObjectWritable extends TestCase {
+
+  protected void setUp() throws Exception {
+    super.setUp();
+  }
+
+  protected void tearDown() throws Exception {
+    super.tearDown();
+  }
+
+  @SuppressWarnings("boxing")
+  public void testReadObjectDataInputConfiguration() throws IOException {
+    HBaseConfiguration conf = new HBaseConfiguration();
+    // Do primitive type
+    final int COUNT = 101;
+    assertTrue(doType(conf, COUNT, int.class).equals(COUNT));
+    // Do unsupported type.
+    boolean exception = false;
+    try {
+      doType(conf, new File("a"), File.class);
+    } catch (UnsupportedOperationException uoe) {
+      exception = true;
+    }
+    assertTrue(exception);
+    // Try odd types
+    final byte A = 'A';
+    byte [] bytes = new byte[1];
+    bytes[0] = A;
+    Object obj = doType(conf, bytes, byte [].class);
+    assertTrue(((byte [])obj)[0] == A);
+    // Do 'known' Writable type.
+    obj = doType(conf, new Text(""), Text.class);
+    assertTrue(obj instanceof Text);
+    // Try type that should get transferred old fashion way.
+    obj = doType(conf, new StopRowFilter(new Text("")),
+        RowFilterInterface.class);
+    assertTrue(obj instanceof StopRowFilter);
+  }
+  
+  private Object doType(final HBaseConfiguration conf, final Object value,
+      final Class<?> clazz)
+  throws IOException {
+    ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(byteStream);
+    HbaseObjectWritable.writeObject(out, value, clazz, conf);
+    out.close();
+    ByteArrayInputStream bais =
+      new ByteArrayInputStream(byteStream.toByteArray());
+    DataInputStream dis = new DataInputStream(bais);
+    Object product = HbaseObjectWritable.readObject(dis, conf);
+    dis.close();
+    return product;
+  }
+}