|
@@ -28,7 +28,6 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
-import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
@@ -75,9 +74,6 @@ public class DefaultSpeculator extends AbstractService implements
|
|
|
private final ConcurrentMap<TaskId, Boolean> runningTasks
|
|
|
= new ConcurrentHashMap<TaskId, Boolean>();
|
|
|
|
|
|
- private final Map<Task, AtomicBoolean> pendingSpeculations
|
|
|
- = new ConcurrentHashMap<Task, AtomicBoolean>();
|
|
|
-
|
|
|
// Used to track any TaskAttempts that aren't heart-beating for a while, so
|
|
|
// that we can aggressively speculate instead of waiting for task-timeout.
|
|
|
private final ConcurrentMap<TaskAttemptId, TaskAttemptHistoryStatistics>
|
|
@@ -328,13 +324,6 @@ public class DefaultSpeculator extends AbstractService implements
|
|
|
|
|
|
estimator.updateAttempt(reportedStatus, timestamp);
|
|
|
|
|
|
- // If the task is already known to be speculation-bait, don't do anything
|
|
|
- if (pendingSpeculations.get(task) != null) {
|
|
|
- if (pendingSpeculations.get(task).get()) {
|
|
|
- return;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
if (stateString.equals(TaskAttemptState.RUNNING.name())) {
|
|
|
runningTasks.putIfAbsent(taskID, Boolean.TRUE);
|
|
|
} else {
|