|
@@ -36,7 +36,6 @@ import java.util.Iterator;
|
|
|
import java.util.Map.Entry;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
-import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
|
import javax.net.SocketFactory;
|
|
|
|
|
@@ -178,8 +177,8 @@ public class Client {
|
|
|
private ConnectionId remoteId;
|
|
|
private Socket socket = null; // connected socket
|
|
|
private DataInputStream in;
|
|
|
- private AtomicReference<DataOutputStream> out =
|
|
|
- new AtomicReference<DataOutputStream>();
|
|
|
+ private DataOutputStream out;
|
|
|
+
|
|
|
// currently active calls
|
|
|
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
|
|
|
private AtomicLong lastActivity = new AtomicLong();// last I/O activity time
|
|
@@ -311,8 +310,8 @@ public class Client {
|
|
|
}
|
|
|
this.in = new DataInputStream(new BufferedInputStream
|
|
|
(new PingInputStream(NetUtils.getInputStream(socket))));
|
|
|
- this.out.set(new DataOutputStream
|
|
|
- (new BufferedOutputStream(NetUtils.getOutputStream(socket))));
|
|
|
+ this.out = new DataOutputStream
|
|
|
+ (new BufferedOutputStream(NetUtils.getOutputStream(socket)));
|
|
|
writeHeader();
|
|
|
|
|
|
// update last activity time
|
|
@@ -370,7 +369,6 @@ public class Client {
|
|
|
* Out is not synchronized because only the first thread does this.
|
|
|
*/
|
|
|
private void writeHeader() throws IOException {
|
|
|
- DataOutputStream out = this.out.get();
|
|
|
out.write(Server.HEADER.array());
|
|
|
out.write(Server.CURRENT_VERSION);
|
|
|
//When there are more fields we can have ConnectionHeader Writable.
|
|
@@ -403,13 +401,13 @@ public class Client {
|
|
|
return true;
|
|
|
} else if (shouldCloseConnection.get()) {
|
|
|
return false;
|
|
|
- } else if (!running.get()) { //get stopped
|
|
|
+ } else if (calls.isEmpty()) { // idle connection closed or stopped
|
|
|
+ markClosed(null);
|
|
|
+ return false;
|
|
|
+ } else { // get stopped but there are still pending requests
|
|
|
markClosed((IOException)new IOException().initCause(
|
|
|
new InterruptedException()));
|
|
|
return false;
|
|
|
- } else { // closed because it has been idle for more than maxIdleTime
|
|
|
- markClosed(null);
|
|
|
- return false;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -425,7 +423,6 @@ public class Client {
|
|
|
if ( curTime - lastActivity.get() >= pingInterval) {
|
|
|
lastActivity.set(curTime);
|
|
|
synchronized (out) {
|
|
|
- DataOutputStream out = this.out.get();
|
|
|
out.writeInt(PING_CALL_ID);
|
|
|
out.flush();
|
|
|
}
|
|
@@ -460,9 +457,6 @@ public class Client {
|
|
|
DataOutputBuffer d=null;
|
|
|
try {
|
|
|
synchronized (this.out) {
|
|
|
- DataOutputStream out = this.out.get();
|
|
|
- if (out==null) return; // socket has closed
|
|
|
-
|
|
|
if (LOG.isDebugEnabled())
|
|
|
LOG.debug(getName() + " sending #" + call.id);
|
|
|
|
|
@@ -531,42 +525,37 @@ public class Client {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- synchronized (out) {
|
|
|
- // release the resources
|
|
|
- // first thing to do;take the connection out of the connection list
|
|
|
- synchronized (connections) {
|
|
|
- if (connections.get(remoteId) == this) {
|
|
|
- connections.remove(remoteId);
|
|
|
- }
|
|
|
+ // release the resources
|
|
|
+ // first thing to do;take the connection out of the connection list
|
|
|
+ synchronized (connections) {
|
|
|
+ if (connections.get(remoteId) == this) {
|
|
|
+ connections.remove(remoteId);
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- // close the socket and streams
|
|
|
- IOUtils.closeStream(in);
|
|
|
- in = null;
|
|
|
- IOUtils.closeStream(out.getAndSet(null));
|
|
|
- IOUtils.closeSocket(socket);
|
|
|
- socket = null;
|
|
|
-
|
|
|
- // clean up all calls
|
|
|
- if (closeException == null) {
|
|
|
- if (!calls.isEmpty()) {
|
|
|
- LOG.warn(
|
|
|
- "A connection is closed for no cause and calls are not empty");
|
|
|
-
|
|
|
- // clean up calls anyway
|
|
|
- closeException = new IOException("Unexpected closed connection");
|
|
|
- cleanupCalls();
|
|
|
- }
|
|
|
- } else {
|
|
|
- // log the info
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("closing ipc connection to " + remoteId.address + ": " +
|
|
|
- closeException.getMessage(),closeException);
|
|
|
- }
|
|
|
+ // close the streams and therefore the socket
|
|
|
+ IOUtils.closeStream(out);
|
|
|
+ IOUtils.closeStream(in);
|
|
|
+
|
|
|
+ // clean up all calls
|
|
|
+ if (closeException == null) {
|
|
|
+ if (!calls.isEmpty()) {
|
|
|
+ LOG.warn(
|
|
|
+ "A connection is closed for no cause and calls are not empty");
|
|
|
|
|
|
- // cleanup calls
|
|
|
+ // clean up calls anyway
|
|
|
+ closeException = new IOException("Unexpected closed connection");
|
|
|
cleanupCalls();
|
|
|
}
|
|
|
+ } else {
|
|
|
+ // log the info
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("closing ipc connection to " + remoteId.address + ": " +
|
|
|
+ closeException.getMessage(),closeException);
|
|
|
+ }
|
|
|
+
|
|
|
+ // cleanup calls
|
|
|
+ cleanupCalls();
|
|
|
}
|
|
|
if (LOG.isDebugEnabled())
|
|
|
LOG.debug(getName() + ": closed");
|