|
@@ -44,6 +44,7 @@ import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileChecksum;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
@@ -75,9 +76,20 @@ import org.apache.hadoop.util.ToolRunner;
|
|
|
* mbs-per-map specifies the amount of data (in MBs) to generate per map.
|
|
|
* By default, this is twice the value of <code>mapreduce.task.io.sort.mb</code>
|
|
|
* <code>map-tasks</code> specifies the number of map tasks to run.
|
|
|
+ * Steps of the unit test:
|
|
|
+ * 1- Generating random input text.
|
|
|
+ * 2- Run a job with encryption disabled. Get the checksum of the output file
|
|
|
+ * <code>checkSumReference</code>.
|
|
|
+ * 3- Run the job with encryption enabled.
|
|
|
+ * 4- Compare <code>checkSumReference</code> to the checksum of the job output.
|
|
|
+ * 5- If the job has multiple reducers, the test launches one final job to
|
|
|
+ * combine the output files into a single one.
|
|
|
+ * 6- Verify that the maps spilled files.
|
|
|
*/
|
|
|
@RunWith(Parameterized.class)
|
|
|
public class TestMRIntermediateDataEncryption {
|
|
|
+ public static final Logger LOG =
|
|
|
+ LoggerFactory.getLogger(TestMRIntermediateDataEncryption.class);
|
|
|
/**
|
|
|
* The number of bytes generated by the input generator.
|
|
|
*/
|
|
@@ -86,8 +98,6 @@ public class TestMRIntermediateDataEncryption {
|
|
|
public static final int INPUT_GEN_NUM_THREADS = 16;
|
|
|
public static final long TASK_SORT_IO_MB_DEFAULT = 128L;
|
|
|
public static final String JOB_DIR_PATH = "jobs-data-path";
|
|
|
- private static final Logger LOG =
|
|
|
- LoggerFactory.getLogger(TestMRIntermediateDataEncryption.class);
|
|
|
/**
|
|
|
* Directory of the test data.
|
|
|
*/
|
|
@@ -97,6 +107,7 @@ public class TestMRIntermediateDataEncryption {
|
|
|
private static MiniDFSCluster dfsCluster;
|
|
|
private static MiniMRClientCluster mrCluster;
|
|
|
private static FileSystem fs;
|
|
|
+ private static FileChecksum checkSumReference;
|
|
|
private static Path jobInputDirPath;
|
|
|
private static long inputFileSize;
|
|
|
/**
|
|
@@ -136,11 +147,7 @@ public class TestMRIntermediateDataEncryption {
|
|
|
{"testSingleReducer", 3, 1, false},
|
|
|
{"testUberMode", 3, 1, true},
|
|
|
{"testMultipleMapsPerNode", 8, 1, false},
|
|
|
- // TODO: The following configuration is commented out until
|
|
|
- // MAPREDUCE-7325 is fixed.
|
|
|
- // Setting multiple reducers breaks LocalJobRunner causing the
|
|
|
- // unit test to fail.
|
|
|
- // {"testMultipleReducers", 2, 4, false}
|
|
|
+ {"testMultipleReducers", 2, 4, false}
|
|
|
});
|
|
|
}
|
|
|
|
|
@@ -171,6 +178,8 @@ public class TestMRIntermediateDataEncryption {
|
|
|
// run the input generator job.
|
|
|
Assert.assertEquals("Generating input should succeed", 0,
|
|
|
generateInputTextFile());
|
|
|
+ // run the reference job
|
|
|
+ runReferenceJob();
|
|
|
}
|
|
|
|
|
|
@AfterClass
|
|
@@ -185,7 +194,7 @@ public class TestMRIntermediateDataEncryption {
|
|
|
// make sure that generated input file is deleted
|
|
|
final File textInputFile = new File(testRootDir, "input.txt");
|
|
|
if (textInputFile.exists()) {
|
|
|
- textInputFile.delete();
|
|
|
+ Assert.assertTrue(textInputFile.delete());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -198,7 +207,7 @@ public class TestMRIntermediateDataEncryption {
|
|
|
// Set the jvm arguments to enable intermediate encryption.
|
|
|
Configuration conf =
|
|
|
MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(null);
|
|
|
- // Set the temp directories a subdir of the test directory.
|
|
|
+ // Set the temp directories a subDir of the test directory.
|
|
|
conf = MRJobConfUtil.setLocalDirectoriesConfigForTesting(conf, testRootDir);
|
|
|
conf.setLong("dfs.blocksize", BLOCK_SIZE_DEFAULT);
|
|
|
return conf;
|
|
@@ -207,7 +216,7 @@ public class TestMRIntermediateDataEncryption {
|
|
|
/**
|
|
|
* Creates a thread safe BufferedWriter to be used among the task generators.
|
|
|
* @return A synchronized <code>BufferedWriter</code> to the input file.
|
|
|
- * @throws IOException
|
|
|
+ * @throws IOException opening a new {@link FileWriter}.
|
|
|
*/
|
|
|
private static synchronized BufferedWriter getTextInputWriter()
|
|
|
throws IOException {
|
|
@@ -223,7 +232,7 @@ public class TestMRIntermediateDataEncryption {
|
|
|
* It creates a total <code>INPUT_GEN_NUM_THREADS</code> future tasks.
|
|
|
*
|
|
|
* @return the result of the input generation. 0 for success.
|
|
|
- * @throws Exception
|
|
|
+ * @throws Exception during the I/O of job.
|
|
|
*/
|
|
|
private static int generateInputTextFile() throws Exception {
|
|
|
final File textInputFile = new File(testRootDir, "input.txt");
|
|
@@ -270,6 +279,118 @@ public class TestMRIntermediateDataEncryption {
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Runs a WordCount job with encryption disabled and stores the checksum of
|
|
|
+ * the output file.
|
|
|
+ * @throws Exception due to I/O errors.
|
|
|
+ */
|
|
|
+ private static void runReferenceJob() throws Exception {
|
|
|
+ final String jobRefLabel = "job-reference";
|
|
|
+ final Path jobRefDirPath = new Path(JOB_DIR_PATH, jobRefLabel);
|
|
|
+ if (fs.exists(jobRefDirPath) && !fs.delete(jobRefDirPath, true)) {
|
|
|
+ throw new IOException("Could not delete " + jobRefDirPath);
|
|
|
+ }
|
|
|
+ Assert.assertTrue(fs.mkdirs(jobRefDirPath));
|
|
|
+ Path jobRefOutputPath = new Path(jobRefDirPath, "out-dir");
|
|
|
+ Configuration referenceConf = new Configuration(commonConfig);
|
|
|
+ referenceConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, false);
|
|
|
+ Job jobReference = runWordCountJob(jobRefLabel, jobRefOutputPath,
|
|
|
+ referenceConf, 4, 1);
|
|
|
+ Assert.assertTrue(jobReference.isSuccessful());
|
|
|
+ FileStatus[] fileStatusArr =
|
|
|
+ fs.listStatus(jobRefOutputPath,
|
|
|
+ new Utils.OutputFileUtils.OutputFilesFilter());
|
|
|
+ Assert.assertEquals(1, fileStatusArr.length);
|
|
|
+ checkSumReference = fs.getFileChecksum(fileStatusArr[0].getPath());
|
|
|
+ Assert.assertTrue(fs.delete(jobRefDirPath, true));
|
|
|
+ }
|
|
|
+
|
|
|
+ private static Job runWordCountJob(String postfixName, Path jOutputPath,
|
|
|
+ Configuration jConf, int mappers, int reducers) throws Exception {
|
|
|
+ Job job = Job.getInstance(jConf);
|
|
|
+ job.getConfiguration().setInt(MRJobConfig.NUM_MAPS, mappers);
|
|
|
+ job.setJarByClass(TestMRIntermediateDataEncryption.class);
|
|
|
+ job.setJobName("mr-spill-" + postfixName);
|
|
|
+ // Mapper configuration
|
|
|
+ job.setMapperClass(TokenizerMapper.class);
|
|
|
+ job.setInputFormatClass(TextInputFormat.class);
|
|
|
+ job.setCombinerClass(LongSumReducer.class);
|
|
|
+ FileInputFormat.setMinInputSplitSize(job,
|
|
|
+ (inputFileSize + mappers) / mappers);
|
|
|
+ // Reducer configuration
|
|
|
+ job.setReducerClass(LongSumReducer.class);
|
|
|
+ job.setNumReduceTasks(reducers);
|
|
|
+ job.setOutputKeyClass(Text.class);
|
|
|
+ job.setOutputValueClass(LongWritable.class);
|
|
|
+ // Set the IO paths for the job.
|
|
|
+ FileInputFormat.addInputPath(job, jobInputDirPath);
|
|
|
+ FileOutputFormat.setOutputPath(job, jOutputPath);
|
|
|
+ if (job.waitForCompletion(true)) {
|
|
|
+ FileStatus[] fileStatusArr =
|
|
|
+ fs.listStatus(jOutputPath,
|
|
|
+ new Utils.OutputFileUtils.OutputFilesFilter());
|
|
|
+ for (FileStatus fStatus : fileStatusArr) {
|
|
|
+ LOG.info("Job: {} .. Output file {} .. Size = {}",
|
|
|
+ postfixName, fStatus.getPath(), fStatus.getLen());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return job;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Compares the checksum of the output file to the
|
|
|
+ * <code>checkSumReference</code>.
|
|
|
+ * If the job has a multiple reducers, the output files are combined by
|
|
|
+ * launching another job.
|
|
|
+ * @return true if the checksums are equal.
|
|
|
+ * @throws Exception if the output is missing or the combiner job fails.
|
|
|
+ */
|
|
|
+ private boolean validateJobOutput() throws Exception {
|
|
|
+ Assert.assertTrue("Job Output path [" + jobOutputPath + "] should exist",
|
|
|
+ fs.exists(jobOutputPath));
|
|
|
+ Path outputPath = jobOutputPath;
|
|
|
+ if (numReducers != 1) {
|
|
|
+ // combine the result into one file by running a combiner job
|
|
|
+ final String jobRefLabel = testTitleName + "-combine";
|
|
|
+ final Path jobRefDirPath = new Path(JOB_DIR_PATH, jobRefLabel);
|
|
|
+ if (fs.exists(jobRefDirPath) && !fs.delete(jobRefDirPath, true)) {
|
|
|
+ throw new IOException("Could not delete " + jobRefDirPath);
|
|
|
+ }
|
|
|
+ fs.mkdirs(jobRefDirPath);
|
|
|
+ outputPath = new Path(jobRefDirPath, "out-dir");
|
|
|
+ Configuration referenceConf = new Configuration(commonConfig);
|
|
|
+ referenceConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA,
|
|
|
+ false);
|
|
|
+ Job combinerJob = Job.getInstance(referenceConf);
|
|
|
+ combinerJob.setJarByClass(TestMRIntermediateDataEncryption.class);
|
|
|
+ combinerJob.setJobName("mr-spill-" + jobRefLabel);
|
|
|
+ combinerJob.setMapperClass(CombinerJobMapper.class);
|
|
|
+ FileInputFormat.addInputPath(combinerJob, jobOutputPath);
|
|
|
+ // Reducer configuration
|
|
|
+ combinerJob.setReducerClass(LongSumReducer.class);
|
|
|
+ combinerJob.setNumReduceTasks(1);
|
|
|
+ combinerJob.setOutputKeyClass(Text.class);
|
|
|
+ combinerJob.setOutputValueClass(LongWritable.class);
|
|
|
+ // Set the IO paths for the job.
|
|
|
+ FileOutputFormat.setOutputPath(combinerJob, outputPath);
|
|
|
+ if (!combinerJob.waitForCompletion(true)) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ FileStatus[] fileStatusArr =
|
|
|
+ fs.listStatus(outputPath,
|
|
|
+ new Utils.OutputFileUtils.OutputFilesFilter());
|
|
|
+ LOG.info("Job-Combination: {} .. Output file {} .. Size = {}",
|
|
|
+ jobRefDirPath, fileStatusArr[0].getPath(), fileStatusArr[0].getLen());
|
|
|
+ }
|
|
|
+ // Get the output files of the job.
|
|
|
+ FileStatus[] fileStatusArr =
|
|
|
+ fs.listStatus(outputPath,
|
|
|
+ new Utils.OutputFileUtils.OutputFilesFilter());
|
|
|
+ FileChecksum jobFileChecksum =
|
|
|
+ fs.getFileChecksum(fileStatusArr[0].getPath());
|
|
|
+ return checkSumReference.equals(jobFileChecksum);
|
|
|
+ }
|
|
|
+
|
|
|
@Before
|
|
|
public void setup() throws Exception {
|
|
|
LOG.info("Starting TestMRIntermediateDataEncryption#{}.......",
|
|
@@ -284,16 +405,16 @@ public class TestMRIntermediateDataEncryption {
|
|
|
config = new Configuration(commonConfig);
|
|
|
config.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, isUber);
|
|
|
config.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0F);
|
|
|
- // set the configuration to make sure that we get spilled files
|
|
|
+ // Set the configuration to make sure that we get spilled files.
|
|
|
long ioSortMb = TASK_SORT_IO_MB_DEFAULT;
|
|
|
config.setLong(MRJobConfig.IO_SORT_MB, ioSortMb);
|
|
|
long mapMb = Math.max(2 * ioSortMb, config.getInt(MRJobConfig.MAP_MEMORY_MB,
|
|
|
MRJobConfig.DEFAULT_MAP_MEMORY_MB));
|
|
|
- // make sure the map tasks will spill to disk.
|
|
|
+ // Make sure the map tasks will spill to disk.
|
|
|
config.setLong(MRJobConfig.MAP_MEMORY_MB, mapMb);
|
|
|
config.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx" + (mapMb - 200) + "m");
|
|
|
config.setInt(MRJobConfig.NUM_MAPS, numMappers);
|
|
|
- // max attempts have to be set to 1 when intermediate encryption is enabled.
|
|
|
+ // Max attempts have to be set to 1 when intermediate encryption is enabled.
|
|
|
config.setInt("mapreduce.map.maxattempts", 1);
|
|
|
config.setInt("mapreduce.reduce.maxattempts", 1);
|
|
|
}
|
|
@@ -302,24 +423,6 @@ public class TestMRIntermediateDataEncryption {
|
|
|
public void testWordCount() throws Exception {
|
|
|
LOG.info("........Starting main Job Driver #{} starting at {}.......",
|
|
|
testTitleName, Time.formatTime(System.currentTimeMillis()));
|
|
|
- Job job = Job.getInstance(config);
|
|
|
- job.getConfiguration().setInt(MRJobConfig.NUM_MAPS, numMappers);
|
|
|
- job.setJarByClass(TestMRIntermediateDataEncryption.class);
|
|
|
- job.setJobName("mr-spill-" + testTitleName);
|
|
|
- // Mapper configuration
|
|
|
- job.setMapperClass(TokenizerMapper.class);
|
|
|
- job.setInputFormatClass(TextInputFormat.class);
|
|
|
- job.setCombinerClass(LongSumReducer.class);
|
|
|
- FileInputFormat.setMinInputSplitSize(job,
|
|
|
- (inputFileSize + numMappers) / numMappers);
|
|
|
- // Reducer configuration
|
|
|
- job.setReducerClass(LongSumReducer.class);
|
|
|
- job.setNumReduceTasks(numReducers);
|
|
|
- job.setOutputKeyClass(Text.class);
|
|
|
- job.setOutputValueClass(LongWritable.class);
|
|
|
- // Set the IO paths for the job.
|
|
|
- FileInputFormat.addInputPath(job, jobInputDirPath);
|
|
|
- FileOutputFormat.setOutputPath(job, jobOutputPath);
|
|
|
SpillCallBackPathsFinder spillInjector =
|
|
|
(SpillCallBackPathsFinder) IntermediateEncryptedStream
|
|
|
.setSpillCBInjector(new SpillCallBackPathsFinder());
|
|
@@ -328,34 +431,36 @@ public class TestMRIntermediateDataEncryption {
|
|
|
testTitleName));
|
|
|
try {
|
|
|
long startTime = Time.monotonicNow();
|
|
|
- testSummary.append(String.format("%nJob %s ended at %s",
|
|
|
+ testSummary.append(String.format("%nJob %s started at %s",
|
|
|
testTitleName, Time.formatTime(System.currentTimeMillis())));
|
|
|
- Assert.assertTrue(job.waitForCompletion(true));
|
|
|
+ Job job = runWordCountJob(testTitleName, jobOutputPath, config,
|
|
|
+ numMappers, numReducers);
|
|
|
+ Assert.assertTrue(job.isSuccessful());
|
|
|
long endTime = Time.monotonicNow();
|
|
|
testSummary.append(String.format("%nJob %s ended at %s",
|
|
|
job.getJobName(), Time.formatTime(System.currentTimeMillis())));
|
|
|
testSummary.append(String.format("%n\tThe job took %.3f seconds",
|
|
|
(1.0 * (endTime - startTime)) / 1000));
|
|
|
- long spilledRecords =
|
|
|
- job.getCounters().findCounter(TaskCounter.SPILLED_RECORDS).getValue();
|
|
|
- Assert.assertFalse(
|
|
|
- "The encrypted spilled files should not be empty.",
|
|
|
- spillInjector.getEncryptedSpilledFiles().isEmpty());
|
|
|
- Assert.assertTrue("Spill records must be greater than 0",
|
|
|
- spilledRecords > 0);
|
|
|
- Assert.assertTrue("Job Output path [" + jobOutputPath + "] should exist",
|
|
|
- fs.exists(jobOutputPath));
|
|
|
- Assert.assertTrue("Invalid access to spill file positions",
|
|
|
- spillInjector.getInvalidSpillEntries().isEmpty());
|
|
|
- FileStatus[] fileStatus =
|
|
|
+ FileStatus[] fileStatusArr =
|
|
|
fs.listStatus(jobOutputPath,
|
|
|
new Utils.OutputFileUtils.OutputFilesFilter());
|
|
|
- for (FileStatus fStatus : fileStatus) {
|
|
|
+ for (FileStatus fStatus : fileStatusArr) {
|
|
|
long fileSize = fStatus.getLen();
|
|
|
testSummary.append(
|
|
|
String.format("%n\tOutput file %s: %d",
|
|
|
fStatus.getPath(), fileSize));
|
|
|
}
|
|
|
+ // Validate the checksum of the output.
|
|
|
+ Assert.assertTrue(validateJobOutput());
|
|
|
+ // Check intermediate files and spilling.
|
|
|
+ long spilledRecords =
|
|
|
+ job.getCounters().findCounter(TaskCounter.SPILLED_RECORDS).getValue();
|
|
|
+ Assert.assertTrue("Spill records must be greater than 0",
|
|
|
+ spilledRecords > 0);
|
|
|
+ Assert.assertFalse("The encrypted spilled files should not be empty.",
|
|
|
+ spillInjector.getEncryptedSpilledFiles().isEmpty());
|
|
|
+ Assert.assertTrue("Invalid access to spill file positions",
|
|
|
+ spillInjector.getInvalidSpillEntries().isEmpty());
|
|
|
} finally {
|
|
|
testSummary.append(spillInjector.getSpilledFileReport());
|
|
|
LOG.info(testSummary.toString());
|
|
@@ -408,4 +513,21 @@ public class TestMRIntermediateDataEncryption {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * A Mapper that reads the output of WordCount passing it to the reducer.
|
|
|
+ * It is used to combine the output of multiple reducer jobs.
|
|
|
+ */
|
|
|
+ public static class CombinerJobMapper
|
|
|
+ extends Mapper<Object, Text, Text, LongWritable> {
|
|
|
+ private final LongWritable sum = new LongWritable(0);
|
|
|
+ private final Text word = new Text();
|
|
|
+ public void map(Object key, Text value,
|
|
|
+ Context context) throws IOException, InterruptedException {
|
|
|
+ String[] line = value.toString().split("\\s+");
|
|
|
+ sum.set(Long.parseLong(line[1]));
|
|
|
+ word.set(line[0]);
|
|
|
+ context.write(word, sum);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|