|
@@ -57,6 +57,7 @@ public class ActivitiesManager extends AbstractService {
|
|
|
private Thread cleanUpThread;
|
|
|
private int timeThreshold = 600 * 1000;
|
|
|
private final RMContext rmContext;
|
|
|
+ private volatile boolean stopped;
|
|
|
|
|
|
public ActivitiesManager(RMContext rmContext) {
|
|
|
super(ActivitiesManager.class.getName());
|
|
@@ -113,7 +114,7 @@ public class ActivitiesManager extends AbstractService {
|
|
|
cleanUpThread = new Thread(new Runnable() {
|
|
|
@Override
|
|
|
public void run() {
|
|
|
- while (true) {
|
|
|
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
|
|
|
Iterator<Map.Entry<NodeId, List<NodeAllocation>>> ite =
|
|
|
completedNodeAllocations.entrySet().iterator();
|
|
|
while (ite.hasNext()) {
|
|
@@ -140,20 +141,29 @@ public class ActivitiesManager extends AbstractService {
|
|
|
|
|
|
try {
|
|
|
Thread.sleep(5000);
|
|
|
- } catch (Exception e) {
|
|
|
- // ignore
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ LOG.info(getName() + " thread interrupted");
|
|
|
+ break;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
});
|
|
|
-
|
|
|
+ cleanUpThread.setName("ActivitiesManager thread.");
|
|
|
cleanUpThread.start();
|
|
|
super.serviceStart();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
protected void serviceStop() throws Exception {
|
|
|
- cleanUpThread.interrupt();
|
|
|
+ stopped = true;
|
|
|
+ if (cleanUpThread != null) {
|
|
|
+ cleanUpThread.interrupt();
|
|
|
+ try {
|
|
|
+ cleanUpThread.join();
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ LOG.warn("Interrupted Exception while stopping", ie);
|
|
|
+ }
|
|
|
+ }
|
|
|
super.serviceStop();
|
|
|
}
|
|
|
|