Browse Source

HADOOP-1710 All updates should be batch updates

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@565993 13f79535-47bb-0310-9956-ffa450edef68
Jim Kellerman 18 years ago
parent
commit
6f361093ad

+ 8 - 14
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HClient.java

@@ -412,7 +412,7 @@ public class HClient implements HConstants {
     if(this.table.get() == null) {
       throw new IllegalStateException("Must open table first");
     }
-    return this.table.get().startBatchUpdate(row);
+    return this.table.get().startUpdate(row);
   }
   
   /** 
@@ -423,7 +423,7 @@ public class HClient implements HConstants {
     if(this.table.get() == null) {
       throw new IllegalStateException("Must open table first");
     }
-    this.table.get().abortBatch(lockid);
+    this.table.get().abort(lockid);
   }
   
   /** 
@@ -448,7 +448,7 @@ public class HClient implements HConstants {
     if(this.table.get() == null) {
       throw new IllegalStateException("Must open table first");
     }
-    this.table.get().commitBatch(lockid, timestamp);
+    this.table.get().commit(lockid, timestamp);
   }
   
   /** 
@@ -464,9 +464,8 @@ public class HClient implements HConstants {
    * 
    * @param row Name of row to start update against.
    * @return Row lockid.
-   * @throws IOException
    */
-  public long startUpdate(final Text row) throws IOException {
+  public long startUpdate(final Text row) {
     if(this.table.get() == null) {
       throw new IllegalStateException("Must open table first");
     }
@@ -480,9 +479,8 @@ public class HClient implements HConstants {
    * @param lockid lock id returned from startUpdate
    * @param column column whose value is being set
    * @param val new value for column
-   * @throws IOException
    */
-  public void put(long lockid, Text column, byte val[]) throws IOException {
+  public void put(long lockid, Text column, byte val[]) {
     if(this.table.get() == null) {
       throw new IllegalStateException("Must open table first");
     }
@@ -494,9 +492,8 @@ public class HClient implements HConstants {
    *
    * @param lockid              - lock id returned from startUpdate
    * @param column              - name of column whose value is to be deleted
-   * @throws IOException
    */
-  public void delete(long lockid, Text column) throws IOException {
+  public void delete(long lockid, Text column) {
     if(this.table.get() == null) {
       throw new IllegalStateException("Must open table first");
     }
@@ -507,9 +504,8 @@ public class HClient implements HConstants {
    * Abort a row mutation
    *
    * @param lockid              - lock id returned from startUpdate
-   * @throws IOException
    */
-  public void abort(long lockid) throws IOException {
+  public void abort(long lockid) {
     if(this.table.get() == null) {
       throw new IllegalStateException("Must open table first");
     }
@@ -544,13 +540,11 @@ public class HClient implements HConstants {
    * Renew lease on update
    * 
    * @param lockid              - lock id returned from startUpdate
-   * @throws IOException
    */
-  public void renewLease(long lockid) throws IOException {
+  public void renewLease(@SuppressWarnings("unused") long lockid) {
     if(this.table.get() == null) {
       throw new IllegalStateException("Must open table first");
     }
-    this.table.get().renewLease(lockid);
   }
 
   private void printUsage() {

+ 1 - 1
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java

@@ -502,7 +502,7 @@ public class HConnectionManager implements HConstants {
 
                 } catch (IOException e) {
                   if (tries < numRetries - 1) {
-                    findServersForTable(META_TABLE_NAME);
+                    metaServers = findServersForTable(META_TABLE_NAME);
                     success = false;
                     break;
                   }

+ 81 - 104
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java

@@ -1796,21 +1796,20 @@ HMasterRegionInterface, Runnable {
 
       // Remove server from root/meta entries
       
-      long clientId = rand.nextLong();
       for (ToDoEntry e: toDoList) {
-        long lockid = server.startUpdate(regionName, clientId, e.row);
+        BatchUpdate b = new BatchUpdate();
+        long lockid = b.startUpdate(e.row);
       
         if (e.deleteRegion) {
-          server.delete(regionName, clientId, lockid, COL_REGIONINFO);
+          b.delete(lockid, COL_REGIONINFO);
         
         } else if (e.regionOffline) {
           e.info.offLine = true;
-          server.put(regionName, clientId, lockid, COL_REGIONINFO,
-              Writables.getBytes(e.info));
+          b.put(lockid, COL_REGIONINFO, Writables.getBytes(e.info));
         }
-        server.delete(regionName, clientId, lockid, COL_SERVER);
-        server.delete(regionName, clientId, lockid, COL_STARTCODE);
-        server.commit(regionName, clientId, lockid, System.currentTimeMillis());
+        b.delete(lockid, COL_SERVER);
+        b.delete(lockid, COL_STARTCODE);
+        server.batchUpdate(regionName, System.currentTimeMillis(), b);
       }
 
       // Get regions reassigned
@@ -2053,23 +2052,20 @@ HMasterRegionInterface, Runnable {
           server = connection.getHRegionConnection(r.server);
         }
 
-        long clientId = rand.nextLong();
         try {
-          long lockid = server.startUpdate(metaRegionName, clientId,
-              regionInfo.regionName);
+          BatchUpdate b = new BatchUpdate();
+          long lockid = b.startUpdate(regionInfo.regionName);
 
           if (deleteRegion) {
-            server.delete(metaRegionName, clientId, lockid, COL_REGIONINFO);
+            b.delete(lockid, COL_REGIONINFO);
 
           } else if (!reassignRegion ) {
             regionInfo.offLine = true;
-            server.put(metaRegionName, clientId, lockid, COL_REGIONINFO,
-                Writables.getBytes(regionInfo));
+            b.put(lockid, COL_REGIONINFO, Writables.getBytes(regionInfo));
           }
-          server.delete(metaRegionName, clientId, lockid, COL_SERVER);
-          server.delete(metaRegionName, clientId, lockid, COL_STARTCODE);
-          server.commit(metaRegionName, clientId, lockid,
-              System.currentTimeMillis());
+          b.delete(lockid, COL_SERVER);
+          b.delete(lockid, COL_STARTCODE);
+          server.batchUpdate(metaRegionName, System.currentTimeMillis(), b);
 
           break;
 
@@ -2199,18 +2195,15 @@ HMasterRegionInterface, Runnable {
         LOG.info("updating row " + region.getRegionName() + " in table " +
             metaRegionName);
 
-        long clientId = rand.nextLong();
         try {
-          long lockid = server.startUpdate(metaRegionName, clientId,
-              region.getRegionName());
+          BatchUpdate b = new BatchUpdate();
+          long lockid = b.startUpdate(region.getRegionName());
           
-          server.put(metaRegionName, clientId, lockid, COL_SERVER,
+          b.put(lockid, COL_SERVER,
               Writables.stringToBytes(serverAddress.toString()));
           
-          server.put(metaRegionName, clientId, lockid, COL_STARTCODE, startCode);
-          
-          server.commit(metaRegionName, clientId, lockid,
-              System.currentTimeMillis());
+          b.put(lockid, COL_STARTCODE, startCode);
+          server.batchUpdate(metaRegionName, System.currentTimeMillis(), b);
 
           if (region.tableDesc.getName().equals(META_TABLE_NAME)) {
             // It's a meta region.
@@ -2335,11 +2328,11 @@ HMasterRegionInterface, Runnable {
                 newRegion.getTableDesc().getName()).lastKey()));
           
       Text metaRegionName = m.regionName;
-      HRegionInterface r = connection.getHRegionConnection(m.server);
-      long scannerid = r.openScanner(metaRegionName, COL_REGIONINFO_ARRAY,
+      HRegionInterface server = connection.getHRegionConnection(m.server);
+      long scannerid = server.openScanner(metaRegionName, COL_REGIONINFO_ARRAY,
           tableName, System.currentTimeMillis(), null);
       try {
-        KeyedData[] data = r.next(scannerid);
+        KeyedData[] data = server.next(scannerid);
             
         // Test data and that the row for the data is for our table. If table
         // does not exist, scanner will return row after where our table would
@@ -2355,7 +2348,7 @@ HMasterRegionInterface, Runnable {
         }
             
       } finally {
-        r.close(scannerid);
+        server.close(scannerid);
       }
 
       // 2. Create the HRegion
@@ -2367,13 +2360,10 @@ HMasterRegionInterface, Runnable {
           
       HRegionInfo info = region.getRegionInfo();
       Text regionName = region.getRegionName();
-      long clientId = rand.nextLong();
-      long lockid = r.startUpdate(metaRegionName, clientId, regionName);
-      
-      r.put(metaRegionName, clientId, lockid, COL_REGIONINFO,
-          Writables.getBytes(info));
-      
-      r.commit(metaRegionName, clientId, lockid, System.currentTimeMillis());
+      BatchUpdate b = new BatchUpdate();
+      long lockid = b.startUpdate(regionName);
+      b.put(lockid, COL_REGIONINFO, Writables.getBytes(info));
+      server.batchUpdate(metaRegionName, System.currentTimeMillis(), b);
 
       // 4. Close the new region to flush it to disk.  Close its log file too.
       
@@ -2608,7 +2598,6 @@ HMasterRegionInterface, Runnable {
       new HashMap<String, HashSet<HRegionInfo>>();
     
     protected long lockid;
-    protected long clientId;
 
     ChangeTableState(Text tableName, boolean onLine) throws IOException {
       super(tableName);
@@ -2653,41 +2642,36 @@ HMasterRegionInterface, Runnable {
           LOG.debug("updating columns in row: " + i.regionName);
         }
 
-        lockid = -1L;
-        clientId = rand.nextLong();
-        try {
-          lockid = server.startUpdate(m.regionName, clientId, i.regionName);
-          updateRegionInfo(server, m.regionName, i);
-          server.delete(m.regionName, clientId, lockid, COL_SERVER);
-          server.delete(m.regionName, clientId, lockid, COL_STARTCODE);
-          server.commit(m.regionName, clientId, lockid,
-              System.currentTimeMillis());
-
-          lockid = -1L;
-
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("updated columns in row: " + i.regionName);
-          }
-
-        } catch (IOException e) {
-          if (e instanceof RemoteException) {
-            e = RemoteExceptionHandler.decodeRemoteException(
-                (RemoteException) e);
-          }
-          LOG.error("column update failed in row: " + i.regionName, e);
+        BatchUpdate b = new BatchUpdate();
+        lockid = b.startUpdate(i.regionName);
+        updateRegionInfo(b, i);
+        b.delete(lockid, COL_SERVER);
+        b.delete(lockid, COL_STARTCODE);
 
-        } finally {
+        for (int tries = 0; tries < numRetries; tries++) {
           try {
-            if (lockid != -1L) {
-              server.abort(m.regionName, clientId, lockid);
+            server.batchUpdate(m.regionName, System.currentTimeMillis(), b);
+            
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("updated columns in row: " + i.regionName);
             }
+            break;
 
-          } catch (IOException iex) {
-            if (iex instanceof RemoteException) {
-              iex = RemoteExceptionHandler.decodeRemoteException(
-                  (RemoteException) iex);
+          } catch (IOException e) {
+            if (tries == numRetries - 1) {
+              if (e instanceof RemoteException) {
+                e = RemoteExceptionHandler.decodeRemoteException(
+                    (RemoteException) e);
+              }
+              LOG.error("column update failed in row: " + i.regionName, e);
+              break;
             }
-            LOG.error("", iex);
+          }
+          try {
+            Thread.sleep(threadWakeFrequency);
+
+          } catch (InterruptedException e) {
+            // continue
           }
         }
 
@@ -2738,12 +2722,11 @@ HMasterRegionInterface, Runnable {
       servedRegions.clear();
     }
 
-    protected void updateRegionInfo(final HRegionInterface server,
-        final Text regionName, final HRegionInfo i) throws IOException {
+    protected void updateRegionInfo(final BatchUpdate b, final HRegionInfo i)
+    throws IOException {
       
       i.offLine = !online;
-      server.put(regionName, clientId, lockid, COL_REGIONINFO,
-          Writables.getBytes(i));
+      b.put(lockid, COL_REGIONINFO, Writables.getBytes(i));
     }
   }
 
@@ -2790,11 +2773,10 @@ HMasterRegionInterface, Runnable {
     }
 
     @Override
-    protected void updateRegionInfo(
-        @SuppressWarnings("hiding") HRegionInterface server, Text regionName,
-        @SuppressWarnings("unused") HRegionInfo i) throws IOException {
+    protected void updateRegionInfo(BatchUpdate b,
+        @SuppressWarnings("unused") HRegionInfo i) {
       
-      server.delete(regionName, clientId, lockid, COL_REGIONINFO);
+      b.delete(lockid, COL_REGIONINFO);
     }
   }
 
@@ -2816,40 +2798,35 @@ HMasterRegionInterface, Runnable {
 
     protected void updateRegionInfo(HRegionInterface server, Text regionName,
         HRegionInfo i) throws IOException {
+
+      BatchUpdate b = new BatchUpdate();
+      long lockid = b.startUpdate(i.regionName);
+      b.put(lockid, COL_REGIONINFO, Writables.getBytes(i));
       
-      long lockid = -1L;
-      long clientId = rand.nextLong();
-      try {
-        lockid = server.startUpdate(regionName, clientId, i.regionName);
-        server.put(regionName, clientId, lockid, COL_REGIONINFO,
-            Writables.getBytes(i));
-      
-        server.commit(regionName, clientId, lockid, System.currentTimeMillis());
-        lockid = -1L;
+      for (int tries = 0; tries < numRetries; tries++) {
+        try {
+          server.batchUpdate(regionName, System.currentTimeMillis(), b);
         
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("updated columns in row: " + i.regionName);
-        }
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("updated columns in row: " + i.regionName);
+          }
+          break;
         
-      } catch (Exception e) {
-        if (e instanceof RemoteException) {
-          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-        }
-        LOG.error("column update failed in row: " + i.regionName, e);
-
-      } finally {
-        if (lockid != -1L) {
-          try {
-            server.abort(regionName, clientId, lockid);
-
-          } catch (IOException iex) {
-            if (iex instanceof RemoteException) {
-              iex = RemoteExceptionHandler.decodeRemoteException(
-                  (RemoteException) iex);
+        } catch (IOException e) {
+          if (tries == numRetries - 1) {
+            if (e instanceof RemoteException) {
+              e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
             }
-            LOG.error("", iex);
+            LOG.error("column update failed in row: " + i.regionName, e);
+            break;
           }
         }
+        try {
+          Thread.sleep(threadWakeFrequency);
+          
+        } catch (InterruptedException e) {
+          // continue
+        }
       }
     }
   }

+ 4 - 14
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java

@@ -283,13 +283,8 @@ class HMerge implements HConstants {
             LOG.debug("updated columns in row: " + regionsToDelete[r]);
           }
         } finally {
-          try {
-            if(lockid != -1L) {
-              table.abort(lockid);
-            }
-
-          } catch(IOException iex) {
-            LOG.error(iex);
+          if(lockid != -1L) {
+            table.abort(lockid);
           }
         }
       }
@@ -309,13 +304,8 @@ class HMerge implements HConstants {
               + newRegion.getRegionName());
         }
       } finally {
-        try {
-          if(lockid != -1L) {
-            table.abort(lockid);
-          }
-
-        } catch(IOException iex) {
-          LOG.error(iex);
+        if(lockid != -1L) {
+          table.abort(lockid);
         }
       }
     }

+ 20 - 2
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java

@@ -97,7 +97,7 @@ public interface HRegionInterface extends VersionedProtocol {
    * @throws IOException
    */
   public KeyedData[] getRow(final Text regionName, final Text row)
-  throws IOException;
+  throws IOException; //TODO
 
   //////////////////////////////////////////////////////////////////////////////
   // Start an atomic row insertion/update.  No changes are committed until the 
@@ -126,7 +126,10 @@ public interface HRegionInterface extends VersionedProtocol {
    * @param row Name of row to start update against.
    * @return Row lockid.
    * @throws IOException
+   * 
+   * Deprecated. Use @see {@link #batchUpdate(Text, long, BatchUpdate)} instead.
    */
+  @Deprecated
   public long startUpdate(final Text regionName, final long clientid,
       final Text row)
   throws IOException;
@@ -140,7 +143,10 @@ public interface HRegionInterface extends VersionedProtocol {
    * @param column column whose value is being set
    * @param val new value for column
    * @throws IOException
+   * 
+   * Deprecated. Use @see {@link #batchUpdate(Text, long, BatchUpdate)} instead.
    */
+  @Deprecated
   public void put(final Text regionName, final long clientid, final long lockid,
       final Text column, final byte [] val)
   throws IOException;
@@ -153,7 +159,10 @@ public interface HRegionInterface extends VersionedProtocol {
    * @param lockid lock id returned from startUpdate
    * @param column name of column whose value is to be deleted
    * @throws IOException
+   * 
+   * Deprecated. Use @see {@link #batchUpdate(Text, long, BatchUpdate)} instead.
    */
+  @Deprecated
   public void delete(final Text regionName, final long clientid,
       final long lockid, final Text column)
   throws IOException;
@@ -165,7 +174,10 @@ public interface HRegionInterface extends VersionedProtocol {
    * @param clientid a unique value to identify the client
    * @param lockid lock id returned from startUpdate
    * @throws IOException
+   * 
+   * Deprecated. Use @see {@link #batchUpdate(Text, long, BatchUpdate)} instead.
    */
+  @Deprecated
   public void abort(final Text regionName, final long clientid, 
       final long lockid)
   throws IOException;
@@ -178,7 +190,10 @@ public interface HRegionInterface extends VersionedProtocol {
    * @param lockid lock id returned from startUpdate
    * @param timestamp the time (in milliseconds to associate with this change)
    * @throws IOException
+   * 
+   * Deprecated. Use @see {@link #batchUpdate(Text, long, BatchUpdate)} instead.
    */
+  @Deprecated
   public void commit(final Text regionName, final long clientid,
       final long lockid, final long timestamp)
   throws IOException;
@@ -189,7 +204,10 @@ public interface HRegionInterface extends VersionedProtocol {
    * @param lockid lock id returned from startUpdate
    * @param clientid a unique value to identify the client
    * @throws IOException
+   * 
+   * Deprecated. Use @see {@link #batchUpdate(Text, long, BatchUpdate)} instead.
    */
+  @Deprecated
   public void renewLease(long lockid, long clientid) throws IOException;
 
   //////////////////////////////////////////////////////////////////////////////
@@ -229,7 +247,7 @@ public interface HRegionInterface extends VersionedProtocol {
    * @return array of values
    * @throws IOException
    */
-  public KeyedData[] next(long scannerId) throws IOException;
+  public KeyedData[] next(long scannerId) throws IOException; //TODO
   
   /**
    * Close a scanner

+ 36 - 54
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java

@@ -225,7 +225,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
       // Remove old region from META
       // NOTE: there is no need for retry logic here. HTable does it for us.
       
-      long lockid = t.startBatchUpdate(oldRegionInfo.getRegionName());
+      long lockid = t.startUpdate(oldRegionInfo.getRegionName());
       oldRegionInfo.offLine = true;
       oldRegionInfo.split = true;
       t.put(lockid, COL_REGIONINFO, Writables.getBytes(oldRegionInfo));
@@ -235,17 +235,17 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
 
       t.put(lockid, COL_SPLITB, Writables.getBytes(
           newRegions[1].getRegionInfo()));
-      t.commitBatch(lockid);
+      t.commit(lockid);
       
       // Add new regions to META
 
       for (int i = 0; i < newRegions.length; i++) {
-        lockid = t.startBatchUpdate(newRegions[i].getRegionName());
+        lockid = t.startUpdate(newRegions[i].getRegionName());
 
         t.put(lockid, COL_REGIONINFO, Writables.getBytes(
             newRegions[i].getRegionInfo()));
         
-        t.commitBatch(lockid);
+        t.commit(lockid);
       }
           
       // Now tell the master about the new regions
@@ -975,18 +975,14 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
   // HRegionInterface
   //////////////////////////////////////////////////////////////////////////////
 
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   public HRegionInfo getRegionInfo(final Text regionName)
   throws NotServingRegionException {
     requestCount.incrementAndGet();
     return getRegion(regionName).getRegionInfo();
   }
 
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   public void batchUpdate(Text regionName, long timestamp, BatchUpdate b)
   throws IOException {
     requestCount.incrementAndGet();
@@ -1006,9 +1002,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     commit(regionName, clientid, lockid, timestamp);
   }
   
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   public byte [] get(final Text regionName, final Text row,
       final Text column)
   throws IOException {
@@ -1016,9 +1010,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     return getRegion(regionName).get(row, column);
   }
 
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   public byte [][] get(final Text regionName, final Text row,
       final Text column, final int numVersions)
   throws IOException {  
@@ -1026,18 +1018,14 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     return getRegion(regionName).get(row, column, numVersions);
   }
 
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   public byte [][] get(final Text regionName, final Text row, final Text column, 
       final long timestamp, final int numVersions) throws IOException {
     requestCount.incrementAndGet();
     return getRegion(regionName).get(row, column, timestamp, numVersions);
   }
 
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   public KeyedData[] getRow(final Text regionName, final Text row)
   throws IOException {
     requestCount.incrementAndGet();
@@ -1052,9 +1040,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     return result;
   }
 
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   public KeyedData[] next(final long scannerId)
   throws IOException {
     requestCount.incrementAndGet();
@@ -1096,18 +1082,15 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     return values.toArray(new KeyedData[values.size()]);
   }
 
-  /**
-   * {@inheritDoc}
+  /*
+   * NOTE: When startUpdate, put, delete, abort, commit and renewLease are
+   * removed from HRegionInterface, these methods (with the exception of
+   * renewLease) must remain, as they are called by batchUpdate (renewLease
+   * can just be removed)
+   * 
+   * However, the remaining methods can become protected instead of public
+   * at that point.
    */
-  public long startUpdate(Text regionName, long clientid, Text row) 
-      throws IOException {
-    requestCount.incrementAndGet();
-    HRegion region = getRegion(regionName);
-    long lockid = region.startUpdate(row);
-    this.leases.createLease(clientid, lockid,
-      new RegionListener(region, lockid));
-    return lockid;
-  }
 
   /** Create a lease for an update. If it times out, the update is aborted */
   private static class RegionListener implements LeaseListener {
@@ -1119,9 +1102,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
       this.localLockId = lockId;
     }
     
-    /**
-     * {@inheritDoc}
-     */
+    /** {@inheritDoc} */
     public void leaseExpired() {
       try {
         localRegion.abort(localLockId);
@@ -1139,9 +1120,18 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     }
   }
   
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
+  public long startUpdate(Text regionName, long clientid, Text row) 
+      throws IOException {
+    requestCount.incrementAndGet();
+    HRegion region = getRegion(regionName);
+    long lockid = region.startUpdate(row);
+    this.leases.createLease(clientid, lockid,
+      new RegionListener(region, lockid));
+    return lockid;
+  }
+
+  /** {@inheritDoc} */
   public void put(final Text regionName, final long clientid,
       final long lockid, final Text column, final byte [] val)
   throws IOException {
@@ -1151,9 +1141,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     region.put(lockid, column, val);
   }
 
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   public void delete(Text regionName, long clientid, long lockid, Text column) 
   throws IOException {
     requestCount.incrementAndGet();
@@ -1162,9 +1150,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     region.delete(lockid, column);
   }
 
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   public void abort(Text regionName, long clientid, long lockid) 
   throws IOException {
     requestCount.incrementAndGet();
@@ -1173,9 +1159,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     region.abort(lockid);
   }
 
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   public void commit(Text regionName, final long clientid, final long lockid,
       final long timestamp) throws IOException {
     requestCount.incrementAndGet();
@@ -1184,9 +1168,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     region.commit(lockid, timestamp);
   }
 
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   public void renewLease(long lockid, long clientid) throws IOException {
     requestCount.incrementAndGet();
     leases.renewLease(clientid, lockid);

+ 96 - 228
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java

@@ -52,11 +52,6 @@ public class HTable implements HConstants {
   
   // For row mutation operations
   
-  protected volatile long currentLockId;
-  protected volatile Text currentRegion;
-  protected volatile HRegionInterface currentServer;
-  protected volatile long clientid;
-  
   protected volatile boolean closed;
 
   protected void checkClosed() {
@@ -81,7 +76,6 @@ public class HTable implements HConstants {
     this.rand = new Random();
     tableServers = connection.getTableServers(tableName);
     this.batch = null;
-    this.currentLockId = -1L;
     closed = false;
   }
 
@@ -116,10 +110,6 @@ public class HTable implements HConstants {
     closed = true;
     tableServers = null;
     batch = null;
-    currentLockId = -1L;
-    currentRegion = null;
-    currentServer = null;
-    clientid = -1L;
     connection.close(tableName);
   }
   
@@ -127,8 +117,28 @@ public class HTable implements HConstants {
    * Verifies that no update is in progress
    */
   public synchronized void checkUpdateInProgress() {
-    if (batch != null || currentLockId != -1L) {
-      throw new IllegalStateException("update in progress");
+    updateInProgress(false);
+  }
+  
+  /*
+   * Checks to see if an update is in progress
+   * 
+   * @param updateMustBeInProgress
+   *    If true, an update must be in progress. An IllegalStateException will be
+   *    thrown if not.
+   *    
+   *    If false, an update must not be in progress. An IllegalStateException
+   *    will be thrown if an update is in progress.
+   */
+  private void updateInProgress(boolean updateMustBeInProgress) {
+    if (updateMustBeInProgress) {
+      if (batch == null) {
+        throw new IllegalStateException("no update in progress");
+      }
+    } else {
+      if (batch != null) {
+        throw new IllegalStateException("update in progress");
+      }
     }
   }
   
@@ -415,27 +425,25 @@ public class HTable implements HConstants {
    *
    * @param row name of row to be updated
    * @return lockid to be used in subsequent put, delete and commit calls
+   * 
+   * Deprecated. Batch operations are now the default. startBatchUpdate is now
+   * implemented by @see {@link #startUpdate(Text)} 
    */
+  @Deprecated
   public synchronized long startBatchUpdate(final Text row) {
-    checkClosed();
-    checkUpdateInProgress();
-    batch = new BatchUpdate();
-    return batch.startUpdate(row);
+    return startUpdate(row);
   }
   
   /** 
    * Abort a batch mutation
    * @param lockid lock id returned by startBatchUpdate
+   * 
+   * Deprecated. Batch operations are now the default. abortBatch is now 
+   * implemented by @see {@link #abort(long)}
    */
+  @Deprecated
   public synchronized void abortBatch(final long lockid) {
-    checkClosed();
-    if (batch == null) {
-      throw new IllegalStateException("no batch update in progress");
-    }
-    if (batch.getLockid() != lockid) {
-      throw new IllegalArgumentException("invalid lock id " + lockid);
-    }
-    batch = null;
+    abort(lockid);
   }
   
   /** 
@@ -443,9 +451,13 @@ public class HTable implements HConstants {
    *
    * @param lockid lock id returned by startBatchUpdate
    * @throws IOException
+   * 
+   * Deprecated. Batch operations are now the default. commitBatch(long) is now
+   * implemented by @see {@link #commit(long)}
    */
+  @Deprecated
   public void commitBatch(final long lockid) throws IOException {
-    commitBatch(lockid, System.currentTimeMillis());
+    commit(lockid, System.currentTimeMillis());
   }
 
   /** 
@@ -454,104 +466,32 @@ public class HTable implements HConstants {
    * @param lockid lock id returned by startBatchUpdate
    * @param timestamp time to associate with all the changes
    * @throws IOException
+   * 
+   * Deprecated. Batch operations are now the default. commitBatch(long, long)
+   * is now implemented by @see {@link #commit(long, long)}
    */
+  @Deprecated
   public synchronized void commitBatch(final long lockid, final long timestamp)
   throws IOException {
 
-    checkClosed();
-    if (batch == null) {
-      throw new IllegalStateException("no batch update in progress");
-    }
-    if (batch.getLockid() != lockid) {
-      throw new IllegalArgumentException("invalid lock id " + lockid);
-    }
-    
-    try {
-      for (int tries = 0; tries < numRetries; tries++) {
-        HRegionLocation r = getRegionLocation(batch.getRow());
-        HRegionInterface server =
-          connection.getHRegionConnection(r.getServerAddress());
-
-        try {
-          server.batchUpdate(r.getRegionInfo().getRegionName(), timestamp, batch);
-          break;
-
-        } catch (IOException e) {
-          if (tries < numRetries -1) {
-            tableServers = connection.reloadTableServers(tableName);
-
-          } else {
-            if (e instanceof RemoteException) {
-              e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-            }
-            throw e;
-          }
-        }
-        try {
-          Thread.sleep(pause);
-
-        } catch (InterruptedException e) {
-        }
-      }
-    } finally {
-      batch = null;
-    }
+    commit(lockid, timestamp);
   }
   
   /** 
    * Start an atomic row insertion/update.  No changes are committed until the 
-   * call to commit() returns. A call to abort() will abandon any updates in progress.
-   *
-   * Callers to this method are given a lease for each unique lockid; before the
-   * lease expires, either abort() or commit() must be called. If it is not 
-   * called, the system will automatically call abort() on the client's behalf.
+   * call to commit() returns.
+   * 
+   * A call to abort() will abandon any updates in progress.
    *
-   * The client can gain extra time with a call to renewLease().
-   * Start an atomic row insertion or update
    * 
    * @param row Name of row to start update against.
    * @return Row lockid.
-   * @throws IOException
    */
-  public synchronized long startUpdate(final Text row) throws IOException {
+  public synchronized long startUpdate(final Text row) {
     checkClosed();
-    checkUpdateInProgress();
-    for (int tries = 0; tries < numRetries; tries++) {
-      IOException e = null;
-      HRegionLocation info = getRegionLocation(row);
-      try {
-        currentServer =
-          connection.getHRegionConnection(info.getServerAddress());
-
-        currentRegion = info.getRegionInfo().getRegionName();
-        clientid = rand.nextLong();
-        currentLockId = currentServer.startUpdate(currentRegion, clientid, row);
-
-        break;
-
-      } catch (IOException ex) {
-        e = ex;
-      }
-      if (tries < numRetries - 1) {
-        try {
-          Thread.sleep(this.pause);
-
-        } catch (InterruptedException ex) {
-        }
-        try {
-          tableServers = connection.reloadTableServers(tableName);
-
-        } catch (IOException ex) {
-          e = ex;
-        }
-      } else {
-        if (e instanceof RemoteException) {
-          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-        }
-        throw e;
-      }
-    }
-    return currentLockId;
+    updateInProgress(false);
+    batch = new BatchUpdate();
+    return batch.startUpdate(row);
   }
   
   /** 
@@ -561,37 +501,14 @@ public class HTable implements HConstants {
    * @param lockid lock id returned from startUpdate
    * @param column column whose value is being set
    * @param val new value for column
-   * @throws IOException
    */
-  public void put(long lockid, Text column, byte val[]) throws IOException {
+  public void put(long lockid, Text column, byte val[]) {
     checkClosed();
     if (val == null) {
       throw new IllegalArgumentException("value cannot be null");
     }
-    if (batch != null) {
-      batch.put(lockid, column, val);
-      return;
-    }
-    
-    if (lockid != currentLockId) {
-      throw new IllegalArgumentException("invalid lockid");
-    }
-    try {
-      this.currentServer.put(this.currentRegion, this.clientid, lockid, column,
-        val);
-    } catch (IOException e) {
-      try {
-        this.currentServer.abort(this.currentRegion, this.clientid, lockid);
-      } catch (IOException e2) {
-        LOG.warn(e2);
-      }
-      this.currentServer = null;
-      this.currentRegion = null;
-      if (e instanceof RemoteException) {
-        e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-      }
-      throw e;
-    }
+    updateInProgress(true);
+    batch.put(lockid, column, val);
   }
   
   /** 
@@ -599,67 +516,25 @@ public class HTable implements HConstants {
    *
    * @param lockid              - lock id returned from startUpdate
    * @param column              - name of column whose value is to be deleted
-   * @throws IOException
    */
-  public void delete(long lockid, Text column) throws IOException {
+  public void delete(long lockid, Text column) {
     checkClosed();
-    if (batch != null) {
-      batch.delete(lockid, column);
-      return;
-    }
-    
-    if (lockid != currentLockId) {
-      throw new IllegalArgumentException("invalid lockid");
-    }
-    try {
-      this.currentServer.delete(this.currentRegion, this.clientid, lockid,
-        column);
-    } catch (IOException e) {
-      try {
-        this.currentServer.abort(this.currentRegion, this.clientid, lockid);
-      } catch(IOException e2) {
-        LOG.warn(e2);
-      }
-      this.currentServer = null;
-      this.currentRegion = null;
-      if (e instanceof RemoteException) {
-        e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-      }
-      throw e;
-    }
+    updateInProgress(true);
+    batch.delete(lockid, column);
   }
   
   /** 
    * Abort a row mutation
    *
    * @param lockid              - lock id returned from startUpdate
-   * @throws IOException
    */
-  public synchronized void abort(long lockid) throws IOException {
+  public synchronized void abort(long lockid) {
     checkClosed();
-    if (batch != null) {
-      abortBatch(lockid);
-      return;
-    }
-
-    if (lockid != currentLockId) {
-      throw new IllegalArgumentException("invalid lockid");
-    }
-    
-    try {
-      try {
-        this.currentServer.abort(this.currentRegion, this.clientid, lockid);
-      } catch (IOException e) {
-        this.currentServer = null;
-        this.currentRegion = null;
-        if (e instanceof RemoteException) {
-          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-        }
-        throw e;
-      }
-    } finally {
-      currentLockId = -1L;
+    updateInProgress(true);
+    if (batch.getLockid() != lockid) {
+      throw new IllegalArgumentException("invalid lock id " + lockid);
     }
+    batch = null;
   }
   
   /** 
@@ -679,32 +554,45 @@ public class HTable implements HConstants {
    * @param timestamp           - time to associate with the change
    * @throws IOException
    */
-  public synchronized void commit(long lockid, long timestamp) throws IOException {
+  public synchronized void commit(long lockid, long timestamp)
+  throws IOException {
+    
     checkClosed();
-    if (batch != null) {
-      commitBatch(lockid, timestamp);
-      return;
+    updateInProgress(true);
+    if (batch.getLockid() != lockid) {
+      throw new IllegalArgumentException("invalid lock id " + lockid);
     }
+    
+    try {
+      for (int tries = 0; tries < numRetries; tries++) {
+        HRegionLocation r = getRegionLocation(batch.getRow());
+        HRegionInterface server =
+          connection.getHRegionConnection(r.getServerAddress());
 
-    if (lockid != currentLockId) {
-      throw new IllegalArgumentException("invalid lockid");
-    }
+        try {
+          server.batchUpdate(r.getRegionInfo().getRegionName(), timestamp, batch);
+          break;
 
-    try {
-      try {
-        this.currentServer.commit(this.currentRegion, this.clientid, lockid,
-            timestamp);
+        } catch (IOException e) {
+          if (tries < numRetries -1) {
+            tableServers = connection.reloadTableServers(tableName);
 
-      } catch (IOException e) {
-        this.currentServer = null;
-        this.currentRegion = null;
-        if(e instanceof RemoteException) {
-          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+          } else {
+            if (e instanceof RemoteException) {
+              e = RemoteExceptionHandler.decodeRemoteException(
+                  (RemoteException) e);
+            }
+            throw e;
+          }
+        }
+        try {
+          Thread.sleep(pause);
+
+        } catch (InterruptedException e) {
         }
-        throw e;
       }
     } finally {
-      currentLockId = -1L;
+      batch = null;
     }
   }
   
@@ -712,32 +600,12 @@ public class HTable implements HConstants {
    * Renew lease on update
    * 
    * @param lockid              - lock id returned from startUpdate
-   * @throws IOException
+   * 
+   * Deprecated. Batch updates are now the default. Consequently this method
+   * does nothing.
    */
-  public synchronized void renewLease(long lockid) throws IOException {
-    checkClosed();
-    if (batch != null) {
-      return;
-    }
-
-    if (lockid != currentLockId) {
-      throw new IllegalArgumentException("invalid lockid");
-    }
-    try {
-      this.currentServer.renewLease(lockid, this.clientid);
-    } catch (IOException e) {
-      try {
-        this.currentServer.abort(this.currentRegion, this.clientid, lockid);
-      } catch (IOException e2) {
-        LOG.warn(e2);
-      }
-      this.currentServer = null;
-      this.currentRegion = null;
-      if (e instanceof RemoteException) {
-        e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-      }
-      throw e;
-    }
+  @Deprecated
+  public synchronized void renewLease(@SuppressWarnings("unused") long lockid) {
   }
 
   /**

+ 4 - 4
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestBatchUpdate.java

@@ -63,7 +63,7 @@ public class TestBatchUpdate extends HBaseClusterTestCase {
   /** the test case */
   public void testBatchUpdate() {
     try {
-      table.commitBatch(-1L);
+      table.commit(-1L);
       
     } catch (IllegalStateException e) {
       // expected
@@ -72,7 +72,7 @@ public class TestBatchUpdate extends HBaseClusterTestCase {
       fail();
     }
 
-    long lockid = table.startBatchUpdate(new Text("row1"));
+    long lockid = table.startUpdate(new Text("row1"));
     
     try {
       try {
@@ -86,9 +86,9 @@ public class TestBatchUpdate extends HBaseClusterTestCase {
       }
       table.put(lockid, CONTENTS, value);
       table.delete(lockid, CONTENTS);
-      table.commitBatch(lockid);
+      table.commit(lockid);
       
-      lockid = table.startBatchUpdate(new Text("row2"));
+      lockid = table.startUpdate(new Text("row2"));
       table.put(lockid, CONTENTS, value);
       table.commit(lockid);
  

+ 2 - 2
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHTable.java

@@ -59,7 +59,7 @@ public class TestHTable extends HBaseClusterTestCase implements HConstants {
     byte[] value = "value".getBytes(UTF8_ENCODING);
     
     HTable a = new HTable(conf, tableAname);
-    long lockid = a.startBatchUpdate(row);
+    long lockid = a.startUpdate(row);
     a.put(lockid, COLUMN_FAMILY, value);
     a.commit(lockid);
     
@@ -77,7 +77,7 @@ public class TestHTable extends HBaseClusterTestCase implements HConstants {
       HStoreKey key = new HStoreKey();
       TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
       while(s.next(key, results)) {
-        lockid = b.startBatchUpdate(key.getRow());
+        lockid = b.startUpdate(key.getRow());
         for(Map.Entry<Text, byte[]> e: results.entrySet()) {
           b.put(lockid, e.getKey(), e.getValue());
         }

+ 1 - 1
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java

@@ -280,7 +280,7 @@ public class TestScanner2 extends HBaseClusterTestCase {
   private void removeRegionFromMETA(final HTable t, final Text regionName)
   throws IOException {
     try {
-      long lockid = t.startBatchUpdate(regionName);
+      long lockid = t.startUpdate(regionName);
       t.delete(lockid, HConstants.COL_REGIONINFO);
       t.delete(lockid, HConstants.COL_SERVER);
       t.delete(lockid, HConstants.COL_STARTCODE);

+ 1 - 1
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java

@@ -461,7 +461,7 @@ public class TestSplit extends HBaseTestCase {
         for (char e = thirdCharStart; e <= LAST_CHAR; e++) {
           byte [] bytes = new byte [] {(byte)c, (byte)d, (byte)e};
           Text t = new Text(new String(bytes));
-          long lockid = table.startBatchUpdate(t);
+          long lockid = table.startUpdate(t);
           try {
             table.put(lockid, new Text(column), bytes);
             table.commit(lockid, System.currentTimeMillis());