|
@@ -44,6 +44,8 @@ import org.apache.hadoop.io.NullWritable;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.mapreduce.Job;
|
|
|
import org.apache.hadoop.mapreduce.Mapper;
|
|
|
+import org.apache.hadoop.mapreduce.Reducer;
|
|
|
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
|
|
|
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
|
|
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
|
|
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
|
|
@@ -82,12 +84,11 @@ public class TestMRWithDistributedCache extends TestCase {
|
|
|
|
|
|
private static final Log LOG =
|
|
|
LogFactory.getLog(TestMRWithDistributedCache.class);
|
|
|
+
|
|
|
+ private static class DistributedCacheChecker {
|
|
|
|
|
|
- public static class DistributedCacheChecker extends
|
|
|
- Mapper<LongWritable, Text, NullWritable, NullWritable> {
|
|
|
-
|
|
|
- @Override
|
|
|
- public void setup(Context context) throws IOException {
|
|
|
+ public void setup(TaskInputOutputContext<?, ?, ?, ?> context)
|
|
|
+ throws IOException {
|
|
|
Configuration conf = context.getConfiguration();
|
|
|
Path[] localFiles = context.getLocalCacheFiles();
|
|
|
URI[] files = context.getCacheFiles();
|
|
@@ -101,6 +102,10 @@ public class TestMRWithDistributedCache extends TestCase {
|
|
|
TestCase.assertEquals(2, files.length);
|
|
|
TestCase.assertEquals(2, archives.length);
|
|
|
|
|
|
+ // Check the file name
|
|
|
+ TestCase.assertTrue(files[0].getPath().endsWith("distributed.first"));
|
|
|
+ TestCase.assertTrue(files[1].getPath().endsWith("distributed.second.jar"));
|
|
|
+
|
|
|
// Check lengths of the files
|
|
|
TestCase.assertEquals(1, fs.getFileStatus(localFiles[0]).getLen());
|
|
|
TestCase.assertTrue(fs.getFileStatus(localFiles[1]).getLen() > 1);
|
|
@@ -130,8 +135,28 @@ public class TestMRWithDistributedCache extends TestCase {
|
|
|
TestCase.assertTrue("second file should be symlinked too",
|
|
|
expectedAbsentSymlinkFile.exists());
|
|
|
}
|
|
|
+
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ public static class DistributedCacheCheckerMapper extends
|
|
|
+ Mapper<LongWritable, Text, NullWritable, NullWritable> {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void setup(Context context) throws IOException,
|
|
|
+ InterruptedException {
|
|
|
+ new DistributedCacheChecker().setup(context);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public static class DistributedCacheCheckerReducer extends
|
|
|
+ Reducer<LongWritable, Text, NullWritable, NullWritable> {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void setup(Context context) throws IOException {
|
|
|
+ new DistributedCacheChecker().setup(context);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private void testWithConf(Configuration conf) throws IOException,
|
|
|
InterruptedException, ClassNotFoundException, URISyntaxException {
|
|
|
// Create a temporary file of length 1.
|
|
@@ -146,7 +171,8 @@ public class TestMRWithDistributedCache extends TestCase {
|
|
|
|
|
|
|
|
|
Job job = Job.getInstance(conf);
|
|
|
- job.setMapperClass(DistributedCacheChecker.class);
|
|
|
+ job.setMapperClass(DistributedCacheCheckerMapper.class);
|
|
|
+ job.setReducerClass(DistributedCacheCheckerReducer.class);
|
|
|
job.setOutputFormatClass(NullOutputFormat.class);
|
|
|
FileInputFormat.setInputPaths(job, first);
|
|
|
// Creates the Job Configuration
|