|
@@ -19,6 +19,9 @@
|
|
package org.apache.hadoop.yarn.server.timelineservice.collector;
|
|
package org.apache.hadoop.yarn.server.timelineservice.collector;
|
|
|
|
|
|
import java.nio.ByteBuffer;
|
|
import java.nio.ByteBuffer;
|
|
|
|
+import java.util.concurrent.Executors;
|
|
|
|
+import java.util.concurrent.ScheduledExecutorService;
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -54,6 +57,8 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
|
|
private static final int SHUTDOWN_HOOK_PRIORITY = 30;
|
|
private static final int SHUTDOWN_HOOK_PRIORITY = 30;
|
|
|
|
|
|
private final NodeTimelineCollectorManager collectorManager;
|
|
private final NodeTimelineCollectorManager collectorManager;
|
|
|
|
+ private long collectorLingerPeriod;
|
|
|
|
+ private ScheduledExecutorService scheduler;
|
|
|
|
|
|
public PerNodeTimelineCollectorsAuxService() {
|
|
public PerNodeTimelineCollectorsAuxService() {
|
|
this(new NodeTimelineCollectorManager());
|
|
this(new NodeTimelineCollectorManager());
|
|
@@ -70,6 +75,10 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
|
|
if (!YarnConfiguration.timelineServiceV2Enabled(conf)) {
|
|
if (!YarnConfiguration.timelineServiceV2Enabled(conf)) {
|
|
throw new YarnException("Timeline service v2 is not enabled");
|
|
throw new YarnException("Timeline service v2 is not enabled");
|
|
}
|
|
}
|
|
|
|
+ collectorLingerPeriod =
|
|
|
|
+ conf.getLong(YarnConfiguration.ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS,
|
|
|
|
+ YarnConfiguration.DEFAULT_ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS);
|
|
|
|
+ scheduler = Executors.newSingleThreadScheduledExecutor();
|
|
collectorManager.init(conf);
|
|
collectorManager.init(conf);
|
|
super.serviceInit(conf);
|
|
super.serviceInit(conf);
|
|
}
|
|
}
|
|
@@ -82,6 +91,12 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
protected void serviceStop() throws Exception {
|
|
protected void serviceStop() throws Exception {
|
|
|
|
+ scheduler.shutdown();
|
|
|
|
+ if (!scheduler.awaitTermination(collectorLingerPeriod,
|
|
|
|
+ TimeUnit.MILLISECONDS)) {
|
|
|
|
+ LOG.warn(
|
|
|
|
+ "Scheduler terminated before removing the application collectors");
|
|
|
|
+ }
|
|
collectorManager.stop();
|
|
collectorManager.stop();
|
|
super.serviceStop();
|
|
super.serviceStop();
|
|
}
|
|
}
|
|
@@ -141,17 +156,11 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
|
|
if (context.getContainerType() == ContainerType.APPLICATION_MASTER) {
|
|
if (context.getContainerType() == ContainerType.APPLICATION_MASTER) {
|
|
final ApplicationId appId =
|
|
final ApplicationId appId =
|
|
context.getContainerId().getApplicationAttemptId().getApplicationId();
|
|
context.getContainerId().getApplicationAttemptId().getApplicationId();
|
|
- new Thread(new Runnable() {
|
|
|
|
|
|
+ scheduler.schedule(new Runnable() {
|
|
public void run() {
|
|
public void run() {
|
|
- try {
|
|
|
|
- // TODO Temporary Fix until solution for YARN-3995 is finalized.
|
|
|
|
- Thread.sleep(1000l);
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
- e.printStackTrace();
|
|
|
|
- }
|
|
|
|
removeApplication(appId);
|
|
removeApplication(appId);
|
|
}
|
|
}
|
|
- }).start();
|
|
|
|
|
|
+ }, collectorLingerPeriod, TimeUnit.MILLISECONDS);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|