|
@@ -19,24 +19,19 @@
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
|
|
|
|
|
import java.net.InetAddress;
|
|
|
-import java.util.ArrayList;
|
|
|
import java.util.Collections;
|
|
|
import java.util.EnumSet;
|
|
|
import java.util.HashMap;
|
|
|
-import java.util.Iterator;
|
|
|
import java.util.LinkedHashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
-import java.util.Map.Entry;
|
|
|
import java.util.Set;
|
|
|
import java.util.TreeSet;
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ConcurrentSkipListSet;
|
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
|
|
|
|
|
-import org.apache.commons.lang.StringUtils;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
@@ -181,19 +176,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
|
new AppFinishedTransition();
|
|
|
private Set<NodeId> ranNodes = new ConcurrentSkipListSet<NodeId>();
|
|
|
|
|
|
- private final boolean logAggregationEnabled;
|
|
|
- private long logAggregationStartTime = 0;
|
|
|
- private final long logAggregationStatusTimeout;
|
|
|
- private final Map<NodeId, LogAggregationReport> logAggregationStatus =
|
|
|
- new ConcurrentHashMap<NodeId, LogAggregationReport>();
|
|
|
- private volatile LogAggregationStatus logAggregationStatusForAppReport;
|
|
|
- private int logAggregationSucceed = 0;
|
|
|
- private int logAggregationFailed = 0;
|
|
|
- private Map<NodeId, List<String>> logAggregationDiagnosticsForNMs =
|
|
|
- new HashMap<NodeId, List<String>>();
|
|
|
- private Map<NodeId, List<String>> logAggregationFailureMessagesForNMs =
|
|
|
- new HashMap<NodeId, List<String>>();
|
|
|
- private final int maxLogAggregationDiagnosticsInMemory;
|
|
|
+ private final RMAppLogAggregation logAggregation;
|
|
|
private Map<ApplicationTimeoutType, Long> applicationTimeouts =
|
|
|
new HashMap<ApplicationTimeoutType, Long>();
|
|
|
|
|
@@ -510,26 +493,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
|
applicationSchedulingEnvs
|
|
|
.putAll(submissionContext.getApplicationSchedulingPropertiesMap());
|
|
|
|
|
|
- long localLogAggregationStatusTimeout =
|
|
|
- conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
|
|
|
- YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
|
|
|
- if (localLogAggregationStatusTimeout <= 0) {
|
|
|
- this.logAggregationStatusTimeout =
|
|
|
- YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS;
|
|
|
- } else {
|
|
|
- this.logAggregationStatusTimeout = localLogAggregationStatusTimeout;
|
|
|
- }
|
|
|
- this.logAggregationEnabled =
|
|
|
- conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
|
|
|
- YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
|
|
|
- if (this.logAggregationEnabled) {
|
|
|
- this.logAggregationStatusForAppReport = LogAggregationStatus.NOT_START;
|
|
|
- } else {
|
|
|
- this.logAggregationStatusForAppReport = LogAggregationStatus.DISABLED;
|
|
|
- }
|
|
|
- maxLogAggregationDiagnosticsInMemory = conf.getInt(
|
|
|
- YarnConfiguration.RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY,
|
|
|
- YarnConfiguration.DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY);
|
|
|
+ this.logAggregation = new RMAppLogAggregation(conf, readLock, writeLock);
|
|
|
|
|
|
// amBlacklistingEnabled can be configured globally
|
|
|
// Just use the global values
|
|
@@ -1087,13 +1051,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
|
// otherwise, add it to ranNodes for further process
|
|
|
app.ranNodes.add(nodeAddedEvent.getNodeId());
|
|
|
|
|
|
- if (!app.logAggregationStatus.containsKey(nodeAddedEvent.getNodeId())) {
|
|
|
- app.logAggregationStatus.put(nodeAddedEvent.getNodeId(),
|
|
|
- LogAggregationReport.newInstance(app.applicationId,
|
|
|
- app.logAggregationEnabled ? LogAggregationStatus.NOT_START
|
|
|
- : LogAggregationStatus.DISABLED, ""));
|
|
|
- }
|
|
|
- };
|
|
|
+ app.logAggregation.addReportIfNecessary(
|
|
|
+ nodeAddedEvent.getNodeId(), app.getApplicationId());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// synchronously recover attempt to ensure any incoming external events
|
|
@@ -1507,7 +1467,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
|
}
|
|
|
|
|
|
public void transition(RMAppImpl app, RMAppEvent event) {
|
|
|
- app.logAggregationStartTime = System.currentTimeMillis();
|
|
|
+ app.logAggregation
|
|
|
+ .recordLogAggregationStartTime(app.systemClock.getTime());
|
|
|
for (NodeId nodeId : app.getRanNodes()) {
|
|
|
app.handler.handle(
|
|
|
new RMNodeCleanAppEvent(nodeId, app.applicationId));
|
|
@@ -1765,257 +1726,21 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
|
|
|
|
@Override
|
|
|
public Map<NodeId, LogAggregationReport> getLogAggregationReportsForApp() {
|
|
|
- try {
|
|
|
- this.readLock.lock();
|
|
|
- if (!isLogAggregationFinished() && isAppInFinalState(this) &&
|
|
|
- System.currentTimeMillis() > this.logAggregationStartTime
|
|
|
- + this.logAggregationStatusTimeout) {
|
|
|
- for (Entry<NodeId, LogAggregationReport> output :
|
|
|
- logAggregationStatus.entrySet()) {
|
|
|
- if (!output.getValue().getLogAggregationStatus()
|
|
|
- .equals(LogAggregationStatus.TIME_OUT)
|
|
|
- && !output.getValue().getLogAggregationStatus()
|
|
|
- .equals(LogAggregationStatus.SUCCEEDED)
|
|
|
- && !output.getValue().getLogAggregationStatus()
|
|
|
- .equals(LogAggregationStatus.FAILED)) {
|
|
|
- output.getValue().setLogAggregationStatus(
|
|
|
- LogAggregationStatus.TIME_OUT);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- return Collections.unmodifiableMap(logAggregationStatus);
|
|
|
- } finally {
|
|
|
- this.readLock.unlock();
|
|
|
- }
|
|
|
+ return logAggregation.getLogAggregationReportsForApp(this);
|
|
|
}
|
|
|
|
|
|
public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) {
|
|
|
- try {
|
|
|
- this.writeLock.lock();
|
|
|
- if (this.logAggregationEnabled && !isLogAggregationFinished()) {
|
|
|
- LogAggregationReport curReport = this.logAggregationStatus.get(nodeId);
|
|
|
- boolean stateChangedToFinal = false;
|
|
|
- if (curReport == null) {
|
|
|
- this.logAggregationStatus.put(nodeId, report);
|
|
|
- if (isLogAggregationFinishedForNM(report)) {
|
|
|
- stateChangedToFinal = true;
|
|
|
- }
|
|
|
- } else {
|
|
|
- if (isLogAggregationFinishedForNM(report)) {
|
|
|
- if (!isLogAggregationFinishedForNM(curReport)) {
|
|
|
- stateChangedToFinal = true;
|
|
|
- }
|
|
|
- }
|
|
|
- if (report.getLogAggregationStatus() != LogAggregationStatus.RUNNING
|
|
|
- || curReport.getLogAggregationStatus() !=
|
|
|
- LogAggregationStatus.RUNNING_WITH_FAILURE) {
|
|
|
- if (curReport.getLogAggregationStatus()
|
|
|
- == LogAggregationStatus.TIME_OUT
|
|
|
- && report.getLogAggregationStatus()
|
|
|
- == LogAggregationStatus.RUNNING) {
|
|
|
- // If the log aggregation status got from latest nm heartbeat
|
|
|
- // is Running, and current log aggregation status is TimeOut,
|
|
|
- // based on whether there are any failure messages for this NM,
|
|
|
- // we will reset the log aggregation status as RUNNING or
|
|
|
- // RUNNING_WITH_FAILURE
|
|
|
- if (logAggregationFailureMessagesForNMs.get(nodeId) != null &&
|
|
|
- !logAggregationFailureMessagesForNMs.get(nodeId).isEmpty()) {
|
|
|
- report.setLogAggregationStatus(
|
|
|
- LogAggregationStatus.RUNNING_WITH_FAILURE);
|
|
|
- }
|
|
|
- }
|
|
|
- curReport.setLogAggregationStatus(report
|
|
|
- .getLogAggregationStatus());
|
|
|
- }
|
|
|
- }
|
|
|
- updateLogAggregationDiagnosticMessages(nodeId, report);
|
|
|
- if (isAppInFinalState(this) && stateChangedToFinal) {
|
|
|
- updateLogAggregationStatus(nodeId);
|
|
|
- }
|
|
|
- }
|
|
|
- } finally {
|
|
|
- this.writeLock.unlock();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public LogAggregationStatus getLogAggregationStatusForAppReport() {
|
|
|
- try {
|
|
|
- this.readLock.lock();
|
|
|
- if (! logAggregationEnabled) {
|
|
|
- return LogAggregationStatus.DISABLED;
|
|
|
- }
|
|
|
- if (isLogAggregationFinished()) {
|
|
|
- return this.logAggregationStatusForAppReport;
|
|
|
- }
|
|
|
- Map<NodeId, LogAggregationReport> reports =
|
|
|
- getLogAggregationReportsForApp();
|
|
|
- if (reports.size() == 0) {
|
|
|
- return this.logAggregationStatusForAppReport;
|
|
|
- }
|
|
|
- int logNotStartCount = 0;
|
|
|
- int logCompletedCount = 0;
|
|
|
- int logTimeOutCount = 0;
|
|
|
- int logFailedCount = 0;
|
|
|
- int logRunningWithFailure = 0;
|
|
|
- for (Entry<NodeId, LogAggregationReport> report : reports.entrySet()) {
|
|
|
- switch (report.getValue().getLogAggregationStatus()) {
|
|
|
- case NOT_START:
|
|
|
- logNotStartCount++;
|
|
|
- break;
|
|
|
- case RUNNING_WITH_FAILURE:
|
|
|
- logRunningWithFailure ++;
|
|
|
- break;
|
|
|
- case SUCCEEDED:
|
|
|
- logCompletedCount++;
|
|
|
- break;
|
|
|
- case FAILED:
|
|
|
- logFailedCount++;
|
|
|
- logCompletedCount++;
|
|
|
- break;
|
|
|
- case TIME_OUT:
|
|
|
- logTimeOutCount++;
|
|
|
- logCompletedCount++;
|
|
|
- break;
|
|
|
- default:
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- if (logNotStartCount == reports.size()) {
|
|
|
- return LogAggregationStatus.NOT_START;
|
|
|
- } else if (logCompletedCount == reports.size()) {
|
|
|
- // We should satisfy two condition in order to return SUCCEEDED or FAILED
|
|
|
- // 1) make sure the application is in final state
|
|
|
- // 2) logs status from all NMs are SUCCEEDED/FAILED/TIMEOUT
|
|
|
- // The SUCCEEDED/FAILED status is the final status which means
|
|
|
- // the log aggregation is finished. And the log aggregation status will
|
|
|
- // not be updated anymore.
|
|
|
- if (logFailedCount > 0 && isAppInFinalState(this)) {
|
|
|
- this.logAggregationStatusForAppReport =
|
|
|
- LogAggregationStatus.FAILED;
|
|
|
- return LogAggregationStatus.FAILED;
|
|
|
- } else if (logTimeOutCount > 0) {
|
|
|
- this.logAggregationStatusForAppReport =
|
|
|
- LogAggregationStatus.TIME_OUT;
|
|
|
- return LogAggregationStatus.TIME_OUT;
|
|
|
- }
|
|
|
- if (isAppInFinalState(this)) {
|
|
|
- this.logAggregationStatusForAppReport =
|
|
|
- LogAggregationStatus.SUCCEEDED;
|
|
|
- return LogAggregationStatus.SUCCEEDED;
|
|
|
- }
|
|
|
- } else if (logRunningWithFailure > 0) {
|
|
|
- return LogAggregationStatus.RUNNING_WITH_FAILURE;
|
|
|
- }
|
|
|
- return LogAggregationStatus.RUNNING;
|
|
|
- } finally {
|
|
|
- this.readLock.unlock();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private boolean isLogAggregationFinished() {
|
|
|
- return this.logAggregationStatusForAppReport
|
|
|
- .equals(LogAggregationStatus.SUCCEEDED)
|
|
|
- || this.logAggregationStatusForAppReport
|
|
|
- .equals(LogAggregationStatus.FAILED)
|
|
|
- || this.logAggregationStatusForAppReport
|
|
|
- .equals(LogAggregationStatus.TIME_OUT);
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- private boolean isLogAggregationFinishedForNM(LogAggregationReport report) {
|
|
|
- return report.getLogAggregationStatus() == LogAggregationStatus.SUCCEEDED
|
|
|
- || report.getLogAggregationStatus() == LogAggregationStatus.FAILED;
|
|
|
- }
|
|
|
-
|
|
|
- private void updateLogAggregationDiagnosticMessages(NodeId nodeId,
|
|
|
- LogAggregationReport report) {
|
|
|
- if (report.getDiagnosticMessage() != null
|
|
|
- && !report.getDiagnosticMessage().isEmpty()) {
|
|
|
- if (report.getLogAggregationStatus()
|
|
|
- == LogAggregationStatus.RUNNING ) {
|
|
|
- List<String> diagnostics = logAggregationDiagnosticsForNMs.get(nodeId);
|
|
|
- if (diagnostics == null) {
|
|
|
- diagnostics = new ArrayList<String>();
|
|
|
- logAggregationDiagnosticsForNMs.put(nodeId, diagnostics);
|
|
|
- } else {
|
|
|
- if (diagnostics.size()
|
|
|
- == maxLogAggregationDiagnosticsInMemory) {
|
|
|
- diagnostics.remove(0);
|
|
|
- }
|
|
|
- }
|
|
|
- diagnostics.add(report.getDiagnosticMessage());
|
|
|
- this.logAggregationStatus.get(nodeId).setDiagnosticMessage(
|
|
|
- StringUtils.join(diagnostics, "\n"));
|
|
|
- } else if (report.getLogAggregationStatus()
|
|
|
- == LogAggregationStatus.RUNNING_WITH_FAILURE) {
|
|
|
- List<String> failureMessages =
|
|
|
- logAggregationFailureMessagesForNMs.get(nodeId);
|
|
|
- if (failureMessages == null) {
|
|
|
- failureMessages = new ArrayList<String>();
|
|
|
- logAggregationFailureMessagesForNMs.put(nodeId, failureMessages);
|
|
|
- } else {
|
|
|
- if (failureMessages.size()
|
|
|
- == maxLogAggregationDiagnosticsInMemory) {
|
|
|
- failureMessages.remove(0);
|
|
|
- }
|
|
|
- }
|
|
|
- failureMessages.add(report.getDiagnosticMessage());
|
|
|
- }
|
|
|
- }
|
|
|
+ logAggregation.aggregateLogReport(nodeId, report, this);
|
|
|
}
|
|
|
|
|
|
- private void updateLogAggregationStatus(NodeId nodeId) {
|
|
|
- LogAggregationStatus status =
|
|
|
- this.logAggregationStatus.get(nodeId).getLogAggregationStatus();
|
|
|
- if (status.equals(LogAggregationStatus.SUCCEEDED)) {
|
|
|
- this.logAggregationSucceed++;
|
|
|
- } else if (status.equals(LogAggregationStatus.FAILED)) {
|
|
|
- this.logAggregationFailed++;
|
|
|
- }
|
|
|
- if (this.logAggregationSucceed == this.logAggregationStatus.size()) {
|
|
|
- this.logAggregationStatusForAppReport =
|
|
|
- LogAggregationStatus.SUCCEEDED;
|
|
|
- // Since the log aggregation status for this application for all NMs
|
|
|
- // is SUCCEEDED, it means all logs are aggregated successfully.
|
|
|
- // We could remove all the cached log aggregation reports
|
|
|
- this.logAggregationStatus.clear();
|
|
|
- this.logAggregationDiagnosticsForNMs.clear();
|
|
|
- this.logAggregationFailureMessagesForNMs.clear();
|
|
|
- } else if (this.logAggregationSucceed + this.logAggregationFailed
|
|
|
- == this.logAggregationStatus.size()) {
|
|
|
- this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED;
|
|
|
- // We have collected the log aggregation status for all NMs.
|
|
|
- // The log aggregation status is FAILED which means the log
|
|
|
- // aggregation fails in some NMs. We are only interested in the
|
|
|
- // nodes where the log aggregation is failed. So we could remove
|
|
|
- // the log aggregation details for those succeeded NMs
|
|
|
- for (Iterator<Map.Entry<NodeId, LogAggregationReport>> it =
|
|
|
- this.logAggregationStatus.entrySet().iterator(); it.hasNext();) {
|
|
|
- Map.Entry<NodeId, LogAggregationReport> entry = it.next();
|
|
|
- if (entry.getValue().getLogAggregationStatus()
|
|
|
- .equals(LogAggregationStatus.SUCCEEDED)) {
|
|
|
- it.remove();
|
|
|
- }
|
|
|
- }
|
|
|
- // the log aggregation has finished/failed.
|
|
|
- // and the status will not be updated anymore.
|
|
|
- this.logAggregationDiagnosticsForNMs.clear();
|
|
|
- }
|
|
|
+ public String getLogAggregationFailureMessagesForNM(NodeId nodeId) {
|
|
|
+ return logAggregation.getLogAggregationFailureMessagesForNM(nodeId);
|
|
|
}
|
|
|
|
|
|
- public String getLogAggregationFailureMessagesForNM(NodeId nodeId) {
|
|
|
- try {
|
|
|
- this.readLock.lock();
|
|
|
- List<String> failureMessages =
|
|
|
- this.logAggregationFailureMessagesForNMs.get(nodeId);
|
|
|
- if (failureMessages == null || failureMessages.isEmpty()) {
|
|
|
- return StringUtils.EMPTY;
|
|
|
- }
|
|
|
- return StringUtils.join(failureMessages, "\n");
|
|
|
- } finally {
|
|
|
- this.readLock.unlock();
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public LogAggregationStatus getLogAggregationStatusForAppReport() {
|
|
|
+ return logAggregation
|
|
|
+ .getLogAggregationStatusForAppReport(this);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -2132,4 +1857,13 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
|
RMAppState state){
|
|
|
/* TODO fail the application on the failed transition */
|
|
|
}
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ long getLogAggregationStartTime() {
|
|
|
+ return logAggregation.getLogAggregationStartTime();
|
|
|
+ }
|
|
|
+
|
|
|
+ Clock getSystemClock() {
|
|
|
+ return systemClock;
|
|
|
+ }
|
|
|
}
|