|
@@ -29,6 +29,7 @@ import java.util.SortedMap;
|
|
|
import java.util.TreeMap;
|
|
|
import java.util.Map.Entry;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
+import java.util.concurrent.Callable;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -275,38 +276,15 @@ public class HTable implements HConstants {
|
|
|
* @return value for specified row/column
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public byte[] get(Text row, Text column) throws IOException {
|
|
|
- checkClosed();
|
|
|
- byte [] value = null;
|
|
|
- for(int tries = 0; tries < numRetries; tries++) {
|
|
|
- HRegionLocation r = getRegionLocation(row);
|
|
|
- HRegionInterface server =
|
|
|
- connection.getHRegionConnection(r.getServerAddress());
|
|
|
-
|
|
|
- try {
|
|
|
- value = server.get(r.getRegionInfo().getRegionName(), row, column);
|
|
|
- break;
|
|
|
-
|
|
|
- } catch (IOException e) {
|
|
|
- if (e instanceof RemoteException) {
|
|
|
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
|
|
- }
|
|
|
- if (tries == numRetries - 1) {
|
|
|
- throw e;
|
|
|
- }
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("reloading table servers because: " + e.getMessage());
|
|
|
- }
|
|
|
- r = getRegionLocation(row, true);
|
|
|
- }
|
|
|
- try {
|
|
|
- Thread.sleep(this.pause);
|
|
|
- } catch (InterruptedException x) {
|
|
|
- // continue
|
|
|
- }
|
|
|
- }
|
|
|
- return value;
|
|
|
- }
|
|
|
+ public byte[] get(Text row, final Text column) throws IOException {
|
|
|
+ checkClosed();
|
|
|
+
|
|
|
+ return getRegionServerWithRetries(new ServerCallable<byte[]>(row){
|
|
|
+ public byte[] call() throws IOException {
|
|
|
+ return server.get(location.getRegionInfo().getRegionName(), row, column);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* Get the specified number of versions of the specified row and column
|
|
@@ -317,39 +295,17 @@ public class HTable implements HConstants {
|
|
|
* @return - array byte values
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public byte[][] get(Text row, Text column, int numVersions) throws IOException {
|
|
|
+ public byte[][] get(final Text row, final Text column, final int numVersions)
|
|
|
+ throws IOException {
|
|
|
checkClosed();
|
|
|
byte [][] values = null;
|
|
|
- for (int tries = 0; tries < numRetries; tries++) {
|
|
|
- HRegionLocation r = getRegionLocation(row);
|
|
|
- HRegionInterface server =
|
|
|
- connection.getHRegionConnection(r.getServerAddress());
|
|
|
-
|
|
|
- try {
|
|
|
- values = server.get(r.getRegionInfo().getRegionName(), row, column,
|
|
|
- numVersions);
|
|
|
-
|
|
|
- break;
|
|
|
-
|
|
|
- } catch (IOException e) {
|
|
|
- if (e instanceof RemoteException) {
|
|
|
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
|
|
- }
|
|
|
- if (tries == numRetries - 1) {
|
|
|
- // No more tries
|
|
|
- throw e;
|
|
|
- }
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("reloading table servers because: " + e.getMessage());
|
|
|
- }
|
|
|
- r = getRegionLocation(row, true);
|
|
|
- }
|
|
|
- try {
|
|
|
- Thread.sleep(this.pause);
|
|
|
- } catch (InterruptedException x) {
|
|
|
- // continue
|
|
|
+
|
|
|
+ values = getRegionServerWithRetries(new ServerCallable<byte[][]>(row) {
|
|
|
+ public byte [][] call() throws IOException {
|
|
|
+ return server.get(location.getRegionInfo().getRegionName(), row,
|
|
|
+ column, numVersions);
|
|
|
}
|
|
|
- }
|
|
|
+ });
|
|
|
|
|
|
if (values != null) {
|
|
|
ArrayList<byte[]> bytes = new ArrayList<byte[]>();
|
|
@@ -372,40 +328,18 @@ public class HTable implements HConstants {
|
|
|
* @return - array of values that match the above criteria
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public byte[][] get(Text row, Text column, long timestamp, int numVersions)
|
|
|
+ public byte[][] get(final Text row, final Text column, final long timestamp,
|
|
|
+ final int numVersions)
|
|
|
throws IOException {
|
|
|
checkClosed();
|
|
|
byte [][] values = null;
|
|
|
- for (int tries = 0; tries < numRetries; tries++) {
|
|
|
- HRegionLocation r = getRegionLocation(row);
|
|
|
- HRegionInterface server =
|
|
|
- connection.getHRegionConnection(r.getServerAddress());
|
|
|
-
|
|
|
- try {
|
|
|
- values = server.get(r.getRegionInfo().getRegionName(), row, column,
|
|
|
- timestamp, numVersions);
|
|
|
-
|
|
|
- break;
|
|
|
-
|
|
|
- } catch (IOException e) {
|
|
|
- if (e instanceof RemoteException) {
|
|
|
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
|
|
- }
|
|
|
- if (tries == numRetries - 1) {
|
|
|
- // No more tries
|
|
|
- throw e;
|
|
|
- }
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("reloading table servers because: " + e.getMessage());
|
|
|
- }
|
|
|
- r = getRegionLocation(row, true);
|
|
|
- }
|
|
|
- try {
|
|
|
- Thread.sleep(this.pause);
|
|
|
- } catch (InterruptedException x) {
|
|
|
- // continue
|
|
|
+
|
|
|
+ values = getRegionServerWithRetries(new ServerCallable<byte[][]>(row) {
|
|
|
+ public byte [][] call() throws IOException {
|
|
|
+ return server.get(location.getRegionInfo().getRegionName(), row,
|
|
|
+ column, timestamp, numVersions);
|
|
|
}
|
|
|
- }
|
|
|
+ });
|
|
|
|
|
|
if (values != null) {
|
|
|
ArrayList<byte[]> bytes = new ArrayList<byte[]>();
|
|
@@ -436,37 +370,17 @@ public class HTable implements HConstants {
|
|
|
* @return Map of columns to values. Map is empty if row does not exist.
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public SortedMap<Text, byte[]> getRow(Text row, long ts) throws IOException {
|
|
|
+ public SortedMap<Text, byte[]> getRow(final Text row, final long ts)
|
|
|
+ throws IOException {
|
|
|
checkClosed();
|
|
|
HbaseMapWritable value = null;
|
|
|
- for (int tries = 0; tries < numRetries; tries++) {
|
|
|
- HRegionLocation r = getRegionLocation(row);
|
|
|
- HRegionInterface server =
|
|
|
- connection.getHRegionConnection(r.getServerAddress());
|
|
|
-
|
|
|
- try {
|
|
|
- value = server.getRow(r.getRegionInfo().getRegionName(), row, ts);
|
|
|
- break;
|
|
|
-
|
|
|
- } catch (IOException e) {
|
|
|
- if (e instanceof RemoteException) {
|
|
|
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
|
|
- }
|
|
|
- if (tries == numRetries - 1) {
|
|
|
- // No more tries
|
|
|
- throw e;
|
|
|
- }
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("reloading table servers because: " + e.getMessage());
|
|
|
- }
|
|
|
- r = getRegionLocation(row, true);
|
|
|
- }
|
|
|
- try {
|
|
|
- Thread.sleep(this.pause);
|
|
|
- } catch (InterruptedException x) {
|
|
|
- // continue
|
|
|
+
|
|
|
+ value = getRegionServerWithRetries(new ServerCallable<HbaseMapWritable>(row) {
|
|
|
+ public HbaseMapWritable call() throws IOException {
|
|
|
+ return server.getRow(location.getRegionInfo().getRegionName(), row, ts);
|
|
|
}
|
|
|
- }
|
|
|
+ });
|
|
|
+
|
|
|
SortedMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
|
|
|
if (value != null && value.size() != 0) {
|
|
|
for (Map.Entry<Writable, Writable> e: value.entrySet()) {
|
|
@@ -722,32 +636,14 @@ public class HTable implements HConstants {
|
|
|
public void deleteAll(final Text row, final Text column, final long ts)
|
|
|
throws IOException {
|
|
|
checkClosed();
|
|
|
- for(int tries = 0; tries < numRetries; tries++) {
|
|
|
- HRegionLocation r = getRegionLocation(row);
|
|
|
- HRegionInterface server =
|
|
|
- connection.getHRegionConnection(r.getServerAddress());
|
|
|
- try {
|
|
|
- server.deleteAll(r.getRegionInfo().getRegionName(), row, column, ts);
|
|
|
- break;
|
|
|
-
|
|
|
- } catch (IOException e) {
|
|
|
- if (e instanceof RemoteException) {
|
|
|
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
|
|
- }
|
|
|
- if (tries == numRetries - 1) {
|
|
|
- throw e;
|
|
|
- }
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("reloading table servers because: " + e.getMessage());
|
|
|
- }
|
|
|
- r = getRegionLocation(row, true);
|
|
|
- }
|
|
|
- try {
|
|
|
- Thread.sleep(this.pause);
|
|
|
- } catch (InterruptedException x) {
|
|
|
- // continue
|
|
|
+
|
|
|
+ getRegionServerWithRetries(new ServerCallable<Boolean>(row) {
|
|
|
+ public Boolean call() throws IOException {
|
|
|
+ server.deleteAll(location.getRegionInfo().getRegionName(), row,
|
|
|
+ column, ts);
|
|
|
+ return null;
|
|
|
}
|
|
|
- }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -757,34 +653,15 @@ public class HTable implements HConstants {
|
|
|
* @param ts Timestamp of cells to delete
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public void deleteAll(final Text row, long ts) throws IOException {
|
|
|
+ public void deleteAll(final Text row, final long ts) throws IOException {
|
|
|
checkClosed();
|
|
|
- for(int tries = 0; tries < numRetries; tries++) {
|
|
|
- HRegionLocation r = getRegionLocation(row);
|
|
|
- HRegionInterface server =
|
|
|
- connection.getHRegionConnection(r.getServerAddress());
|
|
|
- try {
|
|
|
- server.deleteAll(r.getRegionInfo().getRegionName(), row, ts);
|
|
|
- break;
|
|
|
-
|
|
|
- } catch (IOException e) {
|
|
|
- if (e instanceof RemoteException) {
|
|
|
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
|
|
- }
|
|
|
- if (tries == numRetries - 1) {
|
|
|
- throw e;
|
|
|
- }
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("reloading table servers because: " + e.getMessage());
|
|
|
- }
|
|
|
- r = getRegionLocation(row, true);
|
|
|
- }
|
|
|
- try {
|
|
|
- Thread.sleep(this.pause);
|
|
|
- } catch (InterruptedException x) {
|
|
|
- // continue
|
|
|
+
|
|
|
+ getRegionServerWithRetries(new ServerCallable<Boolean>(row){
|
|
|
+ public Boolean call() throws IOException {
|
|
|
+ server.deleteAll(location.getRegionInfo().getRegionName(), row, ts);
|
|
|
+ return null;
|
|
|
}
|
|
|
- }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -806,35 +683,18 @@ public class HTable implements HConstants {
|
|
|
* @param timestamp Timestamp to match
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public void deleteFamily(final Text row, final Text family, long timestamp)
|
|
|
+ public void deleteFamily(final Text row, final Text family,
|
|
|
+ final long timestamp)
|
|
|
throws IOException {
|
|
|
checkClosed();
|
|
|
- for(int tries = 0; tries < numRetries; tries++) {
|
|
|
- HRegionLocation r = getRegionLocation(row);
|
|
|
- HRegionInterface server =
|
|
|
- connection.getHRegionConnection(r.getServerAddress());
|
|
|
- try {
|
|
|
- server.deleteFamily(r.getRegionInfo().getRegionName(), row, family, timestamp);
|
|
|
- break;
|
|
|
-
|
|
|
- } catch (IOException e) {
|
|
|
- if (e instanceof RemoteException) {
|
|
|
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
|
|
- }
|
|
|
- if (tries == numRetries - 1) {
|
|
|
- throw e;
|
|
|
- }
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("reloading table servers because: " + e.getMessage());
|
|
|
- }
|
|
|
- r = getRegionLocation(row, true);
|
|
|
- }
|
|
|
- try {
|
|
|
- Thread.sleep(this.pause);
|
|
|
- } catch (InterruptedException x) {
|
|
|
- // continue
|
|
|
+
|
|
|
+ getRegionServerWithRetries(new ServerCallable<Boolean>(row){
|
|
|
+ public Boolean call() throws IOException {
|
|
|
+ server.deleteFamily(location.getRegionInfo().getRegionName(), row,
|
|
|
+ family, timestamp);
|
|
|
+ return null;
|
|
|
}
|
|
|
- }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -891,7 +751,7 @@ public class HTable implements HConstants {
|
|
|
* @param timestamp time to associate with the change
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- public synchronized void commit(long lockid, long timestamp)
|
|
|
+ public synchronized void commit(long lockid, final long timestamp)
|
|
|
throws IOException {
|
|
|
checkClosed();
|
|
|
updateInProgress(true);
|
|
@@ -900,34 +760,15 @@ public class HTable implements HConstants {
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
- for (int tries = 0; tries < numRetries; tries++) {
|
|
|
- HRegionLocation r = getRegionLocation(batch.get().getRow());
|
|
|
- HRegionInterface server =
|
|
|
- connection.getHRegionConnection(r.getServerAddress());
|
|
|
- try {
|
|
|
- server.batchUpdate(r.getRegionInfo().getRegionName(), timestamp,
|
|
|
- batch.get());
|
|
|
- break;
|
|
|
- } catch (IOException e) {
|
|
|
- if (e instanceof RemoteException) {
|
|
|
- e = RemoteExceptionHandler.decodeRemoteException(
|
|
|
- (RemoteException) e);
|
|
|
+ getRegionServerWithRetries(
|
|
|
+ new ServerCallable<Boolean>(batch.get().getRow()){
|
|
|
+ public Boolean call() throws IOException {
|
|
|
+ server.batchUpdate(location.getRegionInfo().getRegionName(),
|
|
|
+ timestamp, batch.get());
|
|
|
+ return null;
|
|
|
}
|
|
|
- if (tries < numRetries - 1) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("reloading table servers because: " + e.getMessage());
|
|
|
- }
|
|
|
- r = getRegionLocation(batch.get().getRow(), true);
|
|
|
- } else {
|
|
|
- throw e;
|
|
|
- }
|
|
|
- }
|
|
|
- try {
|
|
|
- Thread.sleep(pause);
|
|
|
- } catch (InterruptedException e) {
|
|
|
- // continue
|
|
|
}
|
|
|
- }
|
|
|
+ );
|
|
|
} finally {
|
|
|
batch.set(null);
|
|
|
}
|
|
@@ -1150,4 +991,56 @@ public class HTable implements HConstants {
|
|
|
};
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Inherits from Callable, used to define the particular actions you would
|
|
|
+ * like to take with retry logic.
|
|
|
+ */
|
|
|
+ protected abstract class ServerCallable<T> implements Callable<T> {
|
|
|
+ HRegionLocation location;
|
|
|
+ HRegionInterface server;
|
|
|
+ Text row;
|
|
|
+
|
|
|
+ protected ServerCallable(Text row) {
|
|
|
+ this.row = row;
|
|
|
+ }
|
|
|
+
|
|
|
+ void instantiateServer(boolean reload) throws IOException {
|
|
|
+ this.location = getRegionLocation(row, reload);
|
|
|
+ this.server = connection.getHRegionConnection(location.getServerAddress());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Pass in a ServerCallable with your particular bit of logic defined and
|
|
|
+ * this method will manage the process of doing retries with timed waits
|
|
|
+ * and refinds of missing regions.
|
|
|
+ */
|
|
|
+ protected <T> T getRegionServerWithRetries(ServerCallable<T> callable)
|
|
|
+ throws IOException, RuntimeException {
|
|
|
+ for(int tries = 0; tries < numRetries; tries++) {
|
|
|
+ try {
|
|
|
+ callable.instantiateServer(tries != 0);
|
|
|
+ return callable.call();
|
|
|
+ } catch (IOException e) {
|
|
|
+ if (e instanceof RemoteException) {
|
|
|
+ e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
|
|
+ }
|
|
|
+ if (tries == numRetries - 1) {
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("reloading table servers because: " + e.getMessage());
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ Thread.sleep(pause);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ // continue
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
}
|