|
@@ -37,7 +37,8 @@ import org.apache.hadoop.util.Progress;
|
|
|
@InterfaceAudience.LimitedPrivate({"MapReduce"})
|
|
|
@InterfaceStability.Unstable
|
|
|
@SuppressWarnings({"unchecked", "rawtypes"})
|
|
|
-public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionReporter {
|
|
|
+public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>,
|
|
|
+ ExceptionReporter {
|
|
|
private static final int PROGRESS_FREQUENCY = 2000;
|
|
|
private static final int MAX_EVENTS_TO_FETCH = 10000;
|
|
|
private static final int MIN_EVENTS_TO_FETCH = 100;
|
|
@@ -51,7 +52,7 @@ public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionRepo
|
|
|
private ShuffleClientMetrics metrics;
|
|
|
private TaskUmbilicalProtocol umbilical;
|
|
|
|
|
|
- private ShuffleSchedulerImpl<K,V> scheduler;
|
|
|
+ private ShuffleSchedulerImpl<K, V> scheduler;
|
|
|
private MergeManager<K, V> merger;
|
|
|
private Throwable throwable = null;
|
|
|
private String throwingThreadName = null;
|
|
@@ -68,7 +69,8 @@ public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionRepo
|
|
|
this.jobConf = context.getJobConf();
|
|
|
this.umbilical = context.getUmbilical();
|
|
|
this.reporter = context.getReporter();
|
|
|
- this.metrics = ShuffleClientMetrics.create();
|
|
|
+ this.metrics = ShuffleClientMetrics.create(context.getReduceId(),
|
|
|
+ this.jobConf);
|
|
|
this.copyPhase = context.getCopyPhase();
|
|
|
this.taskStatus = context.getStatus();
|
|
|
this.reduceTask = context.getReduceTask();
|
|
@@ -101,16 +103,16 @@ public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionRepo
|
|
|
int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
|
|
|
|
|
|
// Start the map-completion events fetcher thread
|
|
|
- final EventFetcher<K,V> eventFetcher =
|
|
|
- new EventFetcher<K,V>(reduceId, umbilical, scheduler, this,
|
|
|
- maxEventsToFetch);
|
|
|
+ final EventFetcher<K, V> eventFetcher =
|
|
|
+ new EventFetcher<K, V>(reduceId, umbilical, scheduler, this,
|
|
|
+ maxEventsToFetch);
|
|
|
eventFetcher.start();
|
|
|
|
|
|
// Start the map-output fetcher threads
|
|
|
boolean isLocal = localMapFiles != null;
|
|
|
final int numFetchers = isLocal ? 1 :
|
|
|
- jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
|
|
|
- Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
|
|
|
+ jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
|
|
|
+ Fetcher<K, V>[] fetchers = new Fetcher[numFetchers];
|
|
|
if (isLocal) {
|
|
|
fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
|
|
|
merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
|
|
@@ -118,7 +120,7 @@ public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionRepo
|
|
|
fetchers[0].start();
|
|
|
} else {
|
|
|
for (int i=0; i < numFetchers; ++i) {
|
|
|
- fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
|
|
|
+ fetchers[i] = new Fetcher<K, V>(jobConf, reduceId, scheduler, merger,
|
|
|
reporter, metrics, this,
|
|
|
reduceTask.getShuffleSecret());
|
|
|
fetchers[i].start();
|
|
@@ -141,7 +143,7 @@ public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionRepo
|
|
|
eventFetcher.shutDown();
|
|
|
|
|
|
// Stop the map-output fetcher threads
|
|
|
- for (Fetcher<K,V> fetcher : fetchers) {
|
|
|
+ for (Fetcher<K, V> fetcher : fetchers) {
|
|
|
fetcher.shutDown();
|
|
|
}
|
|
|
|
|
@@ -157,7 +159,7 @@ public class Shuffle<K, V> implements ShuffleConsumerPlugin<K, V>, ExceptionRepo
|
|
|
try {
|
|
|
kvIter = merger.close();
|
|
|
} catch (Throwable e) {
|
|
|
- throw new ShuffleError("Error while doing final merge " , e);
|
|
|
+ throw new ShuffleError("Error while doing final merge ", e);
|
|
|
}
|
|
|
|
|
|
// Sanity check
|