|
@@ -380,16 +380,35 @@ public class MapTask extends Task {
|
|
|
private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
|
|
|
createSortingCollector(JobConf job, TaskReporter reporter)
|
|
|
throws IOException, ClassNotFoundException {
|
|
|
- MapOutputCollector<KEY, VALUE> collector
|
|
|
- = (MapOutputCollector<KEY, VALUE>)
|
|
|
- ReflectionUtils.newInstance(
|
|
|
- job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
|
|
|
- MapOutputBuffer.class, MapOutputCollector.class), job);
|
|
|
- LOG.info("Map output collector class = " + collector.getClass().getName());
|
|
|
MapOutputCollector.Context context =
|
|
|
- new MapOutputCollector.Context(this, job, reporter);
|
|
|
- collector.init(context);
|
|
|
- return collector;
|
|
|
+ new MapOutputCollector.Context(this, job, reporter);
|
|
|
+
|
|
|
+ Class<?>[] collectorClasses = job.getClasses(
|
|
|
+ JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);
|
|
|
+ int remainingCollectors = collectorClasses.length;
|
|
|
+ for (Class clazz : collectorClasses) {
|
|
|
+ try {
|
|
|
+ if (!MapOutputCollector.class.isAssignableFrom(clazz)) {
|
|
|
+ throw new IOException("Invalid output collector class: " + clazz.getName() +
|
|
|
+ " (does not implement MapOutputCollector)");
|
|
|
+ }
|
|
|
+ Class<? extends MapOutputCollector> subclazz =
|
|
|
+ clazz.asSubclass(MapOutputCollector.class);
|
|
|
+ LOG.debug("Trying map output collector class: " + subclazz.getName());
|
|
|
+ MapOutputCollector<KEY, VALUE> collector =
|
|
|
+ ReflectionUtils.newInstance(subclazz, job);
|
|
|
+ collector.init(context);
|
|
|
+ LOG.info("Map output collector class = " + collector.getClass().getName());
|
|
|
+ return collector;
|
|
|
+ } catch (Exception e) {
|
|
|
+ String msg = "Unable to initialize MapOutputCollector " + clazz.getName();
|
|
|
+ if (--remainingCollectors > 0) {
|
|
|
+ msg += " (" + remainingCollectors + " more collector(s) to try)";
|
|
|
+ }
|
|
|
+ LOG.warn(msg, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ throw new IOException("Unable to initialize any output collector");
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|