|
@@ -109,6 +109,55 @@ public class MRCaching {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Using the wordcount example and adding caching to it. The cache
|
|
|
+ * archives/files are set and then are checked in the map if they have been
|
|
|
+ * symlinked or not.
|
|
|
+ */
|
|
|
+ public static class MapClass2 extends MapClass {
|
|
|
+
|
|
|
+ JobConf conf;
|
|
|
+
|
|
|
+ public void configure(JobConf jconf) {
|
|
|
+ conf = jconf;
|
|
|
+ try {
|
|
|
+ // read the cached files (unzipped, unjarred and text)
|
|
|
+ // and put it into a single file TEST_ROOT_DIR/test.txt
|
|
|
+ String TEST_ROOT_DIR = jconf.get("test.build.data","/tmp");
|
|
|
+ Path file = new Path("file:///", TEST_ROOT_DIR);
|
|
|
+ FileSystem fs = FileSystem.getLocal(conf);
|
|
|
+ if (!fs.mkdirs(file)) {
|
|
|
+ throw new IOException("Mkdirs failed to create " + file.toString());
|
|
|
+ }
|
|
|
+ Path fileOut = new Path(file, "test.txt");
|
|
|
+ fs.delete(fileOut, true);
|
|
|
+ DataOutputStream out = fs.create(fileOut);
|
|
|
+ String[] symlinks = new String[6];
|
|
|
+ symlinks[0] = ".";
|
|
|
+ symlinks[1] = "testjar";
|
|
|
+ symlinks[2] = "testzip";
|
|
|
+ symlinks[3] = "testtgz";
|
|
|
+ symlinks[4] = "testtargz";
|
|
|
+ symlinks[5] = "testtar";
|
|
|
+
|
|
|
+ for (int i = 0; i < symlinks.length; i++) {
|
|
|
+ // read out the files from these archives
|
|
|
+ File f = new File(symlinks[i]);
|
|
|
+ File txt = new File(f, "test.txt");
|
|
|
+ FileInputStream fin = new FileInputStream(txt);
|
|
|
+ BufferedReader reader = new BufferedReader(new InputStreamReader(fin));
|
|
|
+ String str = reader.readLine();
|
|
|
+ reader.close();
|
|
|
+ out.writeBytes(str);
|
|
|
+ out.writeBytes("\n");
|
|
|
+ }
|
|
|
+ out.close();
|
|
|
+ } catch (IOException ie) {
|
|
|
+ System.out.println(StringUtils.stringifyException(ie));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* A reducer class that just emits the sum of the input values.
|
|
|
*/
|
|
@@ -135,9 +184,40 @@ public class MRCaching {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ static void setupCache(String cacheDir, FileSystem fs)
|
|
|
+ throws IOException {
|
|
|
+ Path localPath = new Path("build/test/cache");
|
|
|
+ Path txtPath = new Path(localPath, new Path("test.txt"));
|
|
|
+ Path jarPath = new Path(localPath, new Path("test.jar"));
|
|
|
+ Path zipPath = new Path(localPath, new Path("test.zip"));
|
|
|
+ Path tarPath = new Path(localPath, new Path("test.tgz"));
|
|
|
+ Path tarPath1 = new Path(localPath, new Path("test.tar.gz"));
|
|
|
+ Path tarPath2 = new Path(localPath, new Path("test.tar"));
|
|
|
+ Path cachePath = new Path(cacheDir);
|
|
|
+ fs.delete(cachePath, true);
|
|
|
+ if (!fs.mkdirs(cachePath)) {
|
|
|
+ throw new IOException("Mkdirs failed to create " + cachePath.toString());
|
|
|
+ }
|
|
|
+ fs.copyFromLocalFile(txtPath, cachePath);
|
|
|
+ fs.copyFromLocalFile(jarPath, cachePath);
|
|
|
+ fs.copyFromLocalFile(zipPath, cachePath);
|
|
|
+ fs.copyFromLocalFile(tarPath, cachePath);
|
|
|
+ fs.copyFromLocalFile(tarPath1, cachePath);
|
|
|
+ fs.copyFromLocalFile(tarPath2, cachePath);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static TestResult launchMRCache(String indir,
|
|
|
+ String outdir, String cacheDir,
|
|
|
+ JobConf conf, String input)
|
|
|
+ throws IOException {
|
|
|
+ setupCache(cacheDir, FileSystem.get(conf));
|
|
|
+ return launchMRCache(indir,outdir, cacheDir, conf, input, false);
|
|
|
+ }
|
|
|
+
|
|
|
public static TestResult launchMRCache(String indir,
|
|
|
String outdir, String cacheDir,
|
|
|
- JobConf conf, String input)
|
|
|
+ JobConf conf, String input,
|
|
|
+ boolean withSymlink)
|
|
|
throws IOException {
|
|
|
String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/tmp"))
|
|
|
.toString().replace(' ', '+');
|
|
@@ -163,7 +243,6 @@ public class MRCaching {
|
|
|
// the values are counts (ints)
|
|
|
conf.setOutputValueClass(IntWritable.class);
|
|
|
|
|
|
- conf.setMapperClass(MRCaching.MapClass.class);
|
|
|
conf.setCombinerClass(MRCaching.ReduceClass.class);
|
|
|
conf.setReducerClass(MRCaching.ReduceClass.class);
|
|
|
FileInputFormat.setInputPaths(conf, inDir);
|
|
@@ -171,38 +250,29 @@ public class MRCaching {
|
|
|
conf.setNumMapTasks(1);
|
|
|
conf.setNumReduceTasks(1);
|
|
|
conf.setSpeculativeExecution(false);
|
|
|
- Path localPath = new Path("build/test/cache");
|
|
|
- Path txtPath = new Path(localPath, new Path("test.txt"));
|
|
|
- Path jarPath = new Path(localPath, new Path("test.jar"));
|
|
|
- Path zipPath = new Path(localPath, new Path("test.zip"));
|
|
|
- Path tarPath = new Path(localPath, new Path("test.tgz"));
|
|
|
- Path tarPath1 = new Path(localPath, new Path("test.tar.gz"));
|
|
|
- Path tarPath2 = new Path(localPath, new Path("test.tar"));
|
|
|
- Path cachePath = new Path(cacheDir);
|
|
|
- fs.delete(cachePath, true);
|
|
|
- if (!fs.mkdirs(cachePath)) {
|
|
|
- throw new IOException("Mkdirs failed to create " + cachePath.toString());
|
|
|
+ URI[] uris = new URI[6];
|
|
|
+ if (!withSymlink) {
|
|
|
+ conf.setMapperClass(MRCaching.MapClass.class);
|
|
|
+ uris[0] = fs.getUri().resolve(cacheDir + "/test.txt");
|
|
|
+ uris[1] = fs.getUri().resolve(cacheDir + "/test.jar");
|
|
|
+ uris[2] = fs.getUri().resolve(cacheDir + "/test.zip");
|
|
|
+ uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz");
|
|
|
+ uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz");
|
|
|
+ uris[5] = fs.getUri().resolve(cacheDir + "/test.tar");
|
|
|
+ } else {
|
|
|
+ DistributedCache.createSymlink(conf);
|
|
|
+ conf.setMapperClass(MRCaching.MapClass2.class);
|
|
|
+ uris[0] = fs.getUri().resolve(cacheDir + "/test.txt#" + "test.txt");
|
|
|
+ uris[1] = fs.getUri().resolve(cacheDir + "/test.jar#" + "testjar");
|
|
|
+ uris[2] = fs.getUri().resolve(cacheDir + "/test.zip#" + "testzip");
|
|
|
+ uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz#" + "testtgz");
|
|
|
+ uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz#" + "testtargz");
|
|
|
+ uris[5] = fs.getUri().resolve(cacheDir + "/test.tar#" + "testtar");
|
|
|
+ }
|
|
|
+ DistributedCache.addCacheFile(uris[0], conf);
|
|
|
+ for (int i = 1; i < 6; i++) {
|
|
|
+ DistributedCache.addCacheArchive(uris[i], conf);
|
|
|
}
|
|
|
- fs.copyFromLocalFile(txtPath, cachePath);
|
|
|
- fs.copyFromLocalFile(jarPath, cachePath);
|
|
|
- fs.copyFromLocalFile(zipPath, cachePath);
|
|
|
- fs.copyFromLocalFile(tarPath, cachePath);
|
|
|
- fs.copyFromLocalFile(tarPath1, cachePath);
|
|
|
- fs.copyFromLocalFile(tarPath2, cachePath);
|
|
|
- // setting the cached archives to zip, jar and simple text files
|
|
|
- URI uri1 = fs.getUri().resolve(cachePath + "/test.jar");
|
|
|
- URI uri2 = fs.getUri().resolve(cachePath + "/test.zip");
|
|
|
- URI uri3 = fs.getUri().resolve(cachePath + "/test.txt");
|
|
|
- URI uri4 = fs.getUri().resolve(cachePath + "/test.tgz");
|
|
|
- URI uri5 = fs.getUri().resolve(cachePath + "/test.tar.gz");
|
|
|
- URI uri6 = fs.getUri().resolve(cachePath + "/test.tar");
|
|
|
-
|
|
|
- DistributedCache.addCacheArchive(uri1, conf);
|
|
|
- DistributedCache.addCacheArchive(uri2, conf);
|
|
|
- DistributedCache.addCacheFile(uri3, conf);
|
|
|
- DistributedCache.addCacheArchive(uri4, conf);
|
|
|
- DistributedCache.addCacheArchive(uri5, conf);
|
|
|
- DistributedCache.addCacheArchive(uri6, conf);
|
|
|
RunningJob job = JobClient.runJob(conf);
|
|
|
int count = 0;
|
|
|
// after the job ran check to see if the input from the localized cache
|