|
@@ -69,8 +69,14 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
|
public class LogAggregationService extends AbstractService implements
|
|
|
LogHandler {
|
|
|
|
|
|
- private static final Log LOG = LogFactory
|
|
|
- .getLog(LogAggregationService.class);
|
|
|
+ private static final Log LOG = LogFactory.getLog(LogAggregationService.class);
|
|
|
+ private static final long MIN_LOG_ROLLING_INTERVAL = 3600;
|
|
|
+ // This configuration is for debug and test purpose. By setting
|
|
|
+ // this configuration as true. We can break the lower bound of
|
|
|
+ // NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS.
|
|
|
+ private static final String NM_LOG_AGGREGATION_DEBUG_ENABLED
|
|
|
+ = YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled";
|
|
|
+ private long rollingMonitorInterval;
|
|
|
|
|
|
/*
|
|
|
* Expected deployment TLD will be 1777, owner=<NMOwner>, group=<NMGroup -
|
|
@@ -101,6 +107,7 @@ public class LogAggregationService extends AbstractService implements
|
|
|
private NodeId nodeId;
|
|
|
|
|
|
private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
|
|
|
+ private boolean logPermError = true;
|
|
|
|
|
|
@VisibleForTesting
|
|
|
ExecutorService threadPool;
|
|
@@ -128,6 +135,35 @@ public class LogAggregationService extends AbstractService implements
|
|
|
new ThreadFactoryBuilder()
|
|
|
.setNameFormat("LogAggregationService #%d")
|
|
|
.build());
|
|
|
+
|
|
|
+ rollingMonitorInterval = conf.getLong(
|
|
|
+ YarnConfiguration.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS,
|
|
|
+ YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS);
|
|
|
+
|
|
|
+ boolean logAggregationDebugMode =
|
|
|
+ conf.getBoolean(NM_LOG_AGGREGATION_DEBUG_ENABLED, false);
|
|
|
+
|
|
|
+ if (rollingMonitorInterval > 0
|
|
|
+ && rollingMonitorInterval < MIN_LOG_ROLLING_INTERVAL) {
|
|
|
+ if (logAggregationDebugMode) {
|
|
|
+ LOG.info("Log aggregation debug mode enabled. rollingMonitorInterval = "
|
|
|
+ + rollingMonitorInterval);
|
|
|
+ } else {
|
|
|
+ LOG.warn("rollingMonitorIntervall should be more than or equal to "
|
|
|
+ + MIN_LOG_ROLLING_INTERVAL + " seconds. Using "
|
|
|
+ + MIN_LOG_ROLLING_INTERVAL + " seconds instead.");
|
|
|
+ this.rollingMonitorInterval = MIN_LOG_ROLLING_INTERVAL;
|
|
|
+ }
|
|
|
+ } else if (rollingMonitorInterval <= 0) {
|
|
|
+ LOG.info("rollingMonitorInterval is set as " + rollingMonitorInterval
|
|
|
+ + ". The log rolling monitoring interval is disabled. "
|
|
|
+ + "The logs will be aggregated after this application is finished.");
|
|
|
+ } else {
|
|
|
+ LOG.info("rollingMonitorInterval is set as " + rollingMonitorInterval
|
|
|
+ + ". The logs will be aggregated every " + rollingMonitorInterval
|
|
|
+ + " seconds");
|
|
|
+ }
|
|
|
+
|
|
|
super.serviceInit(conf);
|
|
|
}
|
|
|
|
|
@@ -197,11 +233,14 @@ public class LogAggregationService extends AbstractService implements
|
|
|
try {
|
|
|
FsPermission perms =
|
|
|
remoteFS.getFileStatus(this.remoteRootLogDir).getPermission();
|
|
|
- if (!perms.equals(TLDIR_PERMISSIONS)) {
|
|
|
+ if (!perms.equals(TLDIR_PERMISSIONS) && logPermError) {
|
|
|
LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir
|
|
|
+ "] already exist, but with incorrect permissions. "
|
|
|
+ "Expected: [" + TLDIR_PERMISSIONS + "], Found: [" + perms
|
|
|
+ "]." + " The cluster may have problems with multiple users.");
|
|
|
+ logPermError = false;
|
|
|
+ } else {
|
|
|
+ logPermError = true;
|
|
|
}
|
|
|
} catch (FileNotFoundException e) {
|
|
|
remoteExists = false;
|
|
@@ -360,7 +399,7 @@ public class LogAggregationService extends AbstractService implements
|
|
|
getConfig(), appId, userUgi, this.nodeId, dirsHandler,
|
|
|
getRemoteNodeLogFileForApp(appId, user),
|
|
|
appAcls, logAggregationContext, this.context,
|
|
|
- getLocalFileContext(getConfig()));
|
|
|
+ getLocalFileContext(getConfig()), this.rollingMonitorInterval);
|
|
|
if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
|
|
|
throw new YarnRuntimeException("Duplicate initApp for " + appId);
|
|
|
}
|