Browse Source

HADOOP-3049. Fixes a problem in MultiThreadedMapRunner to do with catching RuntimeExceptions. Contributed by Alejandro Abdelnur.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@640714 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 years ago
parent
commit
928779724d

+ 3 - 0
CHANGES.txt

@@ -383,6 +383,9 @@ Release 0.16.2 - Unreleased
     HADOOP-2944. Fixes a "Run on Hadoop" wizard NPE when creating a
     Location from the wizard. (taton)
 
+    HADOOP-3049. Fixes a problem in MultiThreadedMapRunner to do with
+    catching RuntimeExceptions. (Alejandro Abdelnur via ddas)
+
 Release 0.16.1 - 2008-03-13
 
   INCOMPATIBLE CHANGES

+ 57 - 20
src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java

@@ -45,12 +45,15 @@ import java.util.concurrent.TimeUnit;
  * Map implementations using this MapRunnable must be thread-safe.
  * <p>
  * The Map-Reduce job has to be configured to use this MapRunnable class (using
- * the <b>mapred.map.runner.class</b> property) and
+ * the JobConf.setMapRunnerClass method) and
  * the number of thread the thread-pool can use (using the
  * <b>mapred.map.multithreadedrunner.threads</b> property).
  * <p>
  */
-public class MultithreadedMapRunner<K1, V1, K2, V2>
+public class MultithreadedMapRunner<K1 extends WritableComparable,
+                                    V1 extends Writable,
+                                    K2 extends WritableComparable,
+                                    V2 extends Writable>
     implements MapRunnable<K1, V1, K2, V2> {
 
   private static final Log LOG =
@@ -60,19 +63,20 @@ public class MultithreadedMapRunner<K1, V1, K2, V2>
   private Mapper<K1, V1, K2, V2> mapper;
   private ExecutorService executorService;
   private volatile IOException ioException;
+  private volatile RuntimeException runtimeException;
 
   @SuppressWarnings("unchecked")
-  public void configure(JobConf job) {
+  public void configure(JobConf jobConf) {
     int numberOfThreads =
-      job.getInt("mapred.map.multithreadedrunner.threads", 10);
+      jobConf.getInt("mapred.map.multithreadedrunner.threads", 10);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Configuring job " + job.getJobName() +
+      LOG.debug("Configuring jobConf " + jobConf.getJobName() +
                 " to use " + numberOfThreads + " threads");
     }
 
-    this.job = job;
-    this.mapper = (Mapper)ReflectionUtils.newInstance(job.getMapperClass(),
-                                                      job);
+    this.job = jobConf;
+    this.mapper = (Mapper)ReflectionUtils.newInstance(jobConf.getMapperClass(),
+        jobConf);
 
     // Creating a threadpool of the configured size to execute the Mapper
     // map method in parallel.
@@ -94,7 +98,8 @@ public class MultithreadedMapRunner<K1, V1, K2, V2>
         // If threads are not available from the thread-pool this method
         // will block until there is a thread available.
         executorService.execute(
-                                new MapperInvokeRunable(key, value, output, reporter));
+                                new MapperInvokeRunable(key, value, output,
+                                    reporter));
 
         // Checking if a Mapper.map within a Runnable has generated an
         // IOException. If so we rethrow it to force an abort of the Map
@@ -104,6 +109,14 @@ public class MultithreadedMapRunner<K1, V1, K2, V2>
           throw ioException;
         }
 
+        // Checking if a Mapper.map within a Runnable has generated a
+        // RuntimeException. If so we rethrow it to force an abort of the Map
+        // operation thus keeping the semantics of the default
+        // implementation.
+        if (runtimeException != null) {
+          throw runtimeException;
+        }
+
         // Allocate new key & value instances as mapper is running in parallel
         key = input.createKey();
         value = input.createValue();
@@ -127,35 +140,51 @@ public class MultithreadedMapRunner<K1, V1, K2, V2>
                       + job.getJobName());
           }
 
+          // NOTE: while Mapper.map dispatching has concluded there are still
+          // map calls in progress.
+
           // Checking if a Mapper.map within a Runnable has generated an
           // IOException. If so we rethrow it to force an abort of the Map
           // operation thus keeping the semantics of the default
           // implementation.
-          // NOTE: while Mapper.map dispatching has concluded there are still
-          // map calls in progress.
           if (ioException != null) {
             throw ioException;
           }
+
+          // Checking if a Mapper.map within a Runnable has generated a
+          // RuntimeException. If so we rethrow it to force an abort of the Map
+          // operation thus keeping the semantics of the default
+          // implementation.
+          if (runtimeException != null) {
+            throw runtimeException;
+          }
         }
 
+        // NOTE: it could be that a map call has had an exception after the
+        // call for awaitTermination() returing true. And edge case but it
+        // could happen.
+
         // Checking if a Mapper.map within a Runnable has generated an
         // IOException. If so we rethrow it to force an abort of the Map
         // operation thus keeping the semantics of the default
         // implementation.
-        // NOTE: it could be that a map call has had an exception after the
-        // call for awaitTermination() returing true. And edge case but it
-        // could happen.
         if (ioException != null) {
           throw ioException;
         }
-      }
-      catch (IOException ioEx) {
+
+        // Checking if a Mapper.map within a Runnable has generated a
+        // RuntimeException. If so we rethrow it to force an abort of the Map
+        // operation thus keeping the semantics of the default
+        // implementation.
+        if (runtimeException != null) {
+          throw runtimeException;
+        }
+      } catch (IOException ioEx) {
         // Forcing a shutdown of all thread of the threadpool and rethrowing
         // the IOException
         executorService.shutdownNow();
         throw ioEx;
-      }
-      catch (InterruptedException iEx) {
+      } catch (InterruptedException iEx) {
         throw new IOException(iEx.getMessage());
       }
 
@@ -202,8 +231,7 @@ public class MultithreadedMapRunner<K1, V1, K2, V2>
       try {
         // map pair to output
         MultithreadedMapRunner.this.mapper.map(key, value, output, reporter);
-      }
-      catch (IOException ex) {
+      } catch (IOException ex) {
         // If there is an IOException during the call it is set in an instance
         // variable of the MultithreadedMapRunner from where it will be
         // rethrown.
@@ -212,6 +240,15 @@ public class MultithreadedMapRunner<K1, V1, K2, V2>
             MultithreadedMapRunner.this.ioException = ex;
           }
         }
+      } catch (RuntimeException ex) {
+        // If there is a RuntimeException during the call it is set in an
+        // instance variable of the MultithreadedMapRunner from where it will be
+        // rethrown.
+        synchronized (MultithreadedMapRunner.this) {
+          if (MultithreadedMapRunner.this.runtimeException == null) {
+            MultithreadedMapRunner.this.runtimeException = ex;
+          }
+        }
       }
     }
   }

+ 165 - 0
src/test/org/apache/hadoop/mapred/lib/TestMultithreadedMapRunner.java

@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.*;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+
+public class TestMultithreadedMapRunner extends HadoopTestCase {
+
+  public TestMultithreadedMapRunner() throws IOException {
+    super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
+  }
+
+  public void testOKRun() throws Exception {
+    run(false, false);
+  }
+
+  public void testIOExRun() throws Exception {
+    run(true, false);
+  }
+  public void testRuntimeExRun() throws Exception {
+    run(false, true);
+  }
+
+  private void run(boolean ioEx, boolean rtEx) throws Exception {
+    Path inDir = new Path("testing/mt/input");
+    Path outDir = new Path("testing/mt/output");
+
+    // Hack for local FS that does not have the concept of a 'mounting point'
+    if (isLocalFS()) {
+      String localPathRoot = System.getProperty("test.build.data", "/tmp")
+              .replace(' ', '+');
+      inDir = new Path(localPathRoot, inDir);
+      outDir = new Path(localPathRoot, outDir);
+    }
+
+
+    JobConf conf = createJobConf();
+    FileSystem fs = FileSystem.get(conf);
+
+    fs.delete(outDir, true);
+    if (!fs.mkdirs(inDir)) {
+      throw new IOException("Mkdirs failed to create " + inDir.toString());
+    }
+    {
+      DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+      file.writeBytes("a\nb\n\nc\nd\ne");
+      file.close();
+    }
+
+    conf.setJobName("mt");
+    conf.setInputFormat(TextInputFormat.class);
+
+    conf.setOutputKeyClass(LongWritable.class);
+    conf.setOutputValueClass(Text.class);
+
+    conf.setMapOutputKeyClass(LongWritable.class);
+    conf.setMapOutputValueClass(Text.class);
+
+    conf.setOutputFormat(TextOutputFormat.class);
+    conf.setOutputKeyClass(LongWritable.class);
+    conf.setOutputValueClass(Text.class);
+
+    conf.setMapperClass(IDMap.class);
+    conf.setReducerClass(IDReduce.class);
+
+    conf.setInputPath(inDir);
+    conf.setOutputPath(outDir);
+
+    conf.setMapRunnerClass(MultithreadedMapRunner.class);
+    
+    conf.setInt("mapred.map.multithreadedrunner.threads", 2);
+
+    if (ioEx) {
+      conf.setBoolean("multithreaded.ioException", true);
+    }
+    if (rtEx) {
+      conf.setBoolean("multithreaded.runtimeException", true);
+    }
+    JobClient jc = new JobClient(conf);
+    RunningJob job =jc.submitJob(conf);
+    while (!job.isComplete()) {
+      Thread.sleep(100);
+    }
+
+    if (job.isSuccessful()) {
+      assertFalse(ioEx || rtEx);
+    }
+    else {
+      assertTrue(ioEx || rtEx);
+    }
+
+  }
+
+  public static class IDMap implements Mapper<LongWritable, Text,
+                                              LongWritable, Text> {
+    private boolean ioEx = false;
+    private boolean rtEx = false;
+
+    public void configure(JobConf job) {
+      ioEx = job.getBoolean("multithreaded.ioException", false);
+      rtEx = job.getBoolean("multithreaded.runtimeException", false);
+    }
+
+    public void map(LongWritable key, Text value,
+                    OutputCollector<LongWritable, Text> output,
+                    Reporter reporter)
+            throws IOException {
+      if (ioEx) {
+        throw new IOException();
+      }
+      if (rtEx) {
+        throw new RuntimeException();
+      }
+      output.collect(key, value);
+    }
+
+
+    public void close() throws IOException {
+    }
+  }
+
+  public static class IDReduce implements Reducer<LongWritable, Text,
+                                                  LongWritable, Text> {
+
+    public void configure(JobConf job) {
+    }
+
+    public void reduce(LongWritable key, Iterator<Text> values,
+                       OutputCollector<LongWritable, Text> output,
+                       Reporter reporter)
+            throws IOException {
+      while (values.hasNext()) {
+        output.collect(key, values.next());
+      }
+    }
+
+    public void close() throws IOException {
+    }
+  }
+}
+