|
@@ -91,6 +91,7 @@ public class DefaultSpeculator extends AbstractService implements
|
|
|
private final Configuration conf;
|
|
|
private AppContext context;
|
|
|
private Thread speculationBackgroundThread = null;
|
|
|
+ private volatile boolean stopped = false;
|
|
|
private BlockingQueue<SpeculatorEvent> eventQueue
|
|
|
= new LinkedBlockingQueue<SpeculatorEvent>();
|
|
|
private TaskRuntimeEstimator estimator;
|
|
@@ -170,7 +171,7 @@ public class DefaultSpeculator extends AbstractService implements
|
|
|
= new Runnable() {
|
|
|
@Override
|
|
|
public void run() {
|
|
|
- while (!Thread.currentThread().isInterrupted()) {
|
|
|
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
|
|
|
long backgroundRunStartTime = clock.getTime();
|
|
|
try {
|
|
|
int speculations = computeSpeculations();
|
|
@@ -189,8 +190,9 @@ public class DefaultSpeculator extends AbstractService implements
|
|
|
Object pollResult
|
|
|
= scanControl.poll(wait, TimeUnit.MILLISECONDS);
|
|
|
} catch (InterruptedException e) {
|
|
|
- LOG.error("Background thread returning, interrupted : " + e);
|
|
|
- e.printStackTrace(System.out);
|
|
|
+ if (!stopped) {
|
|
|
+ LOG.error("Background thread returning, interrupted", e);
|
|
|
+ }
|
|
|
return;
|
|
|
}
|
|
|
}
|
|
@@ -205,6 +207,7 @@ public class DefaultSpeculator extends AbstractService implements
|
|
|
|
|
|
@Override
|
|
|
public void stop() {
|
|
|
+ stopped = true;
|
|
|
// this could be called before background thread is established
|
|
|
if (speculationBackgroundThread != null) {
|
|
|
speculationBackgroundThread.interrupt();
|