Explorar o código

HADOOP-576. Enable contrib/streaming to use the file cache. Contributed by Mahadev.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@469635 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting %!s(int64=18) %!d(string=hai) anos
pai
achega
bf3d904abf

+ 4 - 0
CHANGES.txt

@@ -87,6 +87,10 @@ Trunk (unreleased changes)
     reduce tasks which use MapFile to still report progress while
     reduce tasks which use MapFile to still report progress while
     writing blocks to the filesystem.  (cutting)
     writing blocks to the filesystem.  (cutting)
 
 
+24. HADOOP-576.  Enable contrib/streaming to use the file cache.  Also
+    extend the cache to permit symbolic links to cached items, rather
+    than local file copies.  (Mahadev Konar via cutting)
+
 
 
 Release 0.7.2 - 2006-10-18
 Release 0.7.2 - 2006-10-18
 
 

+ 2 - 2
build.xml

@@ -350,8 +350,8 @@
 
 
   <target name="test-contrib" depends="compile-core, compile-core-test">
   <target name="test-contrib" depends="compile-core, compile-core-test">
     <subant target="test">
     <subant target="test">
-        <fileset file="${basedir}/src/contrib/build.xml"/>
-    </subant>  	
+       <fileset file="${basedir}/src/contrib/build.xml"/>
+    </subant> 
   </target>   
   </target>   
   
   
   <target name="test" depends="test-core, test-contrib">
   <target name="test" depends="test-core, test-contrib">

+ 9 - 4
src/contrib/build-contrib.xml

@@ -24,9 +24,10 @@
   <property name="build.dir" location="${hadoop.root}/build/contrib/${name}"/>
   <property name="build.dir" location="${hadoop.root}/build/contrib/${name}"/>
   <property name="build.classes" location="${build.dir}/classes"/>
   <property name="build.classes" location="${build.dir}/classes"/>
   <property name="build.test" location="${build.dir}/test"/>
   <property name="build.test" location="${build.dir}/test"/>
+  <property name="hadoop.log.dir" location="${build.dir}/test/logs"/>
   <!-- all jars together -->
   <!-- all jars together -->
   <property name="deploy.dir" location="${hadoop.root}/build/"/>
   <property name="deploy.dir" location="${hadoop.root}/build/"/>
-
+  <property name="minimr.dir" value="${hadoop.root}/build/minimr"/>
   <property name="javac.deprecation" value="off"/>
   <property name="javac.deprecation" value="off"/>
   <property name="javac.debug" value="on"/>
   <property name="javac.debug" value="on"/>
 
 
@@ -51,6 +52,7 @@
   <path id="test.classpath">
   <path id="test.classpath">
     <pathelement location="${build.test}" />
     <pathelement location="${build.test}" />
     <pathelement location="${hadoop.root}/build/test/classes"/>
     <pathelement location="${hadoop.root}/build/test/classes"/>
+    <pathelement location="${minimr.dir}" />
     <pathelement location="${hadoop.root}/src/test"/>
     <pathelement location="${hadoop.root}/src/test"/>
     <pathelement location="${conf.dir}"/>
     <pathelement location="${conf.dir}"/>
     <pathelement location="${hadoop.root}/build"/>
     <pathelement location="${hadoop.root}/build"/>
@@ -69,7 +71,8 @@
     <mkdir dir="${build.dir}"/>
     <mkdir dir="${build.dir}"/>
     <mkdir dir="${build.classes}"/>
     <mkdir dir="${build.classes}"/>
     <mkdir dir="${build.test}"/>
     <mkdir dir="${build.test}"/>
-
+    <mkdir dir="${hadoop.log.dir}"/>
+    <mkdir dir="${minimr.dir}"/>
     <antcall target="init-contrib"/>
     <antcall target="init-contrib"/>
   </target>
   </target>
 
 
@@ -131,8 +134,10 @@
   <!-- ================================================================== -->
   <!-- ================================================================== -->
   <target name="test" depends="compile-test, deploy" if="test.available">
   <target name="test" depends="compile-test, deploy" if="test.available">
     <echo message="contrib: ${name}"/>
     <echo message="contrib: ${name}"/>
+    <delete dir="${hadoop.log.dir}"/>
+    <mkdir dir="${hadoop.log.dir}"/>
     <junit
     <junit
-      printsummary="withOutAndErr" haltonfailure="no" fork="yes"
+      printsummary="withOutAndErr" showoutput="no" haltonfailure="no" fork="yes"
       errorProperty="tests.failed" failureProperty="tests.failed">
       errorProperty="tests.failed" failureProperty="tests.failed">
       
       
       <sysproperty key="test.build.data" value="${build.test}/data"/>
       <sysproperty key="test.build.data" value="${build.test}/data"/>
@@ -145,7 +150,7 @@
       
       
       <sysproperty key="fs.default.name" value="${fs.default.name}"/>
       <sysproperty key="fs.default.name" value="${fs.default.name}"/>
       <sysproperty key="hadoop.test.localoutputfile" value="${hadoop.test.localoutputfile}"/>
       <sysproperty key="hadoop.test.localoutputfile" value="${hadoop.test.localoutputfile}"/>
-   
+      <sysproperty key="hadoop.log.dir" value="${hadoop.log.dir}"/> 
       <classpath refid="test.classpath"/>
       <classpath refid="test.classpath"/>
       <formatter type="plain" />
       <formatter type="plain" />
       <batchtest todir="${build.test}" unless="testcase">
       <batchtest todir="${build.test}" unless="testcase">

+ 6 - 2
src/contrib/streaming/src/java/org/apache/hadoop/streaming/Environment.java

@@ -22,8 +22,12 @@ import java.io.*;
 import java.net.InetAddress;
 import java.net.InetAddress;
 import java.util.*;
 import java.util.*;
 
 
-/*
- * If we move to Java 1.5, we can get rid of this class and just use System.getenv
+/**
+ * This is a class used to get the current environment
+ * on the host machines running the map/reduce. This class
+ * assumes that setting the environment in streaming is 
+ * allowed on windows/ix/linuz/freebsd/sunos/solaris/hp-ux
+ * @author michel
  */
  */
 public class Environment extends Properties {
 public class Environment extends Properties {
 
 

+ 2 - 0
src/contrib/streaming/src/java/org/apache/hadoop/streaming/HadoopStreaming.java

@@ -21,6 +21,8 @@ package org.apache.hadoop.streaming;
 import java.io.IOException;
 import java.io.IOException;
 
 
 /** The main entrypoint. Usually invoked with the script bin/hadoopStreaming
 /** The main entrypoint. Usually invoked with the script bin/hadoopStreaming
+ * or bin/hadoop har hadoop-streaming.jar args.
+ * It passes all the args to StreamJob which handles all the arguments.
  */
  */
 public class HadoopStreaming {
 public class HadoopStreaming {
 
 

+ 10 - 0
src/contrib/streaming/src/java/org/apache/hadoop/streaming/JarBuilder.java

@@ -23,6 +23,16 @@ import java.util.*;
 import java.util.jar.*;
 import java.util.jar.*;
 import java.util.zip.ZipException;
 import java.util.zip.ZipException;
 
 
+/**
+ * This class is the main class for generating job.jar
+ * for Hadoop Streaming jobs. It includes the files specified 
+ * with the -file option and includes them in the jar. Also,
+ * hadoop-streaming is a user level appplication, so all the classes
+ * with hadoop-streaming that are needed in the job are also included
+ * in the job.jar.
+ * @author michel
+ *
+ */
 public class JarBuilder {
 public class JarBuilder {
 
 
   public JarBuilder() {
   public JarBuilder() {

+ 3 - 1
src/contrib/streaming/src/java/org/apache/hadoop/streaming/MustangFile.java

@@ -30,7 +30,9 @@ import java.util.*;
  *
  *
  * Note: not specifying ownerOnly maps to ownerOnly = false
  * Note: not specifying ownerOnly maps to ownerOnly = false
  * From man chmod: If no user specs are given, the effect is as if `a' were given. 
  * From man chmod: If no user specs are given, the effect is as if `a' were given. 
- * 
+ * This class is mainly used to change permissions when files are unjarred from the 
+ * job.jar. The executable specified in the mappper/reducer is set to be executable 
+ * using this class.
  */
  */
 public class MustangFile extends File {
 public class MustangFile extends File {
 
 

+ 76 - 11
src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java

@@ -39,7 +39,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.RunningJob;
-
+import org.apache.hadoop.filecache.*;
+import org.apache.hadoop.util.*;
 /** All the client-side work happens here.
 /** All the client-side work happens here.
  * (Jar packaging, MapRed job submission and monitoring)
  * (Jar packaging, MapRed job submission and monitoring)
  * @author Michel Tourn
  * @author Michel Tourn
@@ -54,7 +55,13 @@ public class StreamJob {
     argv_ = argv;
     argv_ = argv;
     mayExit_ = mayExit;
     mayExit_ = mayExit;
   }
   }
-
+  
+  /**
+   * This is the method that actually 
+   * intializes the job conf and submits the job
+   * to the jobtracker
+   * @throws IOException
+   */
   public void go() throws IOException {
   public void go() throws IOException {
     init();
     init();
 
 
@@ -65,7 +72,7 @@ public class StreamJob {
     setJobConf();
     setJobConf();
     submitAndMonitorJob();
     submitAndMonitorJob();
   }
   }
-
+  
   protected void init() {
   protected void init() {
     try {
     try {
       env_ = new Environment();
       env_ = new Environment();
@@ -157,6 +164,10 @@ public class StreamJob {
     return new File(getHadoopClientHome() + "/conf", hadoopAliasConf_).getAbsolutePath();
     return new File(getHadoopClientHome() + "/conf", hadoopAliasConf_).getAbsolutePath();
   }
   }
 
 
+  /**
+   * This method parses the command line args
+   * to a hadoop streaming job
+   */
   void parseArgv() {
   void parseArgv() {
     if (argv_.length == 0) {
     if (argv_.length == 0) {
       exitUsage(false);
       exitUsage(false);
@@ -219,7 +230,22 @@ public class StreamJob {
       } else if ((s = optionArg(argv_, i, "-inputreader", inReaderSpec_ != null)) != null) {
       } else if ((s = optionArg(argv_, i, "-inputreader", inReaderSpec_ != null)) != null) {
         i++;
         i++;
         inReaderSpec_ = s;
         inReaderSpec_ = s;
-      } else {
+      } else if((s = optionArg(argv_, i, "-cacheArchive", false)) != null) {
+    	  i++;
+    	  if (cacheArchives == null)
+    		  cacheArchives = s;
+    	  else
+    		  cacheArchives = cacheArchives + "," + s;    	  
+      } else if((s = optionArg(argv_, i, "-cacheFile", false)) != null) {
+        i++;
+        System.out.println(" the val of s is " + s);
+        if (cacheFiles == null)
+          cacheFiles = s;
+        else
+          cacheFiles = cacheFiles + "," + s;
+        System.out.println(" the val of cachefiles is " + cacheFiles);
+      }
+      else {
         System.err.println("Unexpected argument: " + argv_[i]);
         System.err.println("Unexpected argument: " + argv_[i]);
         exitUsage(false);
         exitUsage(false);
       }
       }
@@ -269,6 +295,8 @@ public class StreamJob {
     System.out.println("  -inputreader <spec>  Optional.");
     System.out.println("  -inputreader <spec>  Optional.");
     System.out.println("  -jobconf  <n>=<v>    Optional. Add or override a JobConf property");
     System.out.println("  -jobconf  <n>=<v>    Optional. Add or override a JobConf property");
     System.out.println("  -cmdenv   <n>=<v>    Optional. Pass env.var to streaming commands");
     System.out.println("  -cmdenv   <n>=<v>    Optional. Pass env.var to streaming commands");
+    System.out.println("  -cacheFile fileNameURI");
+    System.out.println("  -cacheArchive fileNameURI");
     System.out.println("  -verbose");
     System.out.println("  -verbose");
     System.out.println();
     System.out.println();
     if (!detailed) {
     if (!detailed) {
@@ -392,7 +420,7 @@ public class StreamJob {
     // $HADOOP_HOME/bin/hadoop jar /not/first/on/classpath/custom-hadoop-streaming.jar
     // $HADOOP_HOME/bin/hadoop jar /not/first/on/classpath/custom-hadoop-streaming.jar
     // where findInClasspath() would find the version of hadoop-streaming.jar in $HADOOP_HOME
     // where findInClasspath() would find the version of hadoop-streaming.jar in $HADOOP_HOME
     String runtimeClasses = userJobConfProps_.get("stream.shipped.hadoopstreaming"); // jar or class dir
     String runtimeClasses = userJobConfProps_.get("stream.shipped.hadoopstreaming"); // jar or class dir
-System.out.println(runtimeClasses + "=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming");
+    System.out.println(runtimeClasses + "=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming");
     
     
     if (runtimeClasses == null) {
     if (runtimeClasses == null) {
       runtimeClasses = StreamUtil.findInClasspath(StreamJob.class.getName());
       runtimeClasses = StreamUtil.findInClasspath(StreamJob.class.getName());
@@ -435,6 +463,11 @@ System.out.println(runtimeClasses + "=@@@userJobConfProps_.get(stream.shipped.ha
     return jobJarName;
     return jobJarName;
   }
   }
 
 
+  /**
+   * This method sets the user jobconf variable specified
+   * by user using -jobconf key=value
+   * @param doEarlyProps
+   */
   protected void setUserJobConfProps(boolean doEarlyProps) {
   protected void setUserJobConfProps(boolean doEarlyProps) {
     Iterator it = userJobConfProps_.keySet().iterator();
     Iterator it = userJobConfProps_.keySet().iterator();
     while (it.hasNext()) {
     while (it.hasNext()) {
@@ -448,7 +481,17 @@ System.out.println(runtimeClasses + "=@@@userJobConfProps_.get(stream.shipped.ha
       }
       }
     }
     }
   }
   }
-
+  
+  /**
+   * get the uris of all the files/caches
+   */
+  protected void getURIs(String lcacheArchives, String lcacheFiles) {
+    String archives[] = StringUtils.getStrings(lcacheArchives);
+    String files[] = StringUtils.getStrings(lcacheFiles);
+    fileURIs = StringUtils.stringToURI(files);
+    archiveURIs = StringUtils.stringToURI(archives);
+  }
+  
   protected void setJobConf() throws IOException {
   protected void setJobConf() throws IOException {
     msg("hadoopAliasConf_ = " + hadoopAliasConf_);
     msg("hadoopAliasConf_ = " + hadoopAliasConf_);
     config_ = new Configuration();
     config_ = new Configuration();
@@ -548,7 +591,8 @@ System.out.println(runtimeClasses + "=@@@userJobConfProps_.get(stream.shipped.ha
         jobConf_.set(k, v);
         jobConf_.set(k, v);
       }
       }
     }
     }
-
+    
+    setUserJobConfProps(false);
     // output setup is done late so we can customize for reducerNone_
     // output setup is done late so we can customize for reducerNone_
     //jobConf_.setOutputDir(new File(output_));
     //jobConf_.setOutputDir(new File(output_));
     setOutputSpec();
     setOutputSpec();
@@ -561,20 +605,36 @@ System.out.println(runtimeClasses + "=@@@userJobConfProps_.get(stream.shipped.ha
 
 
     // last, allow user to override anything
     // last, allow user to override anything
     // (although typically used with properties we didn't touch)
     // (although typically used with properties we didn't touch)
-    setUserJobConfProps(false);
 
 
     jar_ = packageJobJar();
     jar_ = packageJobJar();
     if (jar_ != null) {
     if (jar_ != null) {
       jobConf_.setJar(jar_);
       jobConf_.setJar(jar_);
     }
     }
-
+    
+    if ((cacheArchives != null) || (cacheFiles != null)){
+      getURIs(cacheArchives, cacheFiles);
+      boolean b = DistributedCache.checkURIs(fileURIs, archiveURIs);
+      if (!b)
+        fail(LINK_URI);
+      DistributedCache.createSymlink(jobConf_);
+    }
+    // set the jobconf for the caching parameters
+    if (cacheArchives != null)
+      DistributedCache.setCacheArchives(archiveURIs, jobConf_);
+    if (cacheFiles != null)
+      DistributedCache.setCacheFiles(fileURIs, jobConf_);
+    
     if(verbose_) {
     if(verbose_) {
       listJobConfProperties();
       listJobConfProperties();
     }
     }
-    
+   
     msg("submitting to jobconf: " + getJobTrackerHostPort());
     msg("submitting to jobconf: " + getJobTrackerHostPort());
   }
   }
 
 
+  /**
+   * Prints out the jobconf properties on stdout
+   * when verbose is specified.
+   */
   protected void listJobConfProperties()
   protected void listJobConfProperties()
   {
   {
     msg("==== JobConf properties:");
     msg("==== JobConf properties:");
@@ -765,6 +825,10 @@ System.out.println(runtimeClasses + "=@@@userJobConfProps_.get(stream.shipped.ha
   protected String comCmd_;
   protected String comCmd_;
   protected String redCmd_;
   protected String redCmd_;
   protected String cluster_;
   protected String cluster_;
+  protected String cacheFiles;
+  protected String cacheArchives;
+  protected URI[] fileURIs;
+  protected URI[] archiveURIs;
   protected ArrayList configPath_ = new ArrayList(); // <String>
   protected ArrayList configPath_ = new ArrayList(); // <String>
   protected String hadoopAliasConf_;
   protected String hadoopAliasConf_;
   protected String inReaderSpec_;
   protected String inReaderSpec_;
@@ -780,5 +844,6 @@ System.out.println(runtimeClasses + "=@@@userJobConfProps_.get(stream.shipped.ha
 
 
   protected RunningJob running_;
   protected RunningJob running_;
   protected String jobId_;
   protected String jobId_;
-
+  protected static String LINK_URI = "You need to specify the uris as hdfs://host:port/#linkname," +
+      "Please specify a different link name for all of your caching URIs";
 }
 }

+ 126 - 0
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java

@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+/**
+ * This test case tests the symlink creation
+ * utility provided by distributed caching 
+ * @author mahadev
+ *
+ */
+public class TestSymLink extends TestCase
+{
+  String INPUT_FILE = "/testing-streaming/input.txt";
+  String OUTPUT_DIR = "/testing-streaming/out";
+  String CACHE_FILE = "/testing-streaming/cache.txt";
+  String input = "check to see if we can read this none reduce";
+  String map = "xargs cat ";
+  String reduce = "cat";
+  String mapString = "testlink\n";
+  String cacheString = "This is just the cache string";
+  StreamJob job;
+
+  public TestSymLink() throws IOException
+  {
+  }
+
+  public void testSymLink()
+  {
+    try {
+      boolean mayExit = false;
+      int jobTrackerPort = 60050;
+      MiniMRCluster mr = null;
+      MiniDFSCluster dfs = null; 
+      FileSystem fileSys = null;
+      try{
+        Configuration conf = new Configuration();
+        dfs = new MiniDFSCluster(8050, conf, false);
+        fileSys = dfs.getFileSystem();
+        String namenode = fileSys.getName();
+        mr  = new MiniMRCluster(jobTrackerPort, 60060, 1, namenode, true, 3);
+        // During tests, the default Configuration will use a local mapred
+        // So don't specify -config or -cluster
+        String strJobtracker = "mapred.job.tracker=" + "localhost:" + jobTrackerPort;
+        String strNamenode = "fs.default.name=" + namenode;
+        String argv[] = new String[] {
+            "-input", INPUT_FILE,
+            "-output", OUTPUT_DIR,
+            "-mapper", map,
+            "-reducer", reduce,
+            //"-verbose",
+            //"-jobconf", "stream.debug=set"
+            "-jobconf", strNamenode,
+            "-jobconf", strJobtracker,
+            "-cacheFile", "dfs://"+fileSys.getName()+CACHE_FILE + "#testlink"
+        };
+
+        fileSys.delete(new Path(OUTPUT_DIR));
+        fileSys.mkdirs(new Path(OUTPUT_DIR));
+        
+        DataOutputStream file = fileSys.create(new Path(INPUT_FILE));
+        file.writeBytes(mapString);
+        file.close();
+        file = fileSys.create(new Path(CACHE_FILE));
+        file.writeBytes(cacheString);
+        file.close();
+          
+        job = new StreamJob(argv, mayExit);      
+        job.go();
+        String line = null;
+        Path[] fileList = fileSys.listPaths(new Path(OUTPUT_DIR));
+        for (int i = 0; i < fileList.length; i++){
+          System.out.println(fileList[i].toString());
+          BufferedReader bread =
+            new BufferedReader(new InputStreamReader(fileSys.open(fileList[i])));
+          line = bread.readLine();
+          System.out.println(line);
+        }
+        assertEquals(cacheString + "\t", line);
+      } finally{
+        if (fileSys != null) { fileSys.close(); }
+        if (dfs != null) { dfs.shutdown(); }
+        if (mr != null) { mr.shutdown();}
+      }
+      
+    } catch(Exception e) {
+      failTrace(e);
+    }
+  }
+
+  void failTrace(Exception e)
+  {
+    StringWriter sw = new StringWriter();
+    e.printStackTrace(new PrintWriter(sw));
+    fail(sw.toString());
+  }
+
+  public static void main(String[]args) throws Exception
+  {
+    new TestStreaming().testCommandLine();
+  }
+
+}

+ 1 - 8
src/java/org/apache/hadoop/conf/Configuration.java

@@ -302,14 +302,7 @@ public class Configuration {
    */
    */
   public String[] getStrings(String name) {
   public String[] getStrings(String name) {
     String valueString = get(name);
     String valueString = get(name);
-    if (valueString == null)
-      return null;
-    StringTokenizer tokenizer = new StringTokenizer (valueString,",");
-    List values = new ArrayList();
-    while (tokenizer.hasMoreTokens()) {
-      values.add(tokenizer.nextToken());
-    }
-    return (String[])values.toArray(new String[values.size()]);
+    return StringUtils.getStrings(valueString);
   }
   }
 
 
   /**
   /**

+ 100 - 10
src/java/org/apache/hadoop/filecache/DistributedCache.java

@@ -43,7 +43,7 @@ public class DistributedCache {
   /**
   /**
    * 
    * 
    * @param cache the cache to be localized, this should be specified as 
    * @param cache the cache to be localized, this should be specified as 
-   * new URI(dfs://hostname:port/absoulte_path_to_file). If no schema 
+   * new URI(dfs://hostname:port/absoulte_path_to_file#LINKNAME). If no schema 
    * or hostname:port is provided the file is assumed to be in the filesystem
    * or hostname:port is provided the file is assumed to be in the filesystem
    * being used in the Configuration
    * being used in the Configuration
    * @param conf The Confguration file which contains the filesystem
    * @param conf The Confguration file which contains the filesystem
@@ -55,12 +55,14 @@ public class DistributedCache {
    * @param md5 this is a mere checksum to verufy if you are using the right cache. 
    * @param md5 this is a mere checksum to verufy if you are using the right cache. 
    * You need to pass the md5 of the crc file in DFS. This is matched against the one
    * You need to pass the md5 of the crc file in DFS. This is matched against the one
    * calculated in this api and if it does not match, the cache is not localized.
    * calculated in this api and if it does not match, the cache is not localized.
+   * @param currentWorkDir this is the directory where you would want to create symlinks 
+   * for the locally cached files/archives
    * @return the path to directory where the archives are unjarred in case of archives,
    * @return the path to directory where the archives are unjarred in case of archives,
    * the path to the file where the file is copied locally 
    * the path to the file where the file is copied locally 
    * @throws IOException
    * @throws IOException
    */
    */
   public static Path getLocalCache(URI cache, Configuration conf, Path baseDir,
   public static Path getLocalCache(URI cache, Configuration conf, Path baseDir,
-      boolean isArchive, String md5) throws IOException {
+      boolean isArchive, String md5, Path currentWorkDir) throws IOException {
     String cacheId = makeRelative(cache, conf);
     String cacheId = makeRelative(cache, conf);
     CacheStatus lcacheStatus;
     CacheStatus lcacheStatus;
     Path localizedPath;
     Path localizedPath;
@@ -80,7 +82,7 @@ public class DistributedCache {
       }
       }
     }
     }
     synchronized (lcacheStatus) {
     synchronized (lcacheStatus) {
-      localizedPath = localizeCache(cache, lcacheStatus, conf, isArchive, md5);
+      localizedPath = localizeCache(cache, lcacheStatus, conf, isArchive, md5, currentWorkDir);
     }
     }
     // try deleting stuff if you can
     // try deleting stuff if you can
     long size = FileUtil.getDU(new File(baseDir.toString()));
     long size = FileUtil.getDU(new File(baseDir.toString()));
@@ -157,15 +159,26 @@ public class DistributedCache {
 
 
   // the methoed which actually copies the caches locally and unjars/unzips them
   // the methoed which actually copies the caches locally and unjars/unzips them
   private static Path localizeCache(URI cache, CacheStatus cacheStatus,
   private static Path localizeCache(URI cache, CacheStatus cacheStatus,
-      Configuration conf, boolean isArchive, String md5) throws IOException {
+      Configuration conf, boolean isArchive, String md5, Path currentWorkDir) throws IOException {
     boolean b = true;
     boolean b = true;
+    boolean doSymlink = getSymlink(conf);
     FileSystem dfs = getFileSystem(cache, conf);
     FileSystem dfs = getFileSystem(cache, conf);
     b = ifExistsAndFresh(cacheStatus, cache, dfs, md5, conf);
     b = ifExistsAndFresh(cacheStatus, cache, dfs, md5, conf);
     if (b) {
     if (b) {
-      if (isArchive)
+      if (isArchive) {
+        if (doSymlink)
+        FileUtil.symLink(cacheStatus.localLoadPath.toString(), 
+            currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment());
+        
         return cacheStatus.localLoadPath;
         return cacheStatus.localLoadPath;
-      else
+      }
+      else {
+        if (doSymlink)
+          FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), 
+              currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment());
+       
         return cacheFilePath(cacheStatus.localLoadPath);
         return cacheFilePath(cacheStatus.localLoadPath);
+      }
     } else {
     } else {
       // remove the old archive
       // remove the old archive
       // if the old archive cannot be removed since it is being used by another
       // if the old archive cannot be removed since it is being used by another
@@ -179,7 +192,6 @@ public class DistributedCache {
       localFs.delete(cacheStatus.localLoadPath);
       localFs.delete(cacheStatus.localLoadPath);
       Path parchive = new Path(cacheStatus.localLoadPath,
       Path parchive = new Path(cacheStatus.localLoadPath,
                                new Path(cacheStatus.localLoadPath.getName()));
                                new Path(cacheStatus.localLoadPath.getName()));
-
       localFs.mkdirs(cacheStatus.localLoadPath);
       localFs.mkdirs(cacheStatus.localLoadPath);
       String cacheId = cache.getPath();
       String cacheId = cache.getPath();
       dfs.copyToLocalFile(new Path(cacheId), parchive);
       dfs.copyToLocalFile(new Path(cacheId), parchive);
@@ -199,14 +211,23 @@ public class DistributedCache {
         // else will not do anyhting
         // else will not do anyhting
         // and copy the file into the dir as it is
         // and copy the file into the dir as it is
       }
       }
+      // create a symlink if #NAME is specified as fragment in the
+      // symlink
       cacheStatus.currentStatus = true;
       cacheStatus.currentStatus = true;
       cacheStatus.md5 = checkSum;
       cacheStatus.md5 = checkSum;
     }
     }
-    if (isArchive)
+    if (isArchive){
+      if (doSymlink)
+        FileUtil.symLink(cacheStatus.localLoadPath.toString(), 
+            currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment());
       return cacheStatus.localLoadPath;
       return cacheStatus.localLoadPath;
-    else
+    }
+    else {
+      if (doSymlink)
+        FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), 
+            currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment());
       return cacheFilePath(cacheStatus.localLoadPath);
       return cacheFilePath(cacheStatus.localLoadPath);
-
+    }
   }
   }
 
 
   // Checks if the cache has already been localized and is fresh
   // Checks if the cache has already been localized and is fresh
@@ -452,6 +473,75 @@ public class DistributedCache {
     conf.set("mapred.cache.files", files == null ? uri.toString() : files + ","
     conf.set("mapred.cache.files", files == null ? uri.toString() : files + ","
         + uri.toString());
         + uri.toString());
   }
   }
+  
+  /**
+   * This method allows you to create symlinks in the current working directory
+   * of the task to all the cache files/archives
+   * @param conf the jobconf 
+   */
+  public static void createSymlink(Configuration conf){
+    conf.set("mapred.create.symlink", "yes");
+  }
+  
+  /**
+   * This method checks to see if symlinks are to be create for the 
+   * localized cache files in the current working directory 
+   * @param conf the jobconf
+   * @return true if symlinks are to be created- else return false
+   */
+  public static boolean getSymlink(Configuration conf){
+    String result = conf.get("mapred.create.symlink");
+    if ("yes".equals(result)){
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * This method checks if there is a conflict in the fragment names 
+   * of the uris. Also makes sure that each uri has a fragment. It 
+   * is only to be called if you want to create symlinks for 
+   * the various archives and files.
+   * @param uriFiles The uri array of urifiles
+   * @param uriArchives the uri array of uri archives
+   */
+  public static boolean checkURIs(URI[]  uriFiles, URI[] uriArchives){
+    if ((uriFiles == null) && (uriArchives == null)){
+      return true;
+    }
+    if (uriFiles != null){
+      for (int i = 0; i < uriFiles.length; i++){
+        String frag1 = uriFiles[i].getFragment();
+        if (frag1 == null)
+          return false;
+        for (int j=i+1; j < uriFiles.length; i++){
+          String frag2 = uriFiles[j].getFragment();
+          if (frag2 == null)
+            return false;
+          if (frag1.equalsIgnoreCase(frag2))
+            return false;
+        }
+        if (uriArchives != null){
+          for (int j = 0; j < uriArchives.length; j++){
+            String frag2 = uriArchives[j].getFragment();
+            if (frag2 == null){
+              return false;
+            }
+            if (frag1.equalsIgnoreCase(frag2))
+              return false;
+            for (int k=j+1; k < uriArchives.length; k++){
+              String frag3 = uriArchives[k].getFragment();
+              if (frag3 == null)
+                return false;
+              if (frag2.equalsIgnoreCase(frag3))
+                  return false;
+            }
+          }
+        }
+      }
+    }
+    return true;
+  }
 
 
   private static class CacheStatus {
   private static class CacheStatus {
     // false, not loaded yet, true is loaded
     // false, not loaded yet, true is loaded

+ 19 - 0
src/java/org/apache/hadoop/fs/FileUtil.java

@@ -301,4 +301,23 @@ public class FileUtil {
       zipFile.close();
       zipFile.close();
     }
     }
   }
   }
+  
+  /**
+   * Create a soft link between a src and destination
+   * only on a local disk. HDFS does not support this
+   * @param target the target for symlink 
+   * @param destination the symlink
+   * @return value returned by the command
+   */
+  public static int symLink(String target, String linkname) throws IOException{
+   String cmd = "ln -s " + target + " " + linkname;
+   Process p = Runtime.getRuntime().exec( cmd, null );
+   int returnVal = -1;
+   try{
+     returnVal = p.waitFor();
+   } catch(InterruptedException e){
+     //do nothing as of yet
+   }
+   return returnVal;
+ }
 }
 }

+ 4 - 4
src/java/org/apache/hadoop/mapred/TaskRunner.java

@@ -80,7 +80,7 @@ abstract class TaskRunner extends Thread {
       
       
       //before preparing the job localize 
       //before preparing the job localize 
       //all the archives
       //all the archives
-      
+      File workDir = new File(new File(t.getJobFile()).getParentFile().getParent(), "work");
       URI[] archives = DistributedCache.getCacheArchives(conf);
       URI[] archives = DistributedCache.getCacheArchives(conf);
       URI[] files = DistributedCache.getCacheFiles(conf);
       URI[] files = DistributedCache.getCacheFiles(conf);
       if ((archives != null) || (files != null)) {
       if ((archives != null) || (files != null)) {
@@ -88,7 +88,8 @@ abstract class TaskRunner extends Thread {
           String[] md5 = DistributedCache.getArchiveMd5(conf);
           String[] md5 = DistributedCache.getArchiveMd5(conf);
           Path[] p = new Path[archives.length];
           Path[] p = new Path[archives.length];
           for (int i = 0; i < archives.length;i++){
           for (int i = 0; i < archives.length;i++){
-            p[i] = DistributedCache.getLocalCache(archives[i], conf, conf.getLocalPath(TaskTracker.getCacheSubdir()), true, md5[i]);
+            p[i] = DistributedCache.getLocalCache(archives[i], conf, 
+                conf.getLocalPath(TaskTracker.getCacheSubdir()), true, md5[i], new Path(workDir.getAbsolutePath()));
           }
           }
           DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
           DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
         }
         }
@@ -97,7 +98,7 @@ abstract class TaskRunner extends Thread {
           Path[] p = new Path[files.length];
           Path[] p = new Path[files.length];
           for (int i = 0; i < files.length;i++){
           for (int i = 0; i < files.length;i++){
            p[i] = DistributedCache.getLocalCache(files[i], conf, conf.getLocalPath(TaskTracker
            p[i] = DistributedCache.getLocalCache(files[i], conf, conf.getLocalPath(TaskTracker
-              .getCacheSubdir()), false, md5[i]);
+              .getCacheSubdir()), false, md5[i], new Path(workDir.getAbsolutePath()));
           }
           }
           DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
           DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
         }
         }
@@ -123,7 +124,6 @@ abstract class TaskRunner extends Thread {
       // start with same classpath as parent process
       // start with same classpath as parent process
       classPath.append(System.getProperty("java.class.path"));
       classPath.append(System.getProperty("java.class.path"));
       classPath.append(sep);
       classPath.append(sep);
-      File workDir = new File(new File(t.getJobFile()).getParentFile().getParent(), "work");
       workDir.mkdirs();
       workDir.mkdirs();
 	  
 	  
       String jar = conf.getJar();
       String jar = conf.getJar();

+ 19 - 1
src/java/org/apache/hadoop/util/StringUtils.java

@@ -24,7 +24,10 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URISyntaxException;
 import java.text.DateFormat;
 import java.text.DateFormat;
 import java.text.DecimalFormat;
 import java.text.DecimalFormat;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.Date;
+import java.util.List;
+import java.util.StringTokenizer;
 
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.*;
 
 
@@ -253,4 +256,19 @@ public class StringUtils {
     return buf.toString();
     return buf.toString();
   }
   }
   
   
-}
+  /**
+   * returns an arraylist of strings  
+   * @param str the comma seperated string values
+   * @return the arraylist of the comma seperated string values
+   */
+  public static String[] getStrings(String str){
+    if (str == null)
+      return null;
+    StringTokenizer tokenizer = new StringTokenizer (str,",");
+    List values = new ArrayList();
+    while (tokenizer.hasMoreTokens()) {
+      values.add(tokenizer.nextToken());
+    }
+    return (String[])values.toArray(new String[values.size()]);
+  }
+}