Pārlūkot izejas kodu

HADOOP-5576. Fix LocalRunner to work with the new context object API in
mapreduce. (Tom White via omalley)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20@760514 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 16 gadi atpakaļ
vecāks
revīzija
be81978d86
2 mainītis faili ar 49 papildinājumiem un 10 dzēšanām
  1. 3 0
      CHANGES.txt
  2. 46 10
      src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java

+ 3 - 0
CHANGES.txt

@@ -821,6 +821,9 @@ Release 0.20.0 - Unreleased
     if there is a user request pending to kill the task and the TT reported
     the state as SUCCESS. (Amareshwari Sriramadasu via ddas)
 
+    HADOOP-5576. Fix LocalRunner to work with the new context object API in
+    mapreduce. (Tom White via omalley)
+
 Release 0.19.2 - Unreleased
 
   BUG FIXES

+ 46 - 10
src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.mapred;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -28,8 +29,12 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.mapred.JobTrackerMetricsInst;
 import org.apache.hadoop.mapred.JvmTask;
+import org.apache.hadoop.mapred.JobClient.RawSplit;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /** Implements MapReduce locally, in-process, for debugging. */ 
 class LocalJobRunner implements JobSubmissionProtocol {
@@ -106,8 +111,44 @@ class LocalJobRunner implements JobSubmissionProtocol {
       OutputCommitter outputCommitter = job.getOutputCommitter();
       try {
         // split input into minimum number of splits
-        InputSplit[] splits;
-        splits = job.getInputFormat().getSplits(job, 1);
+        RawSplit[] rawSplits;
+        if (job.getUseNewMapper()) {
+          org.apache.hadoop.mapreduce.InputFormat<?,?> input =
+              ReflectionUtils.newInstance(jContext.getInputFormatClass(), jContext.getJobConf());
+                    
+          List<org.apache.hadoop.mapreduce.InputSplit> splits = input.getSplits(jContext);
+          rawSplits = new RawSplit[splits.size()];
+          DataOutputBuffer buffer = new DataOutputBuffer();
+          SerializationFactory factory = new SerializationFactory(conf);
+          Serializer serializer = 
+            factory.getSerializer(splits.get(0).getClass());
+          serializer.open(buffer);
+          for (int i = 0; i < splits.size(); i++) {
+            buffer.reset();
+            serializer.serialize(splits.get(i));
+            RawSplit rawSplit = new RawSplit();
+            rawSplit.setClassName(splits.get(i).getClass().getName());
+            rawSplit.setDataLength(splits.get(i).getLength());
+            rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
+            rawSplit.setLocations(splits.get(i).getLocations());
+            rawSplits[i] = rawSplit;
+          }
+
+        } else {
+          InputSplit[] splits = job.getInputFormat().getSplits(job, 1);
+          rawSplits = new RawSplit[splits.length];
+          DataOutputBuffer buffer = new DataOutputBuffer();
+          for (int i = 0; i < splits.length; i++) {
+            buffer.reset();
+            splits[i].write(buffer);
+            RawSplit rawSplit = new RawSplit();
+            rawSplit.setClassName(splits[i].getClass().getName());
+            rawSplit.setDataLength(splits[i].getLength());
+            rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
+            rawSplit.setLocations(splits[i].getLocations());
+            rawSplits[i] = rawSplit;
+          }
+        }
         
         int numReduceTasks = job.getNumReduceTasks();
         if (numReduceTasks > 1 || numReduceTasks < 0) {
@@ -118,19 +159,14 @@ class LocalJobRunner implements JobSubmissionProtocol {
         outputCommitter.setupJob(jContext);
         status.setSetupProgress(1.0f);
         
-        DataOutputBuffer buffer = new DataOutputBuffer();
-        for (int i = 0; i < splits.length; i++) {
+        for (int i = 0; i < rawSplits.length; i++) {
           if (!this.isInterrupted()) {
             TaskAttemptID mapId = new TaskAttemptID(new TaskID(jobId, true, i),0);  
             mapIds.add(mapId);
-            buffer.reset();
-            splits[i].write(buffer);
-            BytesWritable split = new BytesWritable();
-            split.set(buffer.getData(), 0, buffer.getLength());
             MapTask map = new MapTask(file.toString(),  
                                       mapId, i,
-                                      splits[i].getClass().getName(),
-                                      split);
+                                      rawSplits[i].getClassName(),
+                                      rawSplits[i].getBytes());
             JobConf localConf = new JobConf(job);
             map.setJobFile(localFile.toString());
             map.localizeConfiguration(localConf);