浏览代码

HADOOP-2234 TableInputFormat erroneously aggregates map values

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@599643 13f79535-47bb-0310-9956-ffa450edef68
Michael Stack 17 年之前
父节点
当前提交
ab15f20707

+ 1 - 0
src/contrib/hbase/CHANGES.txt

@@ -40,6 +40,7 @@ Trunk (unreleased changes)
    HADOOP-2253 getRow can return HBASE::DELETEVAL cells
                (Bryan Duxbury via Stack)
    HADOOP-2295 Fix assigning a region to multiple servers
+   HADOOP-2234 TableInputFormat erroneously aggregates map values
 
   IMPROVEMENTS
    HADOOP-2401 Add convenience put method that takes writable

+ 20 - 32
src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java

@@ -48,8 +48,7 @@ import org.apache.log4j.Logger;
  * Convert HBase tabular data into a format that is consumable by Map/Reduce
  */
 public class TableInputFormat
-implements InputFormat<HStoreKey, MapWritable>, JobConfigurable {
-  
+implements InputFormat<HStoreKey, MapWritable>, JobConfigurable {  
   static final Logger LOG = Logger.getLogger(TableInputFormat.class.getName());
 
   /**
@@ -67,9 +66,9 @@ implements InputFormat<HStoreKey, MapWritable>, JobConfigurable {
    * return (HStoreKey, MapWritable<Text, ImmutableBytesWritable>) pairs
    */
   class TableRecordReader implements RecordReader<HStoreKey, MapWritable> {
-    private HScannerInterface m_scanner;
-    private SortedMap<Text, byte[]> m_row; // current buffer
-    private Text m_endRow;
+    private final HScannerInterface m_scanner;
+    // current buffer
+    private final SortedMap<Text, byte[]> m_row = new TreeMap<Text, byte[]>();
 
     /**
      * Constructor
@@ -78,14 +77,15 @@ implements InputFormat<HStoreKey, MapWritable>, JobConfigurable {
      * @throws IOException
      */
     public TableRecordReader(Text startRow, Text endRow) throws IOException {
-      m_row = new TreeMap<Text, byte[]>();
-      m_scanner = m_table.obtainScanner(m_cols, startRow);
-      m_endRow = endRow;
+      if (endRow != null && endRow.getLength() > 0) {
+        this.m_scanner = m_table.obtainScanner(m_cols, startRow, endRow);
+      } else {
+        this.m_scanner = m_table.obtainScanner(m_cols, startRow);
+      }
     }
 
-    /** {@inheritDoc} */
     public void close() throws IOException {
-      m_scanner.close();
+      this.m_scanner.close();
     }
 
     /**
@@ -132,20 +132,14 @@ implements InputFormat<HStoreKey, MapWritable>, JobConfigurable {
      */
     @SuppressWarnings("unchecked")
     public boolean next(HStoreKey key, MapWritable value) throws IOException {
-      m_row.clear();
+      this.m_row.clear();
       HStoreKey tKey = key;
-      boolean hasMore = m_scanner.next(tKey, m_row);
-
-      if(hasMore) {
-        if(m_endRow.getLength() > 0 &&
-            (tKey.getRow().compareTo(m_endRow) >= 0)) {
-          
-          hasMore = false;
-          
-        } else {
-          for(Map.Entry<Text, byte[]> e: m_row.entrySet()) {
-            value.put(e.getKey(), new ImmutableBytesWritable(e.getValue()));
-          }
+      boolean hasMore = this.m_scanner.next(tKey, this.m_row);
+      if (hasMore) {
+        // Clear value to remove content added by previous call to next.
+        value.clear();
+        for (Map.Entry<Text, byte[]> e: this.m_row.entrySet()) {
+          value.put(e.getKey(), new ImmutableBytesWritable(e.getValue()));
         }
       }
       return hasMore;
@@ -153,12 +147,11 @@ implements InputFormat<HStoreKey, MapWritable>, JobConfigurable {
 
   }
 
-  /** {@inheritDoc} */
   public RecordReader<HStoreKey, MapWritable> getRecordReader(
       InputSplit split,
       @SuppressWarnings("unused") JobConf job,
-      @SuppressWarnings("unused") Reporter reporter) throws IOException {
-    
+      @SuppressWarnings("unused") Reporter reporter)
+  throws IOException {  
     TableSplit tSplit = (TableSplit)split;
     return new TableRecordReader(tSplit.getStartRow(), tSplit.getEndRow());
   }
@@ -185,7 +178,6 @@ implements InputFormat<HStoreKey, MapWritable>, JobConfigurable {
     return splits;
   }
 
-  /** {@inheritDoc} */
   public void configure(JobConf job) {
     Path[] tableNames = job.getInputPaths();
     m_tableName = new Text(tableNames[0].getName());
@@ -202,21 +194,17 @@ implements InputFormat<HStoreKey, MapWritable>, JobConfigurable {
     }
   }
 
-  /** {@inheritDoc} */
   public void validateInput(JobConf job) throws IOException {
-
     // expecting exactly one path
-    
     Path[] tableNames = job.getInputPaths();
     if(tableNames == null || tableNames.length > 1) {
       throw new IOException("expecting one table name");
     }
 
     // expecting at least one column
-    
     String colArg = job.get(COLUMN_LIST);
     if(colArg == null || colArg.length() == 0) {
       throw new IOException("expecting at least one column");
     }
   }
-}
+}