Explorar el Código

HADOOP-1447. Add support to contrib/data_join for text inputs. Contributed by Senthil Subramanian.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@546310 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting hace 18 años
padre
commit
85a32edfc7

+ 3 - 0
CHANGES.txt

@@ -104,6 +104,9 @@ Trunk (unreleased changes)
      verified after data is read from large buffers, to better catch
      verified after data is read from large buffers, to better catch
      memory errors.  (cutting)
      memory errors.  (cutting)
 
 
+ 34. HADOOP-1447.  Add support in contrib/data_join for text inputs.
+     (Senthil Subramanian via cutting)
+
 
 
 Release 0.13.0 - 2007-06-08
 Release 0.13.0 - 2007-06-08
 
 

+ 3 - 3
src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ArrayListBackedIterator.java

@@ -33,13 +33,13 @@ public class ArrayListBackedIterator implements ResetableIterator {
 
 
   private Iterator iter;
   private Iterator iter;
 
 
-  private ArrayList data;
+  private ArrayList<Object> data;
 
 
   public ArrayListBackedIterator() {
   public ArrayListBackedIterator() {
-    this(new ArrayList());
+    this(new ArrayList<Object>());
   }
   }
 
 
-  public ArrayListBackedIterator(ArrayList data) {
+  public ArrayListBackedIterator(ArrayList<Object> data) {
     this.data = data;
     this.data = data;
     this.iter = this.data.iterator();
     this.iter = this.data.iterator();
   }
   }

+ 24 - 19
src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinJob.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
 
 
 /**
 /**
@@ -58,26 +59,33 @@ public class DataJoinJob {
 
 
     String inputDir = args[0];
     String inputDir = args[0];
     String outputDir = args[1];
     String outputDir = args[1];
-    int numOfReducers = Integer.parseInt(args[2]);
-    Class mapper = getClassByName(args[3]);
-    Class reducer = getClassByName(args[4]);
-    Class mapoutputValueClass = getClassByName(args[5]);
+    Class inputFormat = SequenceFileInputFormat.class;
+    if (args[2].compareToIgnoreCase("text") != 0) {
+      System.out.println("Using SequenceFileInputFormat: " + args[2]);
+    } else {
+      System.out.println("Using TextInputFormat: " + args[2]);
+      inputFormat = TextInputFormat.class;
+    }
+    int numOfReducers = Integer.parseInt(args[3]);
+    Class mapper = getClassByName(args[4]);
+    Class reducer = getClassByName(args[5]);
+    Class mapoutputValueClass = getClassByName(args[6]);
     Class outputFormat = TextOutputFormat.class;
     Class outputFormat = TextOutputFormat.class;
     Class outputValueClass = Text.class;
     Class outputValueClass = Text.class;
-    if (args[6].compareToIgnoreCase("text") != 0) {
-      System.out.println("Using SequenceFileOutputFormat: " + args[6]);
+    if (args[7].compareToIgnoreCase("text") != 0) {
+      System.out.println("Using SequenceFileOutputFormat: " + args[7]);
       outputFormat = SequenceFileOutputFormat.class;
       outputFormat = SequenceFileOutputFormat.class;
-      outputValueClass = getClassByName(args[6]);
+      outputValueClass = getClassByName(args[7]);
     } else {
     } else {
-      System.out.println("Using TextOutputFormat: " + args[6]);
+      System.out.println("Using TextOutputFormat: " + args[7]);
     }
     }
     long maxNumOfValuesPerGroup = 100;
     long maxNumOfValuesPerGroup = 100;
     String jobName = "";
     String jobName = "";
-    if (args.length > 7) {
-      maxNumOfValuesPerGroup = Long.parseLong(args[7]);
-    }
     if (args.length > 8) {
     if (args.length > 8) {
-      jobName = args[8];
+      maxNumOfValuesPerGroup = Long.parseLong(args[8]);
+    }
+    if (args.length > 9) {
+      jobName = args[9];
     }
     }
     Configuration defaults = new Configuration();
     Configuration defaults = new Configuration();
     JobConf job = new JobConf(defaults, DataJoinJob.class);
     JobConf job = new JobConf(defaults, DataJoinJob.class);
@@ -91,7 +99,7 @@ public class DataJoinJob {
       job.addInputPath(new Path(spec));
       job.addInputPath(new Path(spec));
     }
     }
 
 
-    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setInputFormat(inputFormat);
 
 
     job.setMapperClass(mapper);
     job.setMapperClass(mapper);
     job.setOutputPath(new Path(outputDir));
     job.setOutputPath(new Path(outputDir));
@@ -106,10 +114,7 @@ public class DataJoinJob {
 
 
     job.setNumMapTasks(1);
     job.setNumMapTasks(1);
     job.setNumReduceTasks(numOfReducers);
     job.setNumReduceTasks(numOfReducers);
-    job.setLong("ultjoin.maxNumOfValuesPerGroup",
-                maxNumOfValuesPerGroup);
-    job.set("mapred.child.java.opts", "-Xmx1024m");
-    job.setKeepFailedTaskFiles(true);
+    job.setLong("datajoin.maxNumOfValuesPerGroup", maxNumOfValuesPerGroup);
     return job;
     return job;
   }
   }
 
 
@@ -151,8 +156,8 @@ public class DataJoinJob {
    */
    */
   public static void main(String[] args) {
   public static void main(String[] args) {
     boolean success;
     boolean success;
-    if (args.length < 7 || args.length > 9) {
-      System.out.println("usage: DataJoinJob " + "inputdirs outputdir "
+    if (args.length < 8 || args.length > 10) {
+      System.out.println("usage: DataJoinJob " + "inputdirs outputdir map_input_file_format "
                          + "numofParts " + "mapper_class " + "reducer_class "
                          + "numofParts " + "mapper_class " + "reducer_class "
                          + "map_output_value_class "
                          + "map_output_value_class "
                          + "output_value_class [maxNumOfValuesPerGroup [descriptionOfJob]]]");
                          + "output_value_class [maxNumOfValuesPerGroup [descriptionOfJob]]]");

+ 2 - 3
src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinReducerBase.java

@@ -68,8 +68,7 @@ public abstract class DataJoinReducerBase extends JobBase {
   public void configure(JobConf job) {
   public void configure(JobConf job) {
     super.configure(job);
     super.configure(job);
     this.job = job;
     this.job = job;
-    this.maxNumOfValuesPerGroup = job.getLong("ultjoin.maxNumOfValuesPerGroup",
-                                              100);
+    this.maxNumOfValuesPerGroup = job.getLong("datajoin.maxNumOfValuesPerGroup", 100);
   }
   }
 
 
   /**
   /**
@@ -155,7 +154,7 @@ public abstract class DataJoinReducerBase extends JobBase {
                          OutputCollector output, Reporter reporter) throws IOException {
                          OutputCollector output, Reporter reporter) throws IOException {
     this.collected += 1;
     this.collected += 1;
     addLongValue("collectedCount", 1);
     addLongValue("collectedCount", 1);
-    if (aRecord != null && this.collected % 1 == 0) {
+    if (aRecord != null) {
       output.collect(key, aRecord.getData());
       output.collect(key, aRecord.getData());
       reporter.setStatus("key: " + key.toString() + " collected: " + collected);
       reporter.setStatus("key: " + key.toString() + " collected: " + collected);
       addLongValue("actuallyCollectedCount", 1);
       addLongValue("actuallyCollectedCount", 1);