Explorar el Código

HADOOP-1538 Provide capability for client specified time stamps in HBase
HADOOP-1466 Clean up visibility and javadoc issues in HBase.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@554811 13f79535-47bb-0310-9956-ffa450edef68

Jim Kellerman hace 18 años
padre
commit
16bbc19295

+ 3 - 0
src/contrib/hbase/CHANGES.txt

@@ -53,4 +53,7 @@ Trunk (unreleased changes)
  31. HADOOP-1566 Key-making utility
  32. HADOOP-1415 Provide configurable per-column bloom filters. 
      HADOOP-1466 Clean up visibility and javadoc issues in HBase.
+ 33. HADOOP-1538 Provide capability for client specified time stamps in HBase
+     HADOOP-1466 Clean up visibility and javadoc issues in HBase.
+
 

+ 73 - 23
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java

@@ -74,6 +74,9 @@ public class HClient implements HConstants {
       this.serverAddress = serverAddress;
     }
     
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public String toString() {
       return "address: " + this.serverAddress.toString() + ", regioninfo: " +
@@ -312,7 +315,7 @@ public class HClient implements HConstants {
       long scannerId = -1L;
       try {
         scannerId = server.openScanner(firstMetaServer.regionInfo.regionName,
-            REGIONINFO, tableName);
+            REGIONINFO, tableName, System.currentTimeMillis(), null);
         KeyedData[] values = server.next(scannerId);
         if(values == null || values.length == 0) {
           break;
@@ -417,7 +420,7 @@ public class HClient implements HConstants {
       long scannerId = -1L;
       try {
         scannerId = server.openScanner(firstMetaServer.regionInfo.regionName,
-            REGIONINFO, tableName);
+            REGIONINFO, tableName, System.currentTimeMillis(), null);
         boolean isenabled = false;
         while(true) {
           KeyedData[] values = server.next(scannerId);
@@ -500,7 +503,7 @@ public class HClient implements HConstants {
       long scannerId = -1L;
       try {
         scannerId = server.openScanner(firstMetaServer.regionInfo.regionName,
-            REGIONINFO, tableName);
+            REGIONINFO, tableName, System.currentTimeMillis(), null);
         boolean disabled = false;
         while(true) {
           KeyedData[] values = server.next(scannerId);
@@ -807,7 +810,8 @@ public class HClient implements HConstants {
       long scannerId = -1L;
       try {
         scannerId =
-          server.openScanner(t.regionInfo.regionName, META_COLUMNS, tableName);
+          server.openScanner(t.regionInfo.regionName, META_COLUMNS, tableName,
+              System.currentTimeMillis(), null);
 
         DataInputBuffer inbuf = new DataInputBuffer();
         while(true) {
@@ -963,7 +967,7 @@ public class HClient implements HConstants {
       long scannerId = -1L;
       try {
         scannerId = server.openScanner(t.regionInfo.regionName,
-            META_COLUMNS, EMPTY_START_ROW);
+            META_COLUMNS, EMPTY_START_ROW, System.currentTimeMillis(), null);
         
         DataInputBuffer inbuf = new DataInputBuffer();
         while(true) {
@@ -1180,9 +1184,38 @@ public class HClient implements HConstants {
    * @throws IOException
    */
   public synchronized HScannerInterface obtainScanner(Text[] columns,
-      Text startRow)
-  throws IOException {
-    return obtainScanner(columns, startRow, null);
+      Text startRow) throws IOException {
+    return obtainScanner(columns, startRow, System.currentTimeMillis(), null);
+  }
+  
+  /** 
+   * Get a scanner on the current table starting at the specified row.
+   * Return the specified columns.
+   *
+   * @param columns array of columns to return
+   * @param startRow starting row in table to scan
+   * @param timestamp only return results whose timestamp <= this value
+   * @return scanner
+   * @throws IOException
+   */
+  public synchronized HScannerInterface obtainScanner(Text[] columns,
+      Text startRow, long timestamp) throws IOException {
+    return obtainScanner(columns, startRow, timestamp, null);
+  }
+  
+  /** 
+   * Get a scanner on the current table starting at the specified row.
+   * Return the specified columns.
+   *
+   * @param columns array of columns to return
+   * @param startRow starting row in table to scan
+   * @param filter a row filter using row-key regexp and/or column data filter.
+   * @return scanner
+   * @throws IOException
+   */
+  public synchronized HScannerInterface obtainScanner(Text[] columns,
+      Text startRow, RowFilterInterface filter) throws IOException { 
+    return obtainScanner(columns, startRow, System.currentTimeMillis(), filter);
   }
   
   /** 
@@ -1191,19 +1224,20 @@ public class HClient implements HConstants {
    *
    * @param columns array of columns to return
    * @param startRow starting row in table to scan
+   * @param timestamp only return results whose timestamp <= this value
    * @param filter a row filter using row-key regexp and/or column data filter.
    * @return scanner
    * @throws IOException
    */
   public synchronized HScannerInterface obtainScanner(Text[] columns,
-      Text startRow, RowFilterInterface filter)
-  throws IOException { 
+      Text startRow, long timestamp, RowFilterInterface filter)
+  throws IOException {
     if(this.tableServers == null) {
       throw new IllegalStateException("Must open table first");
     }
-    return new ClientScanner(columns, startRow, filter);
+    return new ClientScanner(columns, startRow, timestamp, filter);
   }
-  
+
   /*
    * @return General HClient RetryPolicy instance.
    */
@@ -1361,8 +1395,21 @@ public class HClient implements HConstants {
    * @throws IOException
    */
   public void commit(long lockid) throws IOException {
+    commit(lockid, System.currentTimeMillis());
+  }
+
+  /** 
+   * Finalize a row mutation
+   *
+   * @param lockid              - lock id returned from startUpdate
+   * @param timestamp           - time to associate with the change
+   * @throws IOException
+   */
+  public void commit(long lockid, long timestamp) throws IOException {
     try {
-      this.currentServer.commit(this.currentRegion, this.clientid, lockid);
+      this.currentServer.commit(this.currentRegion, this.clientid, lockid,
+          timestamp);
+      
     } finally {
       this.currentServer = null;
       this.currentRegion = null;
@@ -1399,6 +1446,7 @@ public class HClient implements HConstants {
     private final Text EMPTY_COLUMN = new Text();
     private Text[] columns;
     private Text startRow;
+    private long scanTime;
     private boolean closed;
     private RegionLocation[] regions;
     @SuppressWarnings("hiding")
@@ -1422,10 +1470,11 @@ public class HClient implements HConstants {
       this.regions = info.toArray(new RegionLocation[info.size()]);
     }
     
-    ClientScanner(Text[] columns, Text startRow, RowFilterInterface filter)
-    throws IOException {
+    ClientScanner(Text[] columns, Text startRow, long timestamp,
+        RowFilterInterface filter) throws IOException {
       this.columns = columns;
       this.startRow = startRow;
+      this.scanTime = timestamp;
       this.closed = false;
       this.filter = filter;
       if (filter != null) {
@@ -1462,11 +1511,12 @@ public class HClient implements HConstants {
             if (this.filter == null) {
               this.scannerId = this.server.openScanner(info.regionInfo.regionName,
                       this.columns, currentRegion == 0 ? this.startRow
-                          : EMPTY_START_ROW);
+                          : EMPTY_START_ROW, scanTime, null);
             } else {
-              this.scannerId = this.server.openScanner(info.regionInfo.regionName,
-                      this.columns, currentRegion == 0 ? this.startRow
-                          : EMPTY_START_ROW, filter);
+              this.scannerId =
+                this.server.openScanner(info.regionInfo.regionName,
+                    this.columns, currentRegion == 0 ? this.startRow
+                        : EMPTY_START_ROW, scanTime, filter);
             }
 
             break;
@@ -1488,8 +1538,8 @@ public class HClient implements HConstants {
       return true;
     }
     
-    /* (non-Javadoc)
-     * @see org.apache.hadoop.hbase.HScannerInterface#next(org.apache.hadoop.hbase.HStoreKey, java.util.TreeMap)
+    /**
+     * {@inheritDoc}
      */
     public boolean next(HStoreKey key, TreeMap<Text, byte[]> results) throws IOException {
       if(this.closed) {
@@ -1511,8 +1561,8 @@ public class HClient implements HConstants {
       return values == null ? false : values.length != 0;
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.hadoop.hbase.HScannerInterface#close()
+    /**
+     * {@inheritDoc}
      */
     public void close() throws IOException {
       if(this.scannerId != -1L) {

+ 15 - 3
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HInternalScannerInterface.java

@@ -26,10 +26,22 @@ import org.apache.hadoop.io.Text;
  * is specified) or if multiple members of the same column family were
  * specified. If so, we need to ignore the timestamp to ensure that we get all
  * the family members, as they may have been last updated at different times.
- * This interface exposes two APIs for querying the scanner.
  */
 public interface HInternalScannerInterface {
   
+  /**
+   * Grab the next row's worth of values. The HScanner will return the most
+   * recent data value for each row that is not newer than the target time.
+   * 
+   * If a dataFilter is defined, it will be used to skip rows that do not
+   * match its criteria. It may cause the scanner to stop prematurely if it
+   * knows that it will no longer accept the remaining results.
+   * 
+   * @param key HStoreKey containing row and timestamp
+   * @param results Map of column/value pairs
+   * @return true if a value was found
+   * @throws IOException
+   */
   public boolean next(HStoreKey key, TreeMap<Text, byte []> results)
   throws IOException;
   
@@ -38,9 +50,9 @@ public interface HInternalScannerInterface {
    */
   public void close();
   
-  /** Returns true if the scanner is matching a column family or regex */
+  /** @return true if the scanner is matching a column family or regex */
   public boolean isWildcardScanner();
   
-  /** Returns true if the scanner is matching multiple column family members */
+  /** @return true if the scanner is matching multiple column family members */
   public boolean isMultipleMatchScanner();
 }

+ 62 - 36
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java

@@ -51,8 +51,8 @@ import org.apache.hadoop.util.StringUtils;
 public class HMaster implements HConstants, HMasterInterface, 
     HMasterRegionInterface, Runnable {
 
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.ipc.VersionedProtocol#getProtocolVersion(java.lang.String, long)
+  /**
+   * {@inheritDoc}
    */
   public long getProtocolVersion(String protocol,
     @SuppressWarnings("unused") long clientVersion)
@@ -169,7 +169,7 @@ public class HMaster implements HConstants, HMasterInterface,
       try {
         regionServer = client.getHRegionConnection(region.server);
         scannerId = regionServer.openScanner(region.regionName, METACOLUMNS,
-            FIRST_ROW);
+            FIRST_ROW, System.currentTimeMillis(), null);
 
         while (true) {
           TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
@@ -278,6 +278,9 @@ public class HMaster implements HConstants, HMasterInterface,
    * Scanner for the <code>ROOT</code> HRegion.
    */
   class RootScanner extends BaseScanner {
+    /**
+     * {@inheritDoc}
+     */
     public void run() {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Running ROOT scanner");
@@ -338,11 +341,17 @@ public class HMaster implements HConstants, HMasterInterface,
     Text regionName;
     Text startKey;
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public boolean equals(Object o) {
       return this.compareTo(o) == 0;
     }
     
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public int hashCode() {
       int result = this.regionName.hashCode();
@@ -351,6 +360,10 @@ public class HMaster implements HConstants, HMasterInterface,
     }
 
     // Comparable
+
+    /**
+     * {@inheritDoc}
+     */
     public int compareTo(Object o) {
       MetaRegion other = (MetaRegion)o;
       
@@ -380,6 +393,9 @@ public class HMaster implements HConstants, HMasterInterface,
    * action would prevent other work from getting done.
    */
   class MetaScanner extends BaseScanner {
+    /**
+     * {@inheritDoc}
+     */
     @SuppressWarnings("null")
     public void run() {
       while (!closed) {
@@ -804,8 +820,8 @@ public class HMaster implements HConstants, HMasterInterface,
   // HMasterRegionInterface
   //////////////////////////////////////////////////////////////////////////////
   
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.hbase.HMasterRegionInterface#regionServerStartup(org.apache.hadoop.hbase.HServerInfo)
+  /**
+   * {@inheritDoc}
    */
   @SuppressWarnings("unused")
   public void regionServerStartup(HServerInfo serverInfo)
@@ -838,8 +854,8 @@ public class HMaster implements HConstants, HMasterInterface,
     return s.hashCode();
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.hbase.HMasterRegionInterface#regionServerReport(org.apache.hadoop.hbase.HServerInfo, org.apache.hadoop.hbase.HMsg[])
+  /**
+   * {@inheritDoc}
    */
   public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[])
   throws IOException {
@@ -1336,7 +1352,7 @@ public class HMaster implements HConstants, HMasterInterface,
         }
         server.delete(regionName, clientId, lockid, COL_SERVER);
         server.delete(regionName, clientId, lockid, COL_STARTCODE);
-        server.commit(regionName, clientId, lockid);
+        server.commit(regionName, clientId, lockid, System.currentTimeMillis());
       }
 
       // Get regions reassigned
@@ -1384,7 +1400,8 @@ public class HMaster implements HConstants, HMasterInterface,
         
         try {
           LOG.debug("scanning root region");
-          scannerId = server.openScanner(HGlobals.rootRegionInfo.regionName, columns, startRow);
+          scannerId = server.openScanner(HGlobals.rootRegionInfo.regionName,
+              columns, startRow, System.currentTimeMillis(), null);
           scanMetaRegion(server, scannerId, HGlobals.rootRegionInfo.regionName);
           break;
           
@@ -1413,7 +1430,8 @@ public class HMaster implements HConstants, HMasterInterface,
 
             server = client.getHRegionConnection(r.server);
           
-            scannerId = server.openScanner(r.regionName, columns, startRow);
+            scannerId = server.openScanner(r.regionName, columns, startRow,
+                System.currentTimeMillis(), null);
             scanMetaRegion(server, scannerId, r.regionName);
             
           }
@@ -1513,7 +1531,9 @@ public class HMaster implements HConstants, HMasterInterface,
           }
           server.delete(metaRegionName, clientId, lockid, COL_SERVER);
           server.delete(metaRegionName, clientId, lockid, COL_STARTCODE);
-          server.commit(metaRegionName, clientId, lockid);
+          server.commit(metaRegionName, clientId, lockid,
+              System.currentTimeMillis());
+          
           break;
 
         } catch(NotServingRegionException e) {
@@ -1622,7 +1642,9 @@ public class HMaster implements HConstants, HMasterInterface,
           long lockid = server.startUpdate(metaRegionName, clientId, regionName);
           server.put(metaRegionName, clientId, lockid, COL_SERVER, serverAddress);
           server.put(metaRegionName, clientId, lockid, COL_STARTCODE, startCode);
-          server.commit(metaRegionName, clientId, lockid);
+          server.commit(metaRegionName, clientId, lockid,
+              System.currentTimeMillis());
+          
           break;
           
         } catch(NotServingRegionException e) {
@@ -1639,15 +1661,15 @@ public class HMaster implements HConstants, HMasterInterface,
   // HMasterInterface
   //////////////////////////////////////////////////////////////////////////////
   
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.hbase.HMasterInterface#isMasterRunning()
+  /**
+   * {@inheritDoc}
    */
   public boolean isMasterRunning() {
     return !closed;
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.hbase.HMasterInterface#shutdown()
+  /**
+   * {@inheritDoc}
    */
   public void shutdown() {
     TimerTask tt = new TimerTask() {
@@ -1664,8 +1686,8 @@ public class HMaster implements HConstants, HMasterInterface,
     t.schedule(tt, 10);
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.hbase.HMasterInterface#createTable(org.apache.hadoop.hbase.HTableDescriptor)
+  /**
+   * {@inheritDoc}
    */
   public void createTable(HTableDescriptor desc) throws IOException {
     if (!isMasterRunning()) {
@@ -1717,7 +1739,8 @@ public class HMaster implements HConstants, HMasterInterface,
         long lockid = server.startUpdate(metaRegionName, clientId, regionName);
         server.put(metaRegionName, clientId, lockid, COL_REGIONINFO,
           byteValue.toByteArray());
-        server.commit(metaRegionName, clientId, lockid);
+        server.commit(metaRegionName, clientId, lockid,
+            System.currentTimeMillis());
 
         // 4. Close the new region to flush it to disk
 
@@ -1741,8 +1764,8 @@ public class HMaster implements HConstants, HMasterInterface,
     }
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.hbase.HMasterInterface#deleteTable(org.apache.hadoop.io.Text)
+  /**
+   * {@inheritDoc}
    */
   public void deleteTable(Text tableName) throws IOException {
     new TableDelete(tableName).process();
@@ -1751,36 +1774,36 @@ public class HMaster implements HConstants, HMasterInterface,
     }
   }
   
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.hbase.HMasterInterface#addColumn(org.apache.hadoop.io.Text, org.apache.hadoop.hbase.HColumnDescriptor)
+  /**
+   * {@inheritDoc}
    */
   public void addColumn(Text tableName, HColumnDescriptor column) throws IOException {
     new AddColumn(tableName, column).process();
   }
   
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.hbase.HMasterInterface#deleteColumn(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text)
+  /**
+   * {@inheritDoc}
    */
   public void deleteColumn(Text tableName, Text columnName) throws IOException {
     new DeleteColumn(tableName, HStoreKey.extractFamily(columnName)).process();
   }
   
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.hbase.HMasterInterface#enableTable(org.apache.hadoop.io.Text)
+  /**
+   * {@inheritDoc}
    */
   public void enableTable(Text tableName) throws IOException {
     new ChangeTableState(tableName, true).process();
   }
   
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.hbase.HMasterInterface#findRootRegion()
+  /**
+   * {@inheritDoc}
    */
   public HServerAddress findRootRegion() {
     return rootRegionLocation;
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.hbase.HMasterInterface#disableTable(org.apache.hadoop.io.Text)
+  /**
+   * {@inheritDoc}
    */
   public void disableTable(Text tableName) throws IOException {
     new ChangeTableState(tableName, false).process();
@@ -1837,7 +1860,8 @@ public class HMaster implements HConstants, HMasterInterface,
               // Open a scanner on the meta region
               
               long scannerId =
-                server.openScanner(m.regionName, METACOLUMNS, tableName);
+                server.openScanner(m.regionName, METACOLUMNS, tableName,
+                    System.currentTimeMillis(), null);
               
               try {
                 DataInputBuffer inbuf = new DataInputBuffer();
@@ -1995,7 +2019,9 @@ public class HMaster implements HConstants, HMasterInterface,
           updateRegionInfo(server, m.regionName, i);
           server.delete(m.regionName, clientId, lockid, COL_SERVER);
           server.delete(m.regionName, clientId, lockid, COL_STARTCODE);
-          server.commit(m.regionName, clientId, lockid);
+          server.commit(m.regionName, clientId, lockid,
+              System.currentTimeMillis());
+          
           lockid = -1L;
 
           if(LOG.isDebugEnabled()) {
@@ -2151,7 +2177,7 @@ public class HMaster implements HConstants, HMasterInterface,
         lockid = server.startUpdate(regionName, clientId, i.regionName);
         server.put(regionName, clientId, lockid, COL_REGIONINFO,
           byteValue.toByteArray());
-        server.commit(regionName, clientId, lockid);
+        server.commit(regionName, clientId, lockid, System.currentTimeMillis());
         lockid = -1L;
         if(LOG.isDebugEnabled()) {
           LOG.debug("updated columns in row: " + i.regionName);
@@ -2252,8 +2278,8 @@ public class HMaster implements HConstants, HMasterInterface,
       this.server = server;
     }
     
-    /* (non-Javadoc)
-     * @see org.apache.hadoop.hbase.LeaseListener#leaseExpired()
+    /**
+     * {@inheritDoc}
      */
     public void leaseExpired() {
       LOG.info(server + " lease expired");

+ 16 - 6
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java

@@ -46,15 +46,17 @@ public class HMemcache {
 
   final HLocking lock = new HLocking();
 
+  /** constructor */
   public HMemcache() {
     super();
   }
 
+  /** represents the state of the memcache at a specified point in time */
   public static class Snapshot {
-    public TreeMap<HStoreKey, byte []> memcacheSnapshot = null;
-    public long sequenceId = 0;
+    TreeMap<HStoreKey, byte []> memcacheSnapshot = null;
+    long sequenceId = 0;
     
-    public Snapshot() {
+    Snapshot() {
       super();
     }
   }
@@ -72,7 +74,7 @@ public class HMemcache {
    * 
    * @return frozen HMemcache TreeMap and HLog sequence number.
    */
-  public Snapshot snapshotMemcacheForLog(HLog log) throws IOException {
+  Snapshot snapshotMemcacheForLog(HLog log) throws IOException {
     Snapshot retval = new Snapshot();
 
     this.lock.obtainWriteLock();
@@ -112,6 +114,7 @@ public class HMemcache {
    * Delete the snapshot, remove from history.
    *
    * Modifying the structure means we need to obtain a writelock.
+   * @throws IOException
    */
   public void deleteSnapshot() throws IOException {
     this.lock.obtainWriteLock();
@@ -251,6 +254,9 @@ public class HMemcache {
     for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
       HStoreKey itKey = es.getKey();
       if (itKey.matchesRowCol(curKey)) {
+        if(HConstants.DELETE_BYTES.compareTo(es.getValue()) == 0) {
+          break;
+        }
         result.add(tailMap.get(itKey));
         curKey.setVersion(itKey.getTimestamp() - 1);
       }
@@ -264,7 +270,7 @@ public class HMemcache {
   /**
    * Return a scanner over the keys in the HMemcache
    */
-  public HInternalScannerInterface getScanner(long timestamp,
+  HInternalScannerInterface getScanner(long timestamp,
       Text targetCols[], Text firstRow)
   throws IOException {  
     return new HMemcacheScanner(timestamp, targetCols, firstRow);
@@ -280,7 +286,7 @@ public class HMemcache {
     final Iterator<HStoreKey> keyIterators[];
 
     @SuppressWarnings("unchecked")
-    public HMemcacheScanner(long timestamp, Text targetCols[], Text firstRow)
+    HMemcacheScanner(long timestamp, Text targetCols[], Text firstRow)
         throws IOException {
       
       super(timestamp, targetCols);
@@ -331,6 +337,7 @@ public class HMemcache {
      * @param firstRow seek to this row
      * @return true if this is the first row
      */
+    @Override
     boolean findFirstRow(int i, Text firstRow) {
       return firstRow.getLength() == 0 ||
         keys[i].getRow().compareTo(firstRow) >= 0;
@@ -342,6 +349,7 @@ public class HMemcache {
      * @param i Which iterator to fetch next value from
      * @return true if there is more data available
      */
+    @Override
     boolean getNext(int i) {
       if (!keyIterators[i].hasNext()) {
         closeSubScanner(i);
@@ -353,6 +361,7 @@ public class HMemcache {
     }
 
     /** Shut down an individual map iterator. */
+    @Override
     void closeSubScanner(int i) {
       keyIterators[i] = null;
       keys[i] = null;
@@ -361,6 +370,7 @@ public class HMemcache {
     }
 
     /** Shut down map iterators, and release the lock */
+    @Override
     public void close() {
       if(! scannerClosed) {
         try {

+ 4 - 6
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java

@@ -316,7 +316,6 @@ class HMerge implements HConstants {
 
   /** Instantiated to compact the meta region */
   private static class OfflineMerger extends Merger {
-    private Path dir;
     private TreeSet<HRegionInfo> metaRegions;
     private TreeMap<Text, byte []> results;
     
@@ -324,7 +323,6 @@ class HMerge implements HConstants {
         throws IOException {
       
       super(conf, fs, tableName);
-      this.dir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR));
       this.metaRegions = new TreeSet<HRegionInfo>();
       this.results = new TreeMap<Text, byte []>();
 
@@ -334,7 +332,7 @@ class HMerge implements HConstants {
         new HRegion(dir, hlog,fs, conf, HGlobals.rootRegionInfo, null);
 
       HInternalScannerInterface rootScanner =
-        root.getScanner(META_COLS, new Text());
+        root.getScanner(META_COLS, new Text(), System.currentTimeMillis(), null);
       
       try {
         while(rootScanner.next(key, results)) {
@@ -357,7 +355,7 @@ class HMerge implements HConstants {
     }
 
     @Override
-    protected TreeSet<HRegionInfo> next() throws IOException {
+    protected TreeSet<HRegionInfo> next() {
       more = false;
       return metaRegions;
     }
@@ -380,7 +378,7 @@ class HMerge implements HConstants {
           root.delete(lockid, COL_REGIONINFO);
           root.delete(lockid, COL_SERVER);
           root.delete(lockid, COL_STARTCODE);
-          root.commit(lockid);
+          root.commit(lockid, System.currentTimeMillis());
           lockid = -1L;
 
           if(LOG.isDebugEnabled()) {
@@ -405,7 +403,7 @@ class HMerge implements HConstants {
       try {
         lockid = root.startUpdate(newRegion.getRegionName());
         root.put(lockid, COL_REGIONINFO, byteValue.toByteArray());
-        root.commit(lockid);
+        root.commit(lockid, System.currentTimeMillis());
         lockid = -1L;
 
         if(LOG.isDebugEnabled()) {

+ 87 - 72
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java

@@ -95,7 +95,11 @@ public class HRegion implements HConstants {
    * HRegionServer. Returns a brand-new active HRegion, also
    * running on the current HRegionServer.
    */
-  static HRegion closeAndMerge(HRegion srcA, HRegion srcB) throws IOException {
+  static HRegion closeAndMerge(final HRegion srcA, final HRegion srcB)
+  throws IOException {
+    
+    HRegion a = srcA;
+    HRegion b = srcB;
 
     // Make sure that srcA comes first; important for key-ordering during
     // write of the merged file.
@@ -109,25 +113,24 @@ public class HRegion implements HConstants {
     } else if((srcB.getStartKey() == null)         // A is not null but B is
         || (srcA.getStartKey().compareTo(srcB.getStartKey()) > 0)) { // A > B
       
-      HRegion tmp = srcA;
-      srcA = srcB;
-      srcB = tmp;
+      a = srcB;
+      b = srcA;
     }
     
-    if (! srcA.getEndKey().equals(srcB.getStartKey())) {
+    if (! a.getEndKey().equals(b.getStartKey())) {
       throw new IOException("Cannot merge non-adjacent regions");
     }
 
-    FileSystem fs = srcA.getFilesystem();
-    Configuration conf = srcA.getConf();
-    HTableDescriptor tabledesc = srcA.getTableDesc();
-    HLog log = srcA.getLog();
-    Path rootDir = srcA.getRootDir();
+    FileSystem fs = a.getFilesystem();
+    Configuration conf = a.getConf();
+    HTableDescriptor tabledesc = a.getTableDesc();
+    HLog log = a.getLog();
+    Path rootDir = a.getRootDir();
 
-    Text startKey = srcA.getStartKey();
-    Text endKey = srcB.getEndKey();
+    Text startKey = a.getStartKey();
+    Text endKey = b.getEndKey();
 
-    Path merges = new Path(srcA.getRegionDir(), MERGEDIR);
+    Path merges = new Path(a.getRegionDir(), MERGEDIR);
     if(! fs.exists(merges)) {
       fs.mkdirs(merges);
     }
@@ -141,8 +144,8 @@ public class HRegion implements HConstants {
       throw new IOException("Cannot merge; target file collision at " + newRegionDir);
     }
 
-    LOG.info("starting merge of regions: " + srcA.getRegionName() + " and " 
-        + srcB.getRegionName() + " new region start key is '" 
+    LOG.info("starting merge of regions: " + a.getRegionName() + " and " 
+        + b.getRegionName() + " new region start key is '" 
         + (startKey == null ? "" : startKey) + "', end key is '" 
         + (endKey == null ? "" : endKey) + "'");
     
@@ -151,7 +154,7 @@ public class HRegion implements HConstants {
     TreeSet<HStoreFile> alreadyMerged = new TreeSet<HStoreFile>();
     TreeMap<Text, Vector<HStoreFile>> filesToMerge =
       new TreeMap<Text, Vector<HStoreFile>>();
-    for(HStoreFile src: srcA.flushcache(true)) {
+    for(HStoreFile src: a.flushcache(true)) {
       Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
       if(v == null) {
         v = new Vector<HStoreFile>();
@@ -160,7 +163,7 @@ public class HRegion implements HConstants {
       v.add(src);
     }
     
-    for(HStoreFile src: srcB.flushcache(true)) {
+    for(HStoreFile src: b.flushcache(true)) {
       Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
       if(v == null) {
         v = new Vector<HStoreFile>();
@@ -188,12 +191,12 @@ public class HRegion implements HConstants {
 
     if(LOG.isDebugEnabled()) {
       LOG.debug("flushing changes since start of merge for region " 
-          + srcA.getRegionName());
+          + a.getRegionName());
     }
 
     filesToMerge.clear();
     
-    for(HStoreFile src: srcA.close()) {
+    for(HStoreFile src: a.close()) {
       if(! alreadyMerged.contains(src)) {
         Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
         if(v == null) {
@@ -206,10 +209,10 @@ public class HRegion implements HConstants {
     
     if(LOG.isDebugEnabled()) {
       LOG.debug("flushing changes since start of merge for region " 
-          + srcB.getRegionName());
+          + b.getRegionName());
     }
     
-    for(HStoreFile src: srcB.close()) {
+    for(HStoreFile src: b.close()) {
       if(! alreadyMerged.contains(src)) {
         Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
         if(v == null) {
@@ -391,11 +394,13 @@ public class HRegion implements HConstants {
    * Close down this HRegion.  Flush the cache, shut down each HStore, don't 
    * service any more calls.
    *
-   * The returned Vector is a list of all the storage files that the HRegion's 
-   * component HStores make use of.  It's a list of HStoreFile objects.
-   *
    * This method could take some time to execute, so don't call it from a 
    * time-sensitive thread.
+   * 
+   * @return Vector of all the storage files that the HRegion's component 
+   * HStores make use of.  It's a list of HStoreFile objects.
+   * 
+   * @throws IOException
    */
   public Vector<HStoreFile> close() throws IOException {
     lock.obtainWriteLock();
@@ -559,42 +564,52 @@ public class HRegion implements HConstants {
   // HRegion accessors
   //////////////////////////////////////////////////////////////////////////////
 
+  /** @return start key for region */
   public Text getStartKey() {
     return regionInfo.startKey;
   }
-  
+
+  /** @return end key for region */
   public Text getEndKey() {
     return regionInfo.endKey;
   }
-  
+
+  /** @return region id */
   public long getRegionId() {
     return regionInfo.regionId;
   }
 
+  /** @return region name */
   public Text getRegionName() {
     return regionInfo.regionName;
   }
-  
+
+  /** @return root directory path */
   public Path getRootDir() {
     return rootDir;
   }
- 
+
+  /** @return HTableDescriptor for this region */
   public HTableDescriptor getTableDesc() {
     return regionInfo.tableDesc;
   }
-  
+
+  /** @return HLog in use for this region */
   public HLog getLog() {
     return log;
   }
-  
+
+  /** @return Configuration object */
   public Configuration getConf() {
     return conf;
   }
-  
+
+  /** @return region directory Path */
   public Path getRegionDir() {
     return regiondir;
   }
-  
+
+  /** @return FileSystem being used by this region */
   public FileSystem getFilesystem() {
     return fs;
   }
@@ -980,21 +995,19 @@ public class HRegion implements HConstants {
     }
   }
 
-  /**
-   * Return an iterator that scans over the HRegion, returning the indicated 
-   * columns.  This Iterator must be closed by the caller.
-   */
-  public HInternalScannerInterface getScanner(Text[] cols, Text firstRow)
-  throws IOException {
-    return getScanner(cols, firstRow, null);
-  }
-
   /**
    * Return an iterator that scans over the HRegion, returning the indicated 
    * columns for only the rows that match the data filter.  This Iterator must be closed by the caller.
+   *
+   * @param cols columns desired in result set
+   * @param firstRow row which is the starting point of the scan
+   * @param timestamp only return rows whose timestamp is <= this value
+   * @param filter row filter
+   * @return HScannerInterface
+   * @throws IOException
    */
-  public HInternalScannerInterface getScanner(Text[] cols, Text firstRow, RowFilterInterface filter) 
-  	throws IOException {
+  public HInternalScannerInterface getScanner(Text[] cols, Text firstRow,
+      long timestamp, RowFilterInterface filter) throws IOException {
     lock.obtainReadLock();
     try {
       TreeSet<Text> families = new TreeSet<Text>();
@@ -1007,7 +1020,7 @@ public class HRegion implements HConstants {
       for (Text family: families) {
         storelist[i++] = stores.get(family);
       }
-      return new HScanner(cols, firstRow, memcache, storelist, filter);
+      return new HScanner(cols, firstRow, timestamp, memcache, storelist, filter);
     } finally {
       lock.releaseReadLock();
     }
@@ -1029,6 +1042,7 @@ public class HRegion implements HConstants {
    * 
    * @param row Row to update
    * @return lockid
+   * @throws IOException
    * @see #put(long, Text, byte[])
    */
   public long startUpdate(Text row) throws IOException {
@@ -1048,6 +1062,11 @@ public class HRegion implements HConstants {
    *
    * This method really just tests the input, then calls an internal localput() 
    * method.
+   *
+   * @param lockid lock id obtained from startUpdate
+   * @param targetCol name of column to be updated
+   * @param val new value for column
+   * @throws IOException
    */
   public void put(long lockid, Text targetCol, byte [] val) throws IOException {
     if (DELETE_BYTES.compareTo(val) == 0) {
@@ -1058,6 +1077,10 @@ public class HRegion implements HConstants {
 
   /**
    * Delete a value or write a value. This is a just a convenience method for put().
+   *
+   * @param lockid lock id obtained from startUpdate
+   * @param targetCol name of column to be deleted
+   * @throws IOException
    */
   public void delete(long lockid, Text targetCol) throws IOException {
     localput(lockid, targetCol, DELETE_BYTES.get());
@@ -1109,6 +1132,9 @@ public class HRegion implements HConstants {
    * Abort a pending set of writes. This dumps from memory all in-progress
    * writes associated with the given row-lock.  These values have not yet
    * been placed in memcache or written to the log.
+   *
+   * @param lockid lock id obtained from startUpdate
+   * @throws IOException
    */
   public void abort(long lockid) throws IOException {
     Text row = getRowFromLock(lockid);
@@ -1142,9 +1168,10 @@ public class HRegion implements HConstants {
    * Once updates hit the change log, they are safe.  They will either be moved 
    * into an HStore in the future, or they will be recovered from the log.
    * @param lockid Lock for row we're to commit.
+   * @param timestamp the time to associate with this change
    * @throws IOException
    */
-  public void commit(final long lockid) throws IOException {
+  public void commit(final long lockid, long timestamp) throws IOException {
     // Remove the row from the pendingWrites list so 
     // that repeated executions won't screw this up.
     Text row = getRowFromLock(lockid);
@@ -1156,12 +1183,11 @@ public class HRegion implements HConstants {
     // hasn't aborted/committed the write-operation
     synchronized(row) {
       // Add updates to the log and add values to the memcache.
-      long commitTimestamp = System.currentTimeMillis();
       TreeMap<Text, byte []> columns =  this.targetColumns.get(lockid);
       if (columns != null && columns.size() > 0) {
         log.append(regionInfo.regionName, regionInfo.tableDesc.getName(),
-          row, columns, commitTimestamp);
-        memcache.add(row, columns, commitTimestamp);
+          row, columns, timestamp);
+        memcache.add(row, columns, timestamp);
         // OK, all done!
       }
       targetColumns.remove(lockid);
@@ -1287,9 +1313,8 @@ public class HRegion implements HConstants {
 
     /** Create an HScanner with a handle on many HStores. */
     @SuppressWarnings("unchecked")
-    HScanner(Text[] cols, Text firstRow, HMemcache memcache, HStore[] stores, RowFilterInterface filter)
-    throws IOException {  
-      long scanTime = System.currentTimeMillis();
+    HScanner(Text[] cols, Text firstRow, long timestamp, HMemcache memcache,
+        HStore[] stores, RowFilterInterface filter) throws IOException {  
       this.dataFilter = filter;
       if (null != dataFilter) {
         dataFilter.reset();
@@ -1310,7 +1335,7 @@ public class HRegion implements HConstants {
       // NOTE: the memcache scanner should be the first scanner
       try {
         HInternalScannerInterface scanner =
-          memcache.getScanner(scanTime, cols, firstRow);
+          memcache.getScanner(timestamp, cols, firstRow);
         if(scanner.isWildcardScanner()) {
           this.wildcardMatch = true;
         }
@@ -1320,7 +1345,7 @@ public class HRegion implements HConstants {
         scanners[0] = scanner;
 
         for(int i = 0; i < stores.length; i++) {
-          scanner = stores[i].getScanner(scanTime, cols, firstRow);
+          scanner = stores[i].getScanner(timestamp, cols, firstRow);
           if(scanner.isWildcardScanner()) {
             this.wildcardMatch = true;
           }
@@ -1347,32 +1372,22 @@ public class HRegion implements HConstants {
       }
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.hadoop.hbase.HInternalScannerInterface#isWildcardScanner()
+    /**
+     * {@inheritDoc}
      */
     public boolean isWildcardScanner() {
       return wildcardMatch;
     }
     
-    /* (non-Javadoc)
-     * @see org.apache.hadoop.hbase.HInternalScannerInterface#isMultipleMatchScanner()
+    /**
+     * {@inheritDoc}
      */
     public boolean isMultipleMatchScanner() {
       return multipleMatchers;
     }
     
-    /*
-     * (non-Javadoc)
-     * 
-     * Grab the next row's worth of values. The HScanner will return the most
-     * recent data value for each row that is not newer than the target time.
-     * 
-     * If a dataFilter is defined, it will be used to skip rows that do not
-     * match its criteria. It may cause the scanner to stop prematurely if it
-     * knows that it will no longer accept the remaining results.
-     * 
-     * @see org.apache.hadoop.hbase.HInternalScannerInterface#next(org.apache.hadoop.hbase.HStoreKey,
-     *      java.util.TreeMap)
+    /**
+     * {@inheritDoc}
      */
     public boolean next(HStoreKey key, TreeMap<Text, byte[]> results)
     throws IOException {
@@ -1501,8 +1516,8 @@ public class HRegion implements HConstants {
       }
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.hadoop.hbase.HInternalScannerInterface#close()
+    /**
+     * {@inheritDoc}
      */
     public void close() {
       for(int i = 0; i < scanners.length; i++) {
@@ -1573,7 +1588,7 @@ public class HRegion implements HConstants {
     DataOutputStream s = new DataOutputStream(bytes);
     r.getRegionInfo().write(s);
     meta.put(writeid, COL_REGIONINFO, bytes.toByteArray());
-    meta.commit(writeid);
+    meta.commit(writeid, System.currentTimeMillis());
   }
   
   static void addRegionToMETA(final HClient client,

+ 5 - 15
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java

@@ -171,10 +171,11 @@ public interface HRegionInterface extends VersionedProtocol {
    * @param regionName region name
    * @param clientid a unique value to identify the client
    * @param lockid lock id returned from startUpdate
+   * @param timestamp the time (in milliseconds to associate with this change)
    * @throws IOException
    */
   public void commit(final Text regionName, final long clientid,
-      final long lockid)
+      final long lockid, final long timestamp)
   throws IOException;
   
   /**
@@ -190,31 +191,20 @@ public interface HRegionInterface extends VersionedProtocol {
   // remote scanner interface
   //////////////////////////////////////////////////////////////////////////////
 
-  /**
-   * Opens a remote scanner.
-   * 
-   * @param regionName name of region to scan
-   * @param columns columns to scan
-   * @param startRow starting row to scan
-   *
-   * @return scannerId scanner identifier used in other calls
-   * @throws IOException
-   */
-  public long openScanner(Text regionName, Text[] columns, Text startRow)
-  throws IOException;
-  
   /**
    * Opens a remote scanner with a RowFilter.
    * 
    * @param regionName name of region to scan
    * @param columns columns to scan
    * @param startRow starting row to scan
+   * @param timestamp only return values whose timestamp is <= this value
    * @param filter RowFilter for filtering results at the row-level.
    *
    * @return scannerId scanner identifier used in other calls
    * @throws IOException
    */
-  public long openScanner(Text regionName, Text[] columns, Text startRow, RowFilterInterface filter)
+  public long openScanner(Text regionName, Text[] columns, Text startRow,
+      long timestamp, RowFilterInterface filter)
   throws IOException;
 
   /**

+ 51 - 56
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java

@@ -48,8 +48,8 @@ import org.apache.hadoop.util.StringUtils;
  ******************************************************************************/
 public class HRegionServer implements HConstants, HRegionInterface, Runnable {
   
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.ipc.VersionedProtocol#getProtocolVersion(java.lang.String, long)
+  /**
+   * {@inheritDoc}
    */
   public long getProtocolVersion(final String protocol, 
       @SuppressWarnings("unused") final long clientVersion)
@@ -107,8 +107,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
   class SplitOrCompactChecker implements Runnable, RegionUnavailableListener {
     HClient client = new HClient(conf);
   
-    /* (non-Javadoc)
-     * @see org.apache.hadoop.hbase.RegionUnavailableListener#closing(org.apache.hadoop.io.Text)
+    /**
+     * {@inheritDoc}
      */
     public void closing(final Text regionName) {
       lock.writeLock().lock();
@@ -124,8 +124,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
       }
     }
     
-    /* (non-Javadoc)
-     * @see org.apache.hadoop.hbase.RegionUnavailableListener#closed(org.apache.hadoop.io.Text)
+    /**
+     * {@inheritDoc}
      */
     public void closed(final Text regionName) {
       lock.writeLock().lock();
@@ -139,8 +139,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
       }
     }
 
-    /* (non-Javadoc)
-     * @see java.lang.Runnable#run()
+    /**
+     * {@inheritDoc}
      */
     public void run() {
       while(! stopRequested) {
@@ -261,8 +261,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
   protected final Integer cacheFlusherLock = new Integer(0);
   /** Runs periodically to flush the memcache */
   class Flusher implements Runnable {
-    /* (non-Javadoc)
-     * @see java.lang.Runnable#run()
+    /**
+     * {@inheritDoc}
      */
     public void run() {
       while(! stopRequested) {
@@ -327,8 +327,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     private int maxLogEntries =
       conf.getInt("hbase.regionserver.maxlogentries", 30 * 1000);
     
-    /* (non-Javadoc)
-     * @see java.lang.Runnable#run()
+    /**
+     * {@inheritDoc}
      */
     public void run() {
       while(! stopRequested) {
@@ -769,8 +769,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
       }
     }
     
-    /* (non-Javadoc)
-     * @see java.lang.Runnable#run()
+    /**
+     * {@inheritDoc}
      */
     public void run() {
       for(ToDoEntry e = null; !stopRequested; ) {
@@ -895,16 +895,16 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
   // HRegionInterface
   //////////////////////////////////////////////////////////////////////////////
 
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.hbase.HRegionInterface#getRegionInfo(org.apache.hadoop.io.Text)
+  /**
+   * {@inheritDoc}
    */
   public HRegionInfo getRegionInfo(final Text regionName)
   throws NotServingRegionException {
     return getRegion(regionName).getRegionInfo();
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.hbase.HRegionInterface#get(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text)
+  /**
+   * {@inheritDoc}
    */
   public byte [] get(final Text regionName, final Text row,
       final Text column)
@@ -912,8 +912,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     return getRegion(regionName).get(row, column);
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.hbase.HRegionInterface#get(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, int)
+  /**
+   * {@inheritDoc}
    */
   public byte [][] get(final Text regionName, final Text row,
       final Text column, final int numVersions)
@@ -921,16 +921,16 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     return getRegion(regionName).get(row, column, numVersions);
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.hbase.HRegionInterface#get(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text, long, int)
+  /**
+   * {@inheritDoc}
    */
   public byte [][] get(final Text regionName, final Text row, final Text column, 
       final long timestamp, final int numVersions) throws IOException {
     return getRegion(regionName).get(row, column, timestamp, numVersions);
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.hbase.HRegionInterface#getRow(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text)
+  /**
+   * {@inheritDoc}
    */
   public KeyedData[] getRow(final Text regionName, final Text row) throws IOException {
     HRegion region = getRegion(regionName);
@@ -944,8 +944,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     return result;
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.hbase.HRegionInterface#next(long)
+  /**
+   * {@inheritDoc}
    */
   public KeyedData[] next(final long scannerId)
   throws IOException {
@@ -993,8 +993,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     return values.toArray(new KeyedData[values.size()]);
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.hbase.HRegionInterface#startUpdate(org.apache.hadoop.io.Text, long, org.apache.hadoop.io.Text)
+  /**
+   * {@inheritDoc}
    */
   public long startUpdate(Text regionName, long clientid, Text row) 
       throws IOException {
@@ -1015,6 +1015,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
       this.localLockId = lockId;
     }
     
+    /**
+     * {@inheritDoc}
+     */
     public void leaseExpired() {
       try {
         localRegion.abort(localLockId);
@@ -1024,8 +1027,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     }
   }
   
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.hbase.HRegionInterface#put(org.apache.hadoop.io.Text, long, long, org.apache.hadoop.io.Text, org.apache.hadoop.io.BytesWritable)
+  /**
+   * {@inheritDoc}
    */
   public void put(final Text regionName, final long clientid,
       final long lockid, final Text column, final byte [] val)
@@ -1035,8 +1038,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     region.put(lockid, column, val);
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.hbase.HRegionInterface#delete(org.apache.hadoop.io.Text, long, long, org.apache.hadoop.io.Text)
+  /**
+   * {@inheritDoc}
    */
   public void delete(Text regionName, long clientid, long lockid, Text column) 
   throws IOException {
@@ -1045,8 +1048,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     region.delete(lockid, column);
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.hbase.HRegionInterface#abort(org.apache.hadoop.io.Text, long, long)
+  /**
+   * {@inheritDoc}
    */
   public void abort(Text regionName, long clientid, long lockid) 
   throws IOException {
@@ -1055,18 +1058,18 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     region.abort(lockid);
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.hbase.HRegionInterface#commit(org.apache.hadoop.io.Text, long, long)
+  /**
+   * {@inheritDoc}
    */
-  public void commit(Text regionName, long clientid, long lockid) 
-  throws IOException {
+  public void commit(Text regionName, final long clientid, final long lockid,
+      final long timestamp) throws IOException {
     HRegion region = getRegion(regionName, true);
     leases.cancelLease(clientid, lockid);
-    region.commit(lockid);
+    region.commit(lockid, timestamp);
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.hbase.HRegionInterface#renewLease(long, long)
+  /**
+   * {@inheritDoc}
    */
   public void renewLease(long lockid, long clientid) throws IOException {
     leases.renewLease(clientid, lockid);
@@ -1136,8 +1139,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
       this.scannerName = n;
     }
     
-    /* (non-Javadoc)
-     * @see org.apache.hadoop.hbase.LeaseListener#leaseExpired()
+    /**
+     * {@inheritDoc}
      */
     public void leaseExpired() {
       LOG.info("Scanner " + this.scannerName + " lease expired");
@@ -1151,25 +1154,17 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     }
   }
   
-  /**
-   * {@inheritDoc}
-   */
-  public long openScanner(final Text regionName, final Text[] cols,
-      final Text firstRow)
-  throws IOException{
-    return openScanner(regionName, cols, firstRow, null);
-  }
-
   /**
    * {@inheritDoc}
    */
   public long openScanner(Text regionName, Text[] cols, Text firstRow,
-      final RowFilterInterface filter)
+      final long timestamp, final RowFilterInterface filter)
   throws IOException {
     HRegion r = getRegion(regionName);
     long scannerId = -1L;
     try {
-      HInternalScannerInterface s = r.getScanner(cols, firstRow, filter);
+      HInternalScannerInterface s =
+        r.getScanner(cols, firstRow, timestamp, filter);
       scannerId = rand.nextLong();
       String scannerName = String.valueOf(scannerId);
       synchronized(scanners) {
@@ -1184,8 +1179,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     return scannerId;
   }
   
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.hbase.HRegionInterface#close(long)
+  /**
+   * {@inheritDoc}
    */
   public void close(final long scannerId) throws IOException {
     String scannerName = String.valueOf(scannerId);

+ 3 - 1
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegiondirReader.java

@@ -165,7 +165,9 @@ class HRegiondirReader {
     HRegion r = new HRegion(this.parentdir, null,
         FileSystem.get(this.conf), conf, info, null);
     Text [] families = info.tableDesc.families().keySet().toArray(new Text [] {});
-    HInternalScannerInterface scanner = r.getScanner(families, new Text());
+    HInternalScannerInterface scanner =
+      r.getScanner(families, new Text(), System.currentTimeMillis(), null);
+    
     HStoreKey key = new HStoreKey();
     TreeMap<Text, byte []> results = new TreeMap<Text, byte []>();
     // Print out table header line.

+ 11 - 1
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java

@@ -391,6 +391,7 @@ class HStore implements HConstants {
       super(fs, dirName, conf);
     }
 
+    /** {@inheritDoc} */
     @Override
     public Writable get(WritableComparable key, Writable val) throws IOException {
       // Note - the key being passed to us is always a HStoreKey
@@ -407,6 +408,7 @@ class HStore implements HConstants {
       return null;
     }
 
+    /** {@inheritDoc} */
     @Override
     public WritableComparable getClosest(WritableComparable key, Writable val)
     throws IOException {
@@ -438,6 +440,7 @@ class HStore implements HConstants {
       super(conf, fs, dirName, keyClass, valClass, compression);
     }
 
+    /** {@inheritDoc} */
     @Override
     public void append(WritableComparable key, Writable val) throws IOException {
       // Note - the key being passed to us is always a HStoreKey
@@ -1031,6 +1034,9 @@ class HStore implements HConstants {
             Text readcol = readkey.getColumn();
             if (results.get(readcol) == null
                 && key.matchesWithoutColumn(readkey)) {
+              if(readval.equals(HConstants.DELETE_BYTES)) {
+                break;
+              }
               results.put(new Text(readcol), readval.get());
               readval = new ImmutableBytesWritable();
             } else if(key.getRow().compareTo(readkey.getRow()) > 0) {
@@ -1078,10 +1084,14 @@ class HStore implements HConstants {
             continue;
           }
           if (readkey.matchesRowCol(key)) {
+            if(readval.equals(HConstants.DELETE_BYTES)) {
+              break;
+            }
             results.add(readval.get());
             readval = new ImmutableBytesWritable();
             while(map.next(readkey, readval) && readkey.matchesRowCol(key)) {
-              if (numVersions > 0 && (results.size() >= numVersions)) {
+              if ((numVersions > 0 && (results.size() >= numVersions))
+                  || readval.equals(HConstants.DELETE_BYTES)) {
                 break;
               }
               results.add(readval.get());

+ 7 - 1
src/contrib/hbase/src/test/org/apache/hadoop/hbase/AbstractMergeTestBase.java

@@ -36,6 +36,9 @@ public abstract class AbstractMergeTestBase extends HBaseTestCase {
   protected FileSystem fs;
   protected Path dir;
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public void setUp() throws Exception {
     super.setUp();
@@ -111,6 +114,9 @@ public abstract class AbstractMergeTestBase extends HBaseTestCase {
     }
   }
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public void tearDown() throws Exception {
     super.tearDown();
@@ -128,7 +134,7 @@ public abstract class AbstractMergeTestBase extends HBaseTestCase {
           + String.format("%1$05d", i)));
 
       region.put(lockid, COLUMN_NAME, value.get());
-      region.commit(lockid);
+      region.commit(lockid, System.currentTimeMillis());
       if(i % 10000 == 0) {
         System.out.println("Flushing write #" + i);
         region.flushcache(false);

+ 3 - 3
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java

@@ -104,7 +104,7 @@ public class TestGet extends HBaseTestCase {
       
       r.put(lockid, HConstants.COL_REGIONINFO, bytes.toByteArray());
       
-      r.commit(lockid);
+      r.commit(lockid, System.currentTimeMillis());
       
       lockid = r.startUpdate(ROW_KEY);
 
@@ -120,7 +120,7 @@ public class TestGet extends HBaseTestCase {
       r.put(lockid, new Text(HConstants.COLUMN_FAMILY + "region"), 
         "region".getBytes(HConstants.UTF8_ENCODING));
 
-      r.commit(lockid);
+      r.commit(lockid, System.currentTimeMillis());
       
       // Verify that get works the same from memcache as when reading from disk
       // NOTE dumpRegion won't work here because it only reads from disk.
@@ -152,7 +152,7 @@ public class TestGet extends HBaseTestCase {
       r.put(lockid, new Text(HConstants.COLUMN_FAMILY + "junk"),
         "junk".getBytes());
       
-      r.commit(lockid);
+      r.commit(lockid, System.currentTimeMillis());
 
       verifyGet(r, otherServerName);
       

+ 24 - 15
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java

@@ -120,7 +120,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
       long writeid = region.startUpdate(new Text("row_" + k));
       region.put(writeid, CONTENTS_BASIC, (CONTENTSTR + k).getBytes());
       region.put(writeid, new Text(ANCHORNUM + k), (ANCHORSTR + k).getBytes());
-      region.commit(writeid);
+      region.commit(writeid, System.currentTimeMillis());
     }
     System.out.println("Write " + NUM_VALS + " rows. Elapsed time: "
         + ((System.currentTimeMillis() - startTime) / 1000.0));
@@ -275,7 +275,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
       long lockid = region.startUpdate(new Text("row_vals1_" + kLabel));
       region.put(lockid, cols[0], vals1[k].getBytes());
       region.put(lockid, cols[1], vals1[k].getBytes());
-      region.commit(lockid);
+      region.commit(lockid, System.currentTimeMillis());
       numInserted += 2;
     }
 
@@ -286,7 +286,8 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
     
     startTime = System.currentTimeMillis();
 
-    HInternalScannerInterface s = region.getScanner(cols, new Text());
+    HInternalScannerInterface s =
+      region.getScanner(cols, new Text(), System.currentTimeMillis(), null);
     int numFetched = 0;
     try {
       HStoreKey curKey = new HStoreKey();
@@ -331,7 +332,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
     
     startTime = System.currentTimeMillis();
     
-    s = region.getScanner(cols, new Text());
+    s = region.getScanner(cols, new Text(), System.currentTimeMillis(), null);
     numFetched = 0;
     try {
       HStoreKey curKey = new HStoreKey();
@@ -373,7 +374,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
       long lockid = region.startUpdate(new Text("row_vals1_" + kLabel));
       region.put(lockid, cols[0], vals1[k].getBytes());
       region.put(lockid, cols[1], vals1[k].getBytes());
-      region.commit(lockid);
+      region.commit(lockid, System.currentTimeMillis());
       numInserted += 2;
     }
 
@@ -384,7 +385,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
     
     startTime = System.currentTimeMillis();
 
-    s = region.getScanner(cols, new Text());
+    s = region.getScanner(cols, new Text(), System.currentTimeMillis(), null);
     numFetched = 0;
     try {
       HStoreKey curKey = new HStoreKey();
@@ -429,7 +430,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
     
     startTime = System.currentTimeMillis();
     
-    s = region.getScanner(cols, new Text());
+    s = region.getScanner(cols, new Text(), System.currentTimeMillis(), null);
     numFetched = 0;
     try {
       HStoreKey curKey = new HStoreKey();
@@ -464,7 +465,9 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
 
     startTime = System.currentTimeMillis();
     
-    s = region.getScanner(cols, new Text("row_vals1_500"));
+    s = region.getScanner(cols, new Text("row_vals1_500"),
+        System.currentTimeMillis(), null);
+    
     numFetched = 0;
     try {
       HStoreKey curKey = new HStoreKey();
@@ -524,7 +527,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
       // Write to the HRegion
       long writeid = region.startUpdate(new Text("row_" + k));
       region.put(writeid, CONTENTS_BODY, buf1.toString().getBytes());
-      region.commit(writeid);
+      region.commit(writeid, System.currentTimeMillis());
       if (k > 0 && k % (N_ROWS / 100) == 0) {
         System.out.println("Flushing write #" + k);
 
@@ -609,13 +612,16 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
     fs.delete(oldRegion2);
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.hbase.RegionUnavailableListener#regionIsUnavailable(org.apache.hadoop.io.Text)
+  /**
+   * {@inheritDoc}
    */
   public void closing(@SuppressWarnings("unused") final Text regionName) {
     // We don't use this here. It is only for the HRegionServer
   }
   
+  /**
+   * {@inheritDoc}
+   */
   public void closed(@SuppressWarnings("unused") final Text regionName) {
     // We don't use this here. It is only for the HRegionServer
   }
@@ -633,7 +639,8 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
     
     long startTime = System.currentTimeMillis();
     
-    HInternalScannerInterface s = region.getScanner(cols, new Text());
+    HInternalScannerInterface s =
+      region.getScanner(cols, new Text(), System.currentTimeMillis(), null);
 
     try {
 
@@ -689,7 +696,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
     
     startTime = System.currentTimeMillis();
 
-    s = region.getScanner(cols, new Text());
+    s = region.getScanner(cols, new Text(), System.currentTimeMillis(), null);
     try {
       int numFetched = 0;
       HStoreKey curKey = new HStoreKey();
@@ -726,7 +733,9 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
 
     if(StaticTestEnvironment.debugging) {
       startTime = System.currentTimeMillis();
-      s = region.getScanner(new Text[] { CONTENTS_BODY }, new Text());
+      s = region.getScanner(new Text[] { CONTENTS_BODY }, new Text(),
+          System.currentTimeMillis(), null);
+      
       try {
         int numFetched = 0;
         HStoreKey curKey = new HStoreKey();
@@ -762,7 +771,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
     
     startTime = System.currentTimeMillis();
     
-    s = region.getScanner(cols, new Text());
+    s = region.getScanner(cols, new Text(), System.currentTimeMillis(), null);
 
     try {
       int fetched = 0;

+ 6 - 4
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java

@@ -78,7 +78,9 @@ public class TestScanner extends HBaseTestCase {
     
     for(int i = 0; i < scanColumns.length; i++) {
       try {
-        scanner = region.getScanner(scanColumns[i], FIRST_ROW);
+        scanner = region.getScanner(scanColumns[i], FIRST_ROW,
+            System.currentTimeMillis(), null);
+        
         while(scanner.next(key, results)) {
           assertTrue(results.containsKey(HConstants.COL_REGIONINFO));
           byte [] val = results.get(HConstants.COL_REGIONINFO); 
@@ -155,7 +157,7 @@ public class TestScanner extends HBaseTestCase {
       DataOutputStream s = new DataOutputStream(byteStream);
       HGlobals.rootRegionInfo.write(s);
       region.put(lockid, HConstants.COL_REGIONINFO, byteStream.toByteArray());
-      region.commit(lockid);
+      region.commit(lockid, System.currentTimeMillis());
 
       // What we just committed is in the memcache. Verify that we can get
       // it back both with scanning and get
@@ -186,7 +188,7 @@ public class TestScanner extends HBaseTestCase {
       region.put(lockid, HConstants.COL_STARTCODE, 
         String.valueOf(START_CODE).getBytes(HConstants.UTF8_ENCODING));
 
-      region.commit(lockid);
+      region.commit(lockid, System.currentTimeMillis());
       
       // Validate that we can still get the HRegionInfo, even though it is in
       // an older row on disk and there is a newer row in the memcache
@@ -223,7 +225,7 @@ public class TestScanner extends HBaseTestCase {
       region.put(lockid, HConstants.COL_SERVER, 
         address.toString().getBytes(HConstants.UTF8_ENCODING));
 
-      region.commit(lockid);
+      region.commit(lockid, System.currentTimeMillis());
       
       // Validate again
       

+ 1 - 1
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java

@@ -81,7 +81,7 @@ public class TestScanner2 extends HBaseClusterTestCase {
       HClient.RegionLocation rl = client.getRegionLocation(table);
       regionServer = client.getHRegionConnection(rl.serverAddress);
       scannerId = regionServer.openScanner(rl.regionInfo.regionName,
-          HMaster.METACOLUMNS, new Text());
+          HMaster.METACOLUMNS, new Text(), System.currentTimeMillis(), null);
       while (true) {
         TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
         KeyedData[] values = regionServer.next(scannerId);

+ 7 - 1
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java

@@ -66,6 +66,9 @@ public class TestTableMapReduce extends HBaseTestCase {
       "6789".getBytes()
   };
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public void setUp() throws Exception {
     super.setUp();
@@ -96,7 +99,7 @@ public class TestTableMapReduce extends HBaseTestCase {
             + String.format("%1$05d", i)));
 
         region.put(lockid, TEXT_INPUT_COLUMN, values[i]);
-        region.commit(lockid);
+        region.commit(lockid, System.currentTimeMillis());
       }
 
       region.close();
@@ -117,6 +120,9 @@ public class TestTableMapReduce extends HBaseTestCase {
     }
   }
 
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public void tearDown() throws Exception {
     super.tearDown();

+ 175 - 0
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java

@@ -0,0 +1,175 @@
+package org.apache.hadoop.hbase;
+
+import java.util.TreeMap;
+import org.apache.hadoop.io.Text;
+
+/** Tests user specifyable time stamps */
+public class TestTimestamp extends HBaseClusterTestCase {
+  private static final long T0 = 10L;
+  private static final long T1 = 100L;
+  
+  private static final String COLUMN_NAME = "contents:";
+  private static final String TABLE_NAME = "test";
+  private static final String VERSION1 = "version1";
+  private static final String LATEST = "latest";
+  
+  private static final Text COLUMN = new Text(COLUMN_NAME);
+  private static final Text[] COLUMNS = {
+    COLUMN
+  };
+  private static final Text TABLE = new Text(TABLE_NAME);
+  private static final Text ROW = new Text("row");
+  
+  private HClient client;
+
+  /** constructor */
+  public TestTimestamp() {
+    super();
+    client = new HClient(conf);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    
+    HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
+    desc.addFamily(new HColumnDescriptor(COLUMN_NAME));
+
+    try {
+      client.createTable(desc);
+      
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+  
+  /** the test */
+  public void testTimestamp() {
+    try {
+      client.openTable(TABLE);
+      
+      // store a value specifying an update time
+
+      long lockid = client.startUpdate(ROW);
+      client.put(lockid, COLUMN, VERSION1.getBytes(HConstants.UTF8_ENCODING));
+      client.commit(lockid, T0);
+      
+      // store a value specifying 'now' as the update time
+      
+      lockid = client.startUpdate(ROW);
+      client.put(lockid, COLUMN, LATEST.getBytes(HConstants.UTF8_ENCODING));
+      client.commit(lockid);
+      
+      // delete values older than T1
+      
+      lockid = client.startUpdate(ROW);
+      client.delete(lockid, COLUMN);
+      client.commit(lockid, T1);
+      
+      // now retrieve...
+      
+      // the most recent version:
+      
+      byte[] bytes = client.get(ROW, COLUMN);
+      assertTrue(bytes != null && bytes.length != 0);
+      assertTrue(LATEST.equals(new String(bytes, HConstants.UTF8_ENCODING)));
+      
+      // any version <= time T1
+      
+      byte[][] values = client.get(ROW, COLUMN, T1, 3);
+      assertNull(values);
+      
+      // the version from T0
+      
+      values = client.get(ROW, COLUMN, T0, 3);
+      assertTrue(values.length == 1
+          && VERSION1.equals(new String(values[0], HConstants.UTF8_ENCODING)));
+
+      // flush everything out to disk
+      
+      HRegionServer s = cluster.regionServers.get(0);
+      for(HRegion r: s.onlineRegions.values() ) {
+        r.flushcache(false);
+      }
+      
+      // now retrieve...
+      
+      // the most recent version:
+      
+      bytes = client.get(ROW, COLUMN);
+      assertTrue(bytes != null && bytes.length != 0);
+      assertTrue(LATEST.equals(new String(bytes, HConstants.UTF8_ENCODING)));
+      
+      // any version <= time T1
+      
+      values = client.get(ROW, COLUMN, T1, 3);
+      assertNull(values);
+      
+      // the version from T0
+      
+      values = client.get(ROW, COLUMN, T0, 3);
+      assertTrue(values.length == 1
+          && VERSION1.equals(new String(values[0], HConstants.UTF8_ENCODING)));
+
+      // three versions older than now
+      
+      values = client.get(ROW, COLUMN, 3);
+      assertTrue(values.length == 1
+          && LATEST.equals(new String(values[0], HConstants.UTF8_ENCODING)));
+      
+      // Test scanners
+      
+      HScannerInterface scanner =
+        client.obtainScanner(COLUMNS, HClient.EMPTY_START_ROW);
+      try {
+        HStoreKey key = new HStoreKey();
+        TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+        int count = 0;
+        while(scanner.next(key, results)) {
+          count++;
+        }
+        assertEquals(count, 1);
+        assertEquals(results.size(), 1);
+        
+      } finally {
+        scanner.close();
+      }
+      
+      scanner = client.obtainScanner(COLUMNS, HClient.EMPTY_START_ROW, T1);
+      try {
+        HStoreKey key = new HStoreKey();
+        TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+        int count = 0;
+        while(scanner.next(key, results)) {
+          count++;
+        }
+        assertEquals(count, 0);
+        assertEquals(results.size(), 0);
+        
+      } finally {
+        scanner.close();
+      }
+      
+      scanner = client.obtainScanner(COLUMNS, HClient.EMPTY_START_ROW, T0);
+      try {
+        HStoreKey key = new HStoreKey();
+        TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+        int count = 0;
+        while(scanner.next(key, results)) {
+          count++;
+        }
+        assertEquals(count, 0);
+        assertEquals(results.size(), 0);
+        
+      } finally {
+        scanner.close();
+      }
+      
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+}