Browse Source

HADOOP-1709 Make HRegionInterface more like that of HTable
HADOOP-1725 Client find of table regions should not include offlined, split parents

Changes:

New class MapWritable replaces KeyedData and KeyedDataArrayWritable

HBaseAdmin, HConnectionManager, HMaster, HRegionInterface,
HRegionServer, HTable, TestScanner2:
- getRow returns MapWritable instead of array of KeyedData
- next returns MapWritable instead of array of KeyedData

GroupingTableMap, IdentityTableMap, IdentityTableReduce,
TableInputFormat, TableMap, TableOutputCollector, TableOutputFormat,
TestTableMapReduce:
- use MapWritable instead of KeyedData and KeyedDataArrayWritable


git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@566878 13f79535-47bb-0310-9956-ffa450edef68

Jim Kellerman 18 năm trước cách đây
mục cha
commit
0c2f997d2f
25 tập tin đã thay đổi với 660 bổ sung423 xóa
  1. 4 1
      src/contrib/hbase/CHANGES.txt
  2. 29 19
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/HBaseAdmin.java
  3. 31 21
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java
  4. 52 39
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
  5. 0 9
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
  6. 6 6
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java
  7. 24 16
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
  8. 34 8
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
  9. 61 33
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java
  10. 0 74
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedData.java
  11. 0 86
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedDataArrayWritable.java
  12. 303 0
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/MapWritable.java
  13. 16 15
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java
  14. 3 3
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java
  15. 2 2
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java
  16. 27 27
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java
  17. 5 4
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableMap.java
  18. 4 3
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java
  19. 20 27
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
  20. 1 0
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java
  21. 5 5
      src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
  22. 1 0
      src/contrib/hbase/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java
  23. 2 0
      src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java
  24. 11 6
      src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java
  25. 19 19
      src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java

+ 4 - 1
src/contrib/hbase/CHANGES.txt

@@ -79,7 +79,7 @@ Trunk (unreleased changes)
      10 concurrent clients
      10 concurrent clients
  50. HADOOP-1468 Add HBase batch update to reduce RPC overhead (restrict batches
  50. HADOOP-1468 Add HBase batch update to reduce RPC overhead (restrict batches
      to a single row at a time)
      to a single row at a time)
- 51. HADOOP-1528 HClient for multiple tables (phase 1)
+ 51. HADOOP-1528 HClient for multiple tables (phase 1) (James Kennedy & JimK)
  52. HADOOP-1528 HClient for multiple tables (phase 2) all HBase client side code
  52. HADOOP-1528 HClient for multiple tables (phase 2) all HBase client side code
      (except TestHClient and HBaseShell) have been converted to use the new client
      (except TestHClient and HBaseShell) have been converted to use the new client
      side objects (HTable/HBaseAdmin/HConnection) instead of HClient.
      side objects (HTable/HBaseAdmin/HConnection) instead of HClient.
@@ -98,3 +98,6 @@ Trunk (unreleased changes)
  60. HADOOP-1644 Compactions should not block updates
  60. HADOOP-1644 Compactions should not block updates
  60. HADOOP-1672 HBase Shell should use new client classes
  60. HADOOP-1672 HBase Shell should use new client classes
      (Edward Yoon via Stack).
      (Edward Yoon via Stack).
+ 61. HADOOP-1709 Make HRegionInterface more like that of HTable
+     HADOOP-1725 Client find of table regions should not include offlined, split parents
+

+ 29 - 19
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HBaseAdmin.java

@@ -21,16 +21,20 @@ package org.apache.hadoop.hbase;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.util.NoSuchElementException;
 import java.util.NoSuchElementException;
+import java.util.Map;
 import java.util.SortedMap;
 import java.util.SortedMap;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.io.KeyedData;
+import org.apache.hadoop.hbase.io.MapWritable;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RemoteException;
 
 
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.hbase.util.Writables;
 
 
 /**
 /**
@@ -178,15 +182,17 @@ public class HBaseAdmin implements HConstants {
         scannerId =
         scannerId =
           server.openScanner(firstMetaServer.getRegionInfo().getRegionName(),
           server.openScanner(firstMetaServer.getRegionInfo().getRegionName(),
             COL_REGIONINFO_ARRAY, tableName, System.currentTimeMillis(), null);
             COL_REGIONINFO_ARRAY, tableName, System.currentTimeMillis(), null);
-        KeyedData[] values = server.next(scannerId);
-        if (values == null || values.length == 0) {
+        MapWritable values = server.next(scannerId);
+        if (values == null || values.size() == 0) {
           break;
           break;
         }
         }
         boolean found = false;
         boolean found = false;
-        for (int j = 0; j < values.length; j++) {
-          if (values[j].getKey().getColumn().equals(COL_REGIONINFO)) {
-            info =
-              (HRegionInfo) Writables.getWritable(values[j].getData(), info);
+        for (Map.Entry<WritableComparable, Writable> e: values.entrySet()) {
+          HStoreKey key = (HStoreKey) e.getKey();
+          if (key.getColumn().equals(COL_REGIONINFO)) {
+            info = (HRegionInfo) Writables.getWritable(
+                  ((ImmutableBytesWritable) e.getValue()).get(), info);
+            
             if (info.tableDesc.getName().equals(tableName)) {
             if (info.tableDesc.getName().equals(tableName)) {
               found = true;
               found = true;
             }
             }
@@ -260,8 +266,8 @@ public class HBaseAdmin implements HConstants {
         boolean isenabled = false;
         boolean isenabled = false;
         
         
         while (true) {
         while (true) {
-          KeyedData[] values = server.next(scannerId);
-          if (values == null || values.length == 0) {
+          MapWritable values = server.next(scannerId);
+          if (values == null || values.size() == 0) {
             if (valuesfound == 0) {
             if (valuesfound == 0) {
               throw new NoSuchElementException(
               throw new NoSuchElementException(
                   "table " + tableName + " not found");
                   "table " + tableName + " not found");
@@ -269,10 +275,12 @@ public class HBaseAdmin implements HConstants {
             break;
             break;
           }
           }
           valuesfound += 1;
           valuesfound += 1;
-          for (int j = 0; j < values.length; j++) {
-            if (values[j].getKey().getColumn().equals(COL_REGIONINFO)) {
-              info =
-                (HRegionInfo) Writables.getWritable(values[j].getData(), info);
+          for (Map.Entry<WritableComparable, Writable> e: values.entrySet()) {
+            HStoreKey key = (HStoreKey) e.getKey();
+            if (key.getColumn().equals(COL_REGIONINFO)) {
+              info = (HRegionInfo) Writables.getWritable(
+                    ((ImmutableBytesWritable) e.getValue()).get(), info);
+            
               isenabled = !info.offLine;
               isenabled = !info.offLine;
               break;
               break;
             }
             }
@@ -359,18 +367,20 @@ public class HBaseAdmin implements HConstants {
         
         
         boolean disabled = false;
         boolean disabled = false;
         while (true) {
         while (true) {
-          KeyedData[] values = server.next(scannerId);
-          if (values == null || values.length == 0) {
+          MapWritable values = server.next(scannerId);
+          if (values == null || values.size() == 0) {
             if (valuesfound == 0) {
             if (valuesfound == 0) {
               throw new NoSuchElementException("table " + tableName + " not found");
               throw new NoSuchElementException("table " + tableName + " not found");
             }
             }
             break;
             break;
           }
           }
           valuesfound += 1;
           valuesfound += 1;
-          for (int j = 0; j < values.length; j++) {
-            if (values[j].getKey().getColumn().equals(COL_REGIONINFO)) {
-              info =
-                (HRegionInfo) Writables.getWritable(values[j].getData(), info);
+          for (Map.Entry<WritableComparable, Writable> e: values.entrySet()) {
+            HStoreKey key = (HStoreKey) e.getKey();
+            if (key.getColumn().equals(COL_REGIONINFO)) {
+              info = (HRegionInfo) Writables.getWritable(
+                    ((ImmutableBytesWritable) e.getValue()).get(), info);
+            
               disabled = info.offLine;
               disabled = info.offLine;
               break;
               break;
             }
             }

+ 31 - 21
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java

@@ -27,7 +27,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeMap;
-import java.util.TreeSet;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
@@ -35,8 +34,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 
 
-import org.apache.hadoop.hbase.io.KeyedData;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.MapWritable;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.hbase.util.Writables;
 
 
 /**
 /**
@@ -228,7 +230,7 @@ public class HConnectionManager implements HConstants {
 
 
     /** {@inheritDoc} */
     /** {@inheritDoc} */
     public HTableDescriptor[] listTables() throws IOException {
     public HTableDescriptor[] listTables() throws IOException {
-      TreeSet<HTableDescriptor> uniqueTables = new TreeSet<HTableDescriptor>();
+      HashSet<HTableDescriptor> uniqueTables = new HashSet<HTableDescriptor>();
 
 
       SortedMap<Text, HRegionLocation> metaTables =
       SortedMap<Text, HRegionLocation> metaTables =
         getTableServers(META_TABLE_NAME);
         getTableServers(META_TABLE_NAME);
@@ -241,16 +243,17 @@ public class HConnectionManager implements HConstants {
               COLUMN_FAMILY_ARRAY, EMPTY_START_ROW, System.currentTimeMillis(),
               COLUMN_FAMILY_ARRAY, EMPTY_START_ROW, System.currentTimeMillis(),
               null);
               null);
 
 
+          HRegionInfo info = new HRegionInfo();
           while (true) {
           while (true) {
-            KeyedData[] values = server.next(scannerId);
-            if (values.length == 0) {
+            MapWritable values = server.next(scannerId);
+            if (values == null || values.size() == 0) {
               break;
               break;
             }
             }
-            for (int i = 0; i < values.length; i++) {
-              if (values[i].getKey().getColumn().equals(COL_REGIONINFO)) {
-                HRegionInfo info =
-                  (HRegionInfo) Writables.getWritable(values[i].getData(),
-                      new HRegionInfo());
+            for (Map.Entry<WritableComparable, Writable> e: values.entrySet()) {
+              HStoreKey key = (HStoreKey) e.getKey();
+              if (key.getColumn().equals(COL_REGIONINFO)) {
+                info = (HRegionInfo) Writables.getWritable(
+                    ((ImmutableBytesWritable) e.getValue()).get(), info);
 
 
                 // Only examine the rows where the startKey is zero length   
                 // Only examine the rows where the startKey is zero length   
                 if (info.startKey.getLength() == 0) {
                 if (info.startKey.getLength() == 0) {
@@ -272,9 +275,9 @@ public class HConnectionManager implements HConstants {
     }
     }
 
 
     /** {@inheritDoc} */
     /** {@inheritDoc} */
-    public SortedMap<Text, HRegionLocation>
-      getTableServers(Text tableName)
-    throws IOException {  
+    public SortedMap<Text, HRegionLocation> getTableServers(Text tableName)
+    throws IOException {
+      
       if (tableName == null || tableName.getLength() == 0) {
       if (tableName == null || tableName.getLength() == 0) {
         throw new IllegalArgumentException(
         throw new IllegalArgumentException(
             "table name cannot be null or zero length");
             "table name cannot be null or zero length");
@@ -542,7 +545,7 @@ public class HConnectionManager implements HConstants {
      * @return map of first row to TableInfo for all meta regions
      * @return map of first row to TableInfo for all meta regions
      * @throws IOException
      * @throws IOException
      */
      */
-    private TreeMap<Text, HRegionLocation> loadMetaFromRoot()
+    private SortedMap<Text, HRegionLocation> loadMetaFromRoot()
     throws IOException {
     throws IOException {
       
       
       SortedMap<Text, HRegionLocation> rootRegion =
       SortedMap<Text, HRegionLocation> rootRegion =
@@ -646,7 +649,7 @@ public class HConnectionManager implements HConstants {
      * @throws NoServerForRegionException - if table can not be found after retrying
      * @throws NoServerForRegionException - if table can not be found after retrying
      * @throws IOException 
      * @throws IOException 
      */
      */
-    private TreeMap<Text, HRegionLocation> scanOneMetaRegion(
+    private SortedMap<Text, HRegionLocation> scanOneMetaRegion(
         final HRegionLocation t, final Text tableName) throws IOException {
         final HRegionLocation t, final Text tableName) throws IOException {
       
       
       HRegionInterface server = getHRegionConnection(t.getServerAddress());
       HRegionInterface server = getHRegionConnection(t.getServerAddress());
@@ -660,8 +663,8 @@ public class HConnectionManager implements HConstants {
             COLUMN_FAMILY_ARRAY, tableName, System.currentTimeMillis(), null);
             COLUMN_FAMILY_ARRAY, tableName, System.currentTimeMillis(), null);
 
 
           while (true) {
           while (true) {
-            KeyedData[] values = server.next(scannerId);
-            if (values.length == 0) {
+            MapWritable values = server.next(scannerId);
+            if (values == null || values.size() == 0) {
               if (servers.size() == 0) {
               if (servers.size() == 0) {
                 // If we didn't find any servers then the table does not exist
                 // If we didn't find any servers then the table does not exist
                 throw new TableNotFoundException("table '" + tableName +
                 throw new TableNotFoundException("table '" + tableName +
@@ -676,9 +679,11 @@ public class HConnectionManager implements HConstants {
               break;
               break;
             }
             }
 
 
-            TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
-            for (int i = 0; i < values.length; i++) {
-              results.put(values[i].getKey().getColumn(), values[i].getData());
+            SortedMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+            for (Map.Entry<WritableComparable, Writable> e: values.entrySet()) {
+              HStoreKey key = (HStoreKey) e.getKey();
+              results.put(key.getColumn(),
+                  ((ImmutableBytesWritable) e.getValue()).get());
             }
             }
             
             
             byte[] bytes = results.get(COL_REGIONINFO);
             byte[] bytes = results.get(COL_REGIONINFO);
@@ -704,8 +709,13 @@ public class HConnectionManager implements HConstants {
               }
               }
               break;
               break;
             }
             }
+            
+            if (regionInfo.isSplit()) {
+              // Region is a split parent. Skip it.
+              continue;
+            }
 
 
-            if (regionInfo.isOffline() && !regionInfo.isSplit()) {
+            if (regionInfo.isOffline()) {
               throw new IllegalStateException("table offline: " + tableName);
               throw new IllegalStateException("table offline: " + tableName);
             }
             }
 
 

+ 52 - 39
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java

@@ -45,9 +45,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.hbase.io.KeyedData;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.MapWritable;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.Server;
@@ -187,8 +190,8 @@ HMasterRegionInterface, Runnable {
       // Array to hold list of split parents found.  Scan adds to list.  After
       // Array to hold list of split parents found.  Scan adds to list.  After
       // scan we go check if parents can be removed.
       // scan we go check if parents can be removed.
 
 
-      Map<HRegionInfo, TreeMap<Text, byte[]>> splitParents =
-        new HashMap<HRegionInfo, TreeMap<Text, byte[]>>();
+      Map<HRegionInfo, SortedMap<Text, byte[]>> splitParents =
+        new HashMap<HRegionInfo, SortedMap<Text, byte[]>>();
       try {
       try {
         regionServer = connection.getHRegionConnection(region.server);
         regionServer = connection.getHRegionConnection(region.server);
         scannerId =
         scannerId =
@@ -197,14 +200,16 @@ HMasterRegionInterface, Runnable {
 
 
         int numberOfRegionsFound = 0;
         int numberOfRegionsFound = 0;
         while (true) {
         while (true) {
-          TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
-          KeyedData[] values = regionServer.next(scannerId);
-          if (values.length == 0) {
+          SortedMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+          MapWritable values = regionServer.next(scannerId);
+          if (values == null || values.size() == 0) {
             break;
             break;
           }
           }
 
 
-          for (int i = 0; i < values.length; i++) {
-            results.put(values[i].getKey().getColumn(), values[i].getData());
+          for (Map.Entry<WritableComparable, Writable> e: values.entrySet()) {
+            HStoreKey key = (HStoreKey) e.getKey();
+            results.put(key.getColumn(),
+                ((ImmutableBytesWritable) e.getValue()).get());
           }
           }
 
 
           HRegionInfo info = (HRegionInfo) Writables.getWritable(
           HRegionInfo info = (HRegionInfo) Writables.getWritable(
@@ -260,10 +265,10 @@ HMasterRegionInterface, Runnable {
       // Scan is finished.  Take a look at split parents to see if any we can clean up.
       // Scan is finished.  Take a look at split parents to see if any we can clean up.
 
 
       if (splitParents.size() > 0) {
       if (splitParents.size() > 0) {
-        for (Map.Entry<HRegionInfo, TreeMap<Text, byte[]>> e:
+        for (Map.Entry<HRegionInfo, SortedMap<Text, byte[]>> e:
           splitParents.entrySet()) {
           splitParents.entrySet()) {
           
           
-          TreeMap<Text, byte[]> results = e.getValue();
+          SortedMap<Text, byte[]> results = e.getValue();
           cleanupSplits(region.regionName, regionServer, e.getKey(), 
           cleanupSplits(region.regionName, regionServer, e.getKey(), 
               (HRegionInfo) Writables.getWritable(results.get(COL_SPLITA),
               (HRegionInfo) Writables.getWritable(results.get(COL_SPLITA),
                   new HRegionInfo()),
                   new HRegionInfo()),
@@ -1643,7 +1648,7 @@ HMasterRegionInterface, Runnable {
 
 
       try {
       try {
         while (true) {
         while (true) {
-          KeyedData[] values = null;
+          MapWritable values = null;
 
 
           try {
           try {
             values = server.next(scannerId);
             values = server.next(scannerId);
@@ -1658,23 +1663,25 @@ HMasterRegionInterface, Runnable {
             break;
             break;
           }
           }
 
 
-          if (values == null || values.length == 0) {
+          if (values == null || values.size() == 0) {
             break;
             break;
           }
           }
 
 
-          TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+          SortedMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
           Text row = null;
           Text row = null;
-          for (int i = 0; i < values.length; i++) {
-            if(row == null) {
-              row = values[i].getKey().getRow();
-
+          for (Map.Entry<WritableComparable, Writable> e: values.entrySet()) {
+            HStoreKey key = (HStoreKey) e.getKey();
+            Text thisRow = key.getRow();
+            if (row == null) {
+              row = thisRow;
             } else {
             } else {
-              if (!row.equals(values[i].getKey().getRow())) {
+              if (!row.equals(thisRow)) {
                 LOG.error("Multiple rows in same scanner result set. firstRow="
                 LOG.error("Multiple rows in same scanner result set. firstRow="
-                    + row + ", currentRow=" + values[i].getKey().getRow());
+                    + row + ", currentRow=" + thisRow);
               }
               }
             }
             }
-            results.put(values[i].getKey().getColumn(), values[i].getData());
+            results.put(key.getColumn(),
+                ((ImmutableBytesWritable) e.getValue()).get());
           }
           }
 
 
           if (LOG.isDebugEnabled() && row != null) {
           if (LOG.isDebugEnabled() && row != null) {
@@ -2317,19 +2324,22 @@ HMasterRegionInterface, Runnable {
       long scannerid = server.openScanner(metaRegionName, COL_REGIONINFO_ARRAY,
       long scannerid = server.openScanner(metaRegionName, COL_REGIONINFO_ARRAY,
           tableName, System.currentTimeMillis(), null);
           tableName, System.currentTimeMillis(), null);
       try {
       try {
-        KeyedData[] data = server.next(scannerid);
+        MapWritable data = server.next(scannerid);
             
             
         // Test data and that the row for the data is for our table. If table
         // Test data and that the row for the data is for our table. If table
         // does not exist, scanner will return row after where our table would
         // does not exist, scanner will return row after where our table would
         // be inserted if it exists so look for exact match on table name.
         // be inserted if it exists so look for exact match on table name.
             
             
-        if (data != null && data.length > 0 &&
-            HRegionInfo.getTableNameFromRegionName(
-                data[0].getKey().getRow()).equals(tableName)) {
-              
-          // Then a region for this table already exists. Ergo table exists.
-              
-          throw new TableExistsException(tableName.toString());
+        if (data != null && data.size() > 0) {
+          for (WritableComparable k: data.keySet()) {
+            if (HRegionInfo.getTableNameFromRegionName(
+                ((HStoreKey) k).getRow()).equals(tableName)) {
+          
+              // Then a region for this table already exists. Ergo table exists.
+                  
+              throw new TableExistsException(tableName.toString());
+            }
+          }
         }
         }
             
             
       } finally {
       } finally {
@@ -2462,35 +2472,38 @@ HMasterRegionInterface, Runnable {
                   String serverName = null;
                   String serverName = null;
                   long startCode = -1L;
                   long startCode = -1L;
 
 
-                  KeyedData[] values = server.next(scannerId);
-                  if(values == null || values.length == 0) {
+                  MapWritable values = server.next(scannerId);
+                  if(values == null || values.size() == 0) {
                     break;
                     break;
                   }
                   }
                   boolean haveRegionInfo = false;
                   boolean haveRegionInfo = false;
-                  for (int i = 0; i < values.length; i++) {
-                    if (values[i].getData().length == 0) {
+                  for (Map.Entry<WritableComparable, Writable> e:
+                    values.entrySet()) {
+
+                    byte[] value = ((ImmutableBytesWritable) e.getValue()).get();
+                    if (value == null || value.length == 0) {
                       break;
                       break;
                     }
                     }
-                    Text column = values[i].getKey().getColumn();
+                    HStoreKey key = (HStoreKey) e.getKey();
+                    Text column = key.getColumn();
                     if (column.equals(COL_REGIONINFO)) {
                     if (column.equals(COL_REGIONINFO)) {
                       haveRegionInfo = true;
                       haveRegionInfo = true;
-                      info = (HRegionInfo) Writables.getWritable(
-                          values[i].getData(), info);
+                      info = (HRegionInfo) Writables.getWritable(value, info);
                     
                     
                     } else if (column.equals(COL_SERVER)) {
                     } else if (column.equals(COL_SERVER)) {
                       try {
                       try {
                         serverName =
                         serverName =
-                          Writables.bytesToString(values[i].getData());
+                          Writables.bytesToString(value);
                     
                     
-                      } catch (UnsupportedEncodingException e) {
+                      } catch (UnsupportedEncodingException ex) {
                         assert(false);
                         assert(false);
                       }
                       }
                     
                     
                     } else if (column.equals(COL_STARTCODE)) {
                     } else if (column.equals(COL_STARTCODE)) {
                       try {
                       try {
-                        startCode = Writables.bytesToLong(values[i].getData());
+                        startCode = Writables.bytesToLong(value);
                       
                       
-                      } catch (UnsupportedEncodingException e) {
+                      } catch (UnsupportedEncodingException ex) {
                         assert(false);
                         assert(false);
                       }
                       }
                     }
                     }

+ 0 - 9
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java

@@ -870,15 +870,6 @@ public class HRegion implements HConstants {
     }
     }
   }
   }
   
   
-  private Vector<HStoreFile> getAllStoreFiles() {
-    Vector<HStoreFile> allHStoreFiles = new Vector<HStoreFile>();
-    for(HStore hstore: stores.values()) {
-      Vector<HStoreFile> hstoreFiles = hstore.getAllStoreFiles();
-      allHStoreFiles.addAll(0, hstoreFiles);
-    }
-    return allHStoreFiles;
-  }
-
   //////////////////////////////////////////////////////////////////////////////
   //////////////////////////////////////////////////////////////////////////////
   // get() methods for client use.
   // get() methods for client use.
   //////////////////////////////////////////////////////////////////////////////
   //////////////////////////////////////////////////////////////////////////////

+ 6 - 6
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java

@@ -23,7 +23,7 @@ import java.io.IOException;
 
 
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.hbase.io.KeyedData;
+import org.apache.hadoop.hbase.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.ipc.VersionedProtocol;
 
 
@@ -93,11 +93,11 @@ public interface HRegionInterface extends VersionedProtocol {
    * 
    * 
    * @param regionName region name
    * @param regionName region name
    * @param row row key
    * @param row row key
-   * @return array of values
+   * @return map of values
    * @throws IOException
    * @throws IOException
    */
    */
-  public KeyedData[] getRow(final Text regionName, final Text row)
-  throws IOException; //TODO
+  public MapWritable getRow(final Text regionName, final Text row)
+  throws IOException;
 
 
   //////////////////////////////////////////////////////////////////////////////
   //////////////////////////////////////////////////////////////////////////////
   // Start an atomic row insertion/update.  No changes are committed until the 
   // Start an atomic row insertion/update.  No changes are committed until the 
@@ -244,10 +244,10 @@ public interface HRegionInterface extends VersionedProtocol {
    * Get the next set of values
    * Get the next set of values
    * 
    * 
    * @param scannerId clientId passed to openScanner
    * @param scannerId clientId passed to openScanner
-   * @return array of values
+   * @return map of values
    * @throws IOException
    * @throws IOException
    */
    */
-  public KeyedData[] next(long scannerId) throws IOException; //TODO
+  public MapWritable next(long scannerId) throws IOException;
   
   
   /**
   /**
    * Close a scanner
    * Close a scanner

+ 24 - 16
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java

@@ -40,18 +40,22 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.filter.RowFilterInterface;
-import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.hbase.io.BatchOperation;
-import org.apache.hadoop.hbase.io.KeyedData;
-import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
 
 
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.BatchOperation;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.MapWritable;
+import org.apache.hadoop.hbase.util.Writables;
+
 /*******************************************************************************
 /*******************************************************************************
  * HRegionServer makes a set of HRegions available to clients.  It checks in with
  * HRegionServer makes a set of HRegions available to clients.  It checks in with
  * the HMaster. There are many HRegionServers in a single HBase deployment.
  * the HMaster. There are many HRegionServers in a single HBase deployment.
@@ -1021,22 +1025,24 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
   }
   }
 
 
   /** {@inheritDoc} */
   /** {@inheritDoc} */
-  public KeyedData[] getRow(final Text regionName, final Text row)
+  public MapWritable getRow(final Text regionName, final Text row)
   throws IOException {
   throws IOException {
     requestCount.incrementAndGet();
     requestCount.incrementAndGet();
     HRegion region = getRegion(regionName);
     HRegion region = getRegion(regionName);
+    MapWritable result = new MapWritable(HStoreKey.class,
+        ImmutableBytesWritable.class,
+        new TreeMap<WritableComparable, Writable>());
+    
     TreeMap<Text, byte[]> map = region.getFull(row);
     TreeMap<Text, byte[]> map = region.getFull(row);
-    KeyedData result[] = new KeyedData[map.size()];
-    int counter = 0;
     for (Map.Entry<Text, byte []> es: map.entrySet()) {
     for (Map.Entry<Text, byte []> es: map.entrySet()) {
-      result[counter++] =
-        new KeyedData(new HStoreKey(row, es.getKey()), es.getValue());
+      result.put(new HStoreKey(row, es.getKey()),
+          new ImmutableBytesWritable(es.getValue()));
     }
     }
     return result;
     return result;
   }
   }
 
 
   /** {@inheritDoc} */
   /** {@inheritDoc} */
-  public KeyedData[] next(final long scannerId)
+  public MapWritable next(final long scannerId)
   throws IOException {
   throws IOException {
     requestCount.incrementAndGet();
     requestCount.incrementAndGet();
     String scannerName = String.valueOf(scannerId);
     String scannerName = String.valueOf(scannerId);
@@ -1048,13 +1054,14 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     
     
     // Collect values to be returned here
     // Collect values to be returned here
     
     
-    ArrayList<KeyedData> values = new ArrayList<KeyedData>();
-    
-    TreeMap<Text, byte []> results = new TreeMap<Text, byte []>();
+    MapWritable values = new MapWritable(HStoreKey.class,
+        ImmutableBytesWritable.class,
+        new TreeMap<WritableComparable, Writable>());
     
     
     // Keep getting rows until we find one that has at least one non-deleted column value
     // Keep getting rows until we find one that has at least one non-deleted column value
     
     
     HStoreKey key = new HStoreKey();
     HStoreKey key = new HStoreKey();
+    TreeMap<Text, byte []> results = new TreeMap<Text, byte []>();
     while (s.next(key, results)) {
     while (s.next(key, results)) {
       for(Map.Entry<Text, byte []> e: results.entrySet()) {
       for(Map.Entry<Text, byte []> e: results.entrySet()) {
         HStoreKey k = new HStoreKey(key.getRow(), e.getKey(), key.getTimestamp());
         HStoreKey k = new HStoreKey(key.getRow(), e.getKey(), key.getTimestamp());
@@ -1063,8 +1070,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
           // Column value is deleted. Don't return it.
           // Column value is deleted. Don't return it.
           continue;
           continue;
         }
         }
-        values.add(new KeyedData(k, val));
+        values.put(k, new ImmutableBytesWritable(val));
       }
       }
+      
       if(values.size() > 0) {
       if(values.size() > 0) {
         // Row has something in it. Return the value.
         // Row has something in it. Return the value.
         break;
         break;
@@ -1074,7 +1082,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
       
       
       results.clear();
       results.clear();
     }
     }
-    return values.toArray(new KeyedData[values.size()]);
+    return values;
   }
   }
 
 
   /*
   /*

+ 34 - 8
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java

@@ -572,6 +572,7 @@ class HStore implements HConstants {
    */
    */
   void compactHelper(final boolean deleteSequenceInfo, long maxSeenSeqID)
   void compactHelper(final boolean deleteSequenceInfo, long maxSeenSeqID)
   throws IOException {
   throws IOException {
+    long maxId = maxSeenSeqID;
     synchronized(compactLock) {
     synchronized(compactLock) {
       Path curCompactStore =
       Path curCompactStore =
         HStoreFile.getHStoreDir(compactdir, regionName, familyName);
         HStoreFile.getHStoreDir(compactdir, regionName, familyName);
@@ -607,12 +608,12 @@ class HStore implements HConstants {
         
         
         // Compute the max-sequenceID seen in any of the to-be-compacted
         // Compute the max-sequenceID seen in any of the to-be-compacted
         // TreeMaps if it hasn't been passed in to us.
         // TreeMaps if it hasn't been passed in to us.
-        if (maxSeenSeqID == -1) {
+        if (maxId == -1) {
           for (HStoreFile hsf: toCompactFiles) {
           for (HStoreFile hsf: toCompactFiles) {
             long seqid = hsf.loadInfo(fs);
             long seqid = hsf.loadInfo(fs);
             if(seqid > 0) {
             if(seqid > 0) {
-              if(seqid > maxSeenSeqID) {
-                maxSeenSeqID = seqid;
+              if(seqid > maxId) {
+                maxId = seqid;
               }
               }
             }
             }
           }
           }
@@ -629,8 +630,8 @@ class HStore implements HConstants {
         }
         }
 
 
         // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
         // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
-        if((! deleteSequenceInfo) && maxSeenSeqID >= 0) {
-          compactedOutputFile.writeInfo(fs, maxSeenSeqID);
+        if((! deleteSequenceInfo) && maxId >= 0) {
+          compactedOutputFile.writeInfo(fs, maxId);
         } else {
         } else {
           compactedOutputFile.writeInfo(fs, -1);
           compactedOutputFile.writeInfo(fs, -1);
         }
         }
@@ -710,14 +711,35 @@ class HStore implements HConstants {
       }
       }
     }
     }
   }
   }
-  
+
+  /** Interface for generic reader for compactions */
   interface CompactionReader {
   interface CompactionReader {
+    
+    /**
+     * Closes the reader
+     * @throws IOException
+     */
     public void close() throws IOException;
     public void close() throws IOException;
+    
+    /**
+     * Get the next key/value pair
+     * 
+     * @param key
+     * @param val
+     * @return true if more data was returned
+     * @throws IOException
+     */
     public boolean next(WritableComparable key, Writable val)
     public boolean next(WritableComparable key, Writable val)
-      throws IOException;
+    throws IOException;
+    
+    /**
+     * Resets the reader
+     * @throws IOException
+     */
     public void reset() throws IOException;
     public void reset() throws IOException;
   }
   }
-  
+
+  /** A compaction reader for MapFile */
   class MapFileCompactionReader implements CompactionReader {
   class MapFileCompactionReader implements CompactionReader {
     final MapFile.Reader reader;
     final MapFile.Reader reader;
     
     
@@ -725,15 +747,18 @@ class HStore implements HConstants {
       this.reader = r;
       this.reader = r;
     }
     }
     
     
+    /** {@inheritDoc} */
     public void close() throws IOException {
     public void close() throws IOException {
       this.reader.close();
       this.reader.close();
     }
     }
 
 
+    /** {@inheritDoc} */
     public boolean next(WritableComparable key, Writable val)
     public boolean next(WritableComparable key, Writable val)
     throws IOException {
     throws IOException {
       return this.reader.next(key, val);
       return this.reader.next(key, val);
     }
     }
 
 
+    /** {@inheritDoc} */
     public void reset() throws IOException {
     public void reset() throws IOException {
       this.reader.reset();
       this.reader.reset();
     }
     }
@@ -1217,6 +1242,7 @@ class HStore implements HConstants {
     return new HStoreScanner(timestamp, targetCols, firstRow);
     return new HStoreScanner(timestamp, targetCols, firstRow);
   }
   }
   
   
+  /** {@inheritDoc} */
   @Override
   @Override
   public String toString() {
   public String toString() {
     return this.storeName;
     return this.storeName;

+ 61 - 33
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java

@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase;
 import java.io.IOException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collection;
+import java.util.Map;
 import java.util.Random;
 import java.util.Random;
 import java.util.SortedMap;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeMap;
@@ -32,8 +33,11 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.hbase.io.KeyedData;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RemoteException;
 
 
 /**
 /**
@@ -183,12 +187,15 @@ public class HTable implements HConstants {
         break;
         break;
         
         
       } catch (IOException e) {
       } catch (IOException e) {
+        if (e instanceof RemoteException) {
+          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+        }
         if (tries == numRetries - 1) {
         if (tries == numRetries - 1) {
-          if (e instanceof RemoteException) {
-            e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-          }
           throw e;
           throw e;
         }
         }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("reloading table servers because: " + e.getMessage());
+        }
         tableServers = connection.reloadTableServers(tableName);
         tableServers = connection.reloadTableServers(tableName);
       }
       }
       try {
       try {
@@ -225,13 +232,16 @@ public class HTable implements HConstants {
         break;
         break;
         
         
       } catch (IOException e) {
       } catch (IOException e) {
+        if (e instanceof RemoteException) {
+          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+        }
         if (tries == numRetries - 1) {
         if (tries == numRetries - 1) {
           // No more tries
           // No more tries
-          if (e instanceof RemoteException) {
-            e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-          }
           throw e;
           throw e;
         }
         }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("reloading table servers because: " + e.getMessage());
+        }
         tableServers = connection.reloadTableServers(tableName);
         tableServers = connection.reloadTableServers(tableName);
       }
       }
       try {
       try {
@@ -279,13 +289,16 @@ public class HTable implements HConstants {
         break;
         break;
     
     
       } catch (IOException e) {
       } catch (IOException e) {
+        if (e instanceof RemoteException) {
+          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+        }
         if (tries == numRetries - 1) {
         if (tries == numRetries - 1) {
           // No more tries
           // No more tries
-          if (e instanceof RemoteException) {
-            e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-          }
           throw e;
           throw e;
         }
         }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("reloading table servers because: " + e.getMessage());
+        }
         tableServers = connection.reloadTableServers(tableName);
         tableServers = connection.reloadTableServers(tableName);
       }
       }
       try {
       try {
@@ -315,7 +328,7 @@ public class HTable implements HConstants {
    */
    */
   public SortedMap<Text, byte[]> getRow(Text row) throws IOException {
   public SortedMap<Text, byte[]> getRow(Text row) throws IOException {
     checkClosed();
     checkClosed();
-    KeyedData[] value = null;
+    MapWritable value = null;
     for (int tries = 0; tries < numRetries; tries++) {
     for (int tries = 0; tries < numRetries; tries++) {
       HRegionLocation r = getRegionLocation(row);
       HRegionLocation r = getRegionLocation(row);
       HRegionInterface server =
       HRegionInterface server =
@@ -326,13 +339,16 @@ public class HTable implements HConstants {
         break;
         break;
         
         
       } catch (IOException e) {
       } catch (IOException e) {
+        if (e instanceof RemoteException) {
+          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+        }
         if (tries == numRetries - 1) {
         if (tries == numRetries - 1) {
           // No more tries
           // No more tries
-          if (e instanceof RemoteException) {
-            e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-          }
           throw e;
           throw e;
         }
         }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("reloading table servers because: " + e.getMessage());
+        }
         tableServers = connection.reloadTableServers(tableName);
         tableServers = connection.reloadTableServers(tableName);
       }
       }
       try {
       try {
@@ -342,10 +358,12 @@ public class HTable implements HConstants {
         // continue
         // continue
       }
       }
     }
     }
-    TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
-    if (value != null && value.length != 0) {
-      for (int i = 0; i < value.length; i++) {
-        results.put(value[i].getKey().getColumn(), value[i].getData());
+    SortedMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+    if (value != null && value.size() != 0) {
+      for (Map.Entry<WritableComparable, Writable> e: value.entrySet()) {
+        HStoreKey key = (HStoreKey) e.getKey();
+        results.put(key.getColumn(),
+            ((ImmutableBytesWritable) e.getValue()).get());
       }
       }
     }
     }
     return results;
     return results;
@@ -574,14 +592,17 @@ public class HTable implements HConstants {
           break;
           break;
 
 
         } catch (IOException e) {
         } catch (IOException e) {
+          if (e instanceof RemoteException) {
+            e = RemoteExceptionHandler.decodeRemoteException(
+                (RemoteException) e);
+          }
           if (tries < numRetries -1) {
           if (tries < numRetries -1) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("reloading table servers because: " + e.getMessage());
+            }
             tableServers = connection.reloadTableServers(tableName);
             tableServers = connection.reloadTableServers(tableName);
 
 
           } else {
           } else {
-            if (e instanceof RemoteException) {
-              e = RemoteExceptionHandler.decodeRemoteException(
-                  (RemoteException) e);
-            }
             throw e;
             throw e;
           }
           }
         }
         }
@@ -589,6 +610,7 @@ public class HTable implements HConstants {
           Thread.sleep(pause);
           Thread.sleep(pause);
 
 
         } catch (InterruptedException e) {
         } catch (InterruptedException e) {
+          // continue
         }
         }
       }
       }
     } finally {
     } finally {
@@ -702,13 +724,17 @@ public class HTable implements HConstants {
             break;
             break;
         
         
           } catch (IOException e) {
           } catch (IOException e) {
+            if (e instanceof RemoteException) {
+              e = RemoteExceptionHandler.decodeRemoteException(
+                  (RemoteException) e);
+            }
             if (tries == numRetries - 1) {
             if (tries == numRetries - 1) {
               // No more tries
               // No more tries
-              if (e instanceof RemoteException) {
-                e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-              }
               throw e;
               throw e;
             }
             }
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("reloading table servers because: " + e.getMessage());
+            }
             tableServers = connection.reloadTableServers(tableName);
             tableServers = connection.reloadTableServers(tableName);
             loadRegions();
             loadRegions();
           }
           }
@@ -732,20 +758,22 @@ public class HTable implements HConstants {
       if (this.closed) {
       if (this.closed) {
         return false;
         return false;
       }
       }
-      KeyedData[] values = null;
+      MapWritable values = null;
       do {
       do {
         values = this.server.next(this.scannerId);
         values = this.server.next(this.scannerId);
-      } while (values != null && values.length == 0 && nextScanner());
+      } while (values != null && values.size() == 0 && nextScanner());
 
 
-      if (values != null && values.length != 0) {
-        for (int i = 0; i < values.length; i++) {
-          key.setRow(values[i].getKey().getRow());
-          key.setVersion(values[i].getKey().getTimestamp());
+      if (values != null && values.size() != 0) {
+        for (Map.Entry<WritableComparable, Writable> e: values.entrySet()) {
+          HStoreKey k = (HStoreKey) e.getKey();
+          key.setRow(k.getRow());
+          key.setVersion(k.getTimestamp());
           key.setColumn(EMPTY_COLUMN);
           key.setColumn(EMPTY_COLUMN);
-          results.put(values[i].getKey().getColumn(), values[i].getData());
+          results.put(k.getColumn(),
+              ((ImmutableBytesWritable) e.getValue()).get());
         }
         }
       }
       }
-      return values == null ? false : values.length != 0;
+      return values == null ? false : values.size() != 0;
     }
     }
 
 
     /**
     /**

+ 0 - 74
src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedData.java

@@ -1,74 +0,0 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.io;
-import org.apache.hadoop.hbase.HStoreKey;
-import org.apache.hadoop.io.*;
-
-import java.io.*;
-
-/*******************************************************************************
- * KeyedData is just a data pair.
- * It includes an HStoreKey and some associated data.
- ******************************************************************************/
-public class KeyedData implements Writable {
-  HStoreKey key;
-  byte [] data;
-
-  /** Default constructor. Used by Writable interface */
-  public KeyedData() {
-    this.key = new HStoreKey();
-  }
-
-  /**
-   * Create a KeyedData object specifying the parts
-   * @param key HStoreKey
-   * @param data
-   */
-  public KeyedData(HStoreKey key, byte [] data) {
-    this.key = key;
-    this.data = data;
-  }
-
-  /** @return returns the key */
-  public HStoreKey getKey() {
-    return key;
-  }
-
-  /** @return - returns the value */
-  public byte [] getData() {
-    return data;
-  }
-
-  // Writable
-
-  /** {@inheritDoc} */
-  public void write(DataOutput out) throws IOException {
-    key.write(out);
-    out.writeInt(this.data.length);
-    out.write(this.data);
-  }
-  
-  /** {@inheritDoc} */
-  public void readFields(DataInput in) throws IOException {
-    key.readFields(in);
-    this.data = new byte[in.readInt()];
-    in.readFully(this.data);
-  }
-}

+ 0 - 86
src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/KeyedDataArrayWritable.java

@@ -1,86 +0,0 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.io;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-
-/**
- * Wraps an array of KeyedData items as a Writable. The array elements
- * may be null.
- */
-public class KeyedDataArrayWritable implements Writable {
-
-  private final static KeyedData NULL_KEYEDDATA = new KeyedData();
-
-  private KeyedData[] m_data;
-
-  /**
-   * Make a record of length 0
-   */
-  public KeyedDataArrayWritable() {
-    m_data = new KeyedData[0];
-  }
-
-  /** @return the array of KeyedData */
-  public KeyedData[] get() {
-    return m_data; 
-  }
-
-  /**
-   * Sets the KeyedData array
-   * 
-   * @param data array of KeyedData
-   */
-  public void set(KeyedData[] data) {
-    if(data == null) {
-      throw new NullPointerException("KeyedData[] cannot be null");
-    }
-    m_data = data;
-  }
-
-  // Writable
-  
-  /** {@inheritDoc} */
-  public void readFields(DataInput in) throws IOException {
-    int len = in.readInt();
-    m_data = new KeyedData[len];
-    for(int i = 0; i < len; i++) {
-      m_data[i] = new KeyedData();
-      m_data[i].readFields(in);
-    }
-  }
-
-  /** {@inheritDoc} */
-  public void write(DataOutput out) throws IOException {
-    int len = m_data.length;
-    out.writeInt(len);
-    for(int i = 0; i < len; i++) {
-      if(m_data[i] != null) {
-        m_data[i].write(out);
-      } else {
-        NULL_KEYEDDATA.write(out);
-      }
-    }
-  }
-}

+ 303 - 0
src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/MapWritable.java

@@ -0,0 +1,303 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.util.TreeMap;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.HConstants;
+
+/**
+ * 
+ */
+public class MapWritable implements Writable, Map<WritableComparable, Writable> {
+  private String keyClass = null;
+  private String valueClass = null;
+  private String mapClass = null;
+  private Map<WritableComparable, Writable> instance = null;
+  
+  /**
+   * Default constructor used by writable
+   */
+  public MapWritable() {}
+  
+  /**
+   * @param keyClass the class of the keys
+   * @param valueClass the class of the values
+   * @param instance the Map to be wrapped in this Writable
+   */
+  @SuppressWarnings("unchecked")
+  public MapWritable(Class keyClass, Class valueClass,
+      Map<WritableComparable, Writable> instance) {
+    
+    this.keyClass = keyClass.getName();
+    this.valueClass = valueClass.getName();
+    this.instance = instance;
+    this.mapClass = instance.getClass().getName();
+  }
+  
+  private void checkInitialized() {
+    if (keyClass == null || 
+        valueClass == null || 
+        mapClass == null || 
+        instance == null) {
+      
+      throw new IllegalStateException("object has not been properly initialized");
+    }
+  }
+
+  /** {@inheritDoc} */
+  public void clear() {
+    checkInitialized();
+    instance.clear();
+  }
+
+  /** {@inheritDoc} */
+  public boolean containsKey(Object key) {
+    checkInitialized();
+    return instance.containsKey(key);
+  }
+
+  /** {@inheritDoc} */
+  public boolean containsValue(Object value) {
+    checkInitialized();
+    return instance.containsValue(value);
+  }
+
+  /** {@inheritDoc} */
+  public Set<Map.Entry<WritableComparable, Writable>> entrySet() {
+    checkInitialized();
+    return instance.entrySet();
+  }
+
+  /** {@inheritDoc} */
+  public Writable get(Object key) {
+    checkInitialized();
+    return instance.get(key);
+  }
+  
+  /**
+   * Returns the value to which this map maps the specified key
+   * @param key
+   * @return value associated with specified key
+   */
+  public Writable get(WritableComparable key) {
+    checkInitialized();
+    return instance.get(key);
+  }
+
+  /** {@inheritDoc} */
+  public boolean isEmpty() {
+    checkInitialized();
+    return instance.isEmpty();
+  }
+
+  /** {@inheritDoc} */
+  public Set<WritableComparable> keySet() {
+    checkInitialized();
+    return instance.keySet();
+  }
+
+  /** {@inheritDoc} */
+  public Writable put(WritableComparable key, Writable value) {
+    checkInitialized();
+    return instance.put(key, value);
+  }
+
+  /** {@inheritDoc} */
+  public void putAll(Map<? extends WritableComparable,? extends Writable> t) {
+    checkInitialized();
+    instance.putAll(t);
+  }
+
+  /** {@inheritDoc} */
+  public Writable remove(Object key) {
+    checkInitialized();
+    return instance.remove(key);
+  }
+
+  /**
+   * Removes the mapping for this key from this map if it is present
+   * @param key
+   * @return value corresponding to key
+   */
+  public Writable remove(WritableComparable key) {
+    checkInitialized();
+    return instance.remove(key);
+  }
+
+  /** {@inheritDoc} */
+  public int size() {
+    checkInitialized();
+    return instance.size();
+  }
+
+  /** {@inheritDoc} */
+  public Collection<Writable> values() {
+    checkInitialized();
+    return instance.values();
+  }
+
+  // Writable
+  
+  /** {@inheritDoc} */
+  public void write(DataOutput out) throws IOException {
+    checkInitialized();
+    out.writeUTF(mapClass);
+    out.writeUTF(keyClass);
+    out.writeUTF(valueClass);
+    out.writeInt(instance.size());
+    
+    for (Map.Entry<WritableComparable, Writable> e: instance.entrySet()) {
+      e.getKey().write(out);
+      e.getValue().write(out);
+    }
+  }
+
+  /** {@inheritDoc} */
+  @SuppressWarnings("unchecked")
+  public void readFields(DataInput in) throws IOException {
+    mapClass = in.readUTF();
+    keyClass = in.readUTF();
+    valueClass = in.readUTF();
+    
+    instance = (Map<WritableComparable, Writable>) objectFactory(mapClass);
+    
+    int entries = in.readInt();
+    for (int i = 0; i < entries; i++) {
+      WritableComparable key = (WritableComparable) objectFactory(keyClass);
+      key.readFields(in);
+      
+      Writable value = (Writable) objectFactory(valueClass);
+      value.readFields(in);
+      
+      instance.put(key, value);
+    }
+  }
+  
+  private Object objectFactory(String className) throws IOException {
+    Object o = null;
+    String exceptionMessage = null;
+    try {
+      o = Class.forName(className).newInstance();
+      
+    } catch (ClassNotFoundException e) {
+      exceptionMessage = e.getMessage();
+      
+    } catch (InstantiationException e) {
+      exceptionMessage = e.getMessage();
+      
+    } catch (IllegalAccessException e) {
+      exceptionMessage = e.getMessage();
+      
+    } finally {
+      if (exceptionMessage != null) {
+        throw new IOException("error instantiating " + className + " because " +
+            exceptionMessage);
+      }
+    }
+    return o;
+  }
+  
+  /**
+   * A simple main program to test this class.
+   * 
+   * @param args not used
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public static void main(@SuppressWarnings("unused") String[] args)
+  throws IOException {
+    
+    HStoreKey[] keys = {
+        new HStoreKey(new Text("row1"), HConstants.COL_REGIONINFO),
+        new HStoreKey(new Text("row2"), HConstants.COL_SERVER),
+        new HStoreKey(new Text("row3"), HConstants.COL_STARTCODE)
+    };
+    
+    ImmutableBytesWritable[] values = {
+        new ImmutableBytesWritable("value1".getBytes()),
+        new ImmutableBytesWritable("value2".getBytes()),
+        new ImmutableBytesWritable("value3".getBytes())
+    };
+
+    @SuppressWarnings("unchecked")
+    MapWritable inMap = new MapWritable(HStoreKey.class,
+        ImmutableBytesWritable.class,
+        (Map) new TreeMap<HStoreKey, ImmutableBytesWritable>());
+    
+    for (int i = 0; i < keys.length; i++) {
+      inMap.put(keys[i], values[i]);
+    }
+
+    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+    DataOutput out = new DataOutputStream(bytes);
+    try {
+      inMap.write(out);
+      
+    } catch (IOException e) {
+      e.printStackTrace();
+      throw e;
+    }
+    
+    MapWritable outMap = new MapWritable();
+    DataInput in =
+      new DataInputStream(new ByteArrayInputStream(bytes.toByteArray()));
+    
+    try {
+      outMap.readFields(in);
+      
+    } catch (IOException e) {
+      e.printStackTrace();
+      throw e;
+    }
+    
+    if (outMap.size() != inMap.size()) {
+      System.err.println("outMap.size()=" + outMap.size() + " != " +
+          "inMap.size()=" + inMap.size());
+    }
+    
+    for (Map.Entry<WritableComparable, Writable> e: inMap.entrySet()) {
+      if (!outMap.containsKey(e.getKey())) {
+        System.err.println("outMap does not contain key " + e.getKey().toString());
+        continue;
+      }
+      if (((WritableComparable) outMap.get(e.getKey())).compareTo(
+          e.getValue()) != 0) {
+        System.err.println("output value for " + e.getKey().toString() + " != input value");
+      }
+    }
+    System.out.println("it worked!");
+  }
+}

+ 16 - 15
src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java

@@ -21,11 +21,15 @@ package org.apache.hadoop.hbase.mapred;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.ArrayList;
+import java.util.Map;
 
 
 import org.apache.hadoop.hbase.HStoreKey;
 import org.apache.hadoop.hbase.HStoreKey;
-import org.apache.hadoop.hbase.io.KeyedData;
-import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.MapWritable;
+
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.Reporter;
 
 
@@ -55,7 +59,8 @@ public class GroupingTableMap extends TableMap {
    *
    *
    * @param table table to be processed
    * @param table table to be processed
    * @param columns space separated list of columns to fetch
    * @param columns space separated list of columns to fetch
-   * @param groupColumns space separated list of columns used to form the key used in collect
+   * @param groupColumns space separated list of columns used to form the key
+   * used in collect
    * @param mapper map class
    * @param mapper map class
    * @param job job configuration object
    * @param job job configuration object
    */
    */
@@ -83,11 +88,11 @@ public class GroupingTableMap extends TableMap {
    * Pass the new key and value to reduce.
    * Pass the new key and value to reduce.
    * If any of the grouping columns are not found in the value, the record is skipped.
    * If any of the grouping columns are not found in the value, the record is skipped.
    *
    *
-   * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.KeyedDataArrayWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
+   * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.MapWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
    */
    */
   @Override
   @Override
   public void map(@SuppressWarnings("unused") HStoreKey key,
   public void map(@SuppressWarnings("unused") HStoreKey key,
-      KeyedDataArrayWritable value, TableOutputCollector output,
+      MapWritable value, TableOutputCollector output,
       @SuppressWarnings("unused") Reporter reporter) throws IOException {
       @SuppressWarnings("unused") Reporter reporter) throws IOException {
     
     
     byte[][] keyVals = extractKeyValues(value);
     byte[][] keyVals = extractKeyValues(value);
@@ -106,20 +111,16 @@ public class GroupingTableMap extends TableMap {
    * @param r
    * @param r
    * @return array of byte values
    * @return array of byte values
    */
    */
-  protected byte[][] extractKeyValues(KeyedDataArrayWritable r) {
+  protected byte[][] extractKeyValues(MapWritable r) {
     byte[][] keyVals = null;
     byte[][] keyVals = null;
     ArrayList<byte[]> foundList = new ArrayList<byte[]>();
     ArrayList<byte[]> foundList = new ArrayList<byte[]>();
     int numCols = m_columns.length;
     int numCols = m_columns.length;
     if(numCols > 0) {
     if(numCols > 0) {
-      KeyedData[] recVals = r.get();
-      boolean found = true;
-      for(int i = 0; i < numCols && found; i++) {
-        found = false;
-        for(int j = 0; j < recVals.length; j++) {
-          if(recVals[j].getKey().getColumn().equals(m_columns[i])) {
-            found = true;
-            byte[] val = recVals[j].getData();
-            foundList.add(val);
+      for (Map.Entry<WritableComparable, Writable> e: r.entrySet()) {
+        Text column = (Text) e.getKey();
+        for (int i = 0; i < numCols; i++) {
+          if (column.equals(m_columns[i])) {
+            foundList.add(((ImmutableBytesWritable) e.getValue()).get());
             break;
             break;
           }
           }
         }
         }

+ 3 - 3
src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java

@@ -22,7 +22,7 @@ package org.apache.hadoop.hbase.mapred;
 import java.io.IOException;
 import java.io.IOException;
 
 
 import org.apache.hadoop.hbase.HStoreKey;
 import org.apache.hadoop.hbase.HStoreKey;
-import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
+import org.apache.hadoop.hbase.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.Reporter;
 
 
@@ -40,10 +40,10 @@ public class IdentityTableMap extends TableMap {
   /**
   /**
    * Pass the key, value to reduce
    * Pass the key, value to reduce
    *
    *
-   * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.KeyedDataArrayWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
+   * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.MapWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
    */
    */
   @Override
   @Override
-  public void map(HStoreKey key, KeyedDataArrayWritable value,
+  public void map(HStoreKey key, MapWritable value,
       TableOutputCollector output,
       TableOutputCollector output,
       @SuppressWarnings("unused") Reporter reporter) throws IOException {
       @SuppressWarnings("unused") Reporter reporter) throws IOException {
     
     

+ 2 - 2
src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java

@@ -22,7 +22,7 @@ package org.apache.hadoop.hbase.mapred;
 import java.io.IOException;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Iterator;
 
 
-import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
+import org.apache.hadoop.hbase.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.Reporter;
 
 
@@ -48,7 +48,7 @@ public class IdentityTableReduce extends TableReduce {
       @SuppressWarnings("unused") Reporter reporter) throws IOException {
       @SuppressWarnings("unused") Reporter reporter) throws IOException {
     
     
     while(values.hasNext()) {
     while(values.hasNext()) {
-      KeyedDataArrayWritable r = (KeyedDataArrayWritable)values.next();
+      MapWritable r = (MapWritable)values.next();
       output.collect(key, r);
       output.collect(key, r);
     }
     }
   }
   }

+ 27 - 27
src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java

@@ -21,13 +21,11 @@ package org.apache.hadoop.hbase.mapred;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.util.Map;
 import java.util.Map;
+import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeMap;
-import java.util.ArrayList;
 
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 
 
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
 
 
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputFormat;
@@ -40,8 +38,8 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.hbase.HTable;
 import org.apache.hadoop.hbase.HTable;
 import org.apache.hadoop.hbase.HScannerInterface;
 import org.apache.hadoop.hbase.HScannerInterface;
 import org.apache.hadoop.hbase.HStoreKey;
 import org.apache.hadoop.hbase.HStoreKey;
-import org.apache.hadoop.hbase.io.KeyedData;
-import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.MapWritable;
 
 
 import org.apache.log4j.Logger;
 import org.apache.log4j.Logger;
 
 
@@ -49,7 +47,7 @@ import org.apache.log4j.Logger;
  * Convert HBase tabular data into a format that is consumable by Map/Reduce
  * Convert HBase tabular data into a format that is consumable by Map/Reduce
  */
  */
 public class TableInputFormat
 public class TableInputFormat
-  implements InputFormat<HStoreKey, KeyedDataArrayWritable>, JobConfigurable {
+implements InputFormat<HStoreKey, MapWritable>, JobConfigurable {
   
   
   static final Logger LOG = Logger.getLogger(TableInputFormat.class.getName());
   static final Logger LOG = Logger.getLogger(TableInputFormat.class.getName());
 
 
@@ -64,11 +62,12 @@ public class TableInputFormat
   HTable m_table;
   HTable m_table;
 
 
   /**
   /**
-   * Iterate over an HBase table data, return (HStoreKey, KeyedDataArrayWritable) pairs
+   * Iterate over an HBase table data,
+   * return (HStoreKey, MapWritable<Text, ImmutableBytesWritable>) pairs
    */
    */
-  class TableRecordReader implements RecordReader<HStoreKey, KeyedDataArrayWritable> {
+  class TableRecordReader implements RecordReader<HStoreKey, MapWritable> {
     private HScannerInterface m_scanner;
     private HScannerInterface m_scanner;
-    private TreeMap<Text, byte[]> m_row; // current buffer
+    private SortedMap<Text, byte[]> m_row; // current buffer
     private Text m_endRow;
     private Text m_endRow;
 
 
     /**
     /**
@@ -102,12 +101,15 @@ public class TableInputFormat
     }
     }
 
 
     /**
     /**
-     * @return KeyedDataArrayWritable of KeyedData
+     * @return MapWritable
      *
      *
      * @see org.apache.hadoop.mapred.RecordReader#createValue()
      * @see org.apache.hadoop.mapred.RecordReader#createValue()
      */
      */
-    public KeyedDataArrayWritable createValue() {
-      return new KeyedDataArrayWritable();
+    @SuppressWarnings("unchecked")
+    public MapWritable createValue() {
+      return new MapWritable((Class) Text.class,
+          (Class) ImmutableBytesWritable.class,
+          (Map) new TreeMap<Text, ImmutableBytesWritable>());
     }
     }
 
 
     /** {@inheritDoc} */
     /** {@inheritDoc} */
@@ -125,34 +127,31 @@ public class TableInputFormat
 
 
     /**
     /**
      * @param key HStoreKey as input key.
      * @param key HStoreKey as input key.
-     * @param value KeyedDataArrayWritable as input value
+     * @param value MapWritable as input value
+     * 
+     * Converts HScannerInterface.next(HStoreKey, SortedMap<Text, byte[]>) to
+     * HStoreKey, MapWritable<Text, ImmutableBytesWritable>
      * 
      * 
-     * Converts HScannerInterface.next(HStoreKey, TreeMap(Text, byte[])) to
-     *                                (HStoreKey, KeyedDataArrayWritable)
      * @return true if there was more data
      * @return true if there was more data
      * @throws IOException
      * @throws IOException
      */
      */
-    public boolean next(HStoreKey key, KeyedDataArrayWritable value) throws IOException {
+    @SuppressWarnings("unchecked")
+    public boolean next(HStoreKey key, MapWritable value) throws IOException {
       LOG.debug("start next");
       LOG.debug("start next");
       m_row.clear();
       m_row.clear();
       HStoreKey tKey = key;
       HStoreKey tKey = key;
       boolean hasMore = m_scanner.next(tKey, m_row);
       boolean hasMore = m_scanner.next(tKey, m_row);
 
 
       if(hasMore) {
       if(hasMore) {
-        if(m_endRow.getLength() > 0 && (tKey.getRow().compareTo(m_endRow) < 0)) {
+        if(m_endRow.getLength() > 0 &&
+            (tKey.getRow().compareTo(m_endRow) < 0)) {
+          
           hasMore = false;
           hasMore = false;
+          
         } else {
         } else {
-          KeyedDataArrayWritable rowVal = value;
-          ArrayList<KeyedData> columns = new ArrayList<KeyedData>();
-
           for(Map.Entry<Text, byte[]> e: m_row.entrySet()) {
           for(Map.Entry<Text, byte[]> e: m_row.entrySet()) {
-            HStoreKey keyCol = new HStoreKey(tKey);
-            keyCol.setColumn(e.getKey());
-            columns.add(new KeyedData(keyCol, e.getValue()));
+            value.put(e.getKey(), new ImmutableBytesWritable(e.getValue()));
           }
           }
-
-          // set the output
-          rowVal.set(columns.toArray(new KeyedData[columns.size()]));
         }
         }
       }
       }
       LOG.debug("end next");
       LOG.debug("end next");
@@ -161,7 +160,8 @@ public class TableInputFormat
 
 
   }
   }
 
 
-  public RecordReader<HStoreKey, KeyedDataArrayWritable> getRecordReader(
+  /** {@inheritDoc} */
+  public RecordReader<HStoreKey, MapWritable> getRecordReader(
       InputSplit split,
       InputSplit split,
       @SuppressWarnings("unused") JobConf job,
       @SuppressWarnings("unused") JobConf job,
       @SuppressWarnings("unused") Reporter reporter) throws IOException {
       @SuppressWarnings("unused") Reporter reporter) throws IOException {

+ 5 - 4
src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableMap.java

@@ -28,7 +28,7 @@ import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
+import org.apache.hadoop.hbase.io.MapWritable;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HStoreKey;
 import org.apache.hadoop.hbase.HStoreKey;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
@@ -39,6 +39,7 @@ import org.apache.log4j.Logger;
  * If the column does not exist, the record is not passed to Reduce.
  * If the column does not exist, the record is not passed to Reduce.
  *
  *
  */
  */
+@SuppressWarnings("unchecked")
 public abstract class TableMap extends MapReduceBase implements Mapper {
 public abstract class TableMap extends MapReduceBase implements Mapper {
 
 
   private static final Logger LOG = Logger.getLogger(TableMap.class.getName());
   private static final Logger LOG = Logger.getLogger(TableMap.class.getName());
@@ -64,7 +65,7 @@ public abstract class TableMap extends MapReduceBase implements Mapper {
     
     
     job.setInputFormat(TableInputFormat.class);
     job.setInputFormat(TableInputFormat.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(KeyedDataArrayWritable.class);
+    job.setOutputValueClass(MapWritable.class);
     job.setMapperClass(mapper);
     job.setMapperClass(mapper);
     job.setInputPath(new Path(table));
     job.setInputPath(new Path(table));
     job.set(TableInputFormat.COLUMN_LIST, columns);
     job.set(TableInputFormat.COLUMN_LIST, columns);
@@ -95,7 +96,7 @@ public abstract class TableMap extends MapReduceBase implements Mapper {
     if(m_collector.collector == null) {
     if(m_collector.collector == null) {
       m_collector.collector = output;
       m_collector.collector = output;
     }
     }
-    map((HStoreKey)key, (KeyedDataArrayWritable)value, m_collector, reporter);
+    map((HStoreKey)key, (MapWritable)value, m_collector, reporter);
     LOG.debug("end map");
     LOG.debug("end map");
   }
   }
 
 
@@ -109,6 +110,6 @@ public abstract class TableMap extends MapReduceBase implements Mapper {
    * @param reporter
    * @param reporter
    * @throws IOException
    * @throws IOException
    */
    */
-  public abstract void map(HStoreKey key, KeyedDataArrayWritable value, 
+  public abstract void map(HStoreKey key, MapWritable value, 
       TableOutputCollector output, Reporter reporter) throws IOException;
       TableOutputCollector output, Reporter reporter) throws IOException;
 }
 }

+ 4 - 3
src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java

@@ -24,13 +24,14 @@ import java.io.IOException;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.OutputCollector;
 
 
-import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
+import org.apache.hadoop.hbase.io.MapWritable;
 
 
 /**
 /**
  * Refine the types that can be collected from a Table Map/Reduce jobs.
  * Refine the types that can be collected from a Table Map/Reduce jobs.
  */
  */
 public class TableOutputCollector {
 public class TableOutputCollector {
   /** The collector object */
   /** The collector object */
+  @SuppressWarnings("unchecked")
   public OutputCollector collector;
   public OutputCollector collector;
 
 
   /**
   /**
@@ -40,8 +41,8 @@ public class TableOutputCollector {
    * @param value
    * @param value
    * @throws IOException
    * @throws IOException
    */
    */
-  public void collect(Text key, KeyedDataArrayWritable value)
-  throws IOException {
+  @SuppressWarnings("unchecked")
+  public void collect(Text key, MapWritable value) throws IOException {
     collector.collect(key, value);
     collector.collect(key, value);
   }
   }
 }
 }

+ 20 - 27
src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java

@@ -20,6 +20,7 @@
 package org.apache.hadoop.hbase.mapred;
 package org.apache.hadoop.hbase.mapred;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.util.Map;
 
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
@@ -34,8 +35,8 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Progressable;
 
 
 import org.apache.hadoop.hbase.HTable;
 import org.apache.hadoop.hbase.HTable;
-import org.apache.hadoop.hbase.io.KeyedData;
-import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.MapWritable;
 
 
 import org.apache.log4j.Logger;
 import org.apache.log4j.Logger;
 
 
@@ -43,7 +44,7 @@ import org.apache.log4j.Logger;
  * Convert Map/Reduce output and write it to an HBase table
  * Convert Map/Reduce output and write it to an HBase table
  */
  */
 public class TableOutputFormat
 public class TableOutputFormat
-  extends OutputFormatBase<Text, KeyedDataArrayWritable> {
+  extends OutputFormatBase<Text, MapWritable> {
 
 
   /** JobConf parameter that specifies the output table */
   /** JobConf parameter that specifies the output table */
   public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
   public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
@@ -58,8 +59,7 @@ public class TableOutputFormat
    * and write to an HBase table
    * and write to an HBase table
    */
    */
   protected class TableRecordWriter
   protected class TableRecordWriter
-    implements RecordWriter<Text, KeyedDataArrayWritable> {
-    
+    implements RecordWriter<Text, MapWritable> {
     private HTable m_table;
     private HTable m_table;
 
 
     /**
     /**
@@ -74,25 +74,17 @@ public class TableOutputFormat
     /** {@inheritDoc} */
     /** {@inheritDoc} */
     public void close(@SuppressWarnings("unused") Reporter reporter) {}
     public void close(@SuppressWarnings("unused") Reporter reporter) {}
 
 
-    /**
-     * Expect key to be of type Text
-     * Expect value to be of type KeyedDataArrayWritable
-     *
-     * @see org.apache.hadoop.mapred.RecordWriter#write(org.apache.hadoop.io.WritableComparable, org.apache.hadoop.io.Writable)
-     */
-    public void write(Text key, KeyedDataArrayWritable value) throws IOException {
+    /** {@inheritDoc} */
+    public void write(Text key, MapWritable value) throws IOException {
       LOG.debug("start write");
       LOG.debug("start write");
-      Text tKey = key;
-      KeyedDataArrayWritable tValue = value;
-      KeyedData[] columns = tValue.get();
 
 
       // start transaction
       // start transaction
       
       
-      long xid = m_table.startUpdate(tKey);
-      
-      for(int i = 0; i < columns.length; i++) {
-        KeyedData column = columns[i];
-        m_table.put(xid, column.getKey().getColumn(), column.getData());
+      long xid = m_table.startUpdate(key);
+
+      for (Map.Entry<WritableComparable, Writable> e: value.entrySet()) {
+        m_table.put(xid, (Text)e.getKey(),
+            ((ImmutableBytesWritable)e.getValue()).get());
       }
       }
       
       
       // end transaction
       // end transaction
@@ -103,14 +95,14 @@ public class TableOutputFormat
     }
     }
   }
   }
   
   
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.mapred.OutputFormatBase#getRecordWriter(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.mapred.JobConf, java.lang.String, org.apache.hadoop.util.Progressable)
-   */
   /** {@inheritDoc} */
   /** {@inheritDoc} */
   @Override
   @Override
-  @SuppressWarnings("unused")
-  public RecordWriter getRecordWriter(FileSystem ignored, JobConf job,
-      String name, Progressable progress) throws IOException {
+  @SuppressWarnings("unchecked")
+  public RecordWriter getRecordWriter(
+      @SuppressWarnings("unused") FileSystem ignored,
+      JobConf job,
+      @SuppressWarnings("unused") String name,
+      @SuppressWarnings("unused") Progressable progress) throws IOException {
     
     
     // expecting exactly one path
     // expecting exactly one path
     
     
@@ -119,8 +111,9 @@ public class TableOutputFormat
     HTable table = null;
     HTable table = null;
     try {
     try {
       table = new HTable(job, tableName);
       table = new HTable(job, tableName);
-    } catch(Exception e) {
+    } catch(IOException e) {
       LOG.error(e);
       LOG.error(e);
+      throw e;
     }
     }
     LOG.debug("end get writer");
     LOG.debug("end get writer");
     return new TableRecordWriter(table);
     return new TableRecordWriter(table);

+ 1 - 0
src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java

@@ -34,6 +34,7 @@ import org.apache.log4j.Logger;
 /**
 /**
  * Write a table, sorting by the input key
  * Write a table, sorting by the input key
  */
  */
+@SuppressWarnings("unchecked")
 public abstract class TableReduce extends MapReduceBase implements Reducer {
 public abstract class TableReduce extends MapReduceBase implements Reducer {
   private static final Logger LOG =
   private static final Logger LOG =
     Logger.getLogger(TableReduce.class.getName());
     Logger.getLogger(TableReduce.class.getName());

+ 5 - 5
src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java

@@ -32,13 +32,13 @@ import org.apache.hadoop.io.Text;
  * Abstract base class for test cases. Performs all static initialization
  * Abstract base class for test cases. Performs all static initialization
  */
  */
 public abstract class HBaseTestCase extends TestCase {
 public abstract class HBaseTestCase extends TestCase {
-  public final static String COLFAMILY_NAME1 = "colfamily1:";
-  public final static String COLFAMILY_NAME2 = "colfamily2:";
-  public final static String COLFAMILY_NAME3 = "colfamily3:";
+  protected final static String COLFAMILY_NAME1 = "colfamily1:";
+  protected final static String COLFAMILY_NAME2 = "colfamily2:";
+  protected final static String COLFAMILY_NAME3 = "colfamily3:";
   protected Path testDir = null;
   protected Path testDir = null;
   protected FileSystem localFs = null;
   protected FileSystem localFs = null;
-  public static final char FIRST_CHAR = 'a';
-  public static final char LAST_CHAR = 'z';
+  protected static final char FIRST_CHAR = 'a';
+  protected static final char LAST_CHAR = 'z';
   
   
   static {
   static {
     StaticTestEnvironment.initialize();
     StaticTestEnvironment.initialize();

+ 1 - 0
src/contrib/hbase/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java

@@ -149,6 +149,7 @@ public class PerformanceEvaluation implements HConstants {
   /**
   /**
    * MapReduce job that runs a performance evaluation client in each map task.
    * MapReduce job that runs a performance evaluation client in each map task.
    */
    */
+  @SuppressWarnings("unchecked")
   public static class EvaluationMapTask extends MapReduceBase
   public static class EvaluationMapTask extends MapReduceBase
   implements Mapper {
   implements Mapper {
     /** configuration parameter name that contains the command */
     /** configuration parameter name that contains the command */

+ 2 - 0
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java

@@ -30,10 +30,12 @@ import org.apache.commons.logging.LogFactory;
 public class TestCompaction extends HBaseTestCase {
 public class TestCompaction extends HBaseTestCase {
   static final Log LOG = LogFactory.getLog(TestCompaction.class.getName());
   static final Log LOG = LogFactory.getLog(TestCompaction.class.getName());
 
 
+  @Override
   protected void setUp() throws Exception {
   protected void setUp() throws Exception {
     super.setUp();
     super.setUp();
   }
   }
   
   
+  @Override
   protected void tearDown() throws Exception {
   protected void tearDown() throws Exception {
     super.tearDown();
     super.tearDown();
   }
   }

+ 11 - 6
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java

@@ -32,12 +32,15 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.hbase.filter.RegExpRowFilter;
 import org.apache.hadoop.hbase.filter.RegExpRowFilter;
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
 import org.apache.hadoop.hbase.filter.RowFilterSet;
 import org.apache.hadoop.hbase.filter.RowFilterSet;
 import org.apache.hadoop.hbase.filter.StopRowFilter;
 import org.apache.hadoop.hbase.filter.StopRowFilter;
 import org.apache.hadoop.hbase.filter.WhileMatchRowFilter;
 import org.apache.hadoop.hbase.filter.WhileMatchRowFilter;
-import org.apache.hadoop.hbase.io.KeyedData;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.MapWritable;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Text;
 
 
@@ -211,13 +214,15 @@ public class TestScanner2 extends HBaseClusterTestCase {
           System.currentTimeMillis(), null);
           System.currentTimeMillis(), null);
       while (true) {
       while (true) {
         TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
         TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
-        KeyedData[] values = regionServer.next(scannerId);
-        if (values.length == 0) {
+        MapWritable values = regionServer.next(scannerId);
+        if (values == null || values.size() == 0) {
           break;
           break;
         }
         }
-
-        for (int i = 0; i < values.length; i++) {
-          results.put(values[i].getKey().getColumn(), values[i].getData());
+        
+        for (Map.Entry<WritableComparable, Writable> e: values.entrySet()) {
+          HStoreKey k = (HStoreKey) e.getKey();
+          results.put(k.getColumn(),
+              ((ImmutableBytesWritable) e.getValue()).get());
         }
         }
 
 
         HRegionInfo info = (HRegionInfo) Writables.getWritable(
         HRegionInfo info = (HRegionInfo) Writables.getWritable(

+ 19 - 19
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java

@@ -38,8 +38,8 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.Reporter;
 
 
-import org.apache.hadoop.hbase.io.KeyedData;
-import org.apache.hadoop.hbase.io.KeyedDataArrayWritable;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.MapWritable;
 
 
 import org.apache.hadoop.hbase.mapred.TableMap;
 import org.apache.hadoop.hbase.mapred.TableMap;
 import org.apache.hadoop.hbase.mapred.TableOutputCollector;
 import org.apache.hadoop.hbase.mapred.TableOutputCollector;
@@ -150,44 +150,44 @@ public class TestTableMapReduce extends HBaseTestCase {
     /**
     /**
      * Pass the key, and reversed value to reduce
      * Pass the key, and reversed value to reduce
      *
      *
-     * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.KeyedDataArrayWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
+     * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.hbase.io.MapWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
      */
      */
+    @SuppressWarnings("unchecked")
     @Override
     @Override
-    public void map(HStoreKey key, KeyedDataArrayWritable value,
+    public void map(HStoreKey key, MapWritable value,
         TableOutputCollector output,
         TableOutputCollector output,
         @SuppressWarnings("unused") Reporter reporter) throws IOException {
         @SuppressWarnings("unused") Reporter reporter) throws IOException {
       
       
       Text tKey = key.getRow();
       Text tKey = key.getRow();
-      KeyedData[] columns = value.get();
       
       
-      if(columns.length != 1) {
+      if(value.size() != 1) {
         throw new IOException("There should only be one input column");
         throw new IOException("There should only be one input column");
       }
       }
-      
-      if(!columns[0].getKey().getColumn().equals(TEXT_INPUT_COLUMN)) {
+
+      Text[] keys = value.keySet().toArray(new Text[value.size()]);
+      if(!keys[0].equals(TEXT_INPUT_COLUMN)) {
         throw new IOException("Wrong input column. Expected: " + INPUT_COLUMN
         throw new IOException("Wrong input column. Expected: " + INPUT_COLUMN
-            + " but got: " + columns[0].getKey().getColumn());
+            + " but got: " + keys[0]);
       }
       }
 
 
-      // Get the input column key and change it to the output column key
-      
-      HStoreKey column = columns[0].getKey();
-      column.setColumn(TEXT_OUTPUT_COLUMN);
-      
       // Get the original value and reverse it
       // Get the original value and reverse it
       
       
-      String originalValue = new String(columns[0].getData());
+      String originalValue =
+        new String(((ImmutableBytesWritable)value.get(keys[0])).get());
       StringBuilder newValue = new StringBuilder();
       StringBuilder newValue = new StringBuilder();
       for(int i = originalValue.length() - 1; i >= 0; i--) {
       for(int i = originalValue.length() - 1; i >= 0; i--) {
         newValue.append(originalValue.charAt(i));
         newValue.append(originalValue.charAt(i));
       }
       }
       
       
       // Now set the value to be collected
       // Now set the value to be collected
+
+      MapWritable outval = new MapWritable((Class) Text.class,
+          (Class) ImmutableBytesWritable.class,
+          (Map) new TreeMap<Text, ImmutableBytesWritable>());
+      outval.put(TEXT_OUTPUT_COLUMN,
+          new ImmutableBytesWritable(newValue.toString().getBytes()));
       
       
-      columns[0] = new KeyedData(column, newValue.toString().getBytes());
-      value.set(columns);
-      
-      output.collect(tKey, value);
+      output.collect(tKey, outval);
     }
     }
   }
   }