|
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
import org.apache.hadoop.yarn.util.Clock;
|
|
import org.apache.hadoop.yarn.util.Clock;
|
|
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
|
|
|
public class DefaultSpeculator extends AbstractService implements
|
|
public class DefaultSpeculator extends AbstractService implements
|
|
Speculator {
|
|
Speculator {
|
|
@@ -62,12 +63,11 @@ public class DefaultSpeculator extends AbstractService implements
|
|
private static final long NOT_RUNNING = Long.MIN_VALUE + 4;
|
|
private static final long NOT_RUNNING = Long.MIN_VALUE + 4;
|
|
private static final long TOO_LATE_TO_SPECULATE = Long.MIN_VALUE + 5;
|
|
private static final long TOO_LATE_TO_SPECULATE = Long.MIN_VALUE + 5;
|
|
|
|
|
|
- private static final long SOONEST_RETRY_AFTER_NO_SPECULATE = 1000L * 1L;
|
|
|
|
- private static final long SOONEST_RETRY_AFTER_SPECULATE = 1000L * 15L;
|
|
|
|
-
|
|
|
|
- private static final double PROPORTION_RUNNING_TASKS_SPECULATABLE = 0.1;
|
|
|
|
- private static final double PROPORTION_TOTAL_TASKS_SPECULATABLE = 0.01;
|
|
|
|
- private static final int MINIMUM_ALLOWED_SPECULATIVE_TASKS = 10;
|
|
|
|
|
|
+ private long soonestRetryAfterNoSpeculate;
|
|
|
|
+ private long soonestRetryAfterSpeculate;
|
|
|
|
+ private double proportionRunningTasksSpeculatable;
|
|
|
|
+ private double proportionTotalTasksSpeculatable;
|
|
|
|
+ private int minimumAllowedSpeculativeTasks;
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(DefaultSpeculator.class);
|
|
private static final Log LOG = LogFactory.getLog(DefaultSpeculator.class);
|
|
|
|
|
|
@@ -163,6 +163,21 @@ public class DefaultSpeculator extends AbstractService implements
|
|
this.estimator = estimator;
|
|
this.estimator = estimator;
|
|
this.clock = clock;
|
|
this.clock = clock;
|
|
this.eventHandler = context.getEventHandler();
|
|
this.eventHandler = context.getEventHandler();
|
|
|
|
+ this.soonestRetryAfterNoSpeculate =
|
|
|
|
+ conf.getLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_NO_SPECULATE,
|
|
|
|
+ MRJobConfig.DEFAULT_SPECULATIVE_RETRY_AFTER_NO_SPECULATE);
|
|
|
|
+ this.soonestRetryAfterSpeculate =
|
|
|
|
+ conf.getLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_SPECULATE,
|
|
|
|
+ MRJobConfig.DEFAULT_SPECULATIVE_RETRY_AFTER_SPECULATE);
|
|
|
|
+ this.proportionRunningTasksSpeculatable =
|
|
|
|
+ conf.getDouble(MRJobConfig.SPECULATIVECAP_RUNNING_TASKS,
|
|
|
|
+ MRJobConfig.DEFAULT_SPECULATIVECAP_RUNNING_TASKS);
|
|
|
|
+ this.proportionTotalTasksSpeculatable =
|
|
|
|
+ conf.getDouble(MRJobConfig.SPECULATIVECAP_TOTAL_TASKS,
|
|
|
|
+ MRJobConfig.DEFAULT_SPECULATIVECAP_TOTAL_TASKS);
|
|
|
|
+ this.minimumAllowedSpeculativeTasks =
|
|
|
|
+ conf.getInt(MRJobConfig.SPECULATIVE_MINIMUM_ALLOWED_TASKS,
|
|
|
|
+ MRJobConfig.DEFAULT_SPECULATIVE_MINIMUM_ALLOWED_TASKS);
|
|
}
|
|
}
|
|
|
|
|
|
/* ************************************************************* */
|
|
/* ************************************************************* */
|
|
@@ -182,8 +197,8 @@ public class DefaultSpeculator extends AbstractService implements
|
|
try {
|
|
try {
|
|
int speculations = computeSpeculations();
|
|
int speculations = computeSpeculations();
|
|
long mininumRecomp
|
|
long mininumRecomp
|
|
- = speculations > 0 ? SOONEST_RETRY_AFTER_SPECULATE
|
|
|
|
- : SOONEST_RETRY_AFTER_NO_SPECULATE;
|
|
|
|
|
|
+ = speculations > 0 ? soonestRetryAfterSpeculate
|
|
|
|
+ : soonestRetryAfterNoSpeculate;
|
|
|
|
|
|
long wait = Math.max(mininumRecomp,
|
|
long wait = Math.max(mininumRecomp,
|
|
clock.getTime() - backgroundRunStartTime);
|
|
clock.getTime() - backgroundRunStartTime);
|
|
@@ -497,8 +512,8 @@ public class DefaultSpeculator extends AbstractService implements
|
|
Map<TaskId, Task> tasks = job.getTasks(type);
|
|
Map<TaskId, Task> tasks = job.getTasks(type);
|
|
|
|
|
|
int numberAllowedSpeculativeTasks
|
|
int numberAllowedSpeculativeTasks
|
|
- = (int) Math.max(MINIMUM_ALLOWED_SPECULATIVE_TASKS,
|
|
|
|
- PROPORTION_TOTAL_TASKS_SPECULATABLE * tasks.size());
|
|
|
|
|
|
+ = (int) Math.max(minimumAllowedSpeculativeTasks,
|
|
|
|
+ proportionTotalTasksSpeculatable * tasks.size());
|
|
|
|
|
|
TaskId bestTaskID = null;
|
|
TaskId bestTaskID = null;
|
|
long bestSpeculationValue = -1L;
|
|
long bestSpeculationValue = -1L;
|
|
@@ -523,7 +538,7 @@ public class DefaultSpeculator extends AbstractService implements
|
|
}
|
|
}
|
|
numberAllowedSpeculativeTasks
|
|
numberAllowedSpeculativeTasks
|
|
= (int) Math.max(numberAllowedSpeculativeTasks,
|
|
= (int) Math.max(numberAllowedSpeculativeTasks,
|
|
- PROPORTION_RUNNING_TASKS_SPECULATABLE * numberRunningTasks);
|
|
|
|
|
|
+ proportionRunningTasksSpeculatable * numberRunningTasks);
|
|
|
|
|
|
// If we found a speculation target, fire it off
|
|
// If we found a speculation target, fire it off
|
|
if (bestTaskID != null
|
|
if (bestTaskID != null
|
|
@@ -583,4 +598,29 @@ public class DefaultSpeculator extends AbstractService implements
|
|
this.lastHeartBeatTime = lastHeartBeatTime;
|
|
this.lastHeartBeatTime = lastHeartBeatTime;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ public long getSoonestRetryAfterNoSpeculate() {
|
|
|
|
+ return soonestRetryAfterNoSpeculate;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ public long getSoonestRetryAfterSpeculate() {
|
|
|
|
+ return soonestRetryAfterSpeculate;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ public double getProportionRunningTasksSpeculatable() {
|
|
|
|
+ return proportionRunningTasksSpeculatable;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ public double getProportionTotalTasksSpeculatable() {
|
|
|
|
+ return proportionTotalTasksSpeculatable;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ public int getMinimumAllowedSpeculativeTasks() {
|
|
|
|
+ return minimumAllowedSpeculativeTasks;
|
|
|
|
+ }
|
|
}
|
|
}
|