|
@@ -31,9 +31,7 @@ import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
-import java.util.concurrent.ExecutorService;
|
|
|
|
-import java.util.concurrent.Executors;
|
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
+import java.util.concurrent.*;
|
|
|
|
|
|
/**
|
|
/**
|
|
* Multithreaded implementation for @link org.apache.hadoop.mapred.MapRunnable.
|
|
* Multithreaded implementation for @link org.apache.hadoop.mapred.MapRunnable.
|
|
@@ -46,8 +44,9 @@ import java.util.concurrent.TimeUnit;
|
|
* <p>
|
|
* <p>
|
|
* The Map-Reduce job has to be configured to use this MapRunnable class (using
|
|
* The Map-Reduce job has to be configured to use this MapRunnable class (using
|
|
* the JobConf.setMapRunnerClass method) and
|
|
* the JobConf.setMapRunnerClass method) and
|
|
- * the number of thread the thread-pool can use (using the
|
|
|
|
- * <b>mapred.map.multithreadedrunner.threads</b> property).
|
|
|
|
|
|
+ * the number of thread the thread-pool can use with the
|
|
|
|
+ * <code>mapred.map.multithreadedrunner.threads</code> property, its default
|
|
|
|
+ * value is 10 threads.
|
|
* <p>
|
|
* <p>
|
|
*/
|
|
*/
|
|
public class MultithreadedMapRunner<K1 extends WritableComparable,
|
|
public class MultithreadedMapRunner<K1 extends WritableComparable,
|
|
@@ -80,7 +79,50 @@ public class MultithreadedMapRunner<K1 extends WritableComparable,
|
|
|
|
|
|
// Creating a threadpool of the configured size to execute the Mapper
|
|
// Creating a threadpool of the configured size to execute the Mapper
|
|
// map method in parallel.
|
|
// map method in parallel.
|
|
- executorService = Executors.newFixedThreadPool(numberOfThreads);
|
|
|
|
|
|
+ executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads,
|
|
|
|
+ 0L, TimeUnit.MILLISECONDS,
|
|
|
|
+ new BlockingArrayQueue
|
|
|
|
+ (numberOfThreads));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * A blocking array queue that replaces offer and add, which throws on a full
|
|
|
|
+ * queue, to a put, which waits on a full queue.
|
|
|
|
+ */
|
|
|
|
+ private static class BlockingArrayQueue extends ArrayBlockingQueue<Runnable> {
|
|
|
|
+ public BlockingArrayQueue(int capacity) {
|
|
|
|
+ super(capacity);
|
|
|
|
+ }
|
|
|
|
+ public boolean offer(Runnable r) {
|
|
|
|
+ return add(r);
|
|
|
|
+ }
|
|
|
|
+ public boolean add(Runnable r) {
|
|
|
|
+ try {
|
|
|
|
+ put(r);
|
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
+ }
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void checkForExceptionsFromProcessingThreads()
|
|
|
|
+ throws IOException, RuntimeException {
|
|
|
|
+ // Checking if a Mapper.map within a Runnable has generated an
|
|
|
|
+ // IOException. If so we rethrow it to force an abort of the Map
|
|
|
|
+ // operation thus keeping the semantics of the default
|
|
|
|
+ // implementation.
|
|
|
|
+ if (ioException != null) {
|
|
|
|
+ throw ioException;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Checking if a Mapper.map within a Runnable has generated a
|
|
|
|
+ // RuntimeException. If so we rethrow it to force an abort of the Map
|
|
|
|
+ // operation thus keeping the semantics of the default
|
|
|
|
+ // implementation.
|
|
|
|
+ if (runtimeException != null) {
|
|
|
|
+ throw runtimeException;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
|
|
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
|
|
@@ -94,28 +136,10 @@ public class MultithreadedMapRunner<K1 extends WritableComparable,
|
|
|
|
|
|
while (input.next(key, value)) {
|
|
while (input.next(key, value)) {
|
|
|
|
|
|
- // Run Mapper.map execution asynchronously in a separate thread.
|
|
|
|
- // If threads are not available from the thread-pool this method
|
|
|
|
- // will block until there is a thread available.
|
|
|
|
- executorService.execute(
|
|
|
|
- new MapperInvokeRunable(key, value, output,
|
|
|
|
- reporter));
|
|
|
|
-
|
|
|
|
- // Checking if a Mapper.map within a Runnable has generated an
|
|
|
|
- // IOException. If so we rethrow it to force an abort of the Map
|
|
|
|
- // operation thus keeping the semantics of the default
|
|
|
|
- // implementation.
|
|
|
|
- if (ioException != null) {
|
|
|
|
- throw ioException;
|
|
|
|
- }
|
|
|
|
|
|
+ executorService.execute(new MapperInvokeRunable(key, value, output,
|
|
|
|
+ reporter));
|
|
|
|
|
|
- // Checking if a Mapper.map within a Runnable has generated a
|
|
|
|
- // RuntimeException. If so we rethrow it to force an abort of the Map
|
|
|
|
- // operation thus keeping the semantics of the default
|
|
|
|
- // implementation.
|
|
|
|
- if (runtimeException != null) {
|
|
|
|
- throw runtimeException;
|
|
|
|
- }
|
|
|
|
|
|
+ checkForExceptionsFromProcessingThreads();
|
|
|
|
|
|
// Allocate new key & value instances as mapper is running in parallel
|
|
// Allocate new key & value instances as mapper is running in parallel
|
|
key = input.createKey();
|
|
key = input.createKey();
|
|
@@ -141,51 +165,23 @@ public class MultithreadedMapRunner<K1 extends WritableComparable,
|
|
}
|
|
}
|
|
|
|
|
|
// NOTE: while Mapper.map dispatching has concluded there are still
|
|
// NOTE: while Mapper.map dispatching has concluded there are still
|
|
- // map calls in progress.
|
|
|
|
|
|
+ // map calls in progress and exceptions would be thrown.
|
|
|
|
+ checkForExceptionsFromProcessingThreads();
|
|
|
|
|
|
- // Checking if a Mapper.map within a Runnable has generated an
|
|
|
|
- // IOException. If so we rethrow it to force an abort of the Map
|
|
|
|
- // operation thus keeping the semantics of the default
|
|
|
|
- // implementation.
|
|
|
|
- if (ioException != null) {
|
|
|
|
- throw ioException;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Checking if a Mapper.map within a Runnable has generated a
|
|
|
|
- // RuntimeException. If so we rethrow it to force an abort of the Map
|
|
|
|
- // operation thus keeping the semantics of the default
|
|
|
|
- // implementation.
|
|
|
|
- if (runtimeException != null) {
|
|
|
|
- throw runtimeException;
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
|
|
|
|
// NOTE: it could be that a map call has had an exception after the
|
|
// NOTE: it could be that a map call has had an exception after the
|
|
// call for awaitTermination() returing true. And edge case but it
|
|
// call for awaitTermination() returing true. And edge case but it
|
|
// could happen.
|
|
// could happen.
|
|
|
|
+ checkForExceptionsFromProcessingThreads();
|
|
|
|
|
|
- // Checking if a Mapper.map within a Runnable has generated an
|
|
|
|
- // IOException. If so we rethrow it to force an abort of the Map
|
|
|
|
- // operation thus keeping the semantics of the default
|
|
|
|
- // implementation.
|
|
|
|
- if (ioException != null) {
|
|
|
|
- throw ioException;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Checking if a Mapper.map within a Runnable has generated a
|
|
|
|
- // RuntimeException. If so we rethrow it to force an abort of the Map
|
|
|
|
- // operation thus keeping the semantics of the default
|
|
|
|
- // implementation.
|
|
|
|
- if (runtimeException != null) {
|
|
|
|
- throw runtimeException;
|
|
|
|
- }
|
|
|
|
} catch (IOException ioEx) {
|
|
} catch (IOException ioEx) {
|
|
// Forcing a shutdown of all thread of the threadpool and rethrowing
|
|
// Forcing a shutdown of all thread of the threadpool and rethrowing
|
|
// the IOException
|
|
// the IOException
|
|
executorService.shutdownNow();
|
|
executorService.shutdownNow();
|
|
throw ioEx;
|
|
throw ioEx;
|
|
} catch (InterruptedException iEx) {
|
|
} catch (InterruptedException iEx) {
|
|
- throw new IOException(iEx.getMessage());
|
|
|
|
|
|
+ throw new RuntimeException(iEx);
|
|
}
|
|
}
|
|
|
|
|
|
} finally {
|
|
} finally {
|