Prechádzať zdrojové kódy

HADOOP-2735. Enables setting TMPDIR for tasks. Contributed by Amareshwari Sri Ramadasu.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@628284 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 rokov pred
rodič
commit
222589c925

+ 3 - 0
CHANGES.txt

@@ -97,6 +97,9 @@ Release 0.16.1 - Unrelease
     HADOOP-2811. Dump of counters in job history does not add comma between
     groups. (runping via omalley)
 
+    HADOOP-2735. Enables setting TMPDIR for tasks. 
+    (Amareshwari Sri Ramadasu via ddas)
+
 Release 0.16.0 - 2008-02-07
 
   INCOMPATIBLE CHANGES

+ 12 - 0
conf/hadoop-default.xml

@@ -681,6 +681,18 @@ creations/deletions), or "all".</description>
   </description>
 </property>
 
+<property>
+  <name>mapred.child.tmp</name>
+  <value>./tmp</value>
+  <description> To set the value of tmp directory for map and reduce tasks.
+  If the value is an absolute path, it is directly assigned. Otherwise, it is
+  prepended with task's working directory. The java tasks are executed with
+  option -Djava.io.tmpdir='the absolute path of the tmp dir'. Pipes and
+  streaming are set with environment variable,
+   TMPDIR='the absolute path of the tmp dir'
+  </description>
+</property>
+
 <property>
   <name>mapred.inmem.merge.threshold</name>
   <value>1000</value>

+ 2 - 0
src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java

@@ -161,6 +161,8 @@ public abstract class PipeMapRed {
       Environment childEnv = (Environment) StreamUtil.env().clone();
       addJobConfToEnvironment(job_, childEnv);
       addEnvironment(childEnv, job_.get("stream.addenvironment"));
+      // add TMPDIR environment variable with the value of java.io.tmpdir
+      envPut(childEnv, "TMPDIR", System.getProperty("java.io.tmpdir"));
       sim = Runtime.getRuntime().exec(argvSplit, childEnv.toArray());
 
       /* // This way required jdk1.5

+ 15 - 0
src/java/org/apache/hadoop/mapred/TaskRunner.java

@@ -323,6 +323,21 @@ abstract class TaskRunner extends Thread {
         vargs.add(javaOptsSplit[i]);
       }
 
+      // add java.io.tmpdir given by mapred.child.tmp
+      String tmp = conf.get("mapred.child.tmp", "./tmp");
+      Path tmpDir = new Path(tmp);
+      
+      // if temp directory path is not absolute 
+      // prepend it with workDir.
+      if (!tmpDir.isAbsolute()) {
+        tmpDir = new Path(workDir.toString(), tmp);
+      }
+      FileSystem localFs = FileSystem.getLocal(conf);
+      if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {
+        throw new IOException("Mkdirs failed to create " + tmpDir.toString());
+      }
+      vargs.add("-Djava.io.tmpdir=" + tmpDir.toString());
+
       // Add classpath.
       vargs.add("-classpath");
       vargs.add(classPath.toString());

+ 2 - 0
src/java/org/apache/hadoop/mapred/pipes/Application.java

@@ -69,6 +69,8 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
               ) throws IOException, InterruptedException {
     serverSocket = new ServerSocket(0);
     Map<String, String> env = new HashMap<String,String>();
+    // add TMPDIR environment variable with the value of java.io.tmpdir
+    env.put("TMPDIR", System.getProperty("java.io.tmpdir"));
     env.put("hadoop.pipes.command.port", 
             Integer.toString(serverSocket.getLocalPort()));
     List<String> cmd = new ArrayList<String>();

+ 188 - 0
src/test/org/apache/hadoop/mapred/TestMiniMRTaskTempDir.java

@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+
+/**
+ * Class to test mapred task's temp directory
+ */
+public class TestMiniMRTaskTempDir extends TestCase {
+  private static final Log LOG =
+    LogFactory.getLog(TestMiniMRTaskTempDir.class.getName());
+
+  private MiniMRCluster mr;
+  private MiniDFSCluster dfs;
+  private FileSystem fileSys;
+  
+  /**
+   * Map class which checks whether temp directory exists
+   * and check the value of java.io.tmpdir
+   * Creates a tempfile and checks whether that is created in 
+   * temp directory specified.
+   */
+  public static class MapClass extends MapReduceBase
+  implements Mapper<LongWritable, Text, Text, IntWritable> {
+	 Path tmpDir;
+	 FileSystem localFs;
+     public void map (LongWritable key, Text value, 
+                     OutputCollector<Text, IntWritable> output, 
+                     Reporter reporter) throws IOException {
+       String tmp = null;
+       if (localFs.exists(tmpDir)) {
+         tmp = tmpDir.makeQualified(localFs).toString();
+
+         assertEquals(tmp, new Path(System.getProperty("java.io.tmpdir")).
+                                           makeQualified(localFs).toString());
+       } else {
+         fail("Temp directory "+tmpDir +" doesnt exist.");
+       }
+       File tmpFile = File.createTempFile("test", ".tmp");
+       assertEquals(tmp, new Path(tmpFile.getParent()).
+                                           makeQualified(localFs).toString());
+     }
+     public void configure(JobConf job) {
+       tmpDir = new Path(job.get("mapred.child.tmp", "./tmp"));
+       try {
+         localFs = FileSystem.getLocal(job);
+       } catch (IOException ioe) {
+         ioe.printStackTrace();
+         fail("IOException in getting localFS");
+       }
+     }
+  }
+
+  /**
+   * Launch tests 
+   * @param conf Configuration of the mapreduce job.
+   * @param inDir input path
+   * @param outDir output path
+   * @param input Input text
+   * @throws IOException
+   */
+  public void launchTest(JobConf conf,
+                         Path inDir,
+                         Path outDir,
+                         String input)
+  throws IOException {
+
+    // set up the input file system and write input text.
+    FileSystem inFs = inDir.getFileSystem(conf);
+    FileSystem outFs = outDir.getFileSystem(conf);
+    outFs.delete(outDir);
+    if (!inFs.mkdirs(inDir)) {
+      throw new IOException("Mkdirs failed to create " + inDir.toString());
+    }
+    {
+      // write input into input file
+      DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
+      file.writeBytes(input);
+      file.close();
+    }
+
+    // configure the mapred Job which creates a tempfile in map.
+    conf.setJobName("testmap");
+    conf.setMapperClass(MapClass.class);        
+    conf.setReducerClass(IdentityReducer.class);
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(0);
+    conf.setInputPath(inDir);
+    conf.setOutputPath(outDir);
+    String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
+                                      "/tmp")).toString().replace(' ', '+');
+    conf.set("test.build.data", TEST_ROOT_DIR);
+
+    // Launch job with default option for temp dir. 
+    // i.e. temp dir is ./tmp 
+    JobClient.runJob(conf);
+    outFs.delete(outDir);
+
+    // Launch job by giving relative path to temp dir.
+    conf.set("mapred.child.tmp", "../temp");
+    JobClient.runJob(conf);
+    outFs.delete(outDir);
+
+    // Launch job by giving absolute path to temp dir
+    conf.set("mapred.child.tmp", "/tmp");
+    JobClient.runJob(conf);
+    outFs.delete(outDir);
+  }
+
+  /**
+   * Tests task's temp directory.
+   * 
+   * In this test, we give different values to mapred.child.tmp
+   * both relative and absolute. And check whether the temp directory 
+   * is created. We also check whether java.io.tmpdir value is same as 
+   * the directory specified. We create a temp file and check if is is 
+   * created in the directory specified.
+   */
+  public void testTaskTempDir(){
+    try {
+      
+      // create configuration, dfs, file system and mapred cluster 
+      dfs = new MiniDFSCluster(new Configuration(), 1, true, null);
+      fileSys = dfs.getFileSystem();
+      mr = new MiniMRCluster(2, fileSys.getUri().toString(), 1);
+      JobConf conf = mr.createJobConf();
+      
+      // intialize input, output directories
+      Path inDir = new Path("testing/wc/input");
+      Path outDir = new Path("testing/wc/output");
+      String input = "The input";
+      
+      launchTest(conf, inDir, outDir, input);
+      
+    } catch(Exception e) {
+      e.printStackTrace();
+      fail("Exception in testing temp dir");
+      // close file system and shut down dfs and mapred cluster
+      try {
+        if (fileSys != null) {
+          fileSys.close();
+        }
+        if (dfs != null) {
+          dfs.shutdown();
+        }
+        if (mr != null) {
+          mr.shutdown();
+        }
+      } catch (IOException ioe) {
+        LOG.info("IO exception in closing file system)" );
+        ioe.printStackTrace();        			
+      }
+    }
+  }
+
+  public static void main(String args[]){
+    TestMiniMRTaskTempDir test = new TestMiniMRTaskTempDir();
+    test.testTaskTempDir();
+  }
+}