Selaa lähdekoodia

HADOOP-836. Fix a MapReduce bug on Windows where the wrong FileSystem was used. Contributed by Owen.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@489185 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 vuotta sitten
vanhempi
commit
29ae6d67af
27 muutettua tiedostoa jossa 61 lisäystä ja 35 poistoa
  1. 5 0
      CHANGES.txt
  2. 2 2
      src/contrib/smallJobsBenchmark/src/java/org/apache/hadoop/benchmarks/mapred/MultiJobRunner.java
  3. 1 1
      src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
  4. 1 1
      src/java/org/apache/hadoop/conf/Configuration.java
  5. 12 1
      src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
  6. 1 1
      src/java/org/apache/hadoop/dfs/FSNamesystem.java
  7. 2 2
      src/java/org/apache/hadoop/filecache/DistributedCache.java
  8. 11 1
      src/java/org/apache/hadoop/fs/FileSystem.java
  9. 2 2
      src/java/org/apache/hadoop/fs/FsShell.java
  10. 1 1
      src/java/org/apache/hadoop/fs/LocalFileSystem.java
  11. 1 1
      src/java/org/apache/hadoop/mapred/IsolationRunner.java
  12. 2 2
      src/java/org/apache/hadoop/mapred/JobConf.java
  13. 1 1
      src/java/org/apache/hadoop/mapred/JobInProgress.java
  14. 1 1
      src/java/org/apache/hadoop/mapred/LocalJobRunner.java
  15. 2 2
      src/java/org/apache/hadoop/mapred/MapTask.java
  16. 1 1
      src/java/org/apache/hadoop/mapred/ReduceTask.java
  17. 1 1
      src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
  18. 2 2
      src/java/org/apache/hadoop/mapred/TaskRunner.java
  19. 3 3
      src/java/org/apache/hadoop/mapred/TaskTracker.java
  20. 1 1
      src/test/org/apache/hadoop/dfs/TestPread.java
  21. 1 1
      src/test/org/apache/hadoop/dfs/TestSeekBug.java
  22. 2 2
      src/test/org/apache/hadoop/fs/DFSCIOTest.java
  23. 1 1
      src/test/org/apache/hadoop/fs/TestLocalFileSystem.java
  24. 1 1
      src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java
  25. 1 1
      src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java
  26. 1 1
      src/test/org/apache/hadoop/mapred/TestTextInputFormat.java
  27. 1 1
      src/test/org/apache/hadoop/record/test/TestWritable.java

+ 5 - 0
CHANGES.txt

@@ -137,6 +137,11 @@ Trunk (unreleased changes)
 38. HADOOP-835.  Fix a NullPointerException reading record-compressed
     SequenceFiles.  (Hairong Kuang via cutting)
 
+39. HADOOP-836.  Fix a MapReduce bug on Windows, where the wrong
+    FileSystem was used.  Also add a static FileSystem.getLocal()
+    method and better Path checking in HDFS, to help avoid such issues
+    in the future.  (omalley via cutting)
+
 
 Release 0.9.2 - 2006-12-15
 

+ 2 - 2
src/contrib/smallJobsBenchmark/src/java/org/apache/hadoop/benchmarks/mapred/MultiJobRunner.java

@@ -231,7 +231,7 @@ public class MultiJobRunner {
     }
     //new File(localPath).
     Configuration conf = new Configuration();
-    FileSystem localFS = FileSystem.getNamed("local", conf);
+    FileSystem localFS = FileSystem.getLocal(conf);
     FileSystem remoteFS = FileSystem.get(conf);
     
     FileUtil.copy(localFS, new Path(localFile), remoteFS, 
@@ -249,7 +249,7 @@ public class MultiJobRunner {
   throws IOException{
     
     Configuration conf = new Configuration();
-    FileSystem localFS = FileSystem.getNamed("local", conf);
+    FileSystem localFS = FileSystem.getLocal(conf);
     FileSystem remoteFS = FileSystem.get(conf);
     
     FileUtil.copy(remoteFS, remotePath, 

+ 1 - 1
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java

@@ -211,7 +211,7 @@ public abstract class PipeMapRed {
       fs_ = FileSystem.get(job_);
       if (job_.getBoolean("stream.sideoutput.localfs", false)) {
         //sideFs_ = new LocalFileSystem(job_);
-        sideFs_ = FileSystem.getNamed("local", job_);
+        sideFs_ = FileSystem.getLocal(job_);
       } else {
         sideFs_ = fs_;
       }

+ 1 - 1
src/java/org/apache/hadoop/conf/Configuration.java

@@ -363,7 +363,7 @@ public class Configuration {
     throws IOException {
     String[] dirs = getStrings(dirsProp);
     int hashCode = path.hashCode();
-    FileSystem fs = FileSystem.getNamed("local", this);
+    FileSystem fs = FileSystem.getLocal(this);
     for (int i = 0; i < dirs.length; i++) {  // try each local dir
       int index = (hashCode+i & Integer.MAX_VALUE) % dirs.length;
       Path file = new Path(dirs[index], path);

+ 12 - 1
src/java/org/apache/hadoop/dfs/DistributedFileSystem.java

@@ -96,12 +96,23 @@ public class DistributedFileSystem extends FileSystem {
     }
     
     public void setWorkingDirectory(Path dir) {
+      Path result = makeAbsolute(dir);
+      if (!FSNamesystem.isValidName(result.toString())) {
+        throw new IllegalArgumentException("Invalid DFS directory name " + 
+                                           result);
+      }
       workingDir = makeAbsolute(dir);
     }
     
     private UTF8 getPath(Path file) {
       checkPath(file);
-      return new UTF8(makeAbsolute(file).toUri().getPath());
+      String result = makeAbsolute(file).toUri().getPath();
+      if (!FSNamesystem.isValidName(result)) {
+        throw new IllegalArgumentException("Pathname " + result + " from " +
+                                            file +
+                                            " is not a valid DFS filename.");
+      }
+      return new UTF8(result);
     }
 
     public String[][] getFileCacheHints(Path f, long start, long len) throws IOException {

+ 1 - 1
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -845,7 +845,7 @@ class FSNamesystem implements FSConstants {
      * Whether the pathname is valid.  Currently prohibits relative paths, 
      * and names which contain a ":" or "/" 
      */
-    private boolean isValidName(String src) {
+    static boolean isValidName(String src) {
       
       // Path must be absolute.
       if (!src.startsWith(Path.SEPARATOR)) {

+ 2 - 2
src/java/org/apache/hadoop/filecache/DistributedCache.java

@@ -127,7 +127,7 @@ public class DistributedCache {
         CacheStatus lcacheStatus = (CacheStatus) cachedArchives.get(cacheId);
         if (lcacheStatus.refcount == 0) {
           // delete this cache entry
-          FileSystem.getNamed("local", conf).delete(lcacheStatus.localLoadPath);
+          FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath);
           it.remove();
         }
       }
@@ -195,7 +195,7 @@ public class DistributedCache {
         throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
             + " is in use and cannot be refreshed");
       byte[] checkSum = createMD5(cache, conf);
-      FileSystem localFs = FileSystem.getNamed("local", conf);
+      FileSystem localFs = FileSystem.getLocal(conf);
       localFs.delete(cacheStatus.localLoadPath);
       Path parchive = new Path(cacheStatus.localLoadPath,
                                new Path(cacheStatus.localLoadPath.getName()));

+ 11 - 1
src/java/org/apache/hadoop/fs/FileSystem.java

@@ -118,7 +118,17 @@ public abstract class FileSystem extends Configured {
 
       return get(URI.create(name), conf);
     }
-  
+
+    /**
+     * Get the local file syste
+     * @param conf the configuration to configure the file system with
+     * @return a LocalFileSystem
+     */
+    public static LocalFileSystem getLocal(Configuration conf)
+      throws IOException {
+      return (LocalFileSystem)get(LocalFileSystem.NAME, conf);
+    }
+
     /** Returns the FileSystem for this URI's scheme and authority.  The scheme
      * of the URI determines a configuration property name,
      * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.

+ 2 - 2
src/java/org/apache/hadoop/fs/FsShell.java

@@ -152,10 +152,10 @@ public class FsShell extends ToolBase {
       for( int i=0; i<srcs.length; i++ ) {
         if(endline) {
             FileUtil.copyMerge(fs, srcs[i], 
-                    FileSystem.getNamed("local", conf), dst, false, conf, "\n");
+                    FileSystem.getLocal(conf), dst, false, conf, "\n");
         } else {
             FileUtil.copyMerge(fs, srcs[i], 
-                    FileSystem.getNamed("local", conf), dst, false, conf, null);
+                    FileSystem.getLocal(conf), dst, false, conf, null);
         }
       }
     }      

+ 1 - 1
src/java/org/apache/hadoop/fs/LocalFileSystem.java

@@ -32,7 +32,7 @@ import org.apache.hadoop.util.Progressable;
  * @author Mike Cafarella
  *****************************************************************/
 public class LocalFileSystem extends FileSystem {
-    private static final URI NAME = URI.create("file:///");
+    static final URI NAME = URI.create("file:///");
 
     private Path workingDir =
       new Path(System.getProperty("user.dir"));

+ 1 - 1
src/java/org/apache/hadoop/mapred/IsolationRunner.java

@@ -139,7 +139,7 @@ public class IsolationRunner {
     int partition = conf.getInt("mapred.task.partition", 0);
     
     // setup the local and user working directories
-    FileSystem local = FileSystem.getNamed("local", conf);
+    FileSystem local = FileSystem.getLocal(conf);
     File workDirName = new File(jobFilename.getParent(), "work");
     local.setWorkingDirectory(new Path(workDirName.toString()));
     FileSystem.get(conf).setWorkingDirectory(conf.getWorkingDirectory());

+ 2 - 2
src/java/org/apache/hadoop/mapred/JobConf.java

@@ -129,14 +129,14 @@ public class JobConf extends Configuration {
   public void deleteLocalFiles() throws IOException {
     String[] localDirs = getLocalDirs();
     for (int i = 0; i < localDirs.length; i++) {
-      FileSystem.getNamed("local", this).delete(new Path(localDirs[i]));
+      FileSystem.getLocal(this).delete(new Path(localDirs[i]));
     }
   }
 
   public void deleteLocalFiles(String subdir) throws IOException {
     String[] localDirs = getLocalDirs();
     for (int i = 0; i < localDirs.length; i++) {
-      FileSystem.getNamed("local", this).delete(new Path(localDirs[i], subdir));
+      FileSystem.getLocal(this).delete(new Path(localDirs[i], subdir));
     }
   }
 

+ 1 - 1
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -75,7 +75,7 @@ class JobInProgress {
         this.jobtracker = jobtracker;
         this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
         this.startTime = System.currentTimeMillis();
-        this.localFs = (LocalFileSystem)FileSystem.getNamed("local", default_conf);
+        this.localFs = (LocalFileSystem)FileSystem.getLocal(default_conf);
 
         JobConf default_job_conf = new JobConf(default_conf);
         this.localJobFile = default_job_conf.getLocalPath(JobTracker.SUBDIR 

+ 1 - 1
src/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -69,7 +69,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
       this.mapoutputFile.setConf(conf);
 
       this.localFile = new JobConf(conf).getLocalPath("localRunner/"+id+".xml");
-      this.localFs = FileSystem.getNamed("local", conf);
+      this.localFs = FileSystem.getLocal(conf);
 
       fs.copyToLocalFile(new Path(file), localFile);
       this.job = new JobConf(localFile);

+ 2 - 2
src/java/org/apache/hadoop/mapred/MapTask.java

@@ -107,7 +107,7 @@ class MapTask extends Task {
     super.localizeConfiguration(conf);
     Path localSplit = new Path(new Path(getJobFile()).getParent(), 
                                "split.dta");
-    DataOutputStream out = LocalFileSystem.get(conf).create(localSplit);
+    DataOutputStream out = FileSystem.getLocal(conf).create(localSplit);
     split.write(out);
     out.close();
     if (split instanceof FileSplit) {
@@ -248,7 +248,7 @@ class MapTask extends Task {
       this.comparator = job.getOutputKeyComparator();
       this.keyClass = job.getMapOutputKeyClass();
       this.valClass = job.getMapOutputValueClass();
-      this.localFs = FileSystem.getNamed("local", job);
+      this.localFs = FileSystem.getLocal(job);
       this.codec = null;
       this.compressionType = CompressionType.NONE;
       if (job.getCompressMapOutput()) {

+ 1 - 1
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -227,7 +227,7 @@ class ReduceTask extends Task {
     Class valueClass = job.getMapOutputValueClass();
     Reducer reducer = (Reducer)ReflectionUtils.newInstance(
                                   job.getReducerClass(), job);
-    FileSystem lfs = FileSystem.getNamed("local", job);
+    FileSystem lfs = FileSystem.getLocal(job);
 
     copyPhase.complete();                         // copy is already complete
     

+ 1 - 1
src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java

@@ -258,7 +258,7 @@ class ReduceTaskRunner extends TaskRunner {
     super(task, tracker, conf);
     this.mapOutputFile = new MapOutputFile();
     this.mapOutputFile.setConf(conf);
-    localFileSys = FileSystem.getNamed("local", conf);
+    localFileSys = FileSystem.getLocal(conf);
 
     this.reduceTask = (ReduceTask)getTask();
     this.scheduledCopies = new ArrayList(100);

+ 2 - 2
src/java/org/apache/hadoop/mapred/TaskRunner.java

@@ -107,7 +107,7 @@ abstract class TaskRunner extends Thread {
           DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
         }
         Path localTaskFile = new Path(t.getJobFile());
-        FileSystem localFs = FileSystem.getNamed("local", conf);
+        FileSystem localFs = FileSystem.getLocal(conf);
         localFs.delete(localTaskFile);
         OutputStream out = localFs.create(localTaskFile);
         try {
@@ -367,4 +367,4 @@ abstract class TaskRunner extends Thread {
     }
   }
   
-}
+}

+ 3 - 3
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -287,7 +287,7 @@ public class TaskTracker
                                   jobId + Path.SEPARATOR + "job.jar");
   
           String jobFile = t.getJobFile();
-          FileSystem localFs = FileSystem.getNamed("local", fConf);
+          FileSystem localFs = FileSystem.getLocal(fConf);
           // this will happen on a partial execution of localizeJob.
           // Sometimes the job.xml gets copied but copying job.jar
           // might throw out an exception
@@ -415,7 +415,7 @@ public class TaskTracker
       server.setThreads(1, workerThreads);
       // let the jsp pages get to the task tracker, config, and other relevant
       // objects
-      FileSystem local = FileSystem.getNamed("local", conf);
+      FileSystem local = FileSystem.getLocal(conf);
       server.setAttribute("task.tracker", this);
       server.setAttribute("local.file.system", local);
       server.setAttribute("conf", conf);
@@ -955,7 +955,7 @@ public class TaskTracker
             Path localTaskDir =
               new Path(this.defaultJobConf.getLocalPath(TaskTracker.getJobCacheSubdir()), 
                 (task.getJobId() + Path.SEPARATOR + task.getTaskId()));
-           FileSystem localFs = FileSystem.getNamed("local", fConf);
+           FileSystem localFs = FileSystem.getLocal(fConf);
            if (!localFs.mkdirs(localTaskDir)) {
              throw new IOException("Mkdirs failed to create " + localTaskDir.toString());
            }

+ 1 - 1
src/test/org/apache/hadoop/dfs/TestPread.java

@@ -129,7 +129,7 @@ public class TestPread extends TestCase {
    */
   public void testPreadLocalFS() throws IOException {
     Configuration conf = new Configuration();
-    FileSystem fileSys = FileSystem.getNamed("local", conf);
+    FileSystem fileSys = FileSystem.getLocal(conf);
     try {
       Path file1 = new Path("build/test/data", "preadtest.dat");
       writeFile(fileSys, file1);

+ 1 - 1
src/test/org/apache/hadoop/dfs/TestSeekBug.java

@@ -108,7 +108,7 @@ public class TestSeekBug extends TestCase {
    */
   public void testSeekBugLocalFS() throws IOException {
     Configuration conf = new Configuration();
-    FileSystem fileSys = FileSystem.getNamed("local", conf);
+    FileSystem fileSys = FileSystem.getLocal(conf);
     try {
       Path file1 = new Path("build/test/data", "seektest.dat");
       writeFile(fileSys, file1);

+ 2 - 2
src/test/org/apache/hadoop/fs/DFSCIOTest.java

@@ -198,7 +198,7 @@ public class DFSCIOTest extends TestCase {
       totalSize *= MEGA;
       
       // create instance of local filesystem 
-      FileSystem localFS = FileSystem.getNamed("local", fsConfig);
+      FileSystem localFS = FileSystem.getLocal(fsConfig);
       
       try {
     	  	// native runtime
@@ -300,7 +300,7 @@ public class DFSCIOTest extends TestCase {
     	  totalSize *= MEGA;
       
       // create instance of local filesystem 
-      FileSystem localFS = FileSystem.getNamed("local", fsConfig);
+      FileSystem localFS = FileSystem.getLocal(fsConfig);
       
       try {
   	  	// native runtime

+ 1 - 1
src/test/org/apache/hadoop/fs/TestLocalFileSystem.java

@@ -44,7 +44,7 @@ public class TestLocalFileSystem extends TestCase {
    */
   public void testWorkingDirectory() throws IOException {
     Configuration conf = new Configuration();
-    FileSystem fileSys = FileSystem.getNamed("local", conf);
+    FileSystem fileSys = FileSystem.getLocal(conf);
     Path origDir = fileSys.getWorkingDirectory();
     Path subdir = new Path("build/test/data/work-dir/new subdir");
     try {

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestSequenceFileInputFilter.java

@@ -46,7 +46,7 @@ public class TestSequenceFileInputFilter extends TestCase {
   static {
       job.setInputPath(inDir);
       try {
-        fs = FileSystem.getNamed( "local", conf);
+        fs = FileSystem.getLocal(conf);
     } catch (IOException e) {
         e.printStackTrace();
         throw new RuntimeException(e);

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java

@@ -36,7 +36,7 @@ public class TestSequenceFileInputFormat extends TestCase {
 
   public void testFormat() throws Exception {
     JobConf job = new JobConf(conf);
-    FileSystem fs = FileSystem.getNamed("local", conf);
+    FileSystem fs = FileSystem.getLocal(conf);
     Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
     Path file = new Path(dir, "test.seq");
     

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestTextInputFormat.java

@@ -38,7 +38,7 @@ public class TestTextInputFormat extends TestCase {
   private static FileSystem localFs = null; 
   static {
     try {
-      localFs = FileSystem.getNamed("local", defaultConf);
+      localFs = FileSystem.getLocal(defaultConf);
     } catch (IOException e) {
       throw new RuntimeException("init failure", e);
     }

+ 1 - 1
src/test/org/apache/hadoop/record/test/TestWritable.java

@@ -42,7 +42,7 @@ public class TestWritable extends TestCase {
 
   public void testFormat() throws Exception {
     JobConf job = new JobConf(conf);
-    FileSystem fs = FileSystem.getNamed("local", conf);
+    FileSystem fs = FileSystem.getLocal(conf);
     Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
     Path file = new Path(dir, "test.seq");