Browse Source

HADOOP-1001. Check the type of keys and values generated by the mapper against the types specified in JobConf. Contributed by Tahir Hashmi.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@527465 13f79535-47bb-0310-9956-ffa450edef68
Thomas White 18 years ago
parent
commit
c3d55c06e1

+ 4 - 0
CHANGES.txt

@@ -155,6 +155,10 @@ Trunk (unreleased changes)
     previously started to create the file to the description of 
     previously started to create the file to the description of 
     AlreadyBeingCreatedException.  (Konstantin Shvachko via tomwhite)
     AlreadyBeingCreatedException.  (Konstantin Shvachko via tomwhite)
 
 
+48. HADOOP-1001.  Check the type of keys and values generated by the 
+    mapper against the types specified in JobConf.  
+    (Tahir Hashmi via tomwhite)
+
 
 
 Release 0.12.3 - 2007-04-06
 Release 0.12.3 - 2007-04-06
 
 

+ 12 - 0
src/java/org/apache/hadoop/mapred/MapTask.java

@@ -312,6 +312,18 @@ class MapTask extends Task {
     
     
     public void collect(WritableComparable key,
     public void collect(WritableComparable key,
               Writable value) throws IOException {
               Writable value) throws IOException {
+      
+      if (key.getClass() != keyClass) {
+        throw new IOException("Type mismatch in key from map: expected "
+                              + keyClass.getName() + ", recieved "
+                              + key.getClass().getName());
+      }
+      if (value.getClass() != valClass) {
+        throw new IOException("Type mismatch in value from map: expected "
+                              + valClass.getName() + ", recieved "
+                              + value.getClass().getName());
+      }
+      
       synchronized (this) {
       synchronized (this) {
         //dump the key/value to buffer
         //dump the key/value to buffer
         int keyOffset = keyValBuffer.getLength(); 
         int keyOffset = keyValBuffer.getLength(); 

+ 158 - 0
src/test/org/apache/hadoop/mapred/TestMapOutputType.java

@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.lib.*;
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.*;
+
+/** 
+ * TestMapOutputType checks whether the Map task handles type mismatch
+ * between mapper output and the type specified in
+ * JobConf.MapOutputKeyType and JobConf.MapOutputValueType.
+ */
+public class TestMapOutputType extends TestCase 
+{
+  JobConf conf = new JobConf(TestMapOutputType.class);
+  JobClient jc;
+  /** 
+   * TextGen is a Mapper that generates a Text key-value pair. The
+   * type specified in conf will be anything but.
+   */
+   
+  static class TextGen implements Mapper {
+    public void configure(JobConf job) {
+    }
+    
+    public void map(WritableComparable key, Writable val, OutputCollector out,
+                    Reporter reporter) throws IOException {
+      key = new Text("Hello");
+      val = new Text("World");
+      
+      out.collect(key, val);
+    }
+    
+    public void close() {
+    }
+  }
+  
+  /** A do-nothing reducer class. We won't get this far, really.
+   *
+   */
+  static class TextReduce implements Reducer {
+    
+    public void configure(JobConf job) {
+    }
+
+    public void reduce(WritableComparable key,
+                       Iterator values,
+                       OutputCollector out,
+                       Reporter reporter) throws IOException {
+      out.collect(new Text("Test"), new Text("Me"));
+    }
+
+    public void close() {
+    }
+  }
+
+
+  public void configure() throws Exception {
+    Path testdir = new Path("build/test/test.mapred.spill");
+    Path inDir = new Path(testdir, "in");
+    Path outDir = new Path(testdir, "out");
+    FileSystem fs = FileSystem.get(conf);
+    fs.delete(testdir);
+    conf.setInt("io.sort.mb", 1);
+    conf.setInputFormat(SequenceFileInputFormat.class);
+    conf.setInputPath(inDir);
+    conf.setOutputPath(outDir);
+    conf.setMapperClass(TextGen.class);
+    conf.setReducerClass(TextReduce.class);
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(Text.class); 
+    
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+    if (!fs.mkdirs(testdir)) {
+      throw new IOException("Mkdirs failed to create " + testdir.toString());
+    }
+    if (!fs.mkdirs(inDir)) {
+      throw new IOException("Mkdirs failed to create " + inDir.toString());
+    }
+    Path inFile = new Path(inDir, "part0");
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inFile, 
+        Text.class, Text.class);
+    writer.append(new Text("rec: 1"), new Text("Hello"));
+    writer.close();
+    
+    jc = new JobClient(conf);
+  }
+  
+  public void testKeyMismatch() throws Exception {
+    configure();
+    
+//  Set bad MapOutputKeyClass and MapOutputValueClass
+    conf.setMapOutputKeyClass(IntWritable.class);
+    conf.setMapOutputValueClass(IntWritable.class);
+    
+    RunningJob r_job = jc.submitJob(conf);
+    while (!r_job.isComplete()) {
+      Thread.sleep(1000);
+    }
+    
+    if (r_job.isSuccessful()) {
+      fail("Oops! The job was supposed to break due to an exception");
+    }
+  }
+  
+  public void testValueMismatch() throws Exception {
+    configure();
+  
+// Set good MapOutputKeyClass, bad MapOutputValueClass    
+    conf.setMapOutputKeyClass(Text.class);
+    conf.setMapOutputValueClass(IntWritable.class);
+    
+    RunningJob r_job = jc.submitJob(conf);
+    while (!r_job.isComplete()) {
+      Thread.sleep(1000);
+    }
+    
+    if (r_job.isSuccessful()) {
+      fail("Oops! The job was supposed to break due to an exception");
+    }
+  }
+  
+  public void testNoMismatch() throws Exception{ 
+    configure();
+    
+//  Set good MapOutputKeyClass and MapOutputValueClass    
+     conf.setMapOutputKeyClass(Text.class);
+     conf.setMapOutputValueClass(Text.class);
+     
+     RunningJob r_job = jc.submitJob(conf);
+     while (!r_job.isComplete()) {
+       Thread.sleep(1000);
+     }
+     
+     if (!r_job.isSuccessful()) {
+       fail("Oops! The job broke due to an unexpected error");
+     }
+   }
+}