|
@@ -32,6 +32,7 @@ import java.util.Vector;
|
|
|
import java.util.concurrent.BlockingQueue;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
@@ -391,6 +392,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
|
|
|
// Leases
|
|
|
private Leases leases;
|
|
|
+
|
|
|
+ // Request counter
|
|
|
+ private AtomicInteger requestCount;
|
|
|
|
|
|
/**
|
|
|
* Starts a HRegionServer at the default location
|
|
@@ -424,6 +428,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
Collections.synchronizedSortedMap(new TreeMap<Text, HRegion>());
|
|
|
|
|
|
this.outboundMsgs = new Vector<HMsg>();
|
|
|
+ this.requestCount = new AtomicInteger();
|
|
|
|
|
|
// Config'ed params
|
|
|
this.numRetries = conf.getInt("hbase.client.retries.number", 2);
|
|
@@ -597,6 +602,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Telling master we are up");
|
|
|
}
|
|
|
+ requestCount.set(0);
|
|
|
+ serverInfo.setLoad(new HServerLoad(0, onlineRegions.size()));
|
|
|
hbaseMaster.regionServerStartup(serverInfo);
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Done telling master we are up");
|
|
@@ -626,6 +633,10 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
+ serverInfo.setLoad(new HServerLoad(requestCount.get(),
|
|
|
+ onlineRegions.size()));
|
|
|
+ requestCount.set(0);
|
|
|
+
|
|
|
HMsg msgs[] =
|
|
|
hbaseMaster.regionServerReport(serverInfo, outboundArray);
|
|
|
lastMsg = System.currentTimeMillis();
|
|
@@ -897,6 +908,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
|
|
|
this.lock.writeLock().lock();
|
|
|
try {
|
|
|
+ this.log.setSequenceNumber(region.getMaxSequenceId());
|
|
|
this.onlineRegions.put(region.getRegionName(), region);
|
|
|
} finally {
|
|
|
this.lock.writeLock().unlock();
|
|
@@ -963,6 +975,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
*/
|
|
|
public HRegionInfo getRegionInfo(final Text regionName)
|
|
|
throws NotServingRegionException {
|
|
|
+ requestCount.incrementAndGet();
|
|
|
return getRegion(regionName).getRegionInfo();
|
|
|
}
|
|
|
|
|
@@ -971,6 +984,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
*/
|
|
|
public void batchUpdate(Text regionName, long timestamp, BatchUpdate b)
|
|
|
throws IOException {
|
|
|
+ requestCount.incrementAndGet();
|
|
|
long clientid = rand.nextLong();
|
|
|
long lockid = startUpdate(regionName, clientid, b.getRow());
|
|
|
for(BatchOperation op: b) {
|
|
@@ -993,6 +1007,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
public byte [] get(final Text regionName, final Text row,
|
|
|
final Text column)
|
|
|
throws IOException {
|
|
|
+ requestCount.incrementAndGet();
|
|
|
return getRegion(regionName).get(row, column);
|
|
|
}
|
|
|
|
|
@@ -1002,6 +1017,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
public byte [][] get(final Text regionName, final Text row,
|
|
|
final Text column, final int numVersions)
|
|
|
throws IOException {
|
|
|
+ requestCount.incrementAndGet();
|
|
|
return getRegion(regionName).get(row, column, numVersions);
|
|
|
}
|
|
|
|
|
@@ -1010,6 +1026,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
*/
|
|
|
public byte [][] get(final Text regionName, final Text row, final Text column,
|
|
|
final long timestamp, final int numVersions) throws IOException {
|
|
|
+ requestCount.incrementAndGet();
|
|
|
return getRegion(regionName).get(row, column, timestamp, numVersions);
|
|
|
}
|
|
|
|
|
@@ -1018,6 +1035,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
*/
|
|
|
public KeyedData[] getRow(final Text regionName, final Text row)
|
|
|
throws IOException {
|
|
|
+ requestCount.incrementAndGet();
|
|
|
HRegion region = getRegion(regionName);
|
|
|
TreeMap<Text, byte[]> map = region.getFull(row);
|
|
|
KeyedData result[] = new KeyedData[map.size()];
|
|
@@ -1034,6 +1052,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
*/
|
|
|
public KeyedData[] next(final long scannerId)
|
|
|
throws IOException {
|
|
|
+ requestCount.incrementAndGet();
|
|
|
String scannerName = String.valueOf(scannerId);
|
|
|
HInternalScannerInterface s = scanners.get(scannerName);
|
|
|
if (s == null) {
|
|
@@ -1077,6 +1096,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
*/
|
|
|
public long startUpdate(Text regionName, long clientid, Text row)
|
|
|
throws IOException {
|
|
|
+ requestCount.incrementAndGet();
|
|
|
HRegion region = getRegion(regionName);
|
|
|
long lockid = region.startUpdate(row);
|
|
|
this.leases.createLease(clientid, lockid,
|
|
@@ -1120,6 +1140,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
public void put(final Text regionName, final long clientid,
|
|
|
final long lockid, final Text column, final byte [] val)
|
|
|
throws IOException {
|
|
|
+ requestCount.incrementAndGet();
|
|
|
HRegion region = getRegion(regionName, true);
|
|
|
leases.renewLease(clientid, lockid);
|
|
|
region.put(lockid, column, val);
|
|
@@ -1130,6 +1151,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
*/
|
|
|
public void delete(Text regionName, long clientid, long lockid, Text column)
|
|
|
throws IOException {
|
|
|
+ requestCount.incrementAndGet();
|
|
|
HRegion region = getRegion(regionName);
|
|
|
leases.renewLease(clientid, lockid);
|
|
|
region.delete(lockid, column);
|
|
@@ -1140,6 +1162,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
*/
|
|
|
public void abort(Text regionName, long clientid, long lockid)
|
|
|
throws IOException {
|
|
|
+ requestCount.incrementAndGet();
|
|
|
HRegion region = getRegion(regionName, true);
|
|
|
leases.cancelLease(clientid, lockid);
|
|
|
region.abort(lockid);
|
|
@@ -1150,6 +1173,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
*/
|
|
|
public void commit(Text regionName, final long clientid, final long lockid,
|
|
|
final long timestamp) throws IOException {
|
|
|
+ requestCount.incrementAndGet();
|
|
|
HRegion region = getRegion(regionName, true);
|
|
|
leases.cancelLease(clientid, lockid);
|
|
|
region.commit(lockid, timestamp);
|
|
@@ -1159,6 +1183,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
* {@inheritDoc}
|
|
|
*/
|
|
|
public void renewLease(long lockid, long clientid) throws IOException {
|
|
|
+ requestCount.incrementAndGet();
|
|
|
leases.renewLease(clientid, lockid);
|
|
|
}
|
|
|
|
|
@@ -1247,6 +1272,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
public long openScanner(Text regionName, Text[] cols, Text firstRow,
|
|
|
final long timestamp, final RowFilterInterface filter)
|
|
|
throws IOException {
|
|
|
+ requestCount.incrementAndGet();
|
|
|
HRegion r = getRegion(regionName);
|
|
|
long scannerId = -1L;
|
|
|
try {
|
|
@@ -1277,6 +1303,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
* {@inheritDoc}
|
|
|
*/
|
|
|
public void close(final long scannerId) throws IOException {
|
|
|
+ requestCount.incrementAndGet();
|
|
|
String scannerName = String.valueOf(scannerId);
|
|
|
HInternalScannerInterface s = null;
|
|
|
synchronized(scanners) {
|