|
@@ -19,6 +19,13 @@
|
|
|
package org.apache.hadoop.yarn.server.timelineservice.collector;
|
|
|
|
|
|
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.concurrent.Executors;
|
|
|
+import java.util.concurrent.ScheduledExecutorService;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
@@ -32,9 +39,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
|
|
|
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
|
|
|
|
|
|
-import java.util.Collections;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.Map;
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
|
|
|
/**
|
|
|
* Class that manages adding and removing collectors and their lifecycle. It
|
|
@@ -47,7 +52,10 @@ public abstract class TimelineCollectorManager extends AbstractService {
|
|
|
private static final Log LOG =
|
|
|
LogFactory.getLog(TimelineCollectorManager.class);
|
|
|
|
|
|
- protected TimelineWriter writer;
|
|
|
+ private TimelineWriter writer;
|
|
|
+ private ScheduledExecutorService writerFlusher;
|
|
|
+ private int flushInterval;
|
|
|
+ private boolean writerFlusherRunning;
|
|
|
|
|
|
@Override
|
|
|
public void serviceInit(Configuration conf) throws Exception {
|
|
@@ -56,6 +64,12 @@ public abstract class TimelineCollectorManager extends AbstractService {
|
|
|
FileSystemTimelineWriterImpl.class,
|
|
|
TimelineWriter.class), conf);
|
|
|
writer.init(conf);
|
|
|
+ // create a single dedicated thread for flushing the writer on a periodic
|
|
|
+ // basis
|
|
|
+ writerFlusher = Executors.newSingleThreadScheduledExecutor();
|
|
|
+ flushInterval = conf.getInt(
|
|
|
+ YarnConfiguration.TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS,
|
|
|
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS);
|
|
|
super.serviceInit(conf);
|
|
|
}
|
|
|
|
|
@@ -65,6 +79,10 @@ public abstract class TimelineCollectorManager extends AbstractService {
|
|
|
if (writer != null) {
|
|
|
writer.start();
|
|
|
}
|
|
|
+ // schedule the flush task
|
|
|
+ writerFlusher.scheduleAtFixedRate(new WriterFlushTask(writer),
|
|
|
+ flushInterval, flushInterval, TimeUnit.SECONDS);
|
|
|
+ writerFlusherRunning = true;
|
|
|
}
|
|
|
|
|
|
// access to this map is synchronized with the map itself
|
|
@@ -161,9 +179,48 @@ public abstract class TimelineCollectorManager extends AbstractService {
|
|
|
c.serviceStop();
|
|
|
}
|
|
|
}
|
|
|
+ // stop the flusher first
|
|
|
+ if (writerFlusher != null) {
|
|
|
+ writerFlusher.shutdown();
|
|
|
+ writerFlusherRunning = false;
|
|
|
+ if (!writerFlusher.awaitTermination(30, TimeUnit.SECONDS)) {
|
|
|
+ // in reality it should be ample time for the flusher task to finish
|
|
|
+ // even if it times out, writers may be able to handle closing in this
|
|
|
+ // situation fine
|
|
|
+ // proceed to close the writer
|
|
|
+ LOG.warn("failed to stop the flusher task in time. " +
|
|
|
+ "will still proceed to close the writer.");
|
|
|
+ }
|
|
|
+ }
|
|
|
if (writer != null) {
|
|
|
writer.close();
|
|
|
}
|
|
|
super.serviceStop();
|
|
|
}
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ boolean writerFlusherRunning() {
|
|
|
+ return writerFlusherRunning;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Task that invokes the flush operation on the timeline writer.
|
|
|
+ */
|
|
|
+ private static class WriterFlushTask implements Runnable {
|
|
|
+ private final TimelineWriter writer;
|
|
|
+
|
|
|
+ public WriterFlushTask(TimelineWriter writer) {
|
|
|
+ this.writer = writer;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ writer.flush();
|
|
|
+ } catch (Throwable th) {
|
|
|
+ // we need to handle all exceptions or subsequent execution may be
|
|
|
+ // suppressed
|
|
|
+ LOG.error("exception during timeline writer flush!", th);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|