浏览代码

HADOOP-2425. Special case TextOutputFormat to specifically handle Text and
NullWritable.


git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@607010 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 17 年之前
父节点
当前提交
7a7c80253c

+ 4 - 0
CHANGES.txt

@@ -140,6 +140,10 @@ Trunk (unreleased changes)
     non-static method to allow sub-classes to provide alternate
     non-static method to allow sub-classes to provide alternate
     implementations. (Alejandro Abdelnur via acmurthy) 
     implementations. (Alejandro Abdelnur via acmurthy) 
 
 
+    HADOOP-2425. Change TextOutputFormat to handle Text specifically for better
+    performance. Make NullWritable implement Comparable. Make TextOutputFormat
+    treat NullWritable like null. (omalley)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     HADOOP-1898.  Release the lock protecting the last time of the last stack
     HADOOP-1898.  Release the lock protecting the last time of the last stack

+ 10 - 1
src/java/org/apache/hadoop/io/NullWritable.java

@@ -21,7 +21,7 @@ package org.apache.hadoop.io;
 import java.io.*;
 import java.io.*;
 
 
 /** Singleton Writable with no data. */
 /** Singleton Writable with no data. */
-public class NullWritable implements Writable {
+public class NullWritable implements WritableComparable {
 
 
   private static final NullWritable THIS = new NullWritable();
   private static final NullWritable THIS = new NullWritable();
 
 
@@ -34,6 +34,15 @@ public class NullWritable implements Writable {
     return "(null)";
     return "(null)";
   }
   }
 
 
+  public int hashCode() { return 0; }
+  public int compareTo(Object other) {
+    if (!(other instanceof NullWritable)) {
+      throw new ClassCastException("can't compare " + other.getClass().getName() 
+                                   + " to NullWritable");
+    }
+    return 0;
+  }
+  public boolean equals(Object other) { return other instanceof NullWritable; }
   public void readFields(DataInput in) throws IOException {}
   public void readFields(DataInput in) throws IOException {}
   public void write(DataOutput out) throws IOException {}
   public void write(DataOutput out) throws IOException {}
 }
 }

+ 41 - 9
src/java/org/apache/hadoop/mapred/TextOutputFormat.java

@@ -20,11 +20,14 @@ package org.apache.hadoop.mapred;
 
 
 import java.io.DataOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 
 
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -39,6 +42,17 @@ public class TextOutputFormat<K extends WritableComparable,
   protected static class LineRecordWriter<K extends WritableComparable,
   protected static class LineRecordWriter<K extends WritableComparable,
                                           V extends Writable>
                                           V extends Writable>
     implements RecordWriter<K, V> {
     implements RecordWriter<K, V> {
+    private static final String utf8 = "UTF-8";
+    private static final byte[] tab;
+    private static final byte[] newline;
+    static {
+      try {
+        tab = "\t".getBytes(utf8);
+        newline = "\n".getBytes(utf8);
+      } catch (UnsupportedEncodingException uee) {
+        throw new IllegalArgumentException("can't find " + utf8 + " encoding");
+      }
+    }
     
     
     private DataOutputStream out;
     private DataOutputStream out;
     
     
@@ -46,22 +60,39 @@ public class TextOutputFormat<K extends WritableComparable,
       this.out = out;
       this.out = out;
     }
     }
     
     
+    /**
+     * Write the object to the byte stream, handling Text as a special
+     * case.
+     * @param o the object to print
+     * @throws IOException if the write throws, we pass it on
+     */
+    private void writeObject(Object o) throws IOException {
+      if (o instanceof Text) {
+        Text to = (Text) o;
+        out.write(to.getBytes(), 0, to.getLength());
+      } else {
+        out.write(o.toString().getBytes(utf8));
+      }
+    }
+
     public synchronized void write(K key, V value)
     public synchronized void write(K key, V value)
       throws IOException {
       throws IOException {
 
 
-      if (key == null && value == null) {
+      boolean nullKey = key == null || key instanceof NullWritable;
+      boolean nullValue = value == null || value instanceof NullWritable;
+      if (nullKey && nullValue) {
         return;
         return;
       }
       }
-      if (key != null) {
-        out.write(key.toString().getBytes("UTF-8"));
+      if (!nullKey) {
+        writeObject(key);
       }
       }
-      if (key != null && value != null) {
-        out.write("\t".getBytes("UTF-8"));
+      if (!(nullKey || nullValue)) {
+        out.write(tab);
       }
       }
-      if (value != null) {
-        out.write(value.toString().getBytes("UTF-8"));
+      if (!nullValue) {
+        writeObject(value);
       }
       }
-      out.writeByte('\n');
+      out.write(newline);
     }
     }
 
 
     public synchronized void close(Reporter reporter) throws IOException {
     public synchronized void close(Reporter reporter) throws IOException {
@@ -82,7 +113,8 @@ public class TextOutputFormat<K extends WritableComparable,
       FSDataOutputStream fileOut = fs.create(new Path(dir, name), progress);
       FSDataOutputStream fileOut = fs.create(new Path(dir, name), progress);
       return new LineRecordWriter<K, V>(fileOut);
       return new LineRecordWriter<K, V>(fileOut);
     } else {
     } else {
-      Class codecClass = getOutputCompressorClass(job, GzipCodec.class);
+      Class<? extends CompressionCodec> codecClass = 
+        getOutputCompressorClass(job, GzipCodec.class);
       // create the named codec
       // create the named codec
       CompressionCodec codec = (CompressionCodec)
       CompressionCodec codec = (CompressionCodec)
         ReflectionUtils.newInstance(codecClass, job);
         ReflectionUtils.newInstance(codecClass, job);

+ 12 - 12
src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java

@@ -19,19 +19,12 @@
 package org.apache.hadoop.mapred;
 package org.apache.hadoop.mapred;
 
 
 import java.io.*;
 import java.io.*;
-import java.util.*;
 import junit.framework.TestCase;
 import junit.framework.TestCase;
 
 
-import org.apache.commons.logging.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.*;
-import org.apache.hadoop.io.compress.*;
-import org.apache.hadoop.util.ReflectionUtils;
 
 
 public class TestTextOutputFormat extends TestCase {
 public class TestTextOutputFormat extends TestCase {
-  private static final Log LOG = LogFactory.getLog(TestTextOutputFormat.class
-                                                   .getName());
-
   private static JobConf defaultConf = new JobConf();
   private static JobConf defaultConf = new JobConf();
 
 
   private static FileSystem localFs = null;
   private static FileSystem localFs = null;
@@ -43,9 +36,11 @@ public class TestTextOutputFormat extends TestCase {
     }
     }
   }
   }
 
 
-  private static Path workDir = new Path(new Path(System.getProperty(
-                                                                     "test.build.data", "."), "data"), "TestTextOutputFormat");
+  private static Path workDir = 
+    new Path(new Path(System.getProperty("test.build.data", "."), "data"), 
+             "TestTextOutputFormat");
 
 
+  @SuppressWarnings("unchecked")
   public void testFormat() throws Exception {
   public void testFormat() throws Exception {
     JobConf job = new JobConf();
     JobConf job = new JobConf();
     job.setOutputPath(workDir);
     job.setOutputPath(workDir);
@@ -54,19 +49,22 @@ public class TestTextOutputFormat extends TestCase {
     // A reporter that does nothing
     // A reporter that does nothing
     Reporter reporter = Reporter.NULL;
     Reporter reporter = Reporter.NULL;
 
 
-    TextOutputFormat<Text, Text> theOutputFormat =
-      new TextOutputFormat<Text, Text>();
-    RecordWriter<Text, Text> theRecordWriter =
+    TextOutputFormat theOutputFormat = new TextOutputFormat();
+    RecordWriter theRecordWriter =
       theOutputFormat.getRecordWriter(localFs, job, file, reporter);
       theOutputFormat.getRecordWriter(localFs, job, file, reporter);
 
 
     Text key1 = new Text("key1");
     Text key1 = new Text("key1");
     Text key2 = new Text("key2");
     Text key2 = new Text("key2");
     Text val1 = new Text("val1");
     Text val1 = new Text("val1");
     Text val2 = new Text("val2");
     Text val2 = new Text("val2");
+    NullWritable nullWritable = NullWritable.get();
 
 
     try {
     try {
       theRecordWriter.write(key1, val1);
       theRecordWriter.write(key1, val1);
+      theRecordWriter.write(null, nullWritable);
       theRecordWriter.write(null, val1);
       theRecordWriter.write(null, val1);
+      theRecordWriter.write(nullWritable, val2);
+      theRecordWriter.write(key2, nullWritable);
       theRecordWriter.write(key1, null);
       theRecordWriter.write(key1, null);
       theRecordWriter.write(null, null);
       theRecordWriter.write(null, null);
       theRecordWriter.write(key2, val2);
       theRecordWriter.write(key2, val2);
@@ -78,6 +76,8 @@ public class TestTextOutputFormat extends TestCase {
     StringBuffer expectedOutput = new StringBuffer();
     StringBuffer expectedOutput = new StringBuffer();
     expectedOutput.append(key1).append('\t').append(val1).append("\n");
     expectedOutput.append(key1).append('\t').append(val1).append("\n");
     expectedOutput.append(val1).append("\n");
     expectedOutput.append(val1).append("\n");
+    expectedOutput.append(val2).append("\n");
+    expectedOutput.append(key2).append("\n");
     expectedOutput.append(key1).append("\n");
     expectedOutput.append(key1).append("\n");
     expectedOutput.append(key2).append('\t').append(val2).append("\n");
     expectedOutput.append(key2).append('\t').append(val2).append("\n");
     String output = UtilsForTests.slurp(expectedFile);
     String output = UtilsForTests.slurp(expectedFile);