|
@@ -19,28 +19,30 @@ import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.Iterator;
|
|
|
+import java.util.NoSuchElementException;
|
|
|
import java.util.Random;
|
|
|
import java.util.TreeMap;
|
|
|
import java.util.TreeSet;
|
|
|
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.io.BytesWritable;
|
|
|
import org.apache.hadoop.io.DataInputBuffer;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
-import org.apache.log4j.Logger;
|
|
|
|
|
|
/*******************************************************************************
|
|
|
* HClient manages a connection to a single HRegionServer.
|
|
|
******************************************************************************/
|
|
|
public class HClient implements HConstants {
|
|
|
- private final Logger LOG =
|
|
|
- Logger.getLogger(this.getClass().getName());
|
|
|
+ private final Log LOG = LogFactory.getLog(this.getClass().getName());
|
|
|
|
|
|
- private static final Text[] metaColumns = {
|
|
|
- META_COLUMN_FAMILY
|
|
|
+ private static final Text[] META_COLUMNS = {
|
|
|
+ COLUMN_FAMILY
|
|
|
};
|
|
|
- private static final Text startRow = new Text();
|
|
|
+
|
|
|
+ private static final Text EMPTY_START_ROW = new Text();
|
|
|
|
|
|
private boolean closed;
|
|
|
private long clientTimeout;
|
|
@@ -83,7 +85,7 @@ public class HClient implements HConstants {
|
|
|
this.closed = false;
|
|
|
this.conf = conf;
|
|
|
|
|
|
- this.clientTimeout = conf.getLong("hbase.client.timeout.length", 10 * 1000);
|
|
|
+ this.clientTimeout = conf.getLong("hbase.client.timeout.length", 30 * 1000);
|
|
|
this.numTimeouts = conf.getInt("hbase.client.timeout.number", 5);
|
|
|
this.numRetries = conf.getInt("hbase.client.retries.number", 2);
|
|
|
|
|
@@ -98,40 +100,61 @@ public class HClient implements HConstants {
|
|
|
this.currentServer = null;
|
|
|
this.rand = new Random();
|
|
|
}
|
|
|
-
|
|
|
- public synchronized void createTable(HTableDescriptor desc) throws IOException {
|
|
|
- if(closed) {
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Check client is open.
|
|
|
+ */
|
|
|
+ private synchronized void checkOpen() {
|
|
|
+ if (this.closed) {
|
|
|
throw new IllegalStateException("client is not open");
|
|
|
}
|
|
|
- if(master == null) {
|
|
|
- locateRootRegion();
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized void checkMaster() throws IOException {
|
|
|
+ if (this.master != null) {
|
|
|
+ return;
|
|
|
}
|
|
|
- master.createTable(desc);
|
|
|
+ HServerAddress masterLocation =
|
|
|
+ new HServerAddress(this.conf.get(MASTER_ADDRESS));
|
|
|
+ this.master = (HMasterInterface)RPC.getProxy(HMasterInterface.class,
|
|
|
+ HMasterInterface.versionID, masterLocation.getInetSocketAddress(), this.conf);
|
|
|
+ }
|
|
|
+
|
|
|
+ public synchronized void createTable(HTableDescriptor desc)
|
|
|
+ throws IOException {
|
|
|
+ checkOpen();
|
|
|
+ checkMaster();
|
|
|
+ locateRootRegion();
|
|
|
+ this.master.createTable(desc);
|
|
|
}
|
|
|
|
|
|
public synchronized void deleteTable(Text tableName) throws IOException {
|
|
|
- if(closed) {
|
|
|
- throw new IllegalStateException("client is not open");
|
|
|
- }
|
|
|
- if(master == null) {
|
|
|
- locateRootRegion();
|
|
|
- }
|
|
|
- master.deleteTable(tableName);
|
|
|
+ checkOpen();
|
|
|
+ checkMaster();
|
|
|
+ locateRootRegion();
|
|
|
+ this.master.deleteTable(tableName);
|
|
|
+ }
|
|
|
+
|
|
|
+ public synchronized void shutdown() throws IOException {
|
|
|
+ checkOpen();
|
|
|
+ checkMaster();
|
|
|
+ this.master.shutdown();
|
|
|
}
|
|
|
|
|
|
public synchronized void openTable(Text tableName) throws IOException {
|
|
|
- if(closed) {
|
|
|
- throw new IllegalStateException("client is not open");
|
|
|
+ if(tableName == null || tableName.getLength() == 0) {
|
|
|
+ throw new IllegalArgumentException("table name cannot be null or zero length");
|
|
|
}
|
|
|
-
|
|
|
- tableServers = tablesToServers.get(tableName);
|
|
|
- if(tableServers == null ) { // We don't know where the table is
|
|
|
+ checkOpen();
|
|
|
+ this.tableServers = tablesToServers.get(tableName);
|
|
|
+ if(this.tableServers == null ) { // We don't know where the table is
|
|
|
findTableInMeta(tableName); // Load the information from meta
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private void findTableInMeta(Text tableName) throws IOException {
|
|
|
- TreeMap<Text, TableInfo> metaServers = tablesToServers.get(META_TABLE_NAME);
|
|
|
+ TreeMap<Text, TableInfo> metaServers =
|
|
|
+ this.tablesToServers.get(META_TABLE_NAME);
|
|
|
|
|
|
if(metaServers == null) { // Don't know where the meta is
|
|
|
loadMetaFromRoot(tableName);
|
|
@@ -139,18 +162,51 @@ public class HClient implements HConstants {
|
|
|
// All we really wanted was the meta or root table
|
|
|
return;
|
|
|
}
|
|
|
- metaServers = tablesToServers.get(META_TABLE_NAME);
|
|
|
+ metaServers = this.tablesToServers.get(META_TABLE_NAME);
|
|
|
}
|
|
|
|
|
|
- tableServers = new TreeMap<Text, TableInfo>();
|
|
|
- for(Iterator<TableInfo> i = metaServers.tailMap(tableName).values().iterator();
|
|
|
- i.hasNext(); ) {
|
|
|
+ this.tableServers = new TreeMap<Text, TableInfo>();
|
|
|
+ for(int tries = 0;
|
|
|
+ this.tableServers.size() == 0 && tries < this.numRetries;
|
|
|
+ tries++) {
|
|
|
+
|
|
|
+ Text firstMetaRegion = null;
|
|
|
+ if(metaServers.containsKey(tableName)) {
|
|
|
+ firstMetaRegion = tableName;
|
|
|
+
|
|
|
+ } else {
|
|
|
+ firstMetaRegion = metaServers.headMap(tableName).lastKey();
|
|
|
+ }
|
|
|
+ for(Iterator<TableInfo> i
|
|
|
+ = metaServers.tailMap(firstMetaRegion).values().iterator();
|
|
|
+ i.hasNext(); ) {
|
|
|
|
|
|
- TableInfo t = i.next();
|
|
|
+ TableInfo t = i.next();
|
|
|
|
|
|
- scanOneMetaRegion(t, tableName);
|
|
|
+ scanOneMetaRegion(t, tableName);
|
|
|
+ }
|
|
|
+ if(this.tableServers.size() == 0) {
|
|
|
+ // Table not assigned. Sleep and try again
|
|
|
+
|
|
|
+ if(LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Sleeping. Table " + tableName
|
|
|
+ + " not currently being served.");
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ Thread.sleep(this.clientTimeout);
|
|
|
+
|
|
|
+ } catch(InterruptedException e) {
|
|
|
+ }
|
|
|
+ if(LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Wake. Retry finding table " + tableName);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- tablesToServers.put(tableName, tableServers);
|
|
|
+ if(this.tableServers.size() == 0) {
|
|
|
+ throw new IOException("failed to scan " + META_TABLE_NAME + " after "
|
|
|
+ + this.numRetries + " retries");
|
|
|
+ }
|
|
|
+ this.tablesToServers.put(tableName, this.tableServers);
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -169,24 +225,23 @@ public class HClient implements HConstants {
|
|
|
* could be.
|
|
|
*/
|
|
|
private void locateRootRegion() throws IOException {
|
|
|
- if(master == null) {
|
|
|
- HServerAddress masterLocation =
|
|
|
- new HServerAddress(this.conf.get(MASTER_ADDRESS));
|
|
|
- master = (HMasterInterface)RPC.getProxy(HMasterInterface.class,
|
|
|
- HMasterInterface.versionID,
|
|
|
- masterLocation.getInetSocketAddress(), conf);
|
|
|
- }
|
|
|
+ checkMaster();
|
|
|
|
|
|
- int tries = 0;
|
|
|
HServerAddress rootRegionLocation = null;
|
|
|
- do {
|
|
|
+ for(int tries = 0; rootRegionLocation == null && tries < numRetries; tries++){
|
|
|
int localTimeouts = 0;
|
|
|
while(rootRegionLocation == null && localTimeouts < numTimeouts) {
|
|
|
rootRegionLocation = master.findRootRegion();
|
|
|
|
|
|
if(rootRegionLocation == null) {
|
|
|
try {
|
|
|
- Thread.sleep(clientTimeout);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Sleeping. Waiting for root region.");
|
|
|
+ }
|
|
|
+ Thread.sleep(this.clientTimeout);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Wake. Retry finding root region.");
|
|
|
+ }
|
|
|
} catch(InterruptedException iex) {
|
|
|
}
|
|
|
localTimeouts++;
|
|
@@ -201,17 +256,18 @@ public class HClient implements HConstants {
|
|
|
HRegionInterface rootRegion = getHRegionConnection(rootRegionLocation);
|
|
|
|
|
|
if(rootRegion.getRegionInfo(HGlobals.rootRegionInfo.regionName) != null) {
|
|
|
- tableServers = new TreeMap<Text, TableInfo>();
|
|
|
- tableServers.put(startRow, new TableInfo(HGlobals.rootRegionInfo, rootRegionLocation));
|
|
|
- tablesToServers.put(ROOT_TABLE_NAME, tableServers);
|
|
|
+ this.tableServers = new TreeMap<Text, TableInfo>();
|
|
|
+ this.tableServers.put(EMPTY_START_ROW,
|
|
|
+ new TableInfo(HGlobals.rootRegionInfo, rootRegionLocation));
|
|
|
+
|
|
|
+ this.tablesToServers.put(ROOT_TABLE_NAME, this.tableServers);
|
|
|
break;
|
|
|
}
|
|
|
rootRegionLocation = null;
|
|
|
-
|
|
|
- } while(rootRegionLocation == null && tries++ < numRetries);
|
|
|
+ }
|
|
|
|
|
|
- if(rootRegionLocation == null) {
|
|
|
- closed = true;
|
|
|
+ if (rootRegionLocation == null) {
|
|
|
+ this.closed = true;
|
|
|
throw new IOException("unable to locate root region server");
|
|
|
}
|
|
|
}
|
|
@@ -220,38 +276,78 @@ public class HClient implements HConstants {
|
|
|
* Scans the root region to find all the meta regions
|
|
|
*/
|
|
|
private void scanRoot() throws IOException {
|
|
|
- tableServers = new TreeMap<Text, TableInfo>();
|
|
|
- TableInfo t = tablesToServers.get(ROOT_TABLE_NAME).get(startRow);
|
|
|
- scanOneMetaRegion(t, META_TABLE_NAME);
|
|
|
- tablesToServers.put(META_TABLE_NAME, tableServers);
|
|
|
+ this.tableServers = new TreeMap<Text, TableInfo>();
|
|
|
+ TableInfo t = this.tablesToServers.get(ROOT_TABLE_NAME).get(EMPTY_START_ROW);
|
|
|
+ for(int tries = 0;
|
|
|
+ scanOneMetaRegion(t, META_TABLE_NAME) == 0 && tries < this.numRetries;
|
|
|
+ tries++) {
|
|
|
+
|
|
|
+ // The table is not yet being served. Sleep and retry.
|
|
|
+
|
|
|
+ if(LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Sleeping. Table " + META_TABLE_NAME
|
|
|
+ + " not currently being served.");
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ Thread.sleep(this.clientTimeout);
|
|
|
+
|
|
|
+ } catch(InterruptedException e) {
|
|
|
+ }
|
|
|
+ if(LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Wake. Retry finding table " + META_TABLE_NAME);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if(this.tableServers.size() == 0) {
|
|
|
+ throw new IOException("failed to scan " + ROOT_TABLE_NAME + " after "
|
|
|
+ + this.numRetries + " retries");
|
|
|
+ }
|
|
|
+ this.tablesToServers.put(META_TABLE_NAME, this.tableServers);
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
* Scans a single meta region
|
|
|
* @param t the table we're going to scan
|
|
|
* @param tableName the name of the table we're looking for
|
|
|
+ * @return returns the number of servers that are serving the table
|
|
|
*/
|
|
|
- private void scanOneMetaRegion(TableInfo t, Text tableName) throws IOException {
|
|
|
+ private int scanOneMetaRegion(TableInfo t, Text tableName)
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
HRegionInterface server = getHRegionConnection(t.serverAddress);
|
|
|
+ int servers = 0;
|
|
|
long scannerId = -1L;
|
|
|
try {
|
|
|
- scannerId = server.openScanner(t.regionInfo.regionName, metaColumns, tableName);
|
|
|
-
|
|
|
+ scannerId =
|
|
|
+ server.openScanner(t.regionInfo.regionName, META_COLUMNS, tableName);
|
|
|
+
|
|
|
DataInputBuffer inbuf = new DataInputBuffer();
|
|
|
while(true) {
|
|
|
+ HRegionInfo regionInfo = null;
|
|
|
+ String serverAddress = null;
|
|
|
HStoreKey key = new HStoreKey();
|
|
|
-
|
|
|
LabelledData[] values = server.next(scannerId, key);
|
|
|
if(values.length == 0) {
|
|
|
+ if(servers == 0) {
|
|
|
+ // If we didn't find any servers then the table does not exist
|
|
|
+
|
|
|
+ throw new NoSuchElementException("table '" + tableName
|
|
|
+ + "' does not exist");
|
|
|
+ }
|
|
|
+
|
|
|
+ // We found at least one server for the table and now we're done.
|
|
|
+
|
|
|
break;
|
|
|
}
|
|
|
|
|
|
+ byte[] bytes = null;
|
|
|
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
|
|
|
for(int i = 0; i < values.length; i++) {
|
|
|
- results.put(values[i].getLabel(), values[i].getData().get());
|
|
|
+ bytes = new byte[values[i].getData().getSize()];
|
|
|
+ System.arraycopy(values[i].getData().get(), 0, bytes, 0, bytes.length);
|
|
|
+ results.put(values[i].getLabel(), bytes);
|
|
|
}
|
|
|
- HRegionInfo regionInfo = new HRegionInfo();
|
|
|
- byte[] bytes = results.get(META_COL_REGIONINFO);
|
|
|
+ regionInfo = new HRegionInfo();
|
|
|
+ bytes = results.get(COL_REGIONINFO);
|
|
|
inbuf.reset(bytes, bytes.length);
|
|
|
regionInfo.readFields(inbuf);
|
|
|
|
|
@@ -259,15 +355,26 @@ public class HClient implements HConstants {
|
|
|
// We're done
|
|
|
break;
|
|
|
}
|
|
|
-
|
|
|
- bytes = results.get(META_COL_SERVER);
|
|
|
- String serverName = new String(bytes, UTF8_ENCODING);
|
|
|
+
|
|
|
+ bytes = results.get(COL_SERVER);
|
|
|
+ if(bytes == null || bytes.length == 0) {
|
|
|
+ // We need to rescan because the table we want is unassigned.
|
|
|
|
|
|
- tableServers.put(regionInfo.startKey,
|
|
|
- new TableInfo(regionInfo, new HServerAddress(serverName)));
|
|
|
+ if(LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("no server address for " + regionInfo.toString());
|
|
|
+ }
|
|
|
+ servers = 0;
|
|
|
+ this.tableServers.clear();
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ servers += 1;
|
|
|
+ serverAddress = new String(bytes, UTF8_ENCODING);
|
|
|
|
|
|
+ this.tableServers.put(regionInfo.startKey,
|
|
|
+ new TableInfo(regionInfo, new HServerAddress(serverAddress)));
|
|
|
}
|
|
|
-
|
|
|
+ return servers;
|
|
|
+
|
|
|
} finally {
|
|
|
if(scannerId != -1L) {
|
|
|
server.close(scannerId);
|
|
@@ -280,23 +387,24 @@ public class HClient implements HConstants {
|
|
|
|
|
|
// See if we already have a connection
|
|
|
|
|
|
- HRegionInterface server = servers.get(regionServer.toString());
|
|
|
+ HRegionInterface server = this.servers.get(regionServer.toString());
|
|
|
|
|
|
if(server == null) { // Get a connection
|
|
|
|
|
|
server = (HRegionInterface)RPC.waitForProxy(HRegionInterface.class,
|
|
|
- HRegionInterface.versionID, regionServer.getInetSocketAddress(), conf);
|
|
|
+ HRegionInterface.versionID, regionServer.getInetSocketAddress(),
|
|
|
+ this.conf);
|
|
|
|
|
|
- servers.put(regionServer.toString(), server);
|
|
|
+ this.servers.put(regionServer.toString(), server);
|
|
|
}
|
|
|
return server;
|
|
|
}
|
|
|
|
|
|
- /** Close the connection to the HRegionServer */
|
|
|
+ /** Close the connection */
|
|
|
public synchronized void close() throws IOException {
|
|
|
- if(! closed) {
|
|
|
+ if(! this.closed) {
|
|
|
RPC.stopClient();
|
|
|
- closed = true;
|
|
|
+ this.closed = true;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -307,65 +415,75 @@ public class HClient implements HConstants {
|
|
|
* catalog table that just contains table names and their descriptors.
|
|
|
* Right now, it only exists as part of the META table's region info.
|
|
|
*/
|
|
|
- public synchronized HTableDescriptor[] listTables() throws IOException {
|
|
|
+ public synchronized HTableDescriptor[] listTables()
|
|
|
+ throws IOException {
|
|
|
TreeSet<HTableDescriptor> uniqueTables = new TreeSet<HTableDescriptor>();
|
|
|
|
|
|
- TreeMap<Text, TableInfo> metaTables = tablesToServers.get(META_TABLE_NAME);
|
|
|
+ TreeMap<Text, TableInfo> metaTables =
|
|
|
+ this.tablesToServers.get(META_TABLE_NAME);
|
|
|
+
|
|
|
if(metaTables == null) {
|
|
|
// Meta is not loaded yet so go do that
|
|
|
loadMetaFromRoot(META_TABLE_NAME);
|
|
|
metaTables = tablesToServers.get(META_TABLE_NAME);
|
|
|
}
|
|
|
|
|
|
- for(Iterator<TableInfo>it = metaTables.values().iterator(); it.hasNext(); ) {
|
|
|
- TableInfo t = it.next();
|
|
|
+ for (TableInfo t: metaTables.values()) {
|
|
|
HRegionInterface server = getHRegionConnection(t.serverAddress);
|
|
|
long scannerId = -1L;
|
|
|
try {
|
|
|
- scannerId = server.openScanner(t.regionInfo.regionName, metaColumns, startRow);
|
|
|
- HStoreKey key = new HStoreKey();
|
|
|
+ scannerId = server.openScanner(t.regionInfo.regionName,
|
|
|
+ META_COLUMNS, EMPTY_START_ROW);
|
|
|
|
|
|
+ HStoreKey key = new HStoreKey();
|
|
|
DataInputBuffer inbuf = new DataInputBuffer();
|
|
|
while(true) {
|
|
|
LabelledData[] values = server.next(scannerId, key);
|
|
|
if(values.length == 0) {
|
|
|
break;
|
|
|
}
|
|
|
-
|
|
|
for(int i = 0; i < values.length; i++) {
|
|
|
- if(values[i].getLabel().equals(META_COL_REGIONINFO)) {
|
|
|
+ if(values[i].getLabel().equals(COL_REGIONINFO)) {
|
|
|
byte[] bytes = values[i].getData().get();
|
|
|
inbuf.reset(bytes, bytes.length);
|
|
|
HRegionInfo info = new HRegionInfo();
|
|
|
info.readFields(inbuf);
|
|
|
|
|
|
- // Only examine the rows where the startKey is zero length
|
|
|
-
|
|
|
+ // Only examine the rows where the startKey is zero length
|
|
|
if(info.startKey.getLength() == 0) {
|
|
|
uniqueTables.add(info.tableDesc);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
} finally {
|
|
|
if(scannerId != -1L) {
|
|
|
server.close(scannerId);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- return (HTableDescriptor[]) uniqueTables.toArray(new HTableDescriptor[uniqueTables.size()]);
|
|
|
+ return (HTableDescriptor[])uniqueTables.
|
|
|
+ toArray(new HTableDescriptor[uniqueTables.size()]);
|
|
|
}
|
|
|
|
|
|
private synchronized TableInfo getTableInfo(Text row) {
|
|
|
- if(tableServers == null) {
|
|
|
+ if(row == null || row.getLength() == 0) {
|
|
|
+ throw new IllegalArgumentException("row key cannot be null or zero length");
|
|
|
+ }
|
|
|
+ if(this.tableServers == null) {
|
|
|
throw new IllegalStateException("Must open table first");
|
|
|
}
|
|
|
|
|
|
// Only one server will have the row we are looking for
|
|
|
|
|
|
- Text serverKey = tableServers.tailMap(row).firstKey();
|
|
|
- return tableServers.get(serverKey);
|
|
|
+ Text serverKey = null;
|
|
|
+ if(this.tableServers.containsKey(row)) {
|
|
|
+ serverKey = row;
|
|
|
+
|
|
|
+ } else {
|
|
|
+ serverKey = this.tableServers.headMap(row).lastKey();
|
|
|
+ }
|
|
|
+ return this.tableServers.get(serverKey);
|
|
|
}
|
|
|
|
|
|
/** Get a single value for the specified row and column */
|
|
@@ -416,7 +534,7 @@ public class HClient implements HConstants {
|
|
|
* Return the specified columns.
|
|
|
*/
|
|
|
public synchronized HScannerInterface obtainScanner(Text[] columns, Text startRow) throws IOException {
|
|
|
- if(tableServers == null) {
|
|
|
+ if(this.tableServers == null) {
|
|
|
throw new IllegalStateException("Must open table first");
|
|
|
}
|
|
|
return new ClientScanner(columns, startRow);
|
|
@@ -427,14 +545,14 @@ public class HClient implements HConstants {
|
|
|
TableInfo info = getTableInfo(row);
|
|
|
long lockid;
|
|
|
try {
|
|
|
- currentServer = getHRegionConnection(info.serverAddress);
|
|
|
- currentRegion = info.regionInfo.regionName;
|
|
|
- clientid = rand.nextLong();
|
|
|
- lockid = currentServer.startUpdate(currentRegion, clientid, row);
|
|
|
+ this.currentServer = getHRegionConnection(info.serverAddress);
|
|
|
+ this.currentRegion = info.regionInfo.regionName;
|
|
|
+ this.clientid = rand.nextLong();
|
|
|
+ lockid = currentServer.startUpdate(this.currentRegion, this.clientid, row);
|
|
|
|
|
|
} catch(IOException e) {
|
|
|
- currentServer = null;
|
|
|
- currentRegion = null;
|
|
|
+ this.currentServer = null;
|
|
|
+ this.currentRegion = null;
|
|
|
throw e;
|
|
|
}
|
|
|
return lockid;
|
|
@@ -443,16 +561,17 @@ public class HClient implements HConstants {
|
|
|
/** Change a value for the specified column */
|
|
|
public void put(long lockid, Text column, byte val[]) throws IOException {
|
|
|
try {
|
|
|
- currentServer.put(currentRegion, clientid, lockid, column, new BytesWritable(val));
|
|
|
+ this.currentServer.put(this.currentRegion, this.clientid, lockid, column,
|
|
|
+ new BytesWritable(val));
|
|
|
|
|
|
} catch(IOException e) {
|
|
|
try {
|
|
|
- currentServer.abort(currentRegion, clientid, lockid);
|
|
|
+ this.currentServer.abort(this.currentRegion, this.clientid, lockid);
|
|
|
|
|
|
} catch(IOException e2) {
|
|
|
}
|
|
|
- currentServer = null;
|
|
|
- currentRegion = null;
|
|
|
+ this.currentServer = null;
|
|
|
+ this.currentRegion = null;
|
|
|
throw e;
|
|
|
}
|
|
|
}
|
|
@@ -460,16 +579,16 @@ public class HClient implements HConstants {
|
|
|
/** Delete the value for a column */
|
|
|
public void delete(long lockid, Text column) throws IOException {
|
|
|
try {
|
|
|
- currentServer.delete(currentRegion, clientid, lockid, column);
|
|
|
+ this.currentServer.delete(this.currentRegion, this.clientid, lockid, column);
|
|
|
|
|
|
} catch(IOException e) {
|
|
|
try {
|
|
|
- currentServer.abort(currentRegion, clientid, lockid);
|
|
|
+ this.currentServer.abort(this.currentRegion, this.clientid, lockid);
|
|
|
|
|
|
} catch(IOException e2) {
|
|
|
}
|
|
|
- currentServer = null;
|
|
|
- currentRegion = null;
|
|
|
+ this.currentServer = null;
|
|
|
+ this.currentRegion = null;
|
|
|
throw e;
|
|
|
}
|
|
|
}
|
|
@@ -477,11 +596,10 @@ public class HClient implements HConstants {
|
|
|
/** Abort a row mutation */
|
|
|
public void abort(long lockid) throws IOException {
|
|
|
try {
|
|
|
- currentServer.abort(currentRegion, clientid, lockid);
|
|
|
-
|
|
|
+ this.currentServer.abort(this.currentRegion, this.clientid, lockid);
|
|
|
} catch(IOException e) {
|
|
|
- currentServer = null;
|
|
|
- currentRegion = null;
|
|
|
+ this.currentServer = null;
|
|
|
+ this.currentRegion = null;
|
|
|
throw e;
|
|
|
}
|
|
|
}
|
|
@@ -489,11 +607,11 @@ public class HClient implements HConstants {
|
|
|
/** Finalize a row mutation */
|
|
|
public void commit(long lockid) throws IOException {
|
|
|
try {
|
|
|
- currentServer.commit(currentRegion, clientid, lockid);
|
|
|
+ this.currentServer.commit(this.currentRegion, this.clientid, lockid);
|
|
|
|
|
|
} finally {
|
|
|
- currentServer = null;
|
|
|
- currentRegion = null;
|
|
|
+ this.currentServer = null;
|
|
|
+ this.currentRegion = null;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -515,7 +633,19 @@ public class HClient implements HConstants {
|
|
|
this.columns = columns;
|
|
|
this.startRow = startRow;
|
|
|
this.closed = false;
|
|
|
- Collection<TableInfo> info = tableServers.tailMap(startRow).values();
|
|
|
+
|
|
|
+ Text firstServer = null;
|
|
|
+ if(this.startRow == null || this.startRow.getLength() == 0) {
|
|
|
+ firstServer = tableServers.firstKey();
|
|
|
+
|
|
|
+ } else if(tableServers.containsKey(startRow)) {
|
|
|
+ firstServer = startRow;
|
|
|
+
|
|
|
+ } else {
|
|
|
+ firstServer = tableServers.headMap(startRow).lastKey();
|
|
|
+ }
|
|
|
+ Collection<TableInfo> info = tableServers.tailMap(firstServer).values();
|
|
|
+
|
|
|
this.regions = info.toArray(new TableInfo[info.size()]);
|
|
|
this.currentRegion = -1;
|
|
|
this.server = null;
|
|
@@ -528,19 +658,20 @@ public class HClient implements HConstants {
|
|
|
* Returns false if there are no more scanners.
|
|
|
*/
|
|
|
private boolean nextScanner() throws IOException {
|
|
|
- if(scannerId != -1L) {
|
|
|
- server.close(scannerId);
|
|
|
- scannerId = -1L;
|
|
|
+ if(this.scannerId != -1L) {
|
|
|
+ this.server.close(this.scannerId);
|
|
|
+ this.scannerId = -1L;
|
|
|
}
|
|
|
- currentRegion += 1;
|
|
|
- if(currentRegion == regions.length) {
|
|
|
+ this.currentRegion += 1;
|
|
|
+ if(this.currentRegion == this.regions.length) {
|
|
|
close();
|
|
|
return false;
|
|
|
}
|
|
|
try {
|
|
|
- server = getHRegionConnection(regions[currentRegion].serverAddress);
|
|
|
- scannerId = server.openScanner(regions[currentRegion].regionInfo.regionName,
|
|
|
- columns, startRow);
|
|
|
+ this.server = getHRegionConnection(this.regions[currentRegion].serverAddress);
|
|
|
+ this.scannerId = this.server.openScanner(
|
|
|
+ this.regions[currentRegion].regionInfo.regionName, this.columns,
|
|
|
+ this.startRow);
|
|
|
|
|
|
} catch(IOException e) {
|
|
|
close();
|
|
@@ -553,16 +684,18 @@ public class HClient implements HConstants {
|
|
|
* @see org.apache.hadoop.hbase.HScannerInterface#next(org.apache.hadoop.hbase.HStoreKey, java.util.TreeMap)
|
|
|
*/
|
|
|
public boolean next(HStoreKey key, TreeMap<Text, byte[]> results) throws IOException {
|
|
|
- if(closed) {
|
|
|
+ if(this.closed) {
|
|
|
return false;
|
|
|
}
|
|
|
LabelledData[] values = null;
|
|
|
do {
|
|
|
- values = server.next(scannerId, key);
|
|
|
+ values = this.server.next(this.scannerId, key);
|
|
|
} while(values.length == 0 && nextScanner());
|
|
|
|
|
|
for(int i = 0; i < values.length; i++) {
|
|
|
- results.put(values[i].getLabel(), values[i].getData().get());
|
|
|
+ byte[] bytes = new byte[values[i].getData().getSize()];
|
|
|
+ System.arraycopy(values[i].getData().get(), 0, bytes, 0, bytes.length);
|
|
|
+ results.put(values[i].getLabel(), bytes);
|
|
|
}
|
|
|
return values.length != 0;
|
|
|
}
|
|
@@ -571,38 +704,112 @@ public class HClient implements HConstants {
|
|
|
* @see org.apache.hadoop.hbase.HScannerInterface#close()
|
|
|
*/
|
|
|
public void close() throws IOException {
|
|
|
- if(scannerId != -1L) {
|
|
|
- server.close(scannerId);
|
|
|
+ if(this.scannerId != -1L) {
|
|
|
+ this.server.close(this.scannerId);
|
|
|
}
|
|
|
- server = null;
|
|
|
- closed = true;
|
|
|
+ this.server = null;
|
|
|
+ this.closed = true;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private void printUsage() {
|
|
|
+ printUsage(null);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void printUsage(final String message) {
|
|
|
+ if (message != null && message.length() > 0) {
|
|
|
+ System.err.println(message);
|
|
|
+ }
|
|
|
System.err.println("Usage: java " + this.getClass().getName() +
|
|
|
- " [--master=hostname:port]");
|
|
|
+ " [--master=host:port] <command> <args>");
|
|
|
+ System.err.println("Options:");
|
|
|
+ System.err.println(" master Specify host and port of HBase " +
|
|
|
+ "cluster master. If not present,");
|
|
|
+ System.err.println(" address is read from configuration.");
|
|
|
+ System.err.println("Commands:");
|
|
|
+ System.err.println(" shutdown Shutdown the HBase cluster.");
|
|
|
+ System.err.println(" createTable Takes table name, column families, " +
|
|
|
+ "and maximum versions.");
|
|
|
+ System.err.println(" deleteTable Takes a table name.");
|
|
|
+ System.err.println(" iistTables List all tables.");
|
|
|
+ System.err.println("Example Usage:");
|
|
|
+ System.err.println(" % java " + this.getClass().getName() + " shutdown");
|
|
|
+ System.err.println(" % java " + this.getClass().getName() +
|
|
|
+ " createTable webcrawl contents: anchors: 10");
|
|
|
}
|
|
|
|
|
|
- private int doCommandLine(final String args[]) {
|
|
|
+ int doCommandLine(final String args[]) {
|
|
|
// Process command-line args. TODO: Better cmd-line processing
|
|
|
- // (but hopefully something not as painful as cli options).
|
|
|
- for (String cmd: args) {
|
|
|
- if (cmd.equals("-h") || cmd.startsWith("--h")) {
|
|
|
- printUsage();
|
|
|
- return 0;
|
|
|
- }
|
|
|
-
|
|
|
- final String masterArgKey = "--master=";
|
|
|
- if (cmd.startsWith(masterArgKey)) {
|
|
|
- this.conf.set(MASTER_ADDRESS,
|
|
|
- cmd.substring(masterArgKey.length()));
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
+ // (but hopefully something not as painful as cli options).
|
|
|
int errCode = -1;
|
|
|
+ if (args.length < 1) {
|
|
|
+ printUsage();
|
|
|
+ return errCode;
|
|
|
+ }
|
|
|
try {
|
|
|
- locateRootRegion();
|
|
|
+ for (int i = 0; i < args.length; i++) {
|
|
|
+ String cmd = args[i];
|
|
|
+ if (cmd.equals("-h") || cmd.startsWith("--h")) {
|
|
|
+ printUsage();
|
|
|
+ errCode = 0;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ final String masterArgKey = "--master=";
|
|
|
+ if (cmd.startsWith(masterArgKey)) {
|
|
|
+ this.conf.set(MASTER_ADDRESS, cmd.substring(masterArgKey.length()));
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (cmd.equals("shutdown")) {
|
|
|
+ shutdown();
|
|
|
+ errCode = 0;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (cmd.equals("listTables")) {
|
|
|
+ HTableDescriptor [] tables = listTables();
|
|
|
+ for (int ii = 0; ii < tables.length; ii++) {
|
|
|
+ System.out.println(tables[ii].getName());
|
|
|
+ }
|
|
|
+ errCode = 0;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (cmd.equals("createTable")) {
|
|
|
+ if (i + 3 > args.length) {
|
|
|
+ throw new IllegalArgumentException("Must supply a table name " +
|
|
|
+ ", at least one column family and maximum number of versions");
|
|
|
+ }
|
|
|
+ int maxVersions = (Integer.parseInt(args[args.length - 1]));
|
|
|
+ HTableDescriptor desc =
|
|
|
+ new HTableDescriptor(args[i + 1], maxVersions);
|
|
|
+ boolean addedFamily = false;
|
|
|
+ for (int ii = i + 2; ii < (args.length - 1); ii++) {
|
|
|
+ desc.addFamily(new Text(args[ii]));
|
|
|
+ addedFamily = true;
|
|
|
+ }
|
|
|
+ if (!addedFamily) {
|
|
|
+ throw new IllegalArgumentException("Must supply at least one " +
|
|
|
+ "column family");
|
|
|
+ }
|
|
|
+ createTable(desc);
|
|
|
+ errCode = 0;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (cmd.equals("deleteTable")) {
|
|
|
+ if (i + 1 > args.length) {
|
|
|
+ throw new IllegalArgumentException("Must supply a table name");
|
|
|
+ }
|
|
|
+ deleteTable(new Text(args[i + 1]));
|
|
|
+ errCode = 0;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ printUsage();
|
|
|
+ break;
|
|
|
+ }
|
|
|
} catch (Exception e) {
|
|
|
e.printStackTrace();
|
|
|
}
|