|
@@ -18,6 +18,8 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.logaggregation;
|
|
|
|
|
|
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT;
|
|
|
+
|
|
|
import java.io.IOException;
|
|
|
import java.net.URI;
|
|
|
import java.util.Arrays;
|
|
@@ -37,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
+import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
import org.junit.Assert;
|
|
@@ -59,13 +62,17 @@ public class TestAggregatedLogDeletionService {
|
|
|
|
|
|
String root = "mockfs://foo/";
|
|
|
String remoteRootLogDir = root+"tmp/logs";
|
|
|
- String suffix = "logs";
|
|
|
+ String configuredSuffix = "logs";
|
|
|
+ String actualSuffix = "logs-tfile";
|
|
|
final Configuration conf = new Configuration();
|
|
|
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
|
|
conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
|
|
|
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
|
|
|
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
|
|
|
- conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
|
|
|
+ conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, configuredSuffix);
|
|
|
+ conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
|
|
|
+ conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
|
|
|
+ LogAggregationTFileController.class.getName());
|
|
|
|
|
|
Path rootPath = new Path(root);
|
|
|
FileSystem rootFs = rootPath.getFileSystem(conf);
|
|
@@ -81,7 +88,7 @@ public class TestAggregatedLogDeletionService {
|
|
|
|
|
|
ApplicationId appId1 =
|
|
|
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
|
|
- Path userLogDir = new Path(userDir, suffix);
|
|
|
+ Path userLogDir = new Path(userDir, actualSuffix);
|
|
|
Path app1Dir = new Path(userLogDir, appId1.toString());
|
|
|
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir);
|
|
|
|
|
@@ -197,7 +204,8 @@ public class TestAggregatedLogDeletionService {
|
|
|
long before50Secs = now - (50 * 1000);
|
|
|
String root = "mockfs://foo/";
|
|
|
String remoteRootLogDir = root + "tmp/logs";
|
|
|
- String suffix = "logs";
|
|
|
+ String configuredSuffix = "logs";
|
|
|
+ String actualSuffix = "logs-tfile";
|
|
|
final Configuration conf = new Configuration();
|
|
|
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
|
|
conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
|
|
@@ -205,7 +213,10 @@ public class TestAggregatedLogDeletionService {
|
|
|
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
|
|
|
"1");
|
|
|
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
|
|
|
- conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
|
|
|
+ conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, configuredSuffix);
|
|
|
+ conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
|
|
|
+ conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
|
|
|
+ LogAggregationTFileController.class.getName());
|
|
|
|
|
|
Path rootPath = new Path(root);
|
|
|
FileSystem rootFs = rootPath.getFileSystem(conf);
|
|
@@ -220,7 +231,7 @@ public class TestAggregatedLogDeletionService {
|
|
|
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
|
|
|
new FileStatus[] { userDirStatus });
|
|
|
|
|
|
- Path userLogDir = new Path(userDir, suffix);
|
|
|
+ Path userLogDir = new Path(userDir, actualSuffix);
|
|
|
|
|
|
ApplicationId appId1 =
|
|
|
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
|
@@ -309,14 +320,18 @@ public class TestAggregatedLogDeletionService {
|
|
|
|
|
|
String root = "mockfs://foo/";
|
|
|
String remoteRootLogDir = root+"tmp/logs";
|
|
|
- String suffix = "logs";
|
|
|
+ String configuredSuffix = "logs";
|
|
|
+ String actualSuffix = "logs-tfile";
|
|
|
Configuration conf = new Configuration();
|
|
|
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
|
|
|
conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
|
|
|
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "864000");
|
|
|
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, "1");
|
|
|
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
|
|
|
- conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
|
|
|
+ conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, configuredSuffix);
|
|
|
+ conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
|
|
|
+ conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
|
|
|
+ LogAggregationTFileController.class.getName());
|
|
|
|
|
|
// prevent us from picking up the same mockfs instance from another test
|
|
|
FileSystem.closeAll();
|
|
@@ -334,7 +349,7 @@ public class TestAggregatedLogDeletionService {
|
|
|
|
|
|
ApplicationId appId1 =
|
|
|
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
|
|
- Path userLogDir = new Path(userDir, suffix);
|
|
|
+ Path userLogDir = new Path(userDir, actualSuffix);
|
|
|
Path app1Dir = new Path(userLogDir, appId1.toString());
|
|
|
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir);
|
|
|
|
|
@@ -391,7 +406,8 @@ public class TestAggregatedLogDeletionService {
|
|
|
|
|
|
String root = "mockfs://foo/";
|
|
|
String remoteRootLogDir = root+"tmp/logs";
|
|
|
- String suffix = "logs";
|
|
|
+ String configuredSuffix = "logs";
|
|
|
+ String actualSuffix = "logs-tfile";
|
|
|
Configuration conf = new Configuration();
|
|
|
conf.setClass("fs.mockfs.impl", MockFileSystem.class,
|
|
|
FileSystem.class);
|
|
@@ -400,7 +416,10 @@ public class TestAggregatedLogDeletionService {
|
|
|
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
|
|
|
"1");
|
|
|
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
|
|
|
- conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
|
|
|
+ conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, configuredSuffix);
|
|
|
+ conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
|
|
|
+ conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
|
|
|
+ LogAggregationTFileController.class.getName());
|
|
|
|
|
|
// prevent us from picking up the same mockfs instance from another test
|
|
|
FileSystem.closeAll();
|
|
@@ -416,7 +435,7 @@ public class TestAggregatedLogDeletionService {
|
|
|
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
|
|
|
new FileStatus[]{userDirStatus});
|
|
|
|
|
|
- Path userLogDir = new Path(userDir, suffix);
|
|
|
+ Path userLogDir = new Path(userDir, actualSuffix);
|
|
|
ApplicationId appId1 =
|
|
|
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
|
|
Path app1Dir = new Path(userLogDir, appId1.toString());
|