Browse Source

HADOOP-3295. Allow TextOutputFormat to use configurable spearators.
Contributed by Zheng Shao.



git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@651693 13f79535-47bb-0310-9956-ffa450edef68

Christopher Douglas 17 years ago
parent
commit
4ad98da605

+ 3 - 0
CHANGES.txt

@@ -38,6 +38,9 @@ Trunk (unreleased changes)
     server slowdown. Clients retry connection for up to 15 minutes
     server slowdown. Clients retry connection for up to 15 minutes
     when socket connection times out. (hairong)
     when socket connection times out. (hairong)
 
 
+    HADOOP-3295. Allow TextOutputFormat to use configurable spearators.
+    (Zheng Shao via cdouglas).
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     HADOOP-3274. The default constructor of BytesWritable creates empty 
     HADOOP-3274. The default constructor of BytesWritable creates empty 

+ 22 - 12
src/java/org/apache/hadoop/mapred/TextOutputFormat.java

@@ -38,23 +38,31 @@ public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
   protected static class LineRecordWriter<K, V>
   protected static class LineRecordWriter<K, V>
     implements RecordWriter<K, V> {
     implements RecordWriter<K, V> {
     private static final String utf8 = "UTF-8";
     private static final String utf8 = "UTF-8";
-    private static final byte[] tab;
     private static final byte[] newline;
     private static final byte[] newline;
     static {
     static {
       try {
       try {
-        tab = "\t".getBytes(utf8);
         newline = "\n".getBytes(utf8);
         newline = "\n".getBytes(utf8);
       } catch (UnsupportedEncodingException uee) {
       } catch (UnsupportedEncodingException uee) {
         throw new IllegalArgumentException("can't find " + utf8 + " encoding");
         throw new IllegalArgumentException("can't find " + utf8 + " encoding");
       }
       }
     }
     }
-    
+
     private DataOutputStream out;
     private DataOutputStream out;
-    
-    public LineRecordWriter(DataOutputStream out) {
+    private final byte[] keyValueSeparator;
+
+    public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
       this.out = out;
       this.out = out;
+      try {
+        this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
+      } catch (UnsupportedEncodingException uee) {
+        throw new IllegalArgumentException("can't find " + utf8 + " encoding");
+      }
     }
     }
-    
+
+    public LineRecordWriter(DataOutputStream out) {
+      this(out, "\t");
+    }
+
     /**
     /**
      * Write the object to the byte stream, handling Text as a special
      * Write the object to the byte stream, handling Text as a special
      * case.
      * case.
@@ -82,7 +90,7 @@ public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
         writeObject(key);
         writeObject(key);
       }
       }
       if (!(nullKey || nullValue)) {
       if (!(nullKey || nullValue)) {
-        out.write(tab);
+        out.write(keyValueSeparator);
       }
       }
       if (!nullValue) {
       if (!nullValue) {
         writeObject(value);
         writeObject(value);
@@ -94,13 +102,14 @@ public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
       out.close();
       out.close();
     }
     }
   }
   }
-  
+
   public RecordWriter<K, V> getRecordWriter(FileSystem ignored,
   public RecordWriter<K, V> getRecordWriter(FileSystem ignored,
                                                   JobConf job,
                                                   JobConf job,
                                                   String name,
                                                   String name,
                                                   Progressable progress)
                                                   Progressable progress)
     throws IOException {
     throws IOException {
 
 
+    String keyValueSeparator = job.get("mapred.textoutputformat.separator", "\t");
     Path dir = getWorkOutputPath(job);
     Path dir = getWorkOutputPath(job);
     FileSystem fs = dir.getFileSystem(job);
     FileSystem fs = dir.getFileSystem(job);
     if (!fs.exists(dir)) {
     if (!fs.exists(dir)) {
@@ -109,9 +118,9 @@ public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
     boolean isCompressed = getCompressOutput(job);
     boolean isCompressed = getCompressOutput(job);
     if (!isCompressed) {
     if (!isCompressed) {
       FSDataOutputStream fileOut = fs.create(new Path(dir, name), progress);
       FSDataOutputStream fileOut = fs.create(new Path(dir, name), progress);
-      return new LineRecordWriter<K, V>(fileOut);
+      return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
     } else {
     } else {
-      Class<? extends CompressionCodec> codecClass = 
+      Class<? extends CompressionCodec> codecClass =
         getOutputCompressorClass(job, GzipCodec.class);
         getOutputCompressorClass(job, GzipCodec.class);
       // create the named codec
       // create the named codec
       CompressionCodec codec = (CompressionCodec)
       CompressionCodec codec = (CompressionCodec)
@@ -120,8 +129,9 @@ public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
       Path filename = new Path(dir, name + codec.getDefaultExtension());
       Path filename = new Path(dir, name + codec.getDefaultExtension());
       FSDataOutputStream fileOut = fs.create(filename, progress);
       FSDataOutputStream fileOut = fs.create(filename, progress);
       return new LineRecordWriter<K, V>(new DataOutputStream
       return new LineRecordWriter<K, V>(new DataOutputStream
-                                        (codec.createOutputStream(fileOut)));
+                                        (codec.createOutputStream(fileOut)),
+                                        keyValueSeparator);
     }
     }
-  }      
+  }
 }
 }
 
 

+ 56 - 5
src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java

@@ -36,8 +36,8 @@ public class TestTextOutputFormat extends TestCase {
     }
     }
   }
   }
 
 
-  private static Path workDir = 
-    new Path(new Path(System.getProperty("test.build.data", "."), "data"), 
+  private static Path workDir =
+    new Path(new Path(System.getProperty("test.build.data", "."), "data"),
              "TestTextOutputFormat");
              "TestTextOutputFormat");
 
 
   @SuppressWarnings("unchecked")
   @SuppressWarnings("unchecked")
@@ -49,7 +49,7 @@ public class TestTextOutputFormat extends TestCase {
       fail("Failed to create output directory");
       fail("Failed to create output directory");
     }
     }
     String file = "test.txt";
     String file = "test.txt";
-    
+
     // A reporter that does nothing
     // A reporter that does nothing
     Reporter reporter = Reporter.NULL;
     Reporter reporter = Reporter.NULL;
 
 
@@ -76,7 +76,7 @@ public class TestTextOutputFormat extends TestCase {
     } finally {
     } finally {
       theRecordWriter.close(reporter);
       theRecordWriter.close(reporter);
     }
     }
-    File expectedFile = new File(new Path(workDir, file).toString()); 
+    File expectedFile = new File(new Path(workDir, file).toString());
     StringBuffer expectedOutput = new StringBuffer();
     StringBuffer expectedOutput = new StringBuffer();
     expectedOutput.append(key1).append('\t').append(val1).append("\n");
     expectedOutput.append(key1).append('\t').append(val1).append("\n");
     expectedOutput.append(val1).append("\n");
     expectedOutput.append(val1).append("\n");
@@ -86,7 +86,58 @@ public class TestTextOutputFormat extends TestCase {
     expectedOutput.append(key2).append('\t').append(val2).append("\n");
     expectedOutput.append(key2).append('\t').append(val2).append("\n");
     String output = UtilsForTests.slurp(expectedFile);
     String output = UtilsForTests.slurp(expectedFile);
     assertEquals(output, expectedOutput.toString());
     assertEquals(output, expectedOutput.toString());
-    
+
+  }
+
+  @SuppressWarnings("unchecked")
+  public void testFormatWithCustomSeparator() throws Exception {
+    JobConf job = new JobConf();
+    String separator = "\u0001";
+    job.set("mapred.textoutputformat.separator", separator);
+    FileOutputFormat.setWorkOutputPath(job, workDir);
+    FileSystem fs = workDir.getFileSystem(job);
+    if (!fs.mkdirs(workDir)) {
+      fail("Failed to create output directory");
+    }
+    String file = "test.txt";
+
+    // A reporter that does nothing
+    Reporter reporter = Reporter.NULL;
+
+    TextOutputFormat theOutputFormat = new TextOutputFormat();
+    RecordWriter theRecordWriter =
+      theOutputFormat.getRecordWriter(localFs, job, file, reporter);
+
+    Text key1 = new Text("key1");
+    Text key2 = new Text("key2");
+    Text val1 = new Text("val1");
+    Text val2 = new Text("val2");
+    NullWritable nullWritable = NullWritable.get();
+
+    try {
+      theRecordWriter.write(key1, val1);
+      theRecordWriter.write(null, nullWritable);
+      theRecordWriter.write(null, val1);
+      theRecordWriter.write(nullWritable, val2);
+      theRecordWriter.write(key2, nullWritable);
+      theRecordWriter.write(key1, null);
+      theRecordWriter.write(null, null);
+      theRecordWriter.write(key2, val2);
+
+    } finally {
+      theRecordWriter.close(reporter);
+    }
+    File expectedFile = new File(new Path(workDir, file).toString());
+    StringBuffer expectedOutput = new StringBuffer();
+    expectedOutput.append(key1).append(separator).append(val1).append("\n");
+    expectedOutput.append(val1).append("\n");
+    expectedOutput.append(val2).append("\n");
+    expectedOutput.append(key2).append("\n");
+    expectedOutput.append(key1).append("\n");
+    expectedOutput.append(key2).append(separator).append(val2).append("\n");
+    String output = UtilsForTests.slurp(expectedFile);
+    assertEquals(output, expectedOutput.toString());
+
   }
   }
 
 
   public static void main(String[] args) throws Exception {
   public static void main(String[] args) throws Exception {