|
@@ -730,7 +730,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private String verifyContainerLogs(LogAggregationService logAggregationService,
|
|
|
+ private LogFileStatusInLastCycle verifyContainerLogs(LogAggregationService logAggregationService,
|
|
|
ApplicationId appId, ContainerId[] expectedContainerIds,
|
|
|
String[] logFiles, int numOfContainerLogs, boolean multiLogs)
|
|
|
throws IOException {
|
|
@@ -775,7 +775,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
|
|
new AggregatedLogFormat.LogReader(this.conf, targetNodeFile.getPath());
|
|
|
Assert.assertEquals(this.user, reader.getApplicationOwner());
|
|
|
verifyAcls(reader.getApplicationAcls());
|
|
|
-
|
|
|
+
|
|
|
+ List<String> fileTypes = new ArrayList<String>();
|
|
|
+
|
|
|
try {
|
|
|
Map<String, Map<String, String>> logMap =
|
|
|
new HashMap<String, Map<String, String>>();
|
|
@@ -801,6 +803,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
|
|
|
|
|
Assert.assertEquals("LogType:", writtenLines[0].substring(0, 8));
|
|
|
String fileType = writtenLines[0].substring(8);
|
|
|
+ fileTypes.add(fileType);
|
|
|
|
|
|
Assert.assertEquals("LogLength:", writtenLines[1].substring(0, 10));
|
|
|
String fileLengthStr = writtenLines[1].substring(10);
|
|
@@ -843,7 +846,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
|
|
Assert.assertEquals(0, thisContainerMap.size());
|
|
|
}
|
|
|
Assert.assertEquals(0, logMap.size());
|
|
|
- return targetNodeFile.getPath().getName();
|
|
|
+ return new LogFileStatusInLastCycle(targetNodeFile.getPath().getName(), fileTypes);
|
|
|
} finally {
|
|
|
reader.close();
|
|
|
}
|
|
@@ -1326,6 +1329,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
|
|
throws Exception {
|
|
|
LogAggregationContext logAggregationContextWithInterval =
|
|
|
Records.newRecord(LogAggregationContext.class);
|
|
|
+ // set IncludePattern/excludePattern in rolling fashion
|
|
|
+ // we expect all the logs except std_final will be uploaded
|
|
|
+ // when app is running. The std_final will be uploaded when
|
|
|
+ // the app finishes.
|
|
|
+ logAggregationContextWithInterval.setRolledLogsIncludePattern(".*");
|
|
|
+ logAggregationContextWithInterval.setRolledLogsExcludePattern("std_final");
|
|
|
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
|
|
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
|
|
this.remoteRootLogDir.getAbsolutePath());
|
|
@@ -1379,9 +1388,14 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
|
|
this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls,
|
|
|
logAggregationContextWithInterval));
|
|
|
|
|
|
+ LogFileStatusInLastCycle logFileStatusInLastCycle = null;
|
|
|
// Simulate log-file creation
|
|
|
- String[] logFiles1 = new String[] { "stdout", "stderr", "syslog" };
|
|
|
- writeContainerLogs(appLogDir, container, logFiles1);
|
|
|
+ // create std_final in log directory which will not be aggregated
|
|
|
+ // until the app finishes.
|
|
|
+ String[] logFiles1WithFinalLog =
|
|
|
+ new String[] { "stdout", "stderr", "syslog", "std_final" };
|
|
|
+ String[] logFiles1 = new String[] { "stdout", "stderr", "syslog"};
|
|
|
+ writeContainerLogs(appLogDir, container, logFiles1WithFinalLog);
|
|
|
|
|
|
// Do log aggregation
|
|
|
AppLogAggregatorImpl aggregator =
|
|
@@ -1396,10 +1410,16 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
|
|
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
|
|
|
50, 1, false, null));
|
|
|
}
|
|
|
- String logFileInLastCycle = null;
|
|
|
// Container logs should be uploaded
|
|
|
- logFileInLastCycle = verifyContainerLogs(logAggregationService, application,
|
|
|
+ logFileStatusInLastCycle = verifyContainerLogs(logAggregationService, application,
|
|
|
new ContainerId[] { container }, logFiles1, 3, true);
|
|
|
+ for(String logFile : logFiles1) {
|
|
|
+ Assert.assertTrue(logFileStatusInLastCycle.getLogFileTypesInLastCycle()
|
|
|
+ .contains(logFile));
|
|
|
+ }
|
|
|
+ // Make sure the std_final is not uploaded.
|
|
|
+ Assert.assertFalse(logFileStatusInLastCycle.getLogFileTypesInLastCycle()
|
|
|
+ .contains("std_final"));
|
|
|
|
|
|
Thread.sleep(2000);
|
|
|
|
|
@@ -1421,15 +1441,23 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
|
|
|
|
|
if (retentionSizeLimitation) {
|
|
|
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
|
|
|
- 50, 1, true, logFileInLastCycle));
|
|
|
+ 50, 1, true, logFileStatusInLastCycle.getLogFilePathInLastCycle()));
|
|
|
} else {
|
|
|
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
|
|
|
50, 2, false, null));
|
|
|
}
|
|
|
// Container logs should be uploaded
|
|
|
- logFileInLastCycle = verifyContainerLogs(logAggregationService, application,
|
|
|
+ logFileStatusInLastCycle = verifyContainerLogs(logAggregationService, application,
|
|
|
new ContainerId[] { container }, logFiles2, 3, true);
|
|
|
|
|
|
+ for(String logFile : logFiles2) {
|
|
|
+ Assert.assertTrue(logFileStatusInLastCycle.getLogFileTypesInLastCycle()
|
|
|
+ .contains(logFile));
|
|
|
+ }
|
|
|
+ // Make sure the std_final is not uploaded.
|
|
|
+ Assert.assertFalse(logFileStatusInLastCycle.getLogFileTypesInLastCycle()
|
|
|
+ .contains("std_final"));
|
|
|
+
|
|
|
Thread.sleep(2000);
|
|
|
|
|
|
// create another logs
|
|
@@ -1443,13 +1471,17 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
|
|
logAggregationService.handle(new LogHandlerAppFinishedEvent(application));
|
|
|
if (retentionSizeLimitation) {
|
|
|
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
|
|
|
- 50, 1, true, logFileInLastCycle));
|
|
|
+ 50, 1, true, logFileStatusInLastCycle.getLogFilePathInLastCycle()));
|
|
|
} else {
|
|
|
Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
|
|
|
50, 3, false, null));
|
|
|
}
|
|
|
+
|
|
|
+ // the app is finished. The log "std_final" should be aggregated this time.
|
|
|
+ String[] logFiles3WithFinalLog =
|
|
|
+ new String[] { "stdout_2", "stderr_2", "syslog_2", "std_final" };
|
|
|
verifyContainerLogs(logAggregationService, application,
|
|
|
- new ContainerId[] { container }, logFiles3, 3, true);
|
|
|
+ new ContainerId[] { container }, logFiles3WithFinalLog, 4, true);
|
|
|
logAggregationService.stop();
|
|
|
assertEquals(0, logAggregationService.getNumAggregators());
|
|
|
dispatcher.stop();
|
|
@@ -1557,4 +1589,23 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
|
|
return numOfLogsAvailable(logAggregationService, application, sizeLimited,
|
|
|
lastLogFile) == expectNum;
|
|
|
}
|
|
|
+
|
|
|
+ private static class LogFileStatusInLastCycle {
|
|
|
+ private String logFilePathInLastCycle;
|
|
|
+ private List<String> logFileTypesInLastCycle;
|
|
|
+
|
|
|
+ public LogFileStatusInLastCycle(String logFilePathInLastCycle,
|
|
|
+ List<String> logFileTypesInLastCycle) {
|
|
|
+ this.logFilePathInLastCycle = logFilePathInLastCycle;
|
|
|
+ this.logFileTypesInLastCycle = logFileTypesInLastCycle;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getLogFilePathInLastCycle() {
|
|
|
+ return this.logFilePathInLastCycle;
|
|
|
+ }
|
|
|
+
|
|
|
+ public List<String> getLogFileTypesInLastCycle() {
|
|
|
+ return this.logFileTypesInLastCycle;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|