|
@@ -71,7 +71,6 @@ public class LogAggregationService extends AbstractService implements
|
|
|
|
|
|
private static final Logger LOG =
|
|
|
LoggerFactory.getLogger(LogAggregationService.class);
|
|
|
- private static final long MIN_LOG_ROLLING_INTERVAL = 3600;
|
|
|
// This configuration is for debug and test purpose. By setting
|
|
|
// this configuration as true. We can break the lower bound of
|
|
|
// NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS.
|
|
@@ -106,6 +105,49 @@ public class LogAggregationService extends AbstractService implements
|
|
|
this.invalidTokenApps = ConcurrentHashMap.newKeySet();
|
|
|
}
|
|
|
|
|
|
+ private static long calculateRollingMonitorInterval(Configuration conf) {
|
|
|
+ long interval = conf.getLong(
|
|
|
+ YarnConfiguration.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS,
|
|
|
+ YarnConfiguration.
|
|
|
+ DEFAULT_NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS);
|
|
|
+
|
|
|
+ if (interval <= 0) {
|
|
|
+ LOG.info("rollingMonitorInterval is set as " + interval
|
|
|
+ + ". The log rolling monitoring interval is disabled. "
|
|
|
+ + "The logs will be aggregated after this application is finished.");
|
|
|
+ } else {
|
|
|
+ boolean logAggregationDebugMode =
|
|
|
+ conf.getBoolean(NM_LOG_AGGREGATION_DEBUG_ENABLED, false);
|
|
|
+ long minRollingMonitorInterval = conf.getLong(
|
|
|
+ YarnConfiguration.MIN_LOG_ROLLING_INTERVAL_SECONDS,
|
|
|
+ YarnConfiguration.MIN_LOG_ROLLING_INTERVAL_SECONDS_DEFAULT);
|
|
|
+
|
|
|
+ boolean warnHardMinLimitLowerThanDefault = minRollingMonitorInterval <
|
|
|
+ YarnConfiguration.MIN_LOG_ROLLING_INTERVAL_SECONDS_DEFAULT &&
|
|
|
+ !logAggregationDebugMode;
|
|
|
+ if (warnHardMinLimitLowerThanDefault) {
|
|
|
+ LOG.warn("{} has been set to {}, which is less than the default "
|
|
|
+ + "minimum value {}. This may impact NodeManager's performance.",
|
|
|
+ YarnConfiguration.MIN_LOG_ROLLING_INTERVAL_SECONDS,
|
|
|
+ minRollingMonitorInterval,
|
|
|
+ YarnConfiguration.MIN_LOG_ROLLING_INTERVAL_SECONDS_DEFAULT);
|
|
|
+ }
|
|
|
+ boolean lowerThanHardLimit = interval < minRollingMonitorInterval;
|
|
|
+ if (lowerThanHardLimit) {
|
|
|
+ if (logAggregationDebugMode) {
|
|
|
+ LOG.info("Log aggregation debug mode enabled. " +
|
|
|
+ "Skipped checking minimum limit.");
|
|
|
+ } else {
|
|
|
+ LOG.warn("rollingMonitorInterval should be more than " +
|
|
|
+ "or equal to {} seconds. Using {} seconds instead.",
|
|
|
+ minRollingMonitorInterval, minRollingMonitorInterval);
|
|
|
+ interval = minRollingMonitorInterval;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return interval;
|
|
|
+ }
|
|
|
+
|
|
|
protected void serviceInit(Configuration conf) throws Exception {
|
|
|
int threadPoolSize = getAggregatorThreadPoolSize(conf);
|
|
|
this.threadPool = HadoopExecutors.newFixedThreadPool(threadPoolSize,
|
|
@@ -113,33 +155,10 @@ public class LogAggregationService extends AbstractService implements
|
|
|
.setNameFormat("LogAggregationService #%d")
|
|
|
.build());
|
|
|
|
|
|
- rollingMonitorInterval = conf.getLong(
|
|
|
- YarnConfiguration.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS,
|
|
|
- YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS);
|
|
|
-
|
|
|
- boolean logAggregationDebugMode =
|
|
|
- conf.getBoolean(NM_LOG_AGGREGATION_DEBUG_ENABLED, false);
|
|
|
-
|
|
|
- if (rollingMonitorInterval > 0
|
|
|
- && rollingMonitorInterval < MIN_LOG_ROLLING_INTERVAL) {
|
|
|
- if (logAggregationDebugMode) {
|
|
|
- LOG.info("Log aggregation debug mode enabled. rollingMonitorInterval = "
|
|
|
- + rollingMonitorInterval);
|
|
|
- } else {
|
|
|
- LOG.warn("rollingMonitorInterval should be more than or equal to {} " +
|
|
|
- "seconds. Using {} seconds instead.",
|
|
|
- MIN_LOG_ROLLING_INTERVAL, MIN_LOG_ROLLING_INTERVAL);
|
|
|
- this.rollingMonitorInterval = MIN_LOG_ROLLING_INTERVAL;
|
|
|
- }
|
|
|
- } else if (rollingMonitorInterval <= 0) {
|
|
|
- LOG.info("rollingMonitorInterval is set as " + rollingMonitorInterval
|
|
|
- + ". The log rolling monitoring interval is disabled. "
|
|
|
- + "The logs will be aggregated after this application is finished.");
|
|
|
- } else {
|
|
|
- LOG.info("rollingMonitorInterval is set as " + rollingMonitorInterval
|
|
|
- + ". The logs will be aggregated every " + rollingMonitorInterval
|
|
|
- + " seconds");
|
|
|
- }
|
|
|
+ rollingMonitorInterval = calculateRollingMonitorInterval(conf);
|
|
|
+ LOG.info("rollingMonitorInterval is set as {}. The logs will be " +
|
|
|
+ "aggregated every {} seconds", rollingMonitorInterval,
|
|
|
+ rollingMonitorInterval);
|
|
|
|
|
|
super.serviceInit(conf);
|
|
|
}
|
|
@@ -413,6 +432,10 @@ public class LogAggregationService extends AbstractService implements
|
|
|
return this.nodeId;
|
|
|
}
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
+ public long getRollingMonitorInterval() {
|
|
|
+ return rollingMonitorInterval;
|
|
|
+ }
|
|
|
|
|
|
private int getAggregatorThreadPoolSize(Configuration conf) {
|
|
|
int threadPoolSize;
|