|
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase;
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
import java.util.NoSuchElementException;
|
|
|
import java.util.Random;
|
|
@@ -33,6 +34,7 @@ import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.hbase.filter.RowFilterInterface;
|
|
|
+import org.apache.hadoop.hbase.io.BatchUpdate;
|
|
|
import org.apache.hadoop.hbase.io.KeyedData;
|
|
|
import org.apache.hadoop.io.DataInputBuffer;
|
|
|
import org.apache.hadoop.io.Text;
|
|
@@ -62,10 +64,102 @@ public class HClient implements HConstants {
|
|
|
private long currentLockId;
|
|
|
private Class<? extends HRegionInterface> serverInterfaceClass;
|
|
|
|
|
|
+ protected class BatchHandler {
|
|
|
+ private HashMap<RegionLocation, BatchUpdate> regionToBatch;
|
|
|
+ private HashMap<Long, BatchUpdate> lockToBatch;
|
|
|
+
|
|
|
+ /** constructor */
|
|
|
+ public BatchHandler() {
|
|
|
+ this.regionToBatch = new HashMap<RegionLocation, BatchUpdate>();
|
|
|
+ this.lockToBatch = new HashMap<Long, BatchUpdate>();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Start a batch row insertion/update.
|
|
|
+ *
|
|
|
+ * Manages multiple batch updates that are targeted for multiple servers,
|
|
|
+ * should the rows span several region servers.
|
|
|
+ *
|
|
|
+ * No changes are committed until the client commits the batch operation via
|
|
|
+ * HClient.batchCommit().
|
|
|
+ *
|
|
|
+ * The entire batch update can be abandoned by calling HClient.batchAbort();
|
|
|
+ *
|
|
|
+ * Callers to this method are given a handle that corresponds to the row being
|
|
|
+ * changed. The handle must be supplied on subsequent put or delete calls so
|
|
|
+ * that the row can be identified.
|
|
|
+ *
|
|
|
+ * @param row Name of row to start update against.
|
|
|
+ * @return Row lockid.
|
|
|
+ */
|
|
|
+ public synchronized long startUpdate(Text row) {
|
|
|
+ RegionLocation info = getRegionLocation(row);
|
|
|
+ BatchUpdate batch = regionToBatch.get(info);
|
|
|
+ if(batch == null) {
|
|
|
+ batch = new BatchUpdate();
|
|
|
+ regionToBatch.put(info, batch);
|
|
|
+ }
|
|
|
+ long lockid = batch.startUpdate(row);
|
|
|
+ lockToBatch.put(lockid, batch);
|
|
|
+ return lockid;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Change the value for the specified column
|
|
|
+ *
|
|
|
+ * @param lockid lock id returned from startUpdate
|
|
|
+ * @param column column whose value is being set
|
|
|
+ * @param value new value for column
|
|
|
+ */
|
|
|
+ public synchronized void put(long lockid, Text column, byte[] value) {
|
|
|
+ BatchUpdate batch = lockToBatch.get(lockid);
|
|
|
+ if (batch == null) {
|
|
|
+ throw new IllegalArgumentException("invalid lock id " + lockid);
|
|
|
+ }
|
|
|
+ batch.put(lockid, column, value);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Delete the value for a column
|
|
|
+ *
|
|
|
+ * @param lockid - lock id returned from startUpdate
|
|
|
+ * @param column - name of column whose value is to be deleted
|
|
|
+ */
|
|
|
+ public synchronized void delete(long lockid, Text column) {
|
|
|
+ BatchUpdate batch = lockToBatch.get(lockid);
|
|
|
+ if (batch == null) {
|
|
|
+ throw new IllegalArgumentException("invalid lock id " + lockid);
|
|
|
+ }
|
|
|
+ batch.delete(lockid, column);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Finalize a batch mutation
|
|
|
+ *
|
|
|
+ * @param timestamp time to associate with all the changes
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public synchronized void commit(long timestamp) throws IOException {
|
|
|
+ try {
|
|
|
+ for(Map.Entry<RegionLocation, BatchUpdate> e: regionToBatch.entrySet()) {
|
|
|
+ RegionLocation r = e.getKey();
|
|
|
+ HRegionInterface server = getHRegionConnection(r.serverAddress);
|
|
|
+ server.batchUpdate(r.regionInfo.getRegionName(), timestamp,
|
|
|
+ e.getValue());
|
|
|
+ }
|
|
|
+ } catch (RemoteException e) {
|
|
|
+ throw RemoteExceptionHandler.decodeRemoteException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private BatchHandler batch;
|
|
|
+
|
|
|
/*
|
|
|
* Data structure that holds current location for a region and its info.
|
|
|
*/
|
|
|
- protected static class RegionLocation {
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ protected static class RegionLocation implements Comparable {
|
|
|
HRegionInfo regionInfo;
|
|
|
HServerAddress serverAddress;
|
|
|
|
|
@@ -84,18 +178,48 @@ public class HClient implements HConstants {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * @return HRegionInfo
|
|
|
+ * {@inheritDoc}
|
|
|
*/
|
|
|
+ @Override
|
|
|
+ public boolean equals(Object o) {
|
|
|
+ return this.compareTo(o) == 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * {@inheritDoc}
|
|
|
+ */
|
|
|
+ @Override
|
|
|
+ public int hashCode() {
|
|
|
+ int result = this.regionInfo.hashCode();
|
|
|
+ result ^= this.serverAddress.hashCode();
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ /** @return HRegionInfo */
|
|
|
public HRegionInfo getRegionInfo(){
|
|
|
return regionInfo;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * @return HServerAddress
|
|
|
- */
|
|
|
+ /** @return HServerAddress */
|
|
|
public HServerAddress getServerAddress(){
|
|
|
return serverAddress;
|
|
|
}
|
|
|
+
|
|
|
+ //
|
|
|
+ // Comparable
|
|
|
+ //
|
|
|
+
|
|
|
+ /**
|
|
|
+ * {@inheritDoc}
|
|
|
+ */
|
|
|
+ public int compareTo(Object o) {
|
|
|
+ RegionLocation other = (RegionLocation) o;
|
|
|
+ int result = this.regionInfo.compareTo(other.regionInfo);
|
|
|
+ if(result == 0) {
|
|
|
+ result = this.serverAddress.compareTo(other.serverAddress);
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// Map tableName -> (Map startRow -> (HRegionInfo, HServerAddress)
|
|
@@ -121,6 +245,7 @@ public class HClient implements HConstants {
|
|
|
*/
|
|
|
public HClient(Configuration conf) {
|
|
|
this.conf = conf;
|
|
|
+ this.batch = null;
|
|
|
this.currentLockId = -1;
|
|
|
|
|
|
this.pause = conf.getLong("hbase.client.pause", 30 * 1000);
|
|
@@ -222,8 +347,6 @@ public class HClient implements HConstants {
|
|
|
*
|
|
|
* @param desc table descriptor for table
|
|
|
*
|
|
|
- * @throws RemoteException if exception occurred on remote side of
|
|
|
- * connection.
|
|
|
* @throws IllegalArgumentException if the table name is reserved
|
|
|
* @throws MasterNotRunningException if master is not running
|
|
|
* @throws NoServerForRegionException if root region is not being served
|
|
@@ -254,8 +377,6 @@ public class HClient implements HConstants {
|
|
|
*
|
|
|
* @param desc table descriptor for table
|
|
|
*
|
|
|
- * @throws RemoteException if exception occurred on remote side of
|
|
|
- * connection.
|
|
|
* @throws IllegalArgumentException if the table name is reserved
|
|
|
* @throws MasterNotRunningException if master is not running
|
|
|
* @throws NoServerForRegionException if root region is not being served
|
|
@@ -265,7 +386,7 @@ public class HClient implements HConstants {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public synchronized void createTableAsync(HTableDescriptor desc)
|
|
|
- throws IOException {
|
|
|
+ throws IOException {
|
|
|
checkReservedTableName(desc.getName());
|
|
|
checkMaster();
|
|
|
try {
|
|
@@ -610,7 +731,7 @@ public class HClient implements HConstants {
|
|
|
if(tableName == null || tableName.getLength() == 0) {
|
|
|
throw new IllegalArgumentException("table name cannot be null or zero length");
|
|
|
}
|
|
|
- if(this.currentLockId != -1) {
|
|
|
+ if(this.currentLockId != -1 || batch != null) {
|
|
|
throw new IllegalStateException("update in progress");
|
|
|
}
|
|
|
this.tableServers = getTableServers(tableName);
|
|
@@ -1315,6 +1436,59 @@ public class HClient implements HConstants {
|
|
|
return new ClientScanner(columns, startRow, timestamp, filter);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Start a batch of row insertions/updates.
|
|
|
+ *
|
|
|
+ * No changes are committed until the call to commitBatchUpdate returns.
|
|
|
+ * A call to abortBatchUpdate will abandon the entire batch.
|
|
|
+ *
|
|
|
+ * Note that in batch mode, calls to commit or abort are ignored.
|
|
|
+ */
|
|
|
+ public synchronized void startBatchUpdate() {
|
|
|
+ if(this.tableServers == null) {
|
|
|
+ throw new IllegalStateException("Must open table first");
|
|
|
+ }
|
|
|
+
|
|
|
+ if(batch == null) {
|
|
|
+ batch = new BatchHandler();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Abort a batch mutation
|
|
|
+ */
|
|
|
+ public synchronized void abortBatch() {
|
|
|
+ batch = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Finalize a batch mutation
|
|
|
+ *
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public synchronized void commitBatch() throws IOException {
|
|
|
+ commitBatch(System.currentTimeMillis());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Finalize a batch mutation
|
|
|
+ *
|
|
|
+ * @param timestamp time to associate with all the changes
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public synchronized void commitBatch(long timestamp) throws IOException {
|
|
|
+ if(batch == null) {
|
|
|
+ throw new IllegalStateException("no batch update in progress");
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ batch.commit(timestamp);
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ batch = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Start an atomic row insertion/update. No changes are committed until the
|
|
|
* call to commit() returns. A call to abort() will abandon any updates in progress.
|
|
@@ -1334,6 +1508,9 @@ public class HClient implements HConstants {
|
|
|
if(this.currentLockId != -1) {
|
|
|
throw new IllegalStateException("update in progress");
|
|
|
}
|
|
|
+ if(batch != null) {
|
|
|
+ return batch.startUpdate(row);
|
|
|
+ }
|
|
|
for(int tries = 0; tries < numRetries; tries++) {
|
|
|
IOException e = null;
|
|
|
RegionLocation info = getRegionLocation(row);
|
|
@@ -1379,6 +1556,14 @@ public class HClient implements HConstants {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public void put(long lockid, Text column, byte val[]) throws IOException {
|
|
|
+ if(val == null) {
|
|
|
+ throw new IllegalArgumentException("value cannot be null");
|
|
|
+ }
|
|
|
+ if(batch != null) {
|
|
|
+ batch.put(lockid, column, val);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
if(lockid != this.currentLockId) {
|
|
|
throw new IllegalArgumentException("invalid lockid");
|
|
|
}
|
|
@@ -1408,6 +1593,11 @@ public class HClient implements HConstants {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public void delete(long lockid, Text column) throws IOException {
|
|
|
+ if(batch != null) {
|
|
|
+ batch.delete(lockid, column);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
if(lockid != this.currentLockId) {
|
|
|
throw new IllegalArgumentException("invalid lockid");
|
|
|
}
|
|
@@ -1436,6 +1626,10 @@ public class HClient implements HConstants {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public void abort(long lockid) throws IOException {
|
|
|
+ if(batch != null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
if(lockid != this.currentLockId) {
|
|
|
throw new IllegalArgumentException("invalid lockid");
|
|
|
}
|
|
@@ -1471,6 +1665,10 @@ public class HClient implements HConstants {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public void commit(long lockid, long timestamp) throws IOException {
|
|
|
+ if(batch != null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
if(lockid != this.currentLockId) {
|
|
|
throw new IllegalArgumentException("invalid lockid");
|
|
|
}
|
|
@@ -1497,6 +1695,13 @@ public class HClient implements HConstants {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public void renewLease(long lockid) throws IOException {
|
|
|
+ if(batch != null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ if(lockid != this.currentLockId) {
|
|
|
+ throw new IllegalArgumentException("invalid lockid");
|
|
|
+ }
|
|
|
try {
|
|
|
this.currentServer.renewLease(lockid, this.clientid);
|
|
|
} catch(IOException e) {
|