|
@@ -18,11 +18,6 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
|
|
|
|
|
|
-import static org.assertj.core.api.Assertions.assertThat;
|
|
|
-import static org.junit.Assert.assertTrue;
|
|
|
-import static org.junit.Assert.assertNotNull;
|
|
|
-import static org.mockito.Mockito.mock;
|
|
|
-import static org.mockito.Mockito.when;
|
|
|
import java.io.ByteArrayOutputStream;
|
|
|
import java.io.File;
|
|
|
import java.io.FileWriter;
|
|
@@ -39,6 +34,7 @@ import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import org.apache.commons.io.IOUtils;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.conf.Configured;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileContext;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
@@ -64,15 +60,23 @@ import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileCo
|
|
|
import org.apache.hadoop.yarn.util.Clock;
|
|
|
import org.apache.hadoop.yarn.util.ControlledClock;
|
|
|
import org.junit.After;
|
|
|
-import org.junit.Assert;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
+import static org.assertj.core.api.Assertions.assertThat;
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
|
+import static org.junit.Assert.assertFalse;
|
|
|
+import static org.junit.Assert.assertTrue;
|
|
|
+import static org.junit.Assert.assertNotNull;
|
|
|
+import static org.mockito.Mockito.mock;
|
|
|
+import static org.mockito.Mockito.when;
|
|
|
+
|
|
|
/**
|
|
|
- * Function test for {@link LogAggregationIndexFileController}.
|
|
|
+ * Function test for {@link LogAggregationIndexedFileController}.
|
|
|
*
|
|
|
*/
|
|
|
-public class TestLogAggregationIndexFileController {
|
|
|
+public class TestLogAggregationIndexedFileController
|
|
|
+ extends Configured {
|
|
|
|
|
|
private final String rootLocalLogDir = "target/LocalLogs";
|
|
|
private final Path rootLocalLogDirPath = new Path(rootLocalLogDir);
|
|
@@ -82,37 +86,37 @@ public class TestLogAggregationIndexFileController {
|
|
|
private static final UserGroupInformation USER_UGI = UserGroupInformation
|
|
|
.createRemoteUser("testUser");
|
|
|
private FileSystem fs;
|
|
|
- private Configuration conf;
|
|
|
private ApplicationId appId;
|
|
|
private ContainerId containerId;
|
|
|
private NodeId nodeId;
|
|
|
|
|
|
private ByteArrayOutputStream sysOutStream;
|
|
|
- private PrintStream sysOut;
|
|
|
|
|
|
- private ByteArrayOutputStream sysErrStream;
|
|
|
- private PrintStream sysErr;
|
|
|
+ private Configuration getTestConf() {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.set("yarn.log-aggregation.Indexed.remote-app-log-dir",
|
|
|
+ remoteLogDir);
|
|
|
+ conf.set("yarn.log-aggregation.Indexed.remote-app-log-dir-suffix",
|
|
|
+ "logs");
|
|
|
+ conf.set(YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE, "gz");
|
|
|
+ return conf;
|
|
|
+ }
|
|
|
|
|
|
@Before
|
|
|
public void setUp() throws IOException {
|
|
|
+ setConf(getTestConf());
|
|
|
appId = ApplicationId.newInstance(123456, 1);
|
|
|
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
|
|
|
appId, 1);
|
|
|
containerId = ContainerId.newContainerId(attemptId, 1);
|
|
|
nodeId = NodeId.newInstance("localhost", 9999);
|
|
|
- conf = new Configuration();
|
|
|
- conf.set("yarn.log-aggregation.Indexed.remote-app-log-dir",
|
|
|
- remoteLogDir);
|
|
|
- conf.set("yarn.log-aggregation.Indexed.remote-app-log-dir-suffix",
|
|
|
- "logs");
|
|
|
- conf.set(YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE, "gz");
|
|
|
- fs = FileSystem.get(conf);
|
|
|
+ fs = FileSystem.get(getConf());
|
|
|
sysOutStream = new ByteArrayOutputStream();
|
|
|
- sysOut = new PrintStream(sysOutStream);
|
|
|
+ PrintStream sysOut = new PrintStream(sysOutStream);
|
|
|
System.setOut(sysOut);
|
|
|
|
|
|
- sysErrStream = new ByteArrayOutputStream();
|
|
|
- sysErr = new PrintStream(sysErrStream);
|
|
|
+ ByteArrayOutputStream sysErrStream = new ByteArrayOutputStream();
|
|
|
+ PrintStream sysErr = new PrintStream(sysErrStream);
|
|
|
System.setErr(sysErr);
|
|
|
}
|
|
|
|
|
@@ -173,7 +177,7 @@ public class TestLogAggregationIndexFileController {
|
|
|
}
|
|
|
};
|
|
|
|
|
|
- fileFormat.initialize(conf, "Indexed");
|
|
|
+ fileFormat.initialize(getConf(), "Indexed");
|
|
|
|
|
|
Map<ApplicationAccessType, String> appAcls = new HashMap<>();
|
|
|
Path appDir = fileFormat.getRemoteAppLogDir(appId,
|
|
@@ -203,28 +207,28 @@ public class TestLogAggregationIndexFileController {
|
|
|
logRequest.setBytes(Long.MAX_VALUE);
|
|
|
List<ContainerLogMeta> meta = fileFormat.readAggregatedLogsMeta(
|
|
|
logRequest);
|
|
|
- Assert.assertTrue(meta.size() == 1);
|
|
|
+ assertEquals(1, meta.size());
|
|
|
List<String> fileNames = new ArrayList<>();
|
|
|
for (ContainerLogMeta log : meta) {
|
|
|
- Assert.assertTrue(log.getContainerId().equals(containerId.toString()));
|
|
|
- Assert.assertTrue(log.getNodeId().equals(nodeId.toString()));
|
|
|
- Assert.assertTrue(log.getContainerLogMeta().size() == 3);
|
|
|
+ assertEquals(containerId.toString(), log.getContainerId());
|
|
|
+ assertEquals(nodeId.toString(), log.getNodeId());
|
|
|
+ assertEquals(3, log.getContainerLogMeta().size());
|
|
|
for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
|
|
|
fileNames.add(file.getFileName());
|
|
|
}
|
|
|
}
|
|
|
fileNames.removeAll(logTypes);
|
|
|
- Assert.assertTrue(fileNames.isEmpty());
|
|
|
+ assertTrue(fileNames.isEmpty());
|
|
|
|
|
|
boolean foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
|
|
|
- Assert.assertTrue(foundLogs);
|
|
|
+ assertTrue(foundLogs);
|
|
|
for (String logType : logTypes) {
|
|
|
- Assert.assertTrue(sysOutStream.toString().contains(logMessage(
|
|
|
+ assertTrue(sysOutStream.toString().contains(logMessage(
|
|
|
containerId, logType)));
|
|
|
}
|
|
|
sysOutStream.reset();
|
|
|
|
|
|
- Configuration factoryConf = new Configuration(conf);
|
|
|
+ Configuration factoryConf = new Configuration(getConf());
|
|
|
factoryConf.set("yarn.log-aggregation.file-formats", "Indexed");
|
|
|
factoryConf.set("yarn.log-aggregation.file-controller.Indexed.class",
|
|
|
"org.apache.hadoop.yarn.logaggregation.filecontroller.ifile"
|
|
@@ -233,12 +237,12 @@ public class TestLogAggregationIndexFileController {
|
|
|
new LogAggregationFileControllerFactory(factoryConf);
|
|
|
LogAggregationFileController fileController = factory
|
|
|
.getFileControllerForRead(appId, USER_UGI.getShortUserName());
|
|
|
- Assert.assertTrue(fileController instanceof
|
|
|
+ assertTrue(fileController instanceof
|
|
|
LogAggregationIndexedFileController);
|
|
|
foundLogs = fileController.readAggregatedLogs(logRequest, System.out);
|
|
|
- Assert.assertTrue(foundLogs);
|
|
|
+ assertTrue(foundLogs);
|
|
|
for (String logType : logTypes) {
|
|
|
- Assert.assertTrue(sysOutStream.toString().contains(logMessage(
|
|
|
+ assertTrue(sysOutStream.toString().contains(logMessage(
|
|
|
containerId, logType)));
|
|
|
}
|
|
|
sysOutStream.reset();
|
|
@@ -261,12 +265,12 @@ public class TestLogAggregationIndexFileController {
|
|
|
}
|
|
|
meta = fileFormat.readAggregatedLogsMeta(
|
|
|
logRequest);
|
|
|
- Assert.assertTrue(meta.size() == 0);
|
|
|
+ assertTrue(meta.isEmpty());
|
|
|
foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
|
|
|
- Assert.assertFalse(foundLogs);
|
|
|
+ assertFalse(foundLogs);
|
|
|
sysOutStream.reset();
|
|
|
fs.delete(checksumFile, false);
|
|
|
- Assert.assertFalse(fs.exists(checksumFile));
|
|
|
+ assertFalse(fs.exists(checksumFile));
|
|
|
|
|
|
List<String> newLogTypes = new ArrayList<>(logTypes);
|
|
|
files.clear();
|
|
@@ -291,24 +295,24 @@ public class TestLogAggregationIndexFileController {
|
|
|
logRequest);
|
|
|
assertThat(meta.size()).isEqualTo(1);
|
|
|
for (ContainerLogMeta log : meta) {
|
|
|
- Assert.assertTrue(log.getContainerId().equals(containerId.toString()));
|
|
|
- Assert.assertTrue(log.getNodeId().equals(nodeId.toString()));
|
|
|
- Assert.assertTrue(log.getContainerLogMeta().size() == 3);
|
|
|
+ assertEquals(containerId.toString(), log.getContainerId());
|
|
|
+ assertEquals(nodeId.toString(), log.getNodeId());
|
|
|
+ assertEquals(3, log.getContainerLogMeta().size());
|
|
|
for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
|
|
|
fileNames.add(file.getFileName());
|
|
|
}
|
|
|
}
|
|
|
fileNames.removeAll(logTypes);
|
|
|
- Assert.assertTrue(fileNames.isEmpty());
|
|
|
+ assertTrue(fileNames.isEmpty());
|
|
|
foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
|
|
|
- Assert.assertTrue(foundLogs);
|
|
|
+ assertTrue(foundLogs);
|
|
|
for (String logType : logTypes) {
|
|
|
- Assert.assertTrue(sysOutStream.toString().contains(logMessage(
|
|
|
+ assertTrue(sysOutStream.toString().contains(logMessage(
|
|
|
containerId, logType)));
|
|
|
}
|
|
|
- Assert.assertFalse(sysOutStream.toString().contains(logMessage(
|
|
|
+ assertFalse(sysOutStream.toString().contains(logMessage(
|
|
|
containerId, "test1")));
|
|
|
- Assert.assertFalse(sysOutStream.toString().contains(logMessage(
|
|
|
+ assertFalse(sysOutStream.toString().contains(logMessage(
|
|
|
containerId, "test2")));
|
|
|
sysOutStream.reset();
|
|
|
|
|
@@ -322,18 +326,18 @@ public class TestLogAggregationIndexFileController {
|
|
|
logRequest);
|
|
|
assertThat(meta.size()).isEqualTo(2);
|
|
|
for (ContainerLogMeta log : meta) {
|
|
|
- Assert.assertTrue(log.getContainerId().equals(containerId.toString()));
|
|
|
- Assert.assertTrue(log.getNodeId().equals(nodeId.toString()));
|
|
|
+ assertEquals(containerId.toString(), log.getContainerId());
|
|
|
+ assertEquals(nodeId.toString(), log.getNodeId());
|
|
|
for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
|
|
|
fileNames.add(file.getFileName());
|
|
|
}
|
|
|
}
|
|
|
fileNames.removeAll(newLogTypes);
|
|
|
- Assert.assertTrue(fileNames.isEmpty());
|
|
|
+ assertTrue(fileNames.isEmpty());
|
|
|
foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
|
|
|
- Assert.assertTrue(foundLogs);
|
|
|
+ assertTrue(foundLogs);
|
|
|
for (String logType : newLogTypes) {
|
|
|
- Assert.assertTrue(sysOutStream.toString().contains(logMessage(
|
|
|
+ assertTrue(sysOutStream.toString().contains(logMessage(
|
|
|
containerId, logType)));
|
|
|
}
|
|
|
sysOutStream.reset();
|
|
@@ -345,23 +349,23 @@ public class TestLogAggregationIndexFileController {
|
|
|
fileFormat.postWrite(context);
|
|
|
fileFormat.closeWriter();
|
|
|
FileStatus[] status = fs.listStatus(logPath.getParent());
|
|
|
- Assert.assertTrue(status.length == 2);
|
|
|
+ assertEquals(2, status.length);
|
|
|
meta = fileFormat.readAggregatedLogsMeta(
|
|
|
logRequest);
|
|
|
assertThat(meta.size()).isEqualTo(3);
|
|
|
for (ContainerLogMeta log : meta) {
|
|
|
- Assert.assertTrue(log.getContainerId().equals(containerId.toString()));
|
|
|
- Assert.assertTrue(log.getNodeId().equals(nodeId.toString()));
|
|
|
+ assertEquals(containerId.toString(), log.getContainerId());
|
|
|
+ assertEquals(nodeId.toString(), log.getNodeId());
|
|
|
for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
|
|
|
fileNames.add(file.getFileName());
|
|
|
}
|
|
|
}
|
|
|
fileNames.removeAll(newLogTypes);
|
|
|
- Assert.assertTrue(fileNames.isEmpty());
|
|
|
+ assertTrue(fileNames.isEmpty());
|
|
|
foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
|
|
|
- Assert.assertTrue(foundLogs);
|
|
|
+ assertTrue(foundLogs);
|
|
|
for (String logType : newLogTypes) {
|
|
|
- Assert.assertTrue(sysOutStream.toString().contains(logMessage(
|
|
|
+ assertTrue(sysOutStream.toString().contains(logMessage(
|
|
|
containerId, logType)));
|
|
|
}
|
|
|
sysOutStream.reset();
|
|
@@ -390,7 +394,7 @@ public class TestLogAggregationIndexFileController {
|
|
|
assertTrue(fs.exists(harPath));
|
|
|
LogAggregationIndexedFileController fileFormat
|
|
|
= new LogAggregationIndexedFileController();
|
|
|
- fileFormat.initialize(conf, "Indexed");
|
|
|
+ fileFormat.initialize(getConf(), "Indexed");
|
|
|
ContainerLogsRequest logRequest = new ContainerLogsRequest();
|
|
|
logRequest.setAppId(appId);
|
|
|
logRequest.setNodeId(nodeId.toString());
|
|
@@ -399,21 +403,21 @@ public class TestLogAggregationIndexFileController {
|
|
|
logRequest.setBytes(Long.MAX_VALUE);
|
|
|
List<ContainerLogMeta> meta = fileFormat.readAggregatedLogsMeta(
|
|
|
logRequest);
|
|
|
- Assert.assertEquals(meta.size(), 3);
|
|
|
+ assertEquals(3, meta.size());
|
|
|
List<String> fileNames = new ArrayList<>();
|
|
|
for (ContainerLogMeta log : meta) {
|
|
|
- Assert.assertTrue(log.getContainerId().equals(containerId.toString()));
|
|
|
- Assert.assertTrue(log.getNodeId().equals(nodeId.toString()));
|
|
|
+ assertEquals(containerId.toString(), log.getContainerId());
|
|
|
+ assertEquals(nodeId.toString(), log.getNodeId());
|
|
|
for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
|
|
|
fileNames.add(file.getFileName());
|
|
|
}
|
|
|
}
|
|
|
fileNames.removeAll(newLogTypes);
|
|
|
- Assert.assertTrue(fileNames.isEmpty());
|
|
|
+ assertTrue(fileNames.isEmpty());
|
|
|
boolean foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
|
|
|
- Assert.assertTrue(foundLogs);
|
|
|
+ assertTrue(foundLogs);
|
|
|
for (String logType : newLogTypes) {
|
|
|
- Assert.assertTrue(sysOutStream.toString().contains(logMessage(
|
|
|
+ assertTrue(sysOutStream.toString().contains(logMessage(
|
|
|
containerId, logType)));
|
|
|
}
|
|
|
sysOutStream.reset();
|
|
@@ -438,8 +442,6 @@ public class TestLogAggregationIndexFileController {
|
|
|
}
|
|
|
|
|
|
private String logMessage(ContainerId containerId, String logType) {
|
|
|
- StringBuilder sb = new StringBuilder();
|
|
|
- sb.append("Hello " + containerId + " in " + logType + "!");
|
|
|
- return sb.toString();
|
|
|
+ return "Hello " + containerId + " in " + logType + "!";
|
|
|
}
|
|
|
}
|