|
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.when;
|
|
|
|
|
|
import java.io.File;
|
|
|
import java.io.FileOutputStream;
|
|
|
+import java.io.InputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.util.HashMap;
|
|
|
|
|
@@ -52,6 +53,7 @@ import org.apache.hadoop.mapreduce.TaskID;
|
|
|
import org.apache.hadoop.mapreduce.TaskType;
|
|
|
import org.apache.hadoop.mapreduce.TypeConverter;
|
|
|
import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
|
|
|
+import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.MRAppMaster.RunningAppContext;
|
|
@@ -372,6 +374,74 @@ public class TestJobHistoryEventHandler {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testPropertyRedactionForJHS() throws Exception {
|
|
|
+ final Configuration conf = new Configuration();
|
|
|
+
|
|
|
+ String sensitivePropertyName = "aws.fake.credentials.name";
|
|
|
+ String sensitivePropertyValue = "aws.fake.credentials.val";
|
|
|
+ conf.set(sensitivePropertyName, sensitivePropertyValue);
|
|
|
+ conf.set(MRJobConfig.MR_JOB_REDACTED_PROPERTIES,
|
|
|
+ sensitivePropertyName);
|
|
|
+ conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
|
|
|
+ dfsCluster.getURI().toString());
|
|
|
+ final TestParams params = new TestParams();
|
|
|
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, params.dfsWorkDir);
|
|
|
+
|
|
|
+ final JHEvenHandlerForTest jheh =
|
|
|
+ new JHEvenHandlerForTest(params.mockAppContext, 0, false);
|
|
|
+
|
|
|
+ try {
|
|
|
+ jheh.init(conf);
|
|
|
+ jheh.start();
|
|
|
+ handleEvent(jheh, new JobHistoryEvent(params.jobId,
|
|
|
+ new AMStartedEvent(params.appAttemptId, 200, params.containerId,
|
|
|
+ "nmhost", 3000, 4000, -1)));
|
|
|
+ handleEvent(jheh, new JobHistoryEvent(params.jobId,
|
|
|
+ new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(
|
|
|
+ params.jobId), 0, 0, 0, JobStateInternal.FAILED.toString())));
|
|
|
+
|
|
|
+ // verify the value of the sensitive property in job.xml is restored.
|
|
|
+ Assert.assertEquals(sensitivePropertyName + " is modified.",
|
|
|
+ conf.get(sensitivePropertyName), sensitivePropertyValue);
|
|
|
+
|
|
|
+ // load the job_conf.xml in JHS directory and verify property redaction.
|
|
|
+ Path jhsJobConfFile = getJobConfInIntermediateDoneDir(conf, params.jobId);
|
|
|
+ Assert.assertTrue("The job_conf.xml file is not in the JHS directory",
|
|
|
+ FileContext.getFileContext(conf).util().exists(jhsJobConfFile));
|
|
|
+ Configuration jhsJobConf = new Configuration();
|
|
|
+
|
|
|
+ try (InputStream input = FileSystem.get(conf).open(jhsJobConfFile)) {
|
|
|
+ jhsJobConf.addResource(input);
|
|
|
+ Assert.assertEquals(
|
|
|
+ sensitivePropertyName + " is not redacted in HDFS.",
|
|
|
+ MRJobConfUtil.REDACTION_REPLACEMENT_VAL,
|
|
|
+ jhsJobConf.get(sensitivePropertyName));
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ jheh.stop();
|
|
|
+ purgeHdfsHistoryIntermediateDoneDirectory(conf);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static Path getJobConfInIntermediateDoneDir(Configuration conf,
|
|
|
+ JobId jobId) throws IOException {
|
|
|
+ Path userDoneDir = new Path(
|
|
|
+ JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf));
|
|
|
+ Path doneDirPrefix =
|
|
|
+ FileContext.getFileContext(conf).makeQualified(userDoneDir);
|
|
|
+ return new Path(
|
|
|
+ doneDirPrefix, JobHistoryUtils.getIntermediateConfFileName(jobId));
|
|
|
+ }
|
|
|
+
|
|
|
+ private void purgeHdfsHistoryIntermediateDoneDirectory(Configuration conf)
|
|
|
+ throws IOException {
|
|
|
+ FileSystem fs = FileSystem.get(dfsCluster.getConfiguration(0));
|
|
|
+ String intermDoneDirPrefix =
|
|
|
+ JobHistoryUtils.getConfiguredHistoryIntermediateDoneDirPrefix(conf);
|
|
|
+ fs.delete(new Path(intermDoneDirPrefix), true);
|
|
|
+ }
|
|
|
+
|
|
|
@Test (timeout=50000)
|
|
|
public void testDefaultFsIsUsedForHistory() throws Exception {
|
|
|
// Create default configuration pointing to the minicluster
|
|
@@ -413,6 +483,7 @@ public class TestJobHistoryEventHandler {
|
|
|
localFileSystem.exists(new Path(t.dfsWorkDir)));
|
|
|
} finally {
|
|
|
jheh.stop();
|
|
|
+ purgeHdfsHistoryIntermediateDoneDirectory(conf);
|
|
|
}
|
|
|
}
|
|
|
|