|
@@ -22,13 +22,15 @@ package org.apache.hadoop.hbase;
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
-import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
import java.util.NoSuchElementException;
|
|
|
import java.util.Random;
|
|
|
import java.util.SortedMap;
|
|
|
import java.util.TreeMap;
|
|
|
import java.util.TreeSet;
|
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
|
+import java.util.concurrent.atomic.AtomicReference;
|
|
|
+import java.util.concurrent.atomic.AtomicReferenceArray;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -61,99 +63,9 @@ public class HClient implements HConstants {
|
|
|
int numRetries;
|
|
|
HMasterInterface master;
|
|
|
private final Configuration conf;
|
|
|
- private volatile long currentLockId;
|
|
|
+ private AtomicLong currentLockId;
|
|
|
private Class<? extends HRegionInterface> serverInterfaceClass;
|
|
|
-
|
|
|
- protected class BatchHandler {
|
|
|
- private HashMap<RegionLocation, BatchUpdate> regionToBatch;
|
|
|
- private HashMap<Long, BatchUpdate> lockToBatch;
|
|
|
-
|
|
|
- /** constructor */
|
|
|
- public BatchHandler() {
|
|
|
- this.regionToBatch = new HashMap<RegionLocation, BatchUpdate>();
|
|
|
- this.lockToBatch = new HashMap<Long, BatchUpdate>();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Start a batch row insertion/update.
|
|
|
- *
|
|
|
- * Manages multiple batch updates that are targeted for multiple servers,
|
|
|
- * should the rows span several region servers.
|
|
|
- *
|
|
|
- * No changes are committed until the client commits the batch operation via
|
|
|
- * HClient.batchCommit().
|
|
|
- *
|
|
|
- * The entire batch update can be abandoned by calling HClient.batchAbort();
|
|
|
- *
|
|
|
- * Callers to this method are given a handle that corresponds to the row being
|
|
|
- * changed. The handle must be supplied on subsequent put or delete calls so
|
|
|
- * that the row can be identified.
|
|
|
- *
|
|
|
- * @param row Name of row to start update against.
|
|
|
- * @return Row lockid.
|
|
|
- */
|
|
|
- public synchronized long startUpdate(Text row) {
|
|
|
- RegionLocation info = getRegionLocation(row);
|
|
|
- BatchUpdate batch = regionToBatch.get(info);
|
|
|
- if(batch == null) {
|
|
|
- batch = new BatchUpdate();
|
|
|
- regionToBatch.put(info, batch);
|
|
|
- }
|
|
|
- long lockid = batch.startUpdate(row);
|
|
|
- lockToBatch.put(lockid, batch);
|
|
|
- return lockid;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Change the value for the specified column
|
|
|
- *
|
|
|
- * @param lockid lock id returned from startUpdate
|
|
|
- * @param column column whose value is being set
|
|
|
- * @param value new value for column
|
|
|
- */
|
|
|
- public synchronized void put(long lockid, Text column, byte[] value) {
|
|
|
- BatchUpdate batch = lockToBatch.get(lockid);
|
|
|
- if (batch == null) {
|
|
|
- throw new IllegalArgumentException("invalid lock id " + lockid);
|
|
|
- }
|
|
|
- batch.put(lockid, column, value);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Delete the value for a column
|
|
|
- *
|
|
|
- * @param lockid - lock id returned from startUpdate
|
|
|
- * @param column - name of column whose value is to be deleted
|
|
|
- */
|
|
|
- public synchronized void delete(long lockid, Text column) {
|
|
|
- BatchUpdate batch = lockToBatch.get(lockid);
|
|
|
- if (batch == null) {
|
|
|
- throw new IllegalArgumentException("invalid lock id " + lockid);
|
|
|
- }
|
|
|
- batch.delete(lockid, column);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Finalize a batch mutation
|
|
|
- *
|
|
|
- * @param timestamp time to associate with all the changes
|
|
|
- * @throws IOException
|
|
|
- */
|
|
|
- public synchronized void commit(long timestamp) throws IOException {
|
|
|
- try {
|
|
|
- for(Map.Entry<RegionLocation, BatchUpdate> e: regionToBatch.entrySet()) {
|
|
|
- RegionLocation r = e.getKey();
|
|
|
- HRegionInterface server = getHRegionConnection(r.serverAddress);
|
|
|
- server.batchUpdate(r.regionInfo.getRegionName(), timestamp,
|
|
|
- e.getValue());
|
|
|
- }
|
|
|
- } catch (RemoteException e) {
|
|
|
- throw RemoteExceptionHandler.decodeRemoteException(e);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private BatchHandler batch;
|
|
|
+ private AtomicReference<BatchUpdate> batch;
|
|
|
|
|
|
/*
|
|
|
* Data structure that holds current location for a region and its info.
|
|
@@ -606,8 +518,8 @@ public class HClient implements HConstants {
|
|
|
*/
|
|
|
public HClient(Configuration conf) {
|
|
|
this.conf = conf;
|
|
|
- this.batch = null;
|
|
|
- this.currentLockId = -1;
|
|
|
+ this.batch = new AtomicReference<BatchUpdate>();
|
|
|
+ this.currentLockId = new AtomicLong(-1L);
|
|
|
|
|
|
this.pause = conf.getLong("hbase.client.pause", 30 * 1000);
|
|
|
this.numRetries = conf.getInt("hbase.client.retries.number", 5);
|
|
@@ -1159,7 +1071,7 @@ public class HClient implements HConstants {
|
|
|
if(tableName == null || tableName.getLength() == 0) {
|
|
|
throw new IllegalArgumentException("table name cannot be null or zero length");
|
|
|
}
|
|
|
- if(this.currentLockId != -1 || batch != null) {
|
|
|
+ if(this.currentLockId.get() != -1L || batch.get() != null) {
|
|
|
throw new IllegalStateException("update in progress");
|
|
|
}
|
|
|
this.currentTableServers = tableServers.getTableServers(tableName);
|
|
@@ -1481,51 +1393,90 @@ public class HClient implements HConstants {
|
|
|
*
|
|
|
* No changes are committed until the call to commitBatchUpdate returns.
|
|
|
* A call to abortBatchUpdate will abandon the entire batch.
|
|
|
- *
|
|
|
- * Note that in batch mode, calls to commit or abort are ignored.
|
|
|
+ *
|
|
|
+ * @param row name of row to be updated
|
|
|
+ * @return lockid to be used in subsequent put, delete and commit calls
|
|
|
*/
|
|
|
- public synchronized void startBatchUpdate() {
|
|
|
- if(this.currentTableServers == null) {
|
|
|
+ public synchronized long startBatchUpdate(final Text row) {
|
|
|
+ if (this.currentTableServers == null) {
|
|
|
throw new IllegalStateException("Must open table first");
|
|
|
}
|
|
|
-
|
|
|
- if(batch == null) {
|
|
|
- batch = new BatchHandler();
|
|
|
+ if (batch.get() != null) {
|
|
|
+ throw new IllegalStateException("batch update in progress");
|
|
|
}
|
|
|
+ batch.set(new BatchUpdate());
|
|
|
+ return batch.get().startUpdate(row);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Abort a batch mutation
|
|
|
+ * @param lockid lock id returned by startBatchUpdate
|
|
|
*/
|
|
|
- public synchronized void abortBatch() {
|
|
|
- batch = null;
|
|
|
+ public synchronized void abortBatch(final long lockid) {
|
|
|
+ BatchUpdate u = batch.get();
|
|
|
+ if (u == null) {
|
|
|
+ throw new IllegalStateException("no batch update in progress");
|
|
|
+ }
|
|
|
+ if (u.getLockid() != lockid) {
|
|
|
+ throw new IllegalArgumentException("invalid lock id " + lockid);
|
|
|
+ }
|
|
|
+ batch.set(null);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Finalize a batch mutation
|
|
|
*
|
|
|
+ * @param lockid lock id returned by startBatchUpdate
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public synchronized void commitBatch() throws IOException {
|
|
|
- commitBatch(System.currentTimeMillis());
|
|
|
+ public void commitBatch(final long lockid) throws IOException {
|
|
|
+ commitBatch(lockid, System.currentTimeMillis());
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Finalize a batch mutation
|
|
|
*
|
|
|
+ * @param lockid lock id returned by startBatchUpdate
|
|
|
* @param timestamp time to associate with all the changes
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public synchronized void commitBatch(long timestamp) throws IOException {
|
|
|
- if(batch == null) {
|
|
|
+ public synchronized void commitBatch(final long lockid, final long timestamp)
|
|
|
+ throws IOException {
|
|
|
+ BatchUpdate u = batch.get();
|
|
|
+ if (u == null) {
|
|
|
throw new IllegalStateException("no batch update in progress");
|
|
|
}
|
|
|
+ if (u.getLockid() != lockid) {
|
|
|
+ throw new IllegalArgumentException("invalid lock id " + lockid);
|
|
|
+ }
|
|
|
|
|
|
try {
|
|
|
- batch.commit(timestamp);
|
|
|
-
|
|
|
+ for (int tries = 0; tries < numRetries; tries++) {
|
|
|
+ RegionLocation r = getRegionLocation(u.getRow());
|
|
|
+ HRegionInterface server = getHRegionConnection(r.serverAddress);
|
|
|
+ try {
|
|
|
+ server.batchUpdate(r.getRegionInfo().getRegionName(), timestamp, u);
|
|
|
+ break;
|
|
|
+
|
|
|
+ } catch (IOException e) {
|
|
|
+ if (tries < numRetries -1) {
|
|
|
+ reloadCurrentTable(r);
|
|
|
+
|
|
|
+ } else {
|
|
|
+ if (e instanceof RemoteException) {
|
|
|
+ e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
|
|
+ }
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ Thread.sleep(pause);
|
|
|
+
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ }
|
|
|
+ }
|
|
|
} finally {
|
|
|
- batch = null;
|
|
|
+ batch.set(null);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1545,26 +1496,27 @@ public class HClient implements HConstants {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public synchronized long startUpdate(final Text row) throws IOException {
|
|
|
- if(this.currentLockId != -1) {
|
|
|
+ if (this.currentLockId.get() != -1L) {
|
|
|
throw new IllegalStateException("update in progress");
|
|
|
}
|
|
|
- if(batch != null) {
|
|
|
- return batch.startUpdate(row);
|
|
|
+ if (batch.get() != null) {
|
|
|
+ throw new IllegalStateException("batch update in progress");
|
|
|
}
|
|
|
- for(int tries = 0; tries < numRetries; tries++) {
|
|
|
+ for (int tries = 0; tries < numRetries; tries++) {
|
|
|
IOException e = null;
|
|
|
RegionLocation info = getRegionLocation(row);
|
|
|
try {
|
|
|
currentServer = getHRegionConnection(info.serverAddress);
|
|
|
currentRegion = info.regionInfo.regionName;
|
|
|
clientid = rand.nextLong();
|
|
|
- this.currentLockId = currentServer.startUpdate(currentRegion, clientid, row);
|
|
|
+ this.currentLockId.set(
|
|
|
+ currentServer.startUpdate(currentRegion, clientid, row));
|
|
|
break;
|
|
|
|
|
|
} catch (IOException ex) {
|
|
|
e = ex;
|
|
|
}
|
|
|
- if(tries < numRetries - 1) {
|
|
|
+ if (tries < numRetries - 1) {
|
|
|
try {
|
|
|
Thread.sleep(this.pause);
|
|
|
|
|
@@ -1577,13 +1529,13 @@ public class HClient implements HConstants {
|
|
|
e = ex;
|
|
|
}
|
|
|
} else {
|
|
|
- if(e instanceof RemoteException) {
|
|
|
+ if (e instanceof RemoteException) {
|
|
|
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
|
|
}
|
|
|
throw e;
|
|
|
}
|
|
|
}
|
|
|
- return this.currentLockId;
|
|
|
+ return this.currentLockId.get();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1596,29 +1548,29 @@ public class HClient implements HConstants {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public void put(long lockid, Text column, byte val[]) throws IOException {
|
|
|
- if(val == null) {
|
|
|
+ if (val == null) {
|
|
|
throw new IllegalArgumentException("value cannot be null");
|
|
|
}
|
|
|
- if(batch != null) {
|
|
|
- batch.put(lockid, column, val);
|
|
|
+ if (batch.get() != null) {
|
|
|
+ batch.get().put(lockid, column, val);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- if(lockid != this.currentLockId) {
|
|
|
+ if (lockid != this.currentLockId.get()) {
|
|
|
throw new IllegalArgumentException("invalid lockid");
|
|
|
}
|
|
|
try {
|
|
|
this.currentServer.put(this.currentRegion, this.clientid, lockid, column,
|
|
|
val);
|
|
|
- } catch(IOException e) {
|
|
|
+ } catch (IOException e) {
|
|
|
try {
|
|
|
this.currentServer.abort(this.currentRegion, this.clientid, lockid);
|
|
|
- } catch(IOException e2) {
|
|
|
+ } catch (IOException e2) {
|
|
|
LOG.warn(e2);
|
|
|
}
|
|
|
this.currentServer = null;
|
|
|
this.currentRegion = null;
|
|
|
- if(e instanceof RemoteException) {
|
|
|
+ if (e instanceof RemoteException) {
|
|
|
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
|
|
}
|
|
|
throw e;
|
|
@@ -1633,18 +1585,18 @@ public class HClient implements HConstants {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public void delete(long lockid, Text column) throws IOException {
|
|
|
- if(batch != null) {
|
|
|
- batch.delete(lockid, column);
|
|
|
+ if (batch.get() != null) {
|
|
|
+ batch.get().delete(lockid, column);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- if(lockid != this.currentLockId) {
|
|
|
+ if (lockid != this.currentLockId.get()) {
|
|
|
throw new IllegalArgumentException("invalid lockid");
|
|
|
}
|
|
|
try {
|
|
|
this.currentServer.delete(this.currentRegion, this.clientid, lockid,
|
|
|
column);
|
|
|
- } catch(IOException e) {
|
|
|
+ } catch (IOException e) {
|
|
|
try {
|
|
|
this.currentServer.abort(this.currentRegion, this.clientid, lockid);
|
|
|
} catch(IOException e2) {
|
|
@@ -1652,7 +1604,7 @@ public class HClient implements HConstants {
|
|
|
}
|
|
|
this.currentServer = null;
|
|
|
this.currentRegion = null;
|
|
|
- if(e instanceof RemoteException) {
|
|
|
+ if (e instanceof RemoteException) {
|
|
|
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
|
|
}
|
|
|
throw e;
|
|
@@ -1666,24 +1618,25 @@ public class HClient implements HConstants {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public void abort(long lockid) throws IOException {
|
|
|
- if(batch != null) {
|
|
|
+ if (batch.get() != null) {
|
|
|
+ abortBatch(lockid);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- if(lockid != this.currentLockId) {
|
|
|
+ if (lockid != this.currentLockId.get()) {
|
|
|
throw new IllegalArgumentException("invalid lockid");
|
|
|
}
|
|
|
try {
|
|
|
this.currentServer.abort(this.currentRegion, this.clientid, lockid);
|
|
|
- } catch(IOException e) {
|
|
|
+ } catch (IOException e) {
|
|
|
this.currentServer = null;
|
|
|
this.currentRegion = null;
|
|
|
- if(e instanceof RemoteException) {
|
|
|
+ if (e instanceof RemoteException) {
|
|
|
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
|
|
}
|
|
|
throw e;
|
|
|
} finally {
|
|
|
- this.currentLockId = -1;
|
|
|
+ this.currentLockId.set(-1L);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1705,11 +1658,12 @@ public class HClient implements HConstants {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public void commit(long lockid, long timestamp) throws IOException {
|
|
|
- if(batch != null) {
|
|
|
+ if (batch.get() != null) {
|
|
|
+ commitBatch(lockid, timestamp);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- if(lockid != this.currentLockId) {
|
|
|
+ if (lockid != this.currentLockId.get()) {
|
|
|
throw new IllegalArgumentException("invalid lockid");
|
|
|
}
|
|
|
try {
|
|
@@ -1724,7 +1678,7 @@ public class HClient implements HConstants {
|
|
|
}
|
|
|
throw e;
|
|
|
} finally {
|
|
|
- this.currentLockId = -1;
|
|
|
+ this.currentLockId.set(-1L);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1735,24 +1689,24 @@ public class HClient implements HConstants {
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public void renewLease(long lockid) throws IOException {
|
|
|
- if(batch != null) {
|
|
|
+ if (batch.get() != null) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- if(lockid != this.currentLockId) {
|
|
|
+ if (lockid != this.currentLockId.get()) {
|
|
|
throw new IllegalArgumentException("invalid lockid");
|
|
|
}
|
|
|
try {
|
|
|
this.currentServer.renewLease(lockid, this.clientid);
|
|
|
- } catch(IOException e) {
|
|
|
+ } catch (IOException e) {
|
|
|
try {
|
|
|
this.currentServer.abort(this.currentRegion, this.clientid, lockid);
|
|
|
- } catch(IOException e2) {
|
|
|
+ } catch (IOException e2) {
|
|
|
LOG.warn(e2);
|
|
|
}
|
|
|
this.currentServer = null;
|
|
|
this.currentRegion = null;
|
|
|
- if(e instanceof RemoteException) {
|
|
|
+ if (e instanceof RemoteException) {
|
|
|
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
|
|
}
|
|
|
throw e;
|
|
@@ -1770,7 +1724,7 @@ public class HClient implements HConstants {
|
|
|
private Text startRow;
|
|
|
private long scanTime;
|
|
|
private boolean closed;
|
|
|
- private volatile RegionLocation[] regions;
|
|
|
+ private AtomicReferenceArray<RegionLocation> regions;
|
|
|
@SuppressWarnings("hiding")
|
|
|
private int currentRegion;
|
|
|
private HRegionInterface server;
|
|
@@ -1791,7 +1745,8 @@ public class HClient implements HConstants {
|
|
|
Collection<RegionLocation> info =
|
|
|
currentTableServers.tailMap(firstServer).values();
|
|
|
|
|
|
- this.regions = info.toArray(new RegionLocation[info.size()]);
|
|
|
+ this.regions = new AtomicReferenceArray<RegionLocation>(
|
|
|
+ info.toArray(new RegionLocation[info.size()]));
|
|
|
}
|
|
|
|
|
|
ClientScanner(Text[] columns, Text startRow, long timestamp,
|
|
@@ -1821,14 +1776,14 @@ public class HClient implements HConstants {
|
|
|
this.scannerId = -1L;
|
|
|
}
|
|
|
this.currentRegion += 1;
|
|
|
- if(this.currentRegion == this.regions.length) {
|
|
|
+ if(this.currentRegion == this.regions.length()) {
|
|
|
close();
|
|
|
return false;
|
|
|
}
|
|
|
try {
|
|
|
for(int tries = 0; tries < numRetries; tries++) {
|
|
|
- RegionLocation info = this.regions[currentRegion];
|
|
|
- this.server = getHRegionConnection(this.regions[currentRegion].serverAddress);
|
|
|
+ RegionLocation info = this.regions.get(currentRegion);
|
|
|
+ this.server = getHRegionConnection(info.serverAddress);
|
|
|
|
|
|
try {
|
|
|
if (this.filter == null) {
|