|
@@ -289,7 +289,7 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
|
|
|
* Time period for which the timelineclient will wait for draining after
|
|
|
* stop.
|
|
|
*/
|
|
|
- private static final long DRAIN_TIME_PERIOD = 2000L;
|
|
|
+ private final long drainTimeoutPeriod;
|
|
|
|
|
|
private int numberOfAsyncsToMerge;
|
|
|
private final BlockingQueue<EntitiesHolder> timelineEntityQueue;
|
|
@@ -300,6 +300,9 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
|
|
|
numberOfAsyncsToMerge =
|
|
|
conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE,
|
|
|
YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE);
|
|
|
+ drainTimeoutPeriod = conf.getLong(
|
|
|
+ YarnConfiguration.TIMELINE_V2_CLIENT_DRAIN_TIME_MILLIS,
|
|
|
+ YarnConfiguration.DEFAULT_TIMELINE_V2_CLIENT_DRAIN_TIME_MILLIS);
|
|
|
}
|
|
|
|
|
|
Runnable createRunnable() {
|
|
@@ -330,7 +333,7 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
|
|
|
// Try to drain the remaining entities to be published @ the max for
|
|
|
// 2 seconds
|
|
|
long timeTillweDrain =
|
|
|
- System.currentTimeMillis() + DRAIN_TIME_PERIOD;
|
|
|
+ System.currentTimeMillis() + drainTimeoutPeriod;
|
|
|
while (!timelineEntityQueue.isEmpty()) {
|
|
|
publishWithoutBlockingOnQueue(timelineEntityQueue.poll());
|
|
|
if (System.currentTimeMillis() > timeTillweDrain) {
|
|
@@ -449,7 +452,7 @@ public class TimelineV2ClientImpl extends TimelineV2Client {
|
|
|
LOG.info("Stopping TimelineClient.");
|
|
|
executor.shutdownNow();
|
|
|
try {
|
|
|
- executor.awaitTermination(DRAIN_TIME_PERIOD, TimeUnit.MILLISECONDS);
|
|
|
+ executor.awaitTermination(drainTimeoutPeriod, TimeUnit.MILLISECONDS);
|
|
|
} catch (InterruptedException e) {
|
|
|
Thread.currentThread().interrupt();
|
|
|
e.printStackTrace();
|