Browse Source

HADOOP-2548 Make TableMap and TableReduce generic

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@611488 13f79535-47bb-0310-9956-ffa450edef68
Michael Stack 17 năm trước cách đây
mục cha
commit
f16ef12c38

+ 2 - 0
src/contrib/hbase/CHANGES.txt

@@ -164,6 +164,8 @@ Trunk (unreleased changes)
    HADOOP-2450 Show version (and svn revision) in hbase web ui
    HADOOP-2472 Range selection using filter (Edward Yoon via Stack)
    HADOOP-2553 Don't make Long objects calculating hbase type hash codes
+   HADOOP-2548 Make TableMap and TableReduce generic
+               (Frederik Hedberg via Stack)
                
 
 

+ 5 - 9
src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java

@@ -27,18 +27,18 @@ import java.util.Map;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HStoreKey;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
 
 /**
  * Extract grouping columns from input record
  */
-public class GroupingTableMap extends TableMap {
+public class GroupingTableMap extends TableMap<Text,MapWritable> {
 
   /**
    * JobConf parameter to specify the columns used to produce the key passed to 
@@ -49,11 +49,6 @@ public class GroupingTableMap extends TableMap {
   
   protected Text[] m_columns;
 
-  /** default constructor */
-  public GroupingTableMap() {
-    super();
-  }
-
   /**
    * Use this before submitting a TableMap job. It will appropriately set up the
    * JobConf.
@@ -65,6 +60,7 @@ public class GroupingTableMap extends TableMap {
    * @param mapper map class
    * @param job job configuration object
    */
+  @SuppressWarnings("unchecked")
   public static void initJob(String table, String columns, String groupColumns, 
       Class<? extends TableMap> mapper, JobConf job) {
     
@@ -89,11 +85,11 @@ public class GroupingTableMap extends TableMap {
    * Pass the new key and value to reduce.
    * If any of the grouping columns are not found in the value, the record is skipped.
    *
-   * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.io.MapWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
+   * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.io.MapWritable, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
    */
   @Override
   public void map(@SuppressWarnings("unused") HStoreKey key,
-      MapWritable value, TableOutputCollector output,
+      MapWritable value, OutputCollector<Text,MapWritable> output,
       @SuppressWarnings("unused") Reporter reporter) throws IOException {
     
     byte[][] keyVals = extractKeyValues(value);

+ 4 - 3
src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java

@@ -24,13 +24,14 @@ import java.io.IOException;
 import org.apache.hadoop.hbase.HStoreKey;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
 
 /**
  * Pass the given key and record as-is to reduce
  */
-public class IdentityTableMap extends TableMap {
+public class IdentityTableMap extends TableMap<Text, MapWritable> {
 
   /** constructor */
   public IdentityTableMap() {
@@ -40,11 +41,11 @@ public class IdentityTableMap extends TableMap {
   /**
    * Pass the key, value to reduce
    *
-   * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.io.MapWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
+   * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.io.MapWritable, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
    */
   @Override
   public void map(HStoreKey key, MapWritable value,
-      TableOutputCollector output,
+      OutputCollector<Text,MapWritable> output,
       @SuppressWarnings("unused") Reporter reporter) throws IOException {
     
     Text tKey = key.getRow();

+ 6 - 11
src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java

@@ -24,28 +24,23 @@ import java.util.Iterator;
 
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
 
 /**
  * Write to table each key, record pair
  */
-public class IdentityTableReduce extends TableReduce {
-
-  /** constructor */
-  public IdentityTableReduce() {
-    super();
-  }
-
+public class IdentityTableReduce extends TableReduce<Text, MapWritable> {
   /**
    * No aggregation, output pairs of (key, record)
    *
-   * @see org.apache.hadoop.hbase.mapred.TableReduce#reduce(org.apache.hadoop.io.Text, java.util.Iterator, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
+   * @see org.apache.hadoop.hbase.mapred.TableReduce#reduce(org.apache.hadoop.io.WritableComparable, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
    */
   @Override
-  public void reduce(Text key, @SuppressWarnings("unchecked") Iterator values,
-      TableOutputCollector output,
-      @SuppressWarnings("unused") Reporter reporter) throws IOException {
+  public void reduce(Text key, Iterator<MapWritable> values,
+      OutputCollector<Text, MapWritable> output, Reporter reporter)
+      throws IOException {
     
     while(values.hasNext()) {
       MapWritable r = (MapWritable)values.next();

+ 7 - 35
src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableMap.java

@@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.mapred;
 
 import java.io.IOException;
 
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HStoreKey;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -30,8 +32,6 @@ import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HStoreKey;
 
 /**
  * Scan an HBase table to sort by a specified sort column.
@@ -39,14 +39,8 @@ import org.apache.hadoop.hbase.HStoreKey;
  *
  */
 @SuppressWarnings("unchecked")
-public abstract class TableMap extends MapReduceBase implements Mapper {
-  private TableOutputCollector m_collector;
-
-  /** constructor*/
-  public TableMap() {
-    m_collector = new TableOutputCollector();
-  }
-
+public abstract class TableMap<K extends WritableComparable, V extends Writable>
+    extends MapReduceBase implements Mapper<HStoreKey, MapWritable, K, V> {
   /**
    * Use this before submitting a TableMap job. It will
    * appropriately set up the JobConf.
@@ -56,9 +50,8 @@ public abstract class TableMap extends MapReduceBase implements Mapper {
    * @param mapper mapper class
    * @param job job configuration
    */
-  public static void initJob(String table, String columns, 
+  public static void initJob(String table, String columns,
       Class<? extends TableMap> mapper, JobConf job) {
-    
     job.setInputFormat(TableInputFormat.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(MapWritable.class);
@@ -67,27 +60,6 @@ public abstract class TableMap extends MapReduceBase implements Mapper {
     job.set(TableInputFormat.COLUMN_LIST, columns);
   }
 
-  /**
-   * Input:
-   * @param key is of type HStoreKey
-   * @param value is of type KeyedDataArrayWritable
-   * @param output output collector
-   * @param reporter object to use for status updates
-   * @throws IOException
-   * 
-   * Output:
-   * The key is a specific column, including the input key or any value
-   * The value is of type LabeledData
-   */
-  public void map(WritableComparable key, Writable value,
-      OutputCollector output, Reporter reporter) throws IOException {
-    
-    if(m_collector.collector == null) {
-      m_collector.collector = output;
-    }
-    map((HStoreKey)key, (MapWritable)value, m_collector, reporter);
-  }
-
   /**
    * Call a user defined function on a single HBase record, represented
    * by a key and its associated record value.
@@ -98,6 +70,6 @@ public abstract class TableMap extends MapReduceBase implements Mapper {
    * @param reporter
    * @throws IOException
    */
-  public abstract void map(HStoreKey key, MapWritable value, 
-      TableOutputCollector output, Reporter reporter) throws IOException;
+  public abstract void map(HStoreKey key, MapWritable value,
+      OutputCollector<K, V> output, Reporter reporter) throws IOException;
 }

+ 0 - 47
src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java

@@ -1,47 +0,0 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapred;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.OutputCollector;
-
-/**
- * Refine the types that can be collected from a Table Map/Reduce jobs.
- */
-public class TableOutputCollector {
-  /** The collector object */
-  @SuppressWarnings("unchecked")
-  public OutputCollector collector;
-
-  /**
-   * Restrict Table Map/Reduce's output to be a Text key and a record.
-   * 
-   * @param key
-   * @param value
-   * @throws IOException
-   */
-  @SuppressWarnings("unchecked")
-  public void collect(Text key, MapWritable value) throws IOException {
-    collector.collect(key, value);
-  }
-}

+ 10 - 32
src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java

@@ -22,8 +22,10 @@ package org.apache.hadoop.hbase.mapred;
 import java.io.IOException;
 import java.util.Iterator;
 
-import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -34,14 +36,8 @@ import org.apache.hadoop.mapred.Reporter;
  * Write a table, sorting by the input key
  */
 @SuppressWarnings("unchecked")
-public abstract class TableReduce extends MapReduceBase implements Reducer {
-  TableOutputCollector m_collector;
-
-  /** Constructor */
-  public TableReduce() {
-    m_collector = new TableOutputCollector();
-  }
-
+public abstract class TableReduce<K extends WritableComparable, V extends Writable>
+    extends MapReduceBase implements Reducer<K, V, Text, MapWritable> {
   /**
    * Use this before submitting a TableReduce job. It will
    * appropriately set up the JobConf.
@@ -50,30 +46,13 @@ public abstract class TableReduce extends MapReduceBase implements Reducer {
    * @param reducer
    * @param job
    */
-  public static void initJob(String table, Class<? extends TableReduce> reducer,
-      JobConf job) {
-    
+  public static void initJob(String table,
+      Class<? extends TableReduce> reducer, JobConf job) {
     job.setOutputFormat(TableOutputFormat.class);
     job.setReducerClass(reducer);
     job.set(TableOutputFormat.OUTPUT_TABLE, table);
   }
 
-  /**
-   * Create a unique key for table insertion by appending a local
-   * counter the given key.
-   *
-   * @see org.apache.hadoop.mapred.Reducer#reduce(org.apache.hadoop.io.WritableComparable, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
-   */
-  @SuppressWarnings("unchecked")
-  public void reduce(WritableComparable key, Iterator values,
-      OutputCollector output, Reporter reporter) throws IOException {
-
-    if(m_collector.collector == null) {
-      m_collector.collector = output;
-    }
-    reduce((Text)key, values, m_collector, reporter);
-  }
-
   /**
    * 
    * @param key
@@ -82,8 +61,7 @@ public abstract class TableReduce extends MapReduceBase implements Reducer {
    * @param reporter
    * @throws IOException
    */
-  @SuppressWarnings("unchecked")
-  public abstract void reduce(Text key, Iterator values, 
-      TableOutputCollector output, Reporter reporter) throws IOException;
-
+  public abstract void reduce(K key, Iterator<V> values,
+      OutputCollector<Text, MapWritable> output, Reporter reporter)
+      throws IOException;
 }

+ 3 - 8
src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java

@@ -46,6 +46,7 @@ import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.OutputCollector;
 
 /**
  * Test Map/Reduce job over HBase tables
@@ -147,13 +148,7 @@ public class TestTableMapReduce extends MultiRegionTable {
   /**
    * Pass the given key and processed record reduce
    */
-  public static class ProcessContentsMapper extends TableMap {
-
-    /** constructor */
-    public ProcessContentsMapper() {
-      super();
-    }
-
+  public static class ProcessContentsMapper extends TableMap<Text, MapWritable> {
     /**
      * Pass the key, and reversed value to reduce
      *
@@ -162,7 +157,7 @@ public class TestTableMapReduce extends MultiRegionTable {
     @SuppressWarnings("unchecked")
     @Override
     public void map(HStoreKey key, MapWritable value,
-        TableOutputCollector output,
+        OutputCollector<Text, MapWritable> output,
         @SuppressWarnings("unused") Reporter reporter) throws IOException {
       
       Text tKey = key.getRow();