|
@@ -41,6 +41,7 @@ public class SyncRequestProcessor extends Thread implements RequestProcessor {
|
|
private final RequestProcessor nextProcessor;
|
|
private final RequestProcessor nextProcessor;
|
|
|
|
|
|
private Thread snapInProcess = null;
|
|
private Thread snapInProcess = null;
|
|
|
|
+ volatile private boolean running;
|
|
|
|
|
|
/**
|
|
/**
|
|
* Transactions that have been written and are waiting to be flushed to
|
|
* Transactions that have been written and are waiting to be flushed to
|
|
@@ -62,6 +63,7 @@ public class SyncRequestProcessor extends Thread implements RequestProcessor {
|
|
super("SyncThread:" + zks.getServerId());
|
|
super("SyncThread:" + zks.getServerId());
|
|
this.zks = zks;
|
|
this.zks = zks;
|
|
this.nextProcessor = nextProcessor;
|
|
this.nextProcessor = nextProcessor;
|
|
|
|
+ running = true;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -147,6 +149,7 @@ public class SyncRequestProcessor extends Thread implements RequestProcessor {
|
|
}
|
|
}
|
|
} catch (Throwable t) {
|
|
} catch (Throwable t) {
|
|
LOG.error("Severe unrecoverable error, exiting", t);
|
|
LOG.error("Severe unrecoverable error, exiting", t);
|
|
|
|
+ running = false;
|
|
System.exit(11);
|
|
System.exit(11);
|
|
}
|
|
}
|
|
LOG.info("SyncRequestProcessor exited!");
|
|
LOG.info("SyncRequestProcessor exited!");
|
|
@@ -170,7 +173,9 @@ public class SyncRequestProcessor extends Thread implements RequestProcessor {
|
|
LOG.info("Shutting down");
|
|
LOG.info("Shutting down");
|
|
queuedRequests.add(requestOfDeath);
|
|
queuedRequests.add(requestOfDeath);
|
|
try {
|
|
try {
|
|
- this.join();
|
|
|
|
|
|
+ if(running){
|
|
|
|
+ this.join();
|
|
|
|
+ }
|
|
} catch(InterruptedException e) {
|
|
} catch(InterruptedException e) {
|
|
LOG.warn("Interrupted while wating for " + this + " to finish");
|
|
LOG.warn("Interrupted while wating for " + this + " to finish");
|
|
}
|
|
}
|