|
@@ -112,7 +112,8 @@ public class JobInProgress {
|
|
|
int finishedReduceTasks = 0;
|
|
|
int failedMapTasks = 0;
|
|
|
int failedReduceTasks = 0;
|
|
|
-
|
|
|
+ private static long DEFAULT_REDUCE_INPUT_LIMIT = -1L;
|
|
|
+ long reduce_input_limit = -1L;
|
|
|
private static float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
|
|
|
int completedMapsForReduceSlowstart = 0;
|
|
|
|
|
@@ -417,6 +418,12 @@ public class JobInProgress {
|
|
|
this.jobMetrics.setTag("jobId", jobId.toString());
|
|
|
hasSpeculativeMaps = conf.getMapSpeculativeExecution();
|
|
|
hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
|
|
|
+ // a limit on the input size of the reduce.
|
|
|
+ // we check to see if the estimated input size of
|
|
|
+ // of each reduce is less than this value. If not
|
|
|
+ // we fail the job. A value of -1 just means there is no
|
|
|
+ // limit set.
|
|
|
+ reduce_input_limit = -1L;
|
|
|
this.maxLevel = jobtracker.getNumTaskCacheLevels();
|
|
|
this.anyCacheLevel = this.maxLevel+1;
|
|
|
this.nonLocalMaps = new LinkedList<TaskInProgress>();
|
|
@@ -425,7 +432,8 @@ public class JobInProgress {
|
|
|
this.nonRunningReduces = new LinkedList<TaskInProgress>();
|
|
|
this.runningReduces = new LinkedHashSet<TaskInProgress>();
|
|
|
this.resourceEstimator = new ResourceEstimator(this);
|
|
|
-
|
|
|
+ this.reduce_input_limit = conf.getLong("mapreduce.reduce.input.limit",
|
|
|
+ DEFAULT_REDUCE_INPUT_LIMIT);
|
|
|
// register job's tokens for renewal
|
|
|
DelegationTokenRenewal.registerDelegationTokensForRenewal(
|
|
|
jobInfo.getJobID(), ts, jobtracker.getConf());
|
|
@@ -1539,6 +1547,25 @@ public class JobInProgress {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
+ /** check to see if we have any misbehaving reducers. If the expected output
|
|
|
+ * for reducers is huge then we just fail the job and error out. The estimated
|
|
|
+ * size is divided by 2 since the resource estimator returns the amount of disk
|
|
|
+ * space the that the reduce will use (which is 2 times the input, space for merge + reduce
|
|
|
+ * input). **/
|
|
|
+ long estimatedReduceInputSize = resourceEstimator.getEstimatedReduceInputSize()/2;
|
|
|
+ if (((estimatedReduceInputSize) >
|
|
|
+ reduce_input_limit) && (reduce_input_limit > 0L)) {
|
|
|
+ // make sure jobtracker lock is held
|
|
|
+ LOG.info("Exceeded limit for reduce input size: Estimated:" +
|
|
|
+ estimatedReduceInputSize + " Limit: " +
|
|
|
+ reduce_input_limit + " Failing Job " + jobId);
|
|
|
+ status.setFailureInfo("Job Exceeded Reduce Input limit "
|
|
|
+ + " Limit: " + reduce_input_limit +
|
|
|
+ " Estimated: " + estimatedReduceInputSize);
|
|
|
+ jobtracker.failJob(this);
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
// Ensure we have sufficient map outputs ready to shuffle before
|
|
|
// scheduling reduces
|
|
|
if (!scheduleReduces()) {
|