Преглед на файлове

Fix incomplete commit of HADOOP-1096.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@519280 13f79535-47bb-0310-9956-ffa450edef68
Thomas White преди 18 години
родител
ревизия
ab2d508eeb

+ 11 - 16
src/java/org/apache/hadoop/record/BinaryInputArchive.java → src/java/org/apache/hadoop/record/BinaryRecordInput.java

@@ -27,14 +27,10 @@ import java.io.InputStream;
  *
  * @author Milind Bhandarkar
  */
-public class BinaryInputArchive implements InputArchive {
+public class BinaryRecordInput implements RecordInput {
     
     final private DataInput in;
     
-    static BinaryInputArchive getArchive(InputStream strm) {
-        return new BinaryInputArchive(new DataInputStream(strm));
-    }
-    
     static private class BinaryIndex implements Index {
         private int nelems;
         private BinaryIndex(int nelems) {
@@ -47,9 +43,15 @@ public class BinaryInputArchive implements InputArchive {
             nelems--;
         }
     }
-    /** Creates a new instance of BinaryInputArchive */
-    public BinaryInputArchive(DataInput in) {
-        this.in = in;
+    
+    /** Creates a new instance of BinaryRecordInput */
+    public BinaryRecordInput(InputStream strm) {
+        this.in = new DataInputStream(strm);
+    }
+    
+    /** Creates a new instance of BinaryRecordInput */
+    public BinaryRecordInput(DataInput din) {
+        this.in = din;
     }
     
     public byte readByte(final String tag) throws IOException {
@@ -77,10 +79,7 @@ public class BinaryInputArchive implements InputArchive {
     }
     
     public String readString(final String tag) throws IOException {
-      final int length = Utils.readVInt(in);
-      final byte[] bytes = new byte[length];
-      in.readFully(bytes);
-      return new String(bytes, "UTF-8");
+      return Utils.fromBinaryString(in);
     }
     
     public Buffer readBuffer(final String tag) throws IOException {
@@ -90,10 +89,6 @@ public class BinaryInputArchive implements InputArchive {
       return new Buffer(barr);
     }
     
-    public void readRecord(final Record record, final String tag) throws IOException {
-        record.deserialize(this, tag);
-    }
-    
     public void startRecord(final String tag) throws IOException {
       // no-op
     }

+ 8 - 13
src/java/org/apache/hadoop/record/BinaryOutputArchive.java → src/java/org/apache/hadoop/record/BinaryRecordOutput.java

@@ -29,19 +29,21 @@ import java.io.OutputStream;
  *
  * @author Milind Bhandarkar
  */
-public class BinaryOutputArchive implements OutputArchive {
+public class BinaryRecordOutput implements RecordOutput {
     
     private DataOutput out;
     
-    static BinaryOutputArchive getArchive(OutputStream strm) {
-        return new BinaryOutputArchive(new DataOutputStream(strm));
+    /** Creates a new instance of BinaryRecordOutput */
+    public BinaryRecordOutput(OutputStream out) {
+        this.out = new DataOutputStream(out);
     }
     
-    /** Creates a new instance of BinaryOutputArchive */
-    public BinaryOutputArchive(DataOutput out) {
+    /** Creates a new instance of BinaryRecordOutput */
+    public BinaryRecordOutput(DataOutput out) {
         this.out = out;
     }
     
+    
     public void writeByte(byte b, String tag) throws IOException {
         out.writeByte(b);
     }
@@ -67,10 +69,7 @@ public class BinaryOutputArchive implements OutputArchive {
     }
     
     public void writeString(String s, String tag) throws IOException {
-      byte[] bytes = s.getBytes("UTF-8");
-      int length = bytes.length;
-      Utils.writeVInt(out, length);
-      out.write(bytes);
+      Utils.toBinaryString(out, s);
     }
     
     public void writeBuffer(Buffer buf, String tag)
@@ -81,10 +80,6 @@ public class BinaryOutputArchive implements OutputArchive {
       out.write(barr, 0, len);
     }
     
-    public void writeRecord(Record r, String tag) throws IOException {
-        r.serialize(this, tag);
-    }
-    
     public void startRecord(Record r, String tag) throws IOException {}
     
     public void endRecord(Record r, String tag) throws IOException {}

+ 8 - 14
src/java/org/apache/hadoop/record/CsvInputArchive.java → src/java/org/apache/hadoop/record/CsvRecordInput.java

@@ -28,7 +28,7 @@ import java.io.UnsupportedEncodingException;
  *
  * @author Milind Bhandarkar
  */
-class CsvInputArchive implements InputArchive {
+public class CsvRecordInput implements RecordInput {
     
     private PushbackReader stream;
     
@@ -71,15 +71,13 @@ class CsvInputArchive implements InputArchive {
         }
     }
     
-    static CsvInputArchive getArchive(InputStream strm)
-    throws UnsupportedEncodingException {
-        return new CsvInputArchive(strm);
-    }
-    
-    /** Creates a new instance of CsvInputArchive */
-    public CsvInputArchive(InputStream in)
-    throws UnsupportedEncodingException {
-        stream = new PushbackReader(new InputStreamReader(in, "UTF-8"));
+    /** Creates a new instance of CsvRecordInput */
+    public CsvRecordInput(InputStream in) {
+      try {
+      stream = new PushbackReader(new InputStreamReader(in, "UTF-8"));
+      } catch (UnsupportedEncodingException ex) {
+        throw new RuntimeException(ex);
+      }
     }
     
     public byte readByte(String tag) throws IOException {
@@ -129,10 +127,6 @@ class CsvInputArchive implements InputArchive {
         return Utils.fromCSVBuffer(sval);
     }
     
-    public void readRecord(Record r, String tag) throws IOException {
-        r.deserialize(this, tag);
-    }
-    
     public void startRecord(String tag) throws IOException {
         if (tag != null && !"".equals(tag)) {
             char c1 = (char) stream.read();

+ 7 - 13
src/java/org/apache/hadoop/record/CsvOutputArchive.java → src/java/org/apache/hadoop/record/CsvRecordOutput.java

@@ -29,16 +29,11 @@ import java.io.UnsupportedEncodingException;
  *
  * @author Milind Bhandarkar
  */
-public class CsvOutputArchive implements OutputArchive {
+public class CsvRecordOutput implements RecordOutput {
 
     private PrintStream stream;
     private boolean isFirst = true;
     
-    static CsvOutputArchive getArchive(OutputStream strm)
-    throws UnsupportedEncodingException {
-        return new CsvOutputArchive(strm);
-    }
-    
     private void throwExceptionOnError(String tag) throws IOException {
         if (stream.checkError()) {
             throw new IOException("Error serializing "+tag);
@@ -52,10 +47,13 @@ public class CsvOutputArchive implements OutputArchive {
         isFirst = false;
     }
     
-    /** Creates a new instance of CsvOutputArchive */
-    public CsvOutputArchive(OutputStream out)
-    throws UnsupportedEncodingException {
+    /** Creates a new instance of CsvRecordOutput */
+    public CsvRecordOutput(OutputStream out) {
+      try {
         stream = new PrintStream(out, true, "UTF-8");
+      } catch (UnsupportedEncodingException ex) {
+        throw new RuntimeException(ex);
+      }
     }
     
     public void writeByte(byte b, String tag) throws IOException {
@@ -102,10 +100,6 @@ public class CsvOutputArchive implements OutputArchive {
         throwExceptionOnError(tag);
     }
     
-    public void writeRecord(Record r, String tag) throws IOException {
-        r.serialize(this, tag);
-    }
-    
     public void startRecord(Record r, String tag) throws IOException {
         if (tag != null && !"".equals(tag)) {
             printCommaUnlessFirst();

+ 122 - 0
src/java/org/apache/hadoop/record/RecordInput.java

@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.record;
+
+import java.io.IOException;
+
+/**
+ * Interface that all the Deserializers have to implement.
+ *
+ * @author Milind Bhandarkar
+ */
+public interface RecordInput {
+  /**
+   * Read a byte from serialized record.
+   * @param tag Used by tagged serialization formats (such as XML)
+   * @return value read from serialized record.
+   */
+  byte readByte(String tag) throws IOException;
+  
+  /**
+   * Read a boolean from serialized record.
+   * @param tag Used by tagged serialization formats (such as XML)
+   * @return value read from serialized record.
+   */
+  boolean readBool(String tag) throws IOException;
+  
+  /**
+   * Read an integer from serialized record.
+   * @param tag Used by tagged serialization formats (such as XML)
+   * @return value read from serialized record.
+   */
+  int readInt(String tag) throws IOException;
+  
+  /**
+   * Read a long integer from serialized record.
+   * @param tag Used by tagged serialization formats (such as XML)
+   * @return value read from serialized record.
+   */
+  long readLong(String tag) throws IOException;
+  
+  /**
+   * Read a single-precision float from serialized record.
+   * @param tag Used by tagged serialization formats (such as XML)
+   * @return value read from serialized record.
+   */
+  float readFloat(String tag) throws IOException;
+  
+  /**
+   * Read a double-precision number from serialized record.
+   * @param tag Used by tagged serialization formats (such as XML)
+   * @return value read from serialized record.
+   */
+  double readDouble(String tag) throws IOException;
+  
+  /**
+   * Read a UTF-8 encoded string from serialized record.
+   * @param tag Used by tagged serialization formats (such as XML)
+   * @return value read from serialized record.
+   */
+  String readString(String tag) throws IOException;
+  
+  /**
+   * Read byte array from serialized record.
+   * @param tag Used by tagged serialization formats (such as XML)
+   * @return value read from serialized record.
+   */
+  Buffer readBuffer(String tag) throws IOException;
+  
+  /**
+   * Check the mark for start of the serialized record.
+   * @param tag Used by tagged serialization formats (such as XML)
+   */
+  void startRecord(String tag) throws IOException;
+  
+  /**
+   * Check the mark for end of the serialized record.
+   * @param tag Used by tagged serialization formats (such as XML)
+   */
+  void endRecord(String tag) throws IOException;
+  
+  /**
+   * Check the mark for start of the serialized vector.
+   * @param tag Used by tagged serialization formats (such as XML)
+   * @return Index that is used to count the number of elements.
+   */
+  Index startVector(String tag) throws IOException;
+  
+  /**
+   * Check the mark for end of the serialized vector.
+   * @param tag Used by tagged serialization formats (such as XML)
+   */
+  void endVector(String tag) throws IOException;
+  
+  /**
+   * Check the mark for start of the serialized map.
+   * @param tag Used by tagged serialization formats (such as XML)
+   * @return Index that is used to count the number of map entries.
+   */
+  Index startMap(String tag) throws IOException;
+  
+  /**
+   * Check the mark for end of the serialized map.
+   * @param tag Used by tagged serialization formats (such as XML)
+   */
+  void endMap(String tag) throws IOException;
+}

+ 143 - 0
src/java/org/apache/hadoop/record/RecordOutput.java

@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.record;
+
+import java.io.IOException;
+import java.util.TreeMap;
+import java.util.ArrayList;
+
+/**
+ * Interface that alll the serializers have to implement.
+ *
+ * @author Milind Bhandarkar
+ */
+public interface RecordOutput {
+  /**
+   * Write a byte to serialized record.
+   * @param b Byte to be serialized
+   * @param tag Used by tagged serialization formats (such as XML)
+   * @throws IOException Indicates error in serialization
+   */
+  public void writeByte(byte b, String tag) throws IOException;
+  
+  /**
+   * Write a boolean to serialized record.
+   * @param b Boolean to be serialized
+   * @param tag Used by tagged serialization formats (such as XML)
+   * @throws IOException Indicates error in serialization
+   */
+  public void writeBool(boolean b, String tag) throws IOException;
+  
+  /**
+   * Write an integer to serialized record.
+   * @param i Integer to be serialized
+   * @param tag Used by tagged serialization formats (such as XML)
+   * @throws IOException Indicates error in serialization
+   */
+  public void writeInt(int i, String tag) throws IOException;
+  
+  /**
+   * Write a long integer to serialized record.
+   * @param l Long to be serialized
+   * @param tag Used by tagged serialization formats (such as XML)
+   * @throws IOException Indicates error in serialization
+   */
+  public void writeLong(long l, String tag) throws IOException;
+  
+  /**
+   * Write a single-precision float to serialized record.
+   * @param f Float to be serialized
+   * @param tag Used by tagged serialization formats (such as XML)
+   * @throws IOException Indicates error in serialization
+   */
+  public void writeFloat(float f, String tag) throws IOException;
+  
+  /**
+   * Write a double precision floating point number to serialized record.
+   * @param d Double to be serialized
+   * @param tag Used by tagged serialization formats (such as XML)
+   * @throws IOException Indicates error in serialization
+   */
+  public void writeDouble(double d, String tag) throws IOException;
+  
+  /**
+   * Write a unicode string to serialized record.
+   * @param s String to be serialized
+   * @param tag Used by tagged serialization formats (such as XML)
+   * @throws IOException Indicates error in serialization
+   */
+  public void writeString(String s, String tag) throws IOException;
+  
+  /**
+   * Write a buffer to serialized record.
+   * @param buf Buffer to be serialized
+   * @param tag Used by tagged serialization formats (such as XML)
+   * @throws IOException Indicates error in serialization
+   */
+  public void writeBuffer(Buffer buf, String tag)
+  throws IOException;
+  
+  /**
+   * Mark the start of a record to be serialized.
+   * @param r Record to be serialized
+   * @param tag Used by tagged serialization formats (such as XML)
+   * @throws IOException Indicates error in serialization
+   */
+  public void startRecord(Record r, String tag) throws IOException;
+  
+  /**
+   * Mark the end of a serialized record.
+   * @param r Record to be serialized
+   * @param tag Used by tagged serialization formats (such as XML)
+   * @throws IOException Indicates error in serialization
+   */
+  public void endRecord(Record r, String tag) throws IOException;
+  
+  /**
+   * Mark the start of a vector to be serialized.
+   * @param v Vector to be serialized
+   * @param tag Used by tagged serialization formats (such as XML)
+   * @throws IOException Indicates error in serialization
+   */
+  public void startVector(ArrayList v, String tag) throws IOException;
+  
+  /**
+   * Mark the end of a serialized vector.
+   * @param v Vector to be serialized
+   * @param tag Used by tagged serialization formats (such as XML)
+   * @throws IOException Indicates error in serialization
+   */
+  public void endVector(ArrayList v, String tag) throws IOException;
+  
+  /**
+   * Mark the start of a map to be serialized.
+   * @param m Map to be serialized
+   * @param tag Used by tagged serialization formats (such as XML)
+   * @throws IOException Indicates error in serialization
+   */
+  public void startMap(TreeMap m, String tag) throws IOException;
+  
+  /**
+   * Mark the end of a serialized map.
+   * @param m Map to be serialized
+   * @param tag Used by tagged serialization formats (such as XML)
+   * @throws IOException Indicates error in serialization
+   */
+  public void endMap(TreeMap m, String tag) throws IOException;
+}

+ 0 - 94
src/java/org/apache/hadoop/record/RecordReader.java

@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.record;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.HashMap;
-
-/**
- * Front-end interface to deserializers. Also acts as a factory
- * for deserializers.
- *
- * @author Milind Bhandarkar
- */
-public class RecordReader {
-    
-    private InputArchive archive;
-
-    static private HashMap archiveFactory;
-    
-    static {
-        archiveFactory = new HashMap();
-        Class[] params = { InputStream.class };
-        try {
-            archiveFactory.put("binary",
-                    BinaryInputArchive.class.getDeclaredMethod(
-                        "getArchive", params));
-            archiveFactory.put("csv",
-                    CsvInputArchive.class.getDeclaredMethod(
-                        "getArchive", params));
-            archiveFactory.put("xml",
-                    XmlInputArchive.class.getDeclaredMethod(
-                        "getArchive", params));
-        } catch (SecurityException ex) {
-            ex.printStackTrace();
-        } catch (NoSuchMethodException ex) {
-            ex.printStackTrace();
-        }
-    }
-    
-    static private InputArchive createArchive(InputStream in, String format)
-    throws IOException {
-        Method factory = (Method) archiveFactory.get(format);
-        if (factory != null) {
-            Object[] params = { in };
-            try {
-                return (InputArchive) factory.invoke(null, params);
-            } catch (IllegalArgumentException ex) {
-                ex.printStackTrace();
-            } catch (InvocationTargetException ex) {
-                ex.printStackTrace();
-            } catch (IllegalAccessException ex) {
-                ex.printStackTrace();
-            }
-        }
-        return null;
-    }
-    /**
-     * Creates a new instance of RecordReader.
-     * @param in Stream from which to deserialize a record
-     * @param format Deserialization format ("binary", "xml", or "csv")
-     */
-    public RecordReader(InputStream in, String format)
-    throws IOException {
-        archive = createArchive(in, format);
-    }
-    
-    /**
-     * Deserialize a record
-     * @param r Record to be deserialized
-     */
-    public void read(Record r) throws IOException {
-        r.deserialize(archive, "");
-    }
-    
-}

+ 0 - 114
src/java/org/apache/hadoop/record/RecordWriter.java

@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.record;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.DataOutputStream;
-import java.io.UnsupportedEncodingException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.HashMap;
-
-/**
- * Front-end for serializers. Also serves as a factory for serializers.
- *
- * @author Milind Bhandarkar
- */
-public class RecordWriter {
-    
-    private OutputArchive archive;
-    
-    static private OutputArchive getBinaryArchive(OutputStream out) {
-        return new BinaryOutputArchive(new DataOutputStream(out));
-    }
-    
-    static private OutputArchive getCsvArchive(OutputStream out)
-    throws IOException {
-        try {
-            return new CsvOutputArchive(out);
-        } catch (UnsupportedEncodingException ex) {
-            throw new IOException("Unsupported encoding UTF-8");
-        }
-    }
-    
-    static private OutputArchive getXmlArchive(OutputStream out)
-    throws IOException {
-        return new XmlOutputArchive(out);
-    }
-
-    static HashMap constructFactory() {
-        HashMap factory = new HashMap();
-        Class[] params = { OutputStream.class };
-        try {
-            factory.put("binary",
-                    BinaryOutputArchive.class.getDeclaredMethod(
-                        "getArchive", params));
-            factory.put("csv",
-                    CsvOutputArchive.class.getDeclaredMethod(
-                        "getArchive", params));
-            factory.put("xml",
-                    XmlOutputArchive.class.getDeclaredMethod(
-                        "getArchive", params));
-        } catch (SecurityException ex) {
-            ex.printStackTrace();
-        } catch (NoSuchMethodException ex) {
-            ex.printStackTrace();
-        }
-        return factory;
-    }
-    
-    static private HashMap archiveFactory = constructFactory();
-    
-    static private OutputArchive createArchive(OutputStream out,
-            String format)
-            throws IOException {
-        Method factory = (Method) archiveFactory.get(format);
-        if (factory != null) {
-            Object[] params = { out };
-            try {
-                return (OutputArchive) factory.invoke(null, params);
-            } catch (IllegalArgumentException ex) {
-                ex.printStackTrace();
-            } catch (InvocationTargetException ex) {
-                ex.printStackTrace();
-            } catch (IllegalAccessException ex) {
-                ex.printStackTrace();
-            }
-        }
-        return null;
-    }
-    /**
-     * Creates a new instance of RecordWriter
-     * @param out Output stream where the records will be serialized
-     * @param format Serialization format ("binary", "xml", or "csv")
-     */
-    public RecordWriter(OutputStream out, String format)
-    throws IOException {
-        archive = createArchive(out, format);
-    }
-    
-    /**
-     * Serialize a record
-     * @param r record to be serialized
-     */
-    public void write(Record r) throws IOException {
-        r.serialize(archive, "");
-    }
-}

+ 7 - 13
src/java/org/apache/hadoop/record/XmlInputArchive.java → src/java/org/apache/hadoop/record/XmlRecordInput.java

@@ -32,7 +32,7 @@ import javax.xml.parsers.SAXParser;
  *
  * @author Milind Bhandarkar
  */
-class XmlInputArchive implements InputArchive {
+class XmlRecordInput implements RecordInput {
     
     static private class Value {
         private String type;
@@ -132,14 +132,9 @@ class XmlInputArchive implements InputArchive {
         }
     }
     
-    static XmlInputArchive getArchive(InputStream strm)
-    throws ParserConfigurationException, SAXException, IOException {
-        return new XmlInputArchive(strm);
-    }
-    
-    /** Creates a new instance of BinaryInputArchive */
-    public XmlInputArchive(InputStream in)
-    throws ParserConfigurationException, SAXException, IOException {
+    /** Creates a new instance of XmlRecordInput */
+    public XmlRecordInput(InputStream in) {
+      try{
         valList = new ArrayList();
         DefaultHandler handler = new XMLParser(valList);
         SAXParserFactory factory = SAXParserFactory.newInstance();
@@ -147,6 +142,9 @@ class XmlInputArchive implements InputArchive {
         parser.parse(in, handler);
         vLen = valList.size();
         vIdx = 0;
+      } catch (Exception ex) {
+        throw new RuntimeException(ex);
+      }
     }
     
     public byte readByte(String tag) throws IOException {
@@ -214,10 +212,6 @@ class XmlInputArchive implements InputArchive {
         return Utils.fromXMLBuffer(v.getValue());
     }
     
-    public void readRecord(Record r, String tag) throws IOException {
-        r.deserialize(this, tag);
-    }
-    
     public void startRecord(String tag) throws IOException {
         Value v = next();
         if (!"struct".equals(v.getType())) {

+ 9 - 12
src/java/org/apache/hadoop/record/XmlOutputArchive.java → src/java/org/apache/hadoop/record/XmlRecordOutput.java

@@ -23,13 +23,14 @@ import java.util.TreeMap;
 import java.util.ArrayList;
 import java.io.PrintStream;
 import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
 import java.util.Stack;
 
 /**
  *
  * @author Milind Bhandarkar
  */
-class XmlOutputArchive implements OutputArchive {
+class XmlRecordOutput implements RecordOutput {
 
     private PrintStream stream;
     
@@ -131,14 +132,14 @@ class XmlOutputArchive implements OutputArchive {
         printEndEnvelope(tag);
     }
     
-    static XmlOutputArchive getArchive(OutputStream strm) {
-        return new XmlOutputArchive(strm);
-    }
-    
-    /** Creates a new instance of XmlOutputArchive */
-    public XmlOutputArchive(OutputStream out) {
-        stream = new PrintStream(out);
+    /** Creates a new instance of XmlRecordOutput */
+    public XmlRecordOutput(OutputStream out) {
+      try {
+        stream = new PrintStream(out, true, "UTF-8");
         compoundStack = new Stack();
+      } catch (UnsupportedEncodingException ex) {
+        throw new RuntimeException(ex);
+      }
     }
     
     public void writeByte(byte b, String tag) throws IOException {
@@ -206,10 +207,6 @@ class XmlOutputArchive implements OutputArchive {
         printEndEnvelope(tag);
     }
     
-    public void writeRecord(Record r, String tag) throws IOException {
-        r.serialize(this, tag);
-    }
-    
     public void startRecord(Record r, String tag) throws IOException {
         insideRecord(tag);
         stream.print("<struct>\n");

+ 314 - 0
src/test/org/apache/hadoop/record/RecordBench.java

@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.record;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Random;
+
+/**
+ * Benchmark for various types of serializations
+ * @author milindb
+ */
+public class RecordBench {
+  
+  private static class Times {
+    long init;
+    long serialize;
+    long deserialize;
+    long write;
+    long readFields;
+  };
+  
+  private static final long SEED = 0xDEADBEEFL;
+  private static final Random rand = new Random();
+  
+  /** Do not allow to create a new instance of RecordBench */
+  private RecordBench() {}
+  
+  private static void initBuffers(Record[] buffers) {
+    final int BUFLEN = 32;
+    for (int idx = 0; idx < buffers.length; idx++) {
+      buffers[idx] = new RecBuffer();
+      int buflen = rand.nextInt(BUFLEN);
+      byte[] bytes = new byte[buflen];
+      rand.nextBytes(bytes);
+      ((RecBuffer)buffers[idx]).setData(new Buffer(bytes));
+    }
+  }
+  
+  private static void initStrings(Record[] strings) {
+    final int STRLEN = 32;
+    for (int idx = 0; idx < strings.length; idx++) {
+      strings[idx] = new RecString();
+      int strlen = rand.nextInt(STRLEN);
+      StringBuilder sb = new StringBuilder(strlen);
+      for (int ich = 0; ich < strlen; ich++) {
+        int cpt = 0;
+        while (true) {
+          cpt = rand.nextInt(0x10FFFF+1);
+          if (Utils.isValidCodePoint(cpt)) {
+            break;
+          }
+        }
+        sb.appendCodePoint(cpt);
+      }
+      ((RecString)strings[idx]).setData(sb.toString());
+    }
+  }
+  
+  private static void initInts(Record[] ints) {
+    for (int idx = 0; idx < ints.length; idx++) {
+      ints[idx] = new RecInt();
+      ((RecInt)ints[idx]).setData(rand.nextInt());
+    }
+  }
+  
+  private static Record[] makeArray(String type, int numRecords, Times times) {
+    Method init = null;
+    try {
+      init = RecordBench.class.getDeclaredMethod("init"+
+          toCamelCase(type) + "s",
+          new Class[] {Record[].class});
+    } catch (NoSuchMethodException ex) {
+      throw new RuntimeException(ex);
+    }
+
+    Record[] records = new Record[numRecords];
+    times.init = System.nanoTime();
+    try {
+      init.invoke(null, new Object[]{records});
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+    times.init = System.nanoTime() - times.init;
+    return records;
+  }
+  
+  private static void runBinaryBench(String type, int numRecords, Times times)
+  throws IOException {
+    Record[] records = makeArray(type, numRecords, times);
+    ByteArrayOutputStream bout = new ByteArrayOutputStream();
+    BinaryRecordOutput rout = new BinaryRecordOutput(bout);
+    DataOutputStream dout = new DataOutputStream(bout);
+    
+    for(int idx = 0; idx < numRecords; idx++) {
+      records[idx].serialize(rout);
+    }
+    bout.reset();
+    
+    times.serialize = System.nanoTime();
+    for(int idx = 0; idx < numRecords; idx++) {
+      records[idx].serialize(rout);
+    }
+    times.serialize = System.nanoTime() - times.serialize;
+    
+    byte[] serialized = bout.toByteArray();
+    ByteArrayInputStream bin = new ByteArrayInputStream(serialized);
+    BinaryRecordInput rin = new BinaryRecordInput(bin);
+    
+    times.deserialize = System.nanoTime();
+    for(int idx = 0; idx < numRecords; idx++) {
+      records[idx].deserialize(rin);
+    }
+    times.deserialize = System.nanoTime() - times.deserialize;
+    
+    bout.reset();
+    
+    times.write = System.nanoTime();
+    for(int idx = 0; idx < numRecords; idx++) {
+      records[idx].write(dout);
+    }
+    times.write = System.nanoTime() - times.write;
+    
+    bin.reset();
+    DataInputStream din = new DataInputStream(bin);
+    
+    times.readFields = System.nanoTime();
+    for(int idx = 0; idx < numRecords; idx++) {
+      records[idx].readFields(din);
+    }
+    times.readFields = System.nanoTime() - times.readFields;
+  }
+  
+  private static void runCsvBench(String type, int numRecords, Times times)
+  throws IOException {
+    Record[] records = makeArray(type, numRecords, times);
+    ByteArrayOutputStream bout = new ByteArrayOutputStream();
+    CsvRecordOutput rout = new CsvRecordOutput(bout);
+    
+    for(int idx = 0; idx < numRecords; idx++) {
+      records[idx].serialize(rout);
+    }
+    bout.reset();
+    
+    times.serialize = System.nanoTime();
+    for(int idx = 0; idx < numRecords; idx++) {
+      records[idx].serialize(rout);
+    }
+    times.serialize = System.nanoTime() - times.serialize;
+    
+    byte[] serialized = bout.toByteArray();
+    ByteArrayInputStream bin = new ByteArrayInputStream(serialized);
+    CsvRecordInput rin = new CsvRecordInput(bin);
+    
+    times.deserialize = System.nanoTime();
+    for(int idx = 0; idx < numRecords; idx++) {
+      records[idx].deserialize(rin);
+    }
+    times.deserialize = System.nanoTime() - times.deserialize;
+  }
+  
+  private static void runXmlBench(String type, int numRecords, Times times)
+  throws IOException {
+    Record[] records = makeArray(type, numRecords, times);
+    ByteArrayOutputStream bout = new ByteArrayOutputStream();
+    XmlRecordOutput rout = new XmlRecordOutput(bout);
+    
+    for(int idx = 0; idx < numRecords; idx++) {
+      records[idx].serialize(rout);
+    }
+    bout.reset();
+    
+    bout.write("<records>\n".getBytes());
+    
+    times.serialize = System.nanoTime();
+    for(int idx = 0; idx < numRecords; idx++) {
+      records[idx].serialize(rout);
+    }
+    times.serialize = System.nanoTime() - times.serialize;
+    
+    bout.write("</records>\n".getBytes());
+    
+    byte[] serialized = bout.toByteArray();
+    ByteArrayInputStream bin = new ByteArrayInputStream(serialized);
+        
+    times.deserialize = System.nanoTime();
+    XmlRecordInput rin = new XmlRecordInput(bin);
+    for(int idx = 0; idx < numRecords; idx++) {
+      records[idx].deserialize(rin);
+    }
+    times.deserialize = System.nanoTime() - times.deserialize;
+  }
+
+  private static void printTimes(String type,
+      String format,
+      int numRecords,
+      Times times) {
+    System.out.println("Type: " + type + " Format: " + format +
+        " #Records: "+numRecords);
+    if (times.init != 0) {
+      System.out.println("Initialization Time (Per record) : "+
+          times.init/numRecords + " Nanoseconds");
+    }
+    
+    if (times.serialize != 0) {
+      System.out.println("Serialization Time (Per Record) : "+
+          times.serialize/numRecords + " Nanoseconds");
+    }
+    
+    if (times.deserialize != 0) {
+      System.out.println("Deserialization Time (Per Record) : "+
+          times.deserialize/numRecords + " Nanoseconds");
+    }
+    
+    if (times.write != 0) {
+      System.out.println("Write Time (Per Record) : "+
+          times.write/numRecords + " Nanoseconds");
+    }
+    
+    if (times.readFields != 0) {
+      System.out.println("ReadFields Time (Per Record) : "+
+          times.readFields/numRecords + " Nanoseconds");
+    }
+    
+    System.out.println();
+  }
+  
+  private static String toCamelCase(String inp) {
+    char firstChar = inp.charAt(0);
+    if (Character.isLowerCase(firstChar)) {
+      return ""+Character.toUpperCase(firstChar) + inp.substring(1);
+    }
+    return inp;
+  }
+  
+  private static void exitOnError() {
+    String usage = "RecordBench {buffer|string|int}"+
+        " {binary|csv|xml} <numRecords>";
+    System.out.println(usage);
+    System.exit(1);
+  }
+  
+  /**
+   * @param args the command line arguments
+   */
+  public static void main(String[] args) throws IOException {
+    String version = "RecordBench v0.1";
+    System.out.println(version+"\n");
+    
+    if (args.length != 3) {
+      exitOnError();
+    }
+    
+    String typeName = args[0];
+    String format = args[1];
+    int numRecords = Integer.decode(args[2]).intValue();
+    
+    Method bench = null;
+    try {
+    bench = RecordBench.class.getDeclaredMethod("run"+
+        toCamelCase(format) + "Bench",
+        new Class[] {String.class, Integer.TYPE, Times.class});
+    } catch (NoSuchMethodException ex) {
+      ex.printStackTrace();
+      exitOnError();
+    }
+    
+    if (numRecords < 0) {
+      exitOnError();
+    }
+    
+    // dry run
+    rand.setSeed(SEED);
+    Times times = new Times();
+    try {
+      bench.invoke(null, new Object[] {typeName, numRecords, times});
+    } catch (Exception ex) {
+      ex.printStackTrace();
+      System.exit(1);
+    }
+    
+    // timed run
+    rand.setSeed(SEED);
+    try {
+      bench.invoke(null, new Object[] {typeName, numRecords, times});
+    } catch (Exception ex) {
+      ex.printStackTrace();
+      System.exit(1);
+    }
+    printTimes(typeName, format, numRecords, times);
+  }
+}