|
@@ -50,11 +50,11 @@ import java.net.URI;
|
|
|
*
|
|
|
* <p><code>DistributedCache</code> can be used to distribute simple, read-only
|
|
|
* data/text files and/or more complex types such as archives, jars etc.
|
|
|
- * Archives (zip files) are un-archived at the slave nodes. Jars maybe be
|
|
|
- * optionally added to the classpath of the tasks, a rudimentary software
|
|
|
- * distribution mechanism. Files have execution permissions. Optionally users
|
|
|
- * can also direct it to symlink the distributed cache file(s) into
|
|
|
- * the working directory of the task.</p>
|
|
|
+ * Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes.
|
|
|
+ * Jars may be optionally added to the classpath of the tasks, a rudimentary
|
|
|
+ * software distribution mechanism. Files have execution permissions.
|
|
|
+ * Optionally users can also direct it to symlink the distributed cache file(s)
|
|
|
+ * into the working directory of the task.</p>
|
|
|
*
|
|
|
* <p><code>DistributedCache</code> tracks modification timestamps of the cache
|
|
|
* files. Clearly the cache files should not be modified by the application
|
|
@@ -70,6 +70,9 @@ import java.net.URI;
|
|
|
* $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat
|
|
|
* $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip
|
|
|
* $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar
|
|
|
+ * $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar
|
|
|
+ * $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz
|
|
|
+ * $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz
|
|
|
*
|
|
|
* 2. Setup the application's <code>JobConf</code>:
|
|
|
*
|
|
@@ -78,7 +81,10 @@ import java.net.URI;
|
|
|
* job);
|
|
|
* DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
|
|
|
* DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
|
|
|
- *
|
|
|
+ * DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
|
|
|
+ * DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
|
|
|
+ * DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);
|
|
|
+ *
|
|
|
* 3. Use the cached files in the {@link Mapper} or {@link Reducer}:
|
|
|
*
|
|
|
* public static class MapClass extends MapReduceBase
|
|
@@ -129,9 +135,11 @@ public class DistributedCache {
|
|
|
* @param conf The Confguration file which contains the filesystem
|
|
|
* @param baseDir The base cache Dir where you wnat to localize the files/archives
|
|
|
* @param fileStatus The file status on the dfs.
|
|
|
- * @param isArchive if the cache is an archive or a file. In case it is an archive
|
|
|
- * with a .zip or .jar extension it will be unzipped/unjarred automatically
|
|
|
- * and the directory where the archive is unjarred is returned as the Path.
|
|
|
+ * @param isArchive if the cache is an archive or a file. In case it is an
|
|
|
+ * archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
|
|
|
+ * be unzipped/unjarred/untarred automatically
|
|
|
+ * and the directory where the archive is unzipped/unjarred/untarred is
|
|
|
+ * returned as the Path.
|
|
|
* In case of a file, the path to the file is returned
|
|
|
* @param confFileStamp this is the hdfs file modification timestamp to verify that the
|
|
|
* file to be cached hasn't changed since the job started
|
|
@@ -185,9 +193,11 @@ public class DistributedCache {
|
|
|
* being used in the Configuration
|
|
|
* @param conf The Confguration file which contains the filesystem
|
|
|
* @param baseDir The base cache Dir where you wnat to localize the files/archives
|
|
|
- * @param isArchive if the cache is an archive or a file. In case it is an archive
|
|
|
- * with a .zip or .jar extension it will be unzipped/unjarred automatically
|
|
|
- * and the directory where the archive is unjarred is returned as the Path.
|
|
|
+ * @param isArchive if the cache is an archive or a file. In case it is an
|
|
|
+ * archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
|
|
|
+ * be unzipped/unjarred/untarred automatically
|
|
|
+ * and the directory where the archive is unzipped/unjarred/untarred
|
|
|
+ * is returned as the Path.
|
|
|
* In case of a file, the path to the file is returned
|
|
|
* @param confFileStamp this is the hdfs file modification timestamp to verify that the
|
|
|
* file to be cached hasn't changed since the job started
|
|
@@ -331,13 +341,14 @@ public class DistributedCache {
|
|
|
fs.copyToLocalFile(new Path(cacheId), parchive);
|
|
|
if (isArchive) {
|
|
|
String tmpArchive = parchive.toString().toLowerCase();
|
|
|
+ File srcFile = new File(parchive.toString());
|
|
|
+ File destDir = new File(parchive.getParent().toString());
|
|
|
if (tmpArchive.endsWith(".jar")) {
|
|
|
- RunJar.unJar(new File(parchive.toString()), new File(parchive
|
|
|
- .getParent().toString()));
|
|
|
+ RunJar.unJar(srcFile, destDir);
|
|
|
} else if (tmpArchive.endsWith(".zip")) {
|
|
|
- FileUtil.unZip(new File(parchive.toString()), new File(parchive
|
|
|
- .getParent().toString()));
|
|
|
-
|
|
|
+ FileUtil.unZip(srcFile, destDir);
|
|
|
+ } else if (isTarFile(tmpArchive)) {
|
|
|
+ FileUtil.unTar(srcFile, destDir);
|
|
|
}
|
|
|
// else will not do anyhting
|
|
|
// and copy the file into the dir as it is
|
|
@@ -373,6 +384,11 @@ public class DistributedCache {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private static boolean isTarFile(String filename) {
|
|
|
+ return (filename.endsWith(".tgz") || filename.endsWith(".tar.gz") ||
|
|
|
+ filename.endsWith(".tar"));
|
|
|
+ }
|
|
|
+
|
|
|
// Checks if the cache has already been localized and is fresh
|
|
|
private static boolean ifExistsAndFresh(Configuration conf, FileSystem fs,
|
|
|
URI cache, long confFileStamp,
|