|
@@ -152,7 +152,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
|
|
|
/** {@inheritDoc} */
|
|
|
@Override
|
|
|
- public synchronized void start() {
|
|
|
+ public void run() {
|
|
|
LOG.info("Starting shutdown thread.");
|
|
|
|
|
|
// tell the region server to stop and wait for it to complete
|
|
@@ -278,31 +278,28 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
QueueEntry e = null;
|
|
|
try {
|
|
|
e = splitQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
|
|
|
-
|
|
|
+ if (e == null) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ synchronized (splitterLock) { // Don't interrupt us while we're working
|
|
|
+ split(e.getRegion());
|
|
|
+ }
|
|
|
} catch (InterruptedException ex) {
|
|
|
continue;
|
|
|
- }
|
|
|
- if (e == null) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- synchronized (splitterLock) { // Don't interrupt us while we're working
|
|
|
- try {
|
|
|
- split(e.getRegion());
|
|
|
-
|
|
|
- } catch (IOException ex) {
|
|
|
- LOG.error("Split failed for region " +
|
|
|
- e.getRegion().getRegionName(),
|
|
|
- RemoteExceptionHandler.checkIOException(ex));
|
|
|
- if (!checkFileSystem()) {
|
|
|
- break;
|
|
|
- }
|
|
|
+ } catch (IOException ex) {
|
|
|
+ LOG.error("Split failed" +
|
|
|
+ (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
|
|
|
+ RemoteExceptionHandler.checkIOException(ex));
|
|
|
+ if (!checkFileSystem()) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
|
|
|
- } catch (Exception ex) {
|
|
|
- LOG.error("Split failed on region " +
|
|
|
- e.getRegion().getRegionName(), ex);
|
|
|
- if (!checkFileSystem()) {
|
|
|
- break;
|
|
|
- }
|
|
|
+ } catch (Exception ex) {
|
|
|
+ LOG.error("Split failed" +
|
|
|
+ (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
|
|
|
+ ex);
|
|
|
+ if (!checkFileSystem()) {
|
|
|
+ break;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -402,29 +399,27 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
QueueEntry e = null;
|
|
|
try {
|
|
|
e = compactionQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
|
|
|
-
|
|
|
- } catch (InterruptedException ex) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- if (e == null) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- try {
|
|
|
+ if (e == null) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
if (e.getRegion().compactIfNeeded()) {
|
|
|
splitter.splitRequested(e);
|
|
|
}
|
|
|
|
|
|
+ } catch (InterruptedException ex) {
|
|
|
+ continue;
|
|
|
} catch (IOException ex) {
|
|
|
- LOG.error("Compaction failed for region " +
|
|
|
- e.getRegion().getRegionName(),
|
|
|
+ LOG.error("Compaction failed" +
|
|
|
+ (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
|
|
|
RemoteExceptionHandler.checkIOException(ex));
|
|
|
if (!checkFileSystem()) {
|
|
|
break;
|
|
|
}
|
|
|
|
|
|
} catch (Exception ex) {
|
|
|
- LOG.error("Compaction failed for region " +
|
|
|
- e.getRegion().getRegionName(), ex);
|
|
|
+ LOG.error("Compaction failed" +
|
|
|
+ (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
|
|
|
+ ex);
|
|
|
if (!checkFileSystem()) {
|
|
|
break;
|
|
|
}
|
|
@@ -469,47 +464,14 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
QueueEntry e = null;
|
|
|
try {
|
|
|
e = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
|
|
|
-
|
|
|
- } catch (InterruptedException ex) {
|
|
|
- continue;
|
|
|
-
|
|
|
- } catch (ConcurrentModificationException ex) {
|
|
|
- continue;
|
|
|
-
|
|
|
- }
|
|
|
- synchronized(cacheFlusherLock) { // Don't interrupt while we're working
|
|
|
- if (e != null) {
|
|
|
- try {
|
|
|
- if (e.getRegion().flushcache()) {
|
|
|
- compactor.compactionRequested(e);
|
|
|
- }
|
|
|
-
|
|
|
- } catch (DroppedSnapshotException ex) {
|
|
|
- // Cache flush can fail in a few places. If it fails in a critical
|
|
|
- // section, we get a DroppedSnapshotException and a replay of hlog
|
|
|
- // is required. Currently the only way to do this is a restart of
|
|
|
- // the server.
|
|
|
- LOG.fatal("Replay of hlog required. Forcing server restart", ex);
|
|
|
- if (!checkFileSystem()) {
|
|
|
- break;
|
|
|
- }
|
|
|
- HRegionServer.this.stop();
|
|
|
-
|
|
|
- } catch (IOException ex) {
|
|
|
- LOG.error("Cache flush failed for region " +
|
|
|
- e.getRegion().getRegionName(),
|
|
|
- RemoteExceptionHandler.checkIOException(ex));
|
|
|
- if (!checkFileSystem()) {
|
|
|
- break;
|
|
|
- }
|
|
|
-
|
|
|
- } catch (Exception ex) {
|
|
|
- LOG.error("Cache flush failed for region " +
|
|
|
- e.getRegion().getRegionName(), ex);
|
|
|
- if (!checkFileSystem()) {
|
|
|
- break;
|
|
|
- }
|
|
|
+ if (e == null) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ synchronized(cacheFlusherLock) { // Don't interrupt while we're working
|
|
|
+ if (e.getRegion().flushcache()) {
|
|
|
+ compactor.compactionRequested(e);
|
|
|
}
|
|
|
+
|
|
|
e.setExpirationTime(System.currentTimeMillis() +
|
|
|
optionalFlushPeriod);
|
|
|
flushQueue.add(e);
|
|
@@ -537,6 +499,38 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ } catch (InterruptedException ex) {
|
|
|
+ continue;
|
|
|
+
|
|
|
+ } catch (ConcurrentModificationException ex) {
|
|
|
+ continue;
|
|
|
+
|
|
|
+ } catch (DroppedSnapshotException ex) {
|
|
|
+ // Cache flush can fail in a few places. If it fails in a critical
|
|
|
+ // section, we get a DroppedSnapshotException and a replay of hlog
|
|
|
+ // is required. Currently the only way to do this is a restart of
|
|
|
+ // the server.
|
|
|
+ LOG.fatal("Replay of hlog required. Forcing server restart", ex);
|
|
|
+ if (!checkFileSystem()) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ HRegionServer.this.stop();
|
|
|
+
|
|
|
+ } catch (IOException ex) {
|
|
|
+ LOG.error("Cache flush failed" +
|
|
|
+ (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
|
|
|
+ RemoteExceptionHandler.checkIOException(ex));
|
|
|
+ if (!checkFileSystem()) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (Exception ex) {
|
|
|
+ LOG.error("Cache flush failed" +
|
|
|
+ (e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
|
|
|
+ ex);
|
|
|
+ if (!checkFileSystem()) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
flushQueue.clear();
|
|
@@ -811,13 +805,15 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
}
|
|
|
// Reset tries count if we had a successful transaction.
|
|
|
tries = 0;
|
|
|
- } catch (IOException e) {
|
|
|
- e = RemoteExceptionHandler.checkIOException(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ if (e instanceof IOException) {
|
|
|
+ e = RemoteExceptionHandler.checkIOException((IOException) e);
|
|
|
+ }
|
|
|
if(tries < this.numRetries) {
|
|
|
LOG.warn("Processing message (Retry: " + tries + ")", e);
|
|
|
tries++;
|
|
|
} else {
|
|
|
- LOG.error("Exceeded max retries: " + this.numRetries, e);
|
|
|
+ LOG.fatal("Exceeded max retries: " + this.numRetries, e);
|
|
|
if (!checkFileSystem()) {
|
|
|
continue;
|
|
|
}
|
|
@@ -826,10 +822,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
this.sleeper.sleep(lastMsg);
|
|
|
- } // while (!stopRequested.get())
|
|
|
- }
|
|
|
+ } // for
|
|
|
+ } // while (!stopRequested.get())
|
|
|
} catch (Throwable t) {
|
|
|
LOG.fatal("Unhandled exception. Aborting...", t);
|
|
|
abort();
|
|
@@ -1148,19 +1143,16 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
/** {@inheritDoc} */
|
|
|
public void run() {
|
|
|
try {
|
|
|
- for(ToDoEntry e = null; !stopRequested.get(); ) {
|
|
|
+ while(!stopRequested.get()) {
|
|
|
+ ToDoEntry e = null;
|
|
|
try {
|
|
|
e = toDo.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
|
|
|
- } catch (InterruptedException ex) {
|
|
|
- // continue
|
|
|
- }
|
|
|
- if(e == null || stopRequested.get()) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- try {
|
|
|
+ if(e == null || stopRequested.get()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
LOG.info(e.msg.toString());
|
|
|
switch(e.msg.getMsg()) {
|
|
|
-
|
|
|
+
|
|
|
case HMsg.MSG_REGIONSERVER_QUIESCE:
|
|
|
closeUserRegions();
|
|
|
break;
|
|
@@ -1185,19 +1177,24 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|
|
"Impossible state during msg processing. Instruction: "
|
|
|
+ e.msg.toString());
|
|
|
}
|
|
|
- } catch (IOException ie) {
|
|
|
- ie = RemoteExceptionHandler.checkIOException(ie);
|
|
|
- if(e.tries < numRetries) {
|
|
|
- LOG.warn(ie);
|
|
|
+ } catch (InterruptedException ex) {
|
|
|
+ // continue
|
|
|
+ } catch (Exception ex) {
|
|
|
+ if (ex instanceof IOException) {
|
|
|
+ ex = RemoteExceptionHandler.checkIOException((IOException) ex);
|
|
|
+ }
|
|
|
+ if(e != null && e.tries < numRetries) {
|
|
|
+ LOG.warn(ex);
|
|
|
e.tries++;
|
|
|
try {
|
|
|
toDo.put(e);
|
|
|
- } catch (InterruptedException ex) {
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
throw new RuntimeException("Putting into msgQueue was " +
|
|
|
- "interrupted.", ex);
|
|
|
+ "interrupted.", ex);
|
|
|
}
|
|
|
} else {
|
|
|
- LOG.error("unable to process message: " + e.msg.toString(), ie);
|
|
|
+ LOG.error("unable to process message" +
|
|
|
+ (e != null ? (": " + e.msg.toString()) : ""), ex);
|
|
|
if (!checkFileSystem()) {
|
|
|
break;
|
|
|
}
|