|
@@ -26,7 +26,6 @@ import java.io.UnsupportedEncodingException;
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
import java.util.HashSet;
|
|
import java.util.HashSet;
|
|
import java.util.Iterator;
|
|
import java.util.Iterator;
|
|
-import java.util.LinkedList;
|
|
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.Random;
|
|
import java.util.Random;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
@@ -37,6 +36,9 @@ import java.util.TimerTask;
|
|
import java.util.TreeMap;
|
|
import java.util.TreeMap;
|
|
import java.util.TreeSet;
|
|
import java.util.TreeSet;
|
|
import java.util.Vector;
|
|
import java.util.Vector;
|
|
|
|
+import java.util.concurrent.BlockingQueue;
|
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -86,7 +88,7 @@ public class HMaster implements HConstants, HMasterInterface,
|
|
int numRetries;
|
|
int numRetries;
|
|
long maxRegionOpenTime;
|
|
long maxRegionOpenTime;
|
|
|
|
|
|
- LinkedList<PendingOperation> msgQueue;
|
|
|
|
|
|
+ BlockingQueue<PendingOperation> msgQueue;
|
|
|
|
|
|
private Leases serverLeases;
|
|
private Leases serverLeases;
|
|
private Server server;
|
|
private Server server;
|
|
@@ -636,7 +638,7 @@ public class HMaster implements HConstants, HMasterInterface,
|
|
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
|
|
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
|
|
this.numRetries = conf.getInt("hbase.client.retries.number", 2);
|
|
this.numRetries = conf.getInt("hbase.client.retries.number", 2);
|
|
this.maxRegionOpenTime = conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000);
|
|
this.maxRegionOpenTime = conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000);
|
|
- this.msgQueue = new LinkedList<PendingOperation>();
|
|
|
|
|
|
+ this.msgQueue = new LinkedBlockingQueue<PendingOperation>();
|
|
this.serverLeases = new Leases(
|
|
this.serverLeases = new Leases(
|
|
conf.getLong("hbase.master.lease.period", 30 * 1000),
|
|
conf.getLong("hbase.master.lease.period", 30 * 1000),
|
|
conf.getLong("hbase.master.lease.thread.wakefrequency", 15 * 1000));
|
|
conf.getLong("hbase.master.lease.thread.wakefrequency", 15 * 1000));
|
|
@@ -736,18 +738,13 @@ public class HMaster implements HConstants, HMasterInterface,
|
|
|
|
|
|
// Main processing loop
|
|
// Main processing loop
|
|
for (PendingOperation op = null; !closed; ) {
|
|
for (PendingOperation op = null; !closed; ) {
|
|
- synchronized(msgQueue) {
|
|
|
|
- while(msgQueue.size() == 0 && !closed) {
|
|
|
|
- try {
|
|
|
|
- msgQueue.wait(threadWakeFrequency);
|
|
|
|
- } catch(InterruptedException iex) {
|
|
|
|
- // continue
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- if(closed) {
|
|
|
|
- continue;
|
|
|
|
- }
|
|
|
|
- op = msgQueue.removeFirst();
|
|
|
|
|
|
+ try {
|
|
|
|
+ op = msgQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ // continue
|
|
|
|
+ }
|
|
|
|
+ if (op == null || closed) {
|
|
|
|
+ continue;
|
|
}
|
|
}
|
|
try {
|
|
try {
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -765,8 +762,10 @@ public class HMaster implements HConstants, HMasterInterface,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
LOG.warn(ex);
|
|
LOG.warn(ex);
|
|
- synchronized(msgQueue) {
|
|
|
|
- msgQueue.addLast(op);
|
|
|
|
|
|
+ try {
|
|
|
|
+ msgQueue.put(op);
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ throw new RuntimeException("Putting into msgQueue was interrupted.", e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -874,10 +873,11 @@ public class HMaster implements HConstants, HMasterInterface,
|
|
// name, then we can timeout the old one right away and register
|
|
// name, then we can timeout the old one right away and register
|
|
// the new one.
|
|
// the new one.
|
|
storedInfo = serversToServerInfo.remove(s);
|
|
storedInfo = serversToServerInfo.remove(s);
|
|
- if(storedInfo != null && !closed) {
|
|
|
|
- synchronized(msgQueue) {
|
|
|
|
- msgQueue.addLast(new PendingServerShutdown(storedInfo));
|
|
|
|
- msgQueue.notifyAll();
|
|
|
|
|
|
+ if (storedInfo != null && !closed) {
|
|
|
|
+ try {
|
|
|
|
+ msgQueue.put(new PendingServerShutdown(storedInfo));
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ throw new RuntimeException("Putting into msgQueue was interrupted.", e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1064,9 +1064,10 @@ public class HMaster implements HConstants, HMasterInterface,
|
|
|
|
|
|
// Queue up an update to note the region location.
|
|
// Queue up an update to note the region location.
|
|
|
|
|
|
- synchronized(msgQueue) {
|
|
|
|
- msgQueue.addLast(new PendingOpenReport(info, region));
|
|
|
|
- msgQueue.notifyAll();
|
|
|
|
|
|
+ try {
|
|
|
|
+ msgQueue.put(new PendingOpenReport(info, region));
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ throw new RuntimeException("Putting into msgQueue was interrupted.", e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
break;
|
|
break;
|
|
@@ -1097,9 +1098,10 @@ public class HMaster implements HConstants, HMasterInterface,
|
|
unassignedRegions.remove(region.regionName);
|
|
unassignedRegions.remove(region.regionName);
|
|
assignAttempts.remove(region.regionName);
|
|
assignAttempts.remove(region.regionName);
|
|
|
|
|
|
- synchronized(msgQueue) {
|
|
|
|
- msgQueue.addLast(new PendingCloseReport(region, reassignRegion, deleteRegion));
|
|
|
|
- msgQueue.notifyAll();
|
|
|
|
|
|
+ try {
|
|
|
|
+ msgQueue.put(new PendingCloseReport(region, reassignRegion, deleteRegion));
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ throw new RuntimeException("Putting into msgQueue was interrupted.", e);
|
|
}
|
|
}
|
|
|
|
|
|
// NOTE: we cannot put the region into unassignedRegions as that
|
|
// NOTE: we cannot put the region into unassignedRegions as that
|
|
@@ -2406,9 +2408,10 @@ public class HMaster implements HConstants, HMasterInterface,
|
|
HGlobals.rootRegionInfo);
|
|
HGlobals.rootRegionInfo);
|
|
assignAttempts.put(HGlobals.rootRegionInfo.regionName, 0L);
|
|
assignAttempts.put(HGlobals.rootRegionInfo.regionName, 0L);
|
|
}
|
|
}
|
|
- synchronized(msgQueue) {
|
|
|
|
- msgQueue.addLast(new PendingServerShutdown(storedInfo));
|
|
|
|
- msgQueue.notifyAll();
|
|
|
|
|
|
+ try {
|
|
|
|
+ msgQueue.put(new PendingServerShutdown(storedInfo));
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ throw new RuntimeException("Putting into msgQueue was interrupted.", e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|