Browse Source

HADOOP-5635. Change distributed cache to work with other distributed file systems. Contributed by Andrew Hitchcock.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@779066 13f79535-47bb-0310-9956-ffa450edef68
Thomas White 16 years ago
parent
commit
5977a6c100

+ 3 - 0
CHANGES.txt

@@ -689,6 +689,9 @@ Trunk (unreleased changes)
     CombineFileInputFormat is used as job InputFormat.
     (Amareshwari Sriramadasu via dhruba)
 
+    HADOOP-5635. Change distributed cache to work with other distributed file
+    systems. (Andrew Hitchcock via tomwhite)
+
 Release 0.20.1 - Unreleased
 
   INCOMPATIBLE CHANGES

+ 1 - 1
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java

@@ -1131,7 +1131,7 @@ public class StreamJob implements Tool {
 
   protected RunningJob running_;
   protected JobID jobId_;
-  protected static final String LINK_URI = "You need to specify the uris as hdfs://host:port/#linkname," +
+  protected static final String LINK_URI = "You need to specify the uris as scheme://path#linkname," +
     "Please specify a different link name for all of your caching URIs";
 
 }

+ 8 - 22
src/core/org/apache/hadoop/filecache/DistributedCache.java

@@ -35,10 +35,10 @@ import java.net.URI;
  * </p>
  * 
  * <p>Applications specify the files, via urls (hdfs:// or http://) to be cached 
- * via the {@link org.apache.hadoop.mapred.JobConf}.
- * The <code>DistributedCache</code> assumes that the
- * files specified via hdfs:// urls are already present on the 
- * {@link FileSystem} at the path specified by the url.</p>
+ * via the {@link org.apache.hadoop.mapred.JobConf}. The
+ * <code>DistributedCache</code> assumes that the files specified via urls are
+ * already present on the {@link FileSystem} at the path specified by the url
+ * and are accessible by every machine in the cluster.</p>
  * 
  * <p>The framework will copy the necessary files on to the slave node before 
  * any tasks for the job are executed on that node. Its efficiency stems from 
@@ -129,9 +129,7 @@ public class DistributedCache {
    * previously cached (and valid) or copy it from the {@link FileSystem} now.
    * 
    * @param cache the cache to be localized, this should be specified as 
-   * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema 
-   * or hostname:port is provided the file is assumed to be in the filesystem
-   * being used in the Configuration
+   * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
    * @param conf The Confguration file which contains the filesystem
    * @param baseDir The base cache Dir where you wnat to localize the files/archives
    * @param fileStatus The file status on the dfs.
@@ -162,9 +160,7 @@ public class DistributedCache {
    * previously cached (and valid) or copy it from the {@link FileSystem} now.
    * 
    * @param cache the cache to be localized, this should be specified as 
-   * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema 
-   * or hostname:port is provided the file is assumed to be in the filesystem
-   * being used in the Configuration
+   * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
    * @param conf The Confguration file which contains the filesystem
    * @param baseDir The base cache Dir where you wnat to localize the files/archives
    * @param fileStatus The file status on the dfs.
@@ -231,9 +227,7 @@ public class DistributedCache {
    * previously cached (and valid) or copy it from the {@link FileSystem} now.
    * 
    * @param cache the cache to be localized, this should be specified as 
-   * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema 
-   * or hostname:port is provided the file is assumed to be in the filesystem
-   * being used in the Configuration
+   * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
    * @param conf The Confguration file which contains the filesystem
    * @param baseDir The base cache Dir where you wnat to localize the files/archives
    * @param isArchive if the cache is an archive or a file. In case it is an 
@@ -350,7 +344,7 @@ public class DistributedCache {
     if(cache.getFragment() == null) {
     	doSymlink = false;
     }
-    FileSystem fs = getFileSystem(cache, conf);
+    FileSystem fs = FileSystem.get(cache, conf);
     String link = currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
     File flink = new File(link);
     if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
@@ -533,14 +527,6 @@ public class DistributedCache {
     }  
   }
   
-  private static FileSystem getFileSystem(URI cache, Configuration conf)
-    throws IOException {
-    if ("hdfs".equals(cache.getScheme()))
-      return FileSystem.get(cache, conf);
-    else
-      return FileSystem.get(conf);
-  }
-
   /**
    * Set the configuration with the given set of archives
    * @param archives The list of archives that need to be localized

+ 9 - 0
src/test/core/org/apache/hadoop/filecache/TestDistributedCache.java

@@ -55,6 +55,15 @@ public class TestDistributedCache extends TestCase {
     assertTrue("DistributedCache failed deleting old cache when the cache store is full.",
         dirStatuses.length > 1);
   }
+  
+  public void testFileSystemOtherThanDefault() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
+    Path fileToCache = new Path("fakefile:///" + firstCacheFile.toUri().getPath());
+    Path result = DistributedCache.getLocalCache(fileToCache.toUri(), conf, new Path(TEST_CACHE_BASE_DIR), 
+        false, System.currentTimeMillis(), new Path(TEST_ROOT_DIR));
+    assertNotNull("DistributedCache cached file on non-default filesystem.", result);
+  }
 
   private void createTempFile(FileSystem fs, Path p) throws IOException {
     FSDataOutputStream out = fs.create(p);