Browse Source

HADOOP-3565. Merge -r 668788:668789 from trunk to branch 0.17.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.17@668793 13f79535-47bb-0310-9956-ffa450edef68
Owen O'Malley 17 years ago
parent
commit
cfb783f3f8

+ 8 - 0
CHANGES.txt

@@ -18,6 +18,14 @@ Release 0.17.1
     distributions.  (Adam Heath via cutting)
 
 
+Release 0.17.1 - Unreleased
+
+  INCOMPATIBLE CHANGES
+
+    HADOOP-3565. Fix the Java serialization, which is not enabled by
+    default, to clear the state of the serializer between objects.
+    (tomwhite via omalley)
+
 Release 0.17.0 - 2008-05-18
 
   INCOMPATIBLE CHANGES

+ 1 - 0
src/java/org/apache/hadoop/io/serializer/JavaSerialization.java

@@ -76,6 +76,7 @@ public class JavaSerialization implements Serialization<Serializable> {
     }
 
     public void serialize(Serializable object) throws IOException {
+      oos.reset(); // clear (class) back-references
       oos.writeObject(object);
     }
 

+ 32 - 67
src/test/org/apache/hadoop/mapred/TestJavaSerialization.java

@@ -18,83 +18,58 @@
 package org.apache.hadoop.mapred;
 
 import java.io.BufferedReader;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.Writer;
+import java.util.Iterator;
+import java.util.StringTokenizer;
 
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.serializer.JavaSerializationComparator;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
-import org.apache.hadoop.util.Progressable;
 
 public class TestJavaSerialization extends ClusterMapReduceTestCase {
   
-  static class TypeConverterMapper extends MapReduceBase implements
-      Mapper<LongWritable, Text, Long, String> {
+  static class WordCountMapper extends MapReduceBase implements
+      Mapper<LongWritable, Text, String, Long> {
 
     public void map(LongWritable key, Text value,
-        OutputCollector<Long, String> output, Reporter reporter)
+        OutputCollector<String, Long> output, Reporter reporter)
         throws IOException {
-      output.collect(key.get(), value.toString());
+      StringTokenizer st = new StringTokenizer(value.toString());
+      while (st.hasMoreTokens()) {
+        output.collect(st.nextToken(), 1L);
+      }
     }
 
   }
   
-  static class StringOutputFormat<K, V> extends FileOutputFormat<K, V> {
+  static class SumReducer<K> extends MapReduceBase implements
+      Reducer<K, Long, K, Long> {
     
-    static class LineRecordWriter<K, V> implements RecordWriter<K, V> {
-      
-      private DataOutputStream out;
-      
-      public LineRecordWriter(DataOutputStream out) {
-        this.out = out;
-      }
-
-      public void close(Reporter reporter) throws IOException {
-        out.close();
-      }
+    public void reduce(K key, Iterator<Long> values,
+        OutputCollector<K, Long> output, Reporter reporter)
+      throws IOException {
 
-      public void write(K key, V value) throws IOException {
-        print(key);
-        print("\t");
-        print(value);
-        print("\n");
-      }
-      
-      private void print(Object o) throws IOException {
-        out.write(o.toString().getBytes("UTF-8"));
+      long sum = 0;
+      while (values.hasNext()) {
+        sum += values.next();
       }
-      
-    }
-
-    @Override
-    public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
-        String name, Progressable progress) throws IOException {
-
-      Path dir = getWorkOutputPath(job);
-      FileSystem fs = dir.getFileSystem(job);
-      FSDataOutputStream fileOut = fs.create(new Path(dir, name), progress);
-      return new LineRecordWriter<K, V>(fileOut);
+      output.collect(key, sum);
     }
     
   }
   
   public void testMapReduceJob() throws Exception {
-    OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
+    OutputStream os = getFileSystem().create(new Path(getInputDir(),
+        "text.txt"));
     Writer wr = new OutputStreamWriter(os);
-    wr.write("hello1\n");
-    wr.write("hello2\n");
-    wr.write("hello3\n");
-    wr.write("hello4\n");
+    wr.write("b a\n");
     wr.close();
 
     JobConf conf = createJobConf();
@@ -106,16 +81,12 @@ public class TestJavaSerialization extends ClusterMapReduceTestCase {
 
     conf.setInputFormat(TextInputFormat.class);
 
-    conf.setMapOutputKeyClass(Long.class);
-    conf.setMapOutputValueClass(String.class);
-
-    conf.setOutputFormat(StringOutputFormat.class);
-    conf.setOutputKeyClass(Long.class);
-    conf.setOutputValueClass(String.class);
+    conf.setOutputKeyClass(String.class);
+    conf.setOutputValueClass(Long.class);
     conf.setOutputKeyComparatorClass(JavaSerializationComparator.class);
 
-    conf.setMapperClass(TypeConverterMapper.class);
-    conf.setReducerClass(IdentityReducer.class);
+    conf.setMapperClass(WordCountMapper.class);
+    conf.setReducerClass(SumReducer.class);
 
     FileInputFormat.setInputPaths(conf, getInputDir());
 
@@ -126,19 +97,13 @@ public class TestJavaSerialization extends ClusterMapReduceTestCase {
     Path[] outputFiles = FileUtil.stat2Paths(
                            getFileSystem().listStatus(getOutputDir(),
                            new OutputLogFilter()));
-    if (outputFiles.length > 0) {
-      InputStream is = getFileSystem().open(outputFiles[0]);
-      BufferedReader reader = new BufferedReader(new InputStreamReader(is));
-      String line = reader.readLine();
-      int counter = 0;
-      while (line != null) {
-        counter++;
-        assertTrue(line.contains("hello"));
-        line = reader.readLine();
-      }
-      reader.close();
-      assertEquals(4, counter);
-    }
+    assertEquals(1, outputFiles.length);
+    InputStream is = getFileSystem().open(outputFiles[0]);
+    BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+    assertEquals("a\t1", reader.readLine());
+    assertEquals("b\t1", reader.readLine());
+    assertNull(reader.readLine());
+    reader.close();
   }
 
 }