|
@@ -18,13 +18,21 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.logaggregation;
|
|
|
|
|
|
+import static org.mockito.Mockito.spy;
|
|
|
+import static org.mockito.Mockito.when;
|
|
|
+
|
|
|
+import java.io.BufferedReader;
|
|
|
import java.io.DataInputStream;
|
|
|
import java.io.File;
|
|
|
+import java.io.FileNotFoundException;
|
|
|
import java.io.FileOutputStream;
|
|
|
+import java.io.FileReader;
|
|
|
import java.io.IOException;
|
|
|
import java.io.OutputStreamWriter;
|
|
|
import java.io.StringWriter;
|
|
|
+import java.io.UnsupportedEncodingException;
|
|
|
import java.io.Writer;
|
|
|
+import java.util.Arrays;
|
|
|
import java.util.Collections;
|
|
|
|
|
|
import junit.framework.Assert;
|
|
@@ -32,11 +40,14 @@ import junit.framework.Assert;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
-import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
+import org.apache.hadoop.io.nativeio.NativeIO;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
|
|
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
|
|
@@ -44,6 +55,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
|
|
|
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
|
|
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
|
|
import org.junit.After;
|
|
|
+import org.junit.Assume;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
|
|
@@ -97,7 +109,7 @@ public class TestAggregatedLogFormat {
|
|
|
LogKey logKey = new LogKey(testContainerId);
|
|
|
LogValue logValue =
|
|
|
new LogValue(Collections.singletonList(srcFileRoot.toString()),
|
|
|
- testContainerId);
|
|
|
+ testContainerId, ugi.getShortUserName());
|
|
|
|
|
|
logWriter.append(logKey, logValue);
|
|
|
logWriter.closeWriter();
|
|
@@ -131,9 +143,115 @@ public class TestAggregatedLogFormat {
|
|
|
Assert.assertEquals(expectedLength, s.length());
|
|
|
}
|
|
|
|
|
|
+ @Test(timeout=10000)
|
|
|
+ public void testContainerLogsFileAccess() throws IOException {
|
|
|
+ // This test will run only if NativeIO is enabled as SecureIOUtils
|
|
|
+ // require it to be enabled.
|
|
|
+ Assume.assumeTrue(NativeIO.isAvailable());
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
|
|
+ "kerberos");
|
|
|
+ UserGroupInformation.setConfiguration(conf);
|
|
|
+ File workDir = new File(testWorkDir, "testContainerLogsFileAccess1");
|
|
|
+ Path remoteAppLogFile =
|
|
|
+ new Path(workDir.getAbsolutePath(), "aggregatedLogFile");
|
|
|
+ Path srcFileRoot = new Path(workDir.getAbsolutePath(), "srcFiles");
|
|
|
+
|
|
|
+ String data = "Log File content for container : ";
|
|
|
+ // Creating files for container1. Log aggregator will try to read log files
|
|
|
+ // with illegal user.
|
|
|
+ ContainerId testContainerId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
|
|
|
+ Path appDir =
|
|
|
+ new Path(srcFileRoot, testContainerId1.getApplicationAttemptId()
|
|
|
+ .getApplicationId().toString());
|
|
|
+ Path srcFilePath1 = new Path(appDir, testContainerId1.toString());
|
|
|
+ String stdout = "stdout";
|
|
|
+ String stderr = "stderr";
|
|
|
+ writeSrcFile(srcFilePath1, stdout, data + testContainerId1.toString()
|
|
|
+ + stdout);
|
|
|
+ writeSrcFile(srcFilePath1, stderr, data + testContainerId1.toString()
|
|
|
+ + stderr);
|
|
|
+
|
|
|
+ UserGroupInformation ugi =
|
|
|
+ UserGroupInformation.getCurrentUser();
|
|
|
+ LogWriter logWriter = new LogWriter(conf, remoteAppLogFile, ugi);
|
|
|
+
|
|
|
+ LogKey logKey = new LogKey(testContainerId1);
|
|
|
+ String randomUser = "randomUser";
|
|
|
+ LogValue logValue =
|
|
|
+ spy(new LogValue(Collections.singletonList(srcFileRoot.toString()),
|
|
|
+ testContainerId1, randomUser));
|
|
|
+
|
|
|
+ // It is trying simulate a situation where first log file is owned by
|
|
|
+ // different user (probably symlink) and second one by the user itself.
|
|
|
+ when(logValue.getUser()).thenReturn(randomUser).thenReturn(
|
|
|
+ ugi.getShortUserName());
|
|
|
+ logWriter.append(logKey, logValue);
|
|
|
+
|
|
|
+ logWriter.closeWriter();
|
|
|
+
|
|
|
+ BufferedReader in =
|
|
|
+ new BufferedReader(new FileReader(new File(remoteAppLogFile
|
|
|
+ .toUri().getRawPath())));
|
|
|
+ String line;
|
|
|
+ StringBuffer sb = new StringBuffer("");
|
|
|
+ while ((line = in.readLine()) != null) {
|
|
|
+ LOG.info(line);
|
|
|
+ sb.append(line);
|
|
|
+ }
|
|
|
+ line = sb.toString();
|
|
|
+
|
|
|
+ String stdoutFile1 =
|
|
|
+ StringUtils.join(
|
|
|
+ Path.SEPARATOR,
|
|
|
+ Arrays.asList(new String[] {
|
|
|
+ srcFileRoot.toUri().toString(),
|
|
|
+ testContainerId1.getApplicationAttemptId().getApplicationId()
|
|
|
+ .toString(), testContainerId1.toString(), stderr }));
|
|
|
+ String message1 =
|
|
|
+ "Owner '" + ugi.getShortUserName() + "' for path " + stdoutFile1
|
|
|
+ + " did not match expected owner '" + randomUser + "'";
|
|
|
+
|
|
|
+ String stdoutFile2 =
|
|
|
+ StringUtils.join(
|
|
|
+ Path.SEPARATOR,
|
|
|
+ Arrays.asList(new String[] {
|
|
|
+ srcFileRoot.toUri().toString(),
|
|
|
+ testContainerId1.getApplicationAttemptId().getApplicationId()
|
|
|
+ .toString(), testContainerId1.toString(), stdout }));
|
|
|
+ String message2 =
|
|
|
+ "Owner '" + ugi.getShortUserName() + "' for path "
|
|
|
+ + stdoutFile2 + " did not match expected owner '"
|
|
|
+ + ugi.getShortUserName() + "'";
|
|
|
+
|
|
|
+ Assert.assertTrue(line.contains(message1));
|
|
|
+ Assert.assertFalse(line.contains(message2));
|
|
|
+ Assert.assertFalse(line.contains(data + testContainerId1.toString()
|
|
|
+ + stderr));
|
|
|
+ Assert.assertTrue(line.contains(data + testContainerId1.toString()
|
|
|
+ + stdout));
|
|
|
+ }
|
|
|
|
|
|
private void writeSrcFile(Path srcFilePath, String fileName, long length)
|
|
|
throws IOException {
|
|
|
+ OutputStreamWriter osw = getOutputStreamWriter(srcFilePath, fileName);
|
|
|
+ int ch = filler;
|
|
|
+ for (int i = 0; i < length; i++) {
|
|
|
+ osw.write(ch);
|
|
|
+ }
|
|
|
+ osw.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void writeSrcFile(Path srcFilePath, String fileName, String data)
|
|
|
+ throws IOException {
|
|
|
+ OutputStreamWriter osw = getOutputStreamWriter(srcFilePath, fileName);
|
|
|
+ osw.write(data);
|
|
|
+ osw.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ private OutputStreamWriter getOutputStreamWriter(Path srcFilePath,
|
|
|
+ String fileName) throws IOException, FileNotFoundException,
|
|
|
+ UnsupportedEncodingException {
|
|
|
File dir = new File(srcFilePath.toString());
|
|
|
if (!dir.exists()) {
|
|
|
if (!dir.mkdirs()) {
|
|
@@ -143,10 +261,6 @@ public class TestAggregatedLogFormat {
|
|
|
File outputFile = new File(new File(srcFilePath.toString()), fileName);
|
|
|
FileOutputStream os = new FileOutputStream(outputFile);
|
|
|
OutputStreamWriter osw = new OutputStreamWriter(os, "UTF8");
|
|
|
- int ch = filler;
|
|
|
- for (int i = 0; i < length; i++) {
|
|
|
- osw.write(ch);
|
|
|
- }
|
|
|
- osw.close();
|
|
|
+ return osw;
|
|
|
}
|
|
|
}
|