Browse Source

Merge -c 1220996 from trunk to branch-0.23 to fix MAPREDUCE-3563.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1220997 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 13 năm trước cách đây
mục cha
commit
907e861241

+ 5 - 1
hadoop-mapreduce-project/CHANGES.txt

@@ -282,7 +282,11 @@ Release 0.23.1 - Unreleased
     before the job started, so that it works properly with oozie throughout
     before the job started, so that it works properly with oozie throughout
     the job execution. (Robert Joseph Evans via vinodkv)
     the job execution. (Robert Joseph Evans via vinodkv)
 
 
-    MAPREDUCE-3579. ConverterUtils shouldn't include a port in a path from a url without a port. (atm via harsh)
+    MAPREDUCE-3579. ConverterUtils shouldn't include a port in a path from a url 
+    without a port. (atm via harsh)
+
+    MAPREDUCE-3563. Fixed LocalJobRunner to work correctly with new mapreduce
+    apis. (acmurthy)
 
 
 Release 0.23.0 - 2011-11-01 
 Release 0.23.0 - 2011-11-01 
 
 

+ 38 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
 import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
 import org.apache.hadoop.mapreduce.ClusterMetrics;
 import org.apache.hadoop.mapreduce.ClusterMetrics;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.QueueInfo;
 import org.apache.hadoop.mapreduce.QueueInfo;
 import org.apache.hadoop.mapreduce.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
@@ -52,11 +53,13 @@ import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIden
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.mapreduce.v2.LogParams;
 import org.apache.hadoop.mapreduce.v2.LogParams;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.ReflectionUtils;
 
 
 /** Implements MapReduce locally, in-process, for debugging. */
 /** Implements MapReduce locally, in-process, for debugging. */
 @InterfaceAudience.Private
 @InterfaceAudience.Private
@@ -304,12 +307,45 @@ public class LocalJobRunner implements ClientProtocol {
       return executor;
       return executor;
     }
     }
 
 
+    private org.apache.hadoop.mapreduce.OutputCommitter 
+    createOutputCommitter(boolean newApiCommitter, JobID jobId, Configuration conf) throws Exception {
+      org.apache.hadoop.mapreduce.OutputCommitter committer = null;
+
+      LOG.info("OutputCommitter set in config "
+          + conf.get("mapred.output.committer.class"));
+
+      if (newApiCommitter) {
+        org.apache.hadoop.mapreduce.TaskID taskId =
+            new org.apache.hadoop.mapreduce.TaskID(jobId, TaskType.MAP, 0);
+        org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID =
+            new org.apache.hadoop.mapreduce.TaskAttemptID(taskId, 0);
+        org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = 
+            new TaskAttemptContextImpl(conf, taskAttemptID);
+        OutputFormat outputFormat =
+          ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), conf);
+        committer = outputFormat.getOutputCommitter(taskContext);
+      } else {
+        committer = ReflectionUtils.newInstance(conf.getClass(
+            "mapred.output.committer.class", FileOutputCommitter.class,
+            org.apache.hadoop.mapred.OutputCommitter.class), conf);
+      }
+      LOG.info("OutputCommitter is " + committer.getClass().getName());
+      return committer;
+    }
+
     @Override
     @Override
     public void run() {
     public void run() {
       JobID jobId = profile.getJobID();
       JobID jobId = profile.getJobID();
       JobContext jContext = new JobContextImpl(job, jobId);
       JobContext jContext = new JobContextImpl(job, jobId);
-      OutputCommitter outputCommitter = job.getOutputCommitter();
-
+      
+      org.apache.hadoop.mapreduce.OutputCommitter outputCommitter = null;
+      try {
+        outputCommitter = createOutputCommitter(conf.getUseNewMapper(), jobId, conf);
+      } catch (Exception e) {
+        LOG.info("Failed to createOutputCommitter", e);
+        return;
+      }
+      
       try {
       try {
         TaskSplitMetaInfo[] taskSplitMetaInfos = 
         TaskSplitMetaInfo[] taskSplitMetaInfos = 
           SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);
           SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);

+ 157 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalModeWithNewApis.java

@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import static org.junit.Assert.*;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Random;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestLocalModeWithNewApis {
+
+  public static final Log LOG = 
+      LogFactory.getLog(TestLocalModeWithNewApis.class);
+  
+  Configuration conf;
+  
+  @Before
+  public void setUp() throws Exception {
+    conf = new Configuration();
+    conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @Test
+  public void testNewApis() throws Exception {
+    Random r = new Random(System.currentTimeMillis());
+    Path tmpBaseDir = new Path("/tmp/wc-" + r.nextInt());
+    final Path inDir = new Path(tmpBaseDir, "input");
+    final Path outDir = new Path(tmpBaseDir, "output");
+    String input = "The quick brown fox\nhas many silly\nred fox sox\n";
+    FileSystem inFs = inDir.getFileSystem(conf);
+    FileSystem outFs = outDir.getFileSystem(conf);
+    outFs.delete(outDir, true);
+    if (!inFs.mkdirs(inDir)) {
+      throw new IOException("Mkdirs failed to create " + inDir.toString());
+    }
+    {
+      DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
+      file.writeBytes(input);
+      file.close();
+    }
+
+    Job job = Job.getInstance(conf, "word count");
+    job.setJarByClass(TestLocalModeWithNewApis.class);
+    job.setMapperClass(TokenizerMapper.class);
+    job.setCombinerClass(IntSumReducer.class);
+    job.setReducerClass(IntSumReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+    FileInputFormat.addInputPath(job, inDir);
+    FileOutputFormat.setOutputPath(job, outDir);
+    assertEquals(job.waitForCompletion(true), true);
+
+    String output = readOutput(outDir, conf);
+    assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
+                 "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", output);
+    
+    outFs.delete(tmpBaseDir, true);
+  }
+
+  static String readOutput(Path outDir, Configuration conf) 
+      throws IOException {
+    FileSystem fs = outDir.getFileSystem(conf);
+    StringBuffer result = new StringBuffer();
+
+    Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
+           new Utils.OutputFileUtils.OutputFilesFilter()));
+    for (Path outputFile : fileList) {
+      LOG.info("Path" + ": "+ outputFile);
+      BufferedReader file = 
+        new BufferedReader(new InputStreamReader(fs.open(outputFile)));
+      String line = file.readLine();
+      while (line != null) {
+        result.append(line);
+        result.append("\n");
+        line = file.readLine();
+      }
+      file.close();
+    }
+    return result.toString();
+  }
+
+  public static class TokenizerMapper 
+  extends Mapper<Object, Text, Text, IntWritable>{
+
+    private final static IntWritable one = new IntWritable(1);
+    private Text word = new Text();
+
+    public void map(Object key, Text value, Context context
+        ) throws IOException, InterruptedException {
+      StringTokenizer itr = new StringTokenizer(value.toString());
+      while (itr.hasMoreTokens()) {
+        word.set(itr.nextToken());
+        context.write(word, one);
+      }
+    }
+  }
+
+
+  public static class IntSumReducer 
+  extends Reducer<Text,IntWritable,Text,IntWritable> {
+    private IntWritable result = new IntWritable();
+
+    public void reduce(Text key, Iterable<IntWritable> values, 
+        Context context
+        ) throws IOException, InterruptedException {
+      int sum = 0;
+      for (IntWritable val : values) {
+        sum += val.get();
+      }
+      result.set(sum);
+      context.write(key, result);
+    }
+  }
+
+}

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java

@@ -233,7 +233,7 @@ public class FileOutputCommitter extends OutputCommitter {
           " directory of task: " + attemptId + " - " + workPath);
           " directory of task: " + attemptId + " - " + workPath);
         }
         }
         LOG.info("Saved output of task '" + attemptId + "' to " + 
         LOG.info("Saved output of task '" + attemptId + "' to " + 
-                 outputPath);
+            jobOutputPath);
       }
       }
     }
     }
   }
   }