|
@@ -315,9 +315,8 @@ public abstract class Server {
|
|
selector= Selector.open();
|
|
selector= Selector.open();
|
|
readers = new Reader[readThreads];
|
|
readers = new Reader[readThreads];
|
|
for (int i = 0; i < readThreads; i++) {
|
|
for (int i = 0; i < readThreads; i++) {
|
|
- Selector readSelector = Selector.open();
|
|
|
|
- Reader reader = new Reader("Socket Reader #" + (i + 1) + " for port " + port,
|
|
|
|
- readSelector);
|
|
|
|
|
|
+ Reader reader = new Reader(
|
|
|
|
+ "Socket Reader #" + (i + 1) + " for port " + port);
|
|
readers[i] = reader;
|
|
readers[i] = reader;
|
|
reader.start();
|
|
reader.start();
|
|
}
|
|
}
|
|
@@ -330,42 +329,53 @@ public abstract class Server {
|
|
|
|
|
|
private class Reader extends Thread {
|
|
private class Reader extends Thread {
|
|
private volatile boolean adding = false;
|
|
private volatile boolean adding = false;
|
|
- private Selector readSelector = null;
|
|
|
|
|
|
+ private final Selector readSelector;
|
|
|
|
|
|
- Reader(String name, Selector readSelector) {
|
|
|
|
|
|
+ Reader(String name) throws IOException {
|
|
super(name);
|
|
super(name);
|
|
- this.readSelector = readSelector;
|
|
|
|
|
|
+
|
|
|
|
+ this.readSelector = Selector.open();
|
|
}
|
|
}
|
|
|
|
+
|
|
public void run() {
|
|
public void run() {
|
|
LOG.info("Starting " + getName());
|
|
LOG.info("Starting " + getName());
|
|
- synchronized (this) {
|
|
|
|
- while (running) {
|
|
|
|
- SelectionKey key = null;
|
|
|
|
- try {
|
|
|
|
- readSelector.select();
|
|
|
|
- while (adding) {
|
|
|
|
- this.wait(1000);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
|
|
|
|
- while (iter.hasNext()) {
|
|
|
|
- key = iter.next();
|
|
|
|
- iter.remove();
|
|
|
|
- if (key.isValid()) {
|
|
|
|
- if (key.isReadable()) {
|
|
|
|
- doRead(key);
|
|
|
|
- }
|
|
|
|
|
|
+ try {
|
|
|
|
+ doRunLoop();
|
|
|
|
+ } finally {
|
|
|
|
+ try {
|
|
|
|
+ readSelector.close();
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ LOG.error("Error closing read selector in " + this.getName(), ioe);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private synchronized void doRunLoop() {
|
|
|
|
+ while (running) {
|
|
|
|
+ SelectionKey key = null;
|
|
|
|
+ try {
|
|
|
|
+ readSelector.select();
|
|
|
|
+ while (adding) {
|
|
|
|
+ this.wait(1000);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
|
|
|
|
+ while (iter.hasNext()) {
|
|
|
|
+ key = iter.next();
|
|
|
|
+ iter.remove();
|
|
|
|
+ if (key.isValid()) {
|
|
|
|
+ if (key.isReadable()) {
|
|
|
|
+ doRead(key);
|
|
}
|
|
}
|
|
- key = null;
|
|
|
|
- }
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
- if (running) { // unexpected -- log it
|
|
|
|
- LOG.info(getName() + " caught: " +
|
|
|
|
- StringUtils.stringifyException(e));
|
|
|
|
}
|
|
}
|
|
- } catch (IOException ex) {
|
|
|
|
- LOG.error("Error in Reader", ex);
|
|
|
|
|
|
+ key = null;
|
|
}
|
|
}
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
+ if (running) { // unexpected -- log it
|
|
|
|
+ LOG.info(getName() + " unexpectedly interrupted", e);
|
|
|
|
+ }
|
|
|
|
+ } catch (IOException ex) {
|
|
|
|
+ LOG.error("Error in Reader", ex);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -605,7 +615,7 @@ public abstract class Server {
|
|
|
|
|
|
// Sends responses of RPC back to clients.
|
|
// Sends responses of RPC back to clients.
|
|
private class Responder extends Thread {
|
|
private class Responder extends Thread {
|
|
- private Selector writeSelector;
|
|
|
|
|
|
+ private final Selector writeSelector;
|
|
private int pending; // connections waiting to register
|
|
private int pending; // connections waiting to register
|
|
|
|
|
|
final static int PURGE_INTERVAL = 900000; // 15mins
|
|
final static int PURGE_INTERVAL = 900000; // 15mins
|
|
@@ -621,6 +631,19 @@ public abstract class Server {
|
|
public void run() {
|
|
public void run() {
|
|
LOG.info(getName() + ": starting");
|
|
LOG.info(getName() + ": starting");
|
|
SERVER.set(Server.this);
|
|
SERVER.set(Server.this);
|
|
|
|
+ try {
|
|
|
|
+ doRunLoop();
|
|
|
|
+ } finally {
|
|
|
|
+ LOG.info("Stopping " + this.getName());
|
|
|
|
+ try {
|
|
|
|
+ writeSelector.close();
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ LOG.error("Couldn't close write selector in " + this.getName(), ioe);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void doRunLoop() {
|
|
long lastPurgeTime = 0; // last check for old calls.
|
|
long lastPurgeTime = 0; // last check for old calls.
|
|
|
|
|
|
while (running) {
|
|
while (running) {
|
|
@@ -682,11 +705,9 @@ public abstract class Server {
|
|
LOG.warn("Out of Memory in server select", e);
|
|
LOG.warn("Out of Memory in server select", e);
|
|
try { Thread.sleep(60000); } catch (Exception ie) {}
|
|
try { Thread.sleep(60000); } catch (Exception ie) {}
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
- LOG.warn("Exception in Responder " +
|
|
|
|
- StringUtils.stringifyException(e));
|
|
|
|
|
|
+ LOG.warn("Exception in Responder", e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- LOG.info("Stopping " + this.getName());
|
|
|
|
}
|
|
}
|
|
|
|
|
|
private void doAsyncWrite(SelectionKey key) throws IOException {
|
|
private void doAsyncWrite(SelectionKey key) throws IOException {
|
|
@@ -1446,12 +1467,10 @@ public abstract class Server {
|
|
}
|
|
}
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
if (running) { // unexpected -- log it
|
|
if (running) { // unexpected -- log it
|
|
- LOG.info(getName() + " caught: " +
|
|
|
|
- StringUtils.stringifyException(e));
|
|
|
|
|
|
+ LOG.info(getName() + " unexpectedly interrupted", e);
|
|
}
|
|
}
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
- LOG.info(getName() + " caught: " +
|
|
|
|
- StringUtils.stringifyException(e));
|
|
|
|
|
|
+ LOG.info(getName() + " caught an exception", e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
LOG.info(getName() + ": exiting");
|
|
LOG.info(getName() + ": exiting");
|