|
@@ -70,6 +70,7 @@ import java.util.concurrent.*;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
+import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
|
import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
|
|
import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID;
|
|
import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
|
|
import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
|
|
@@ -440,6 +441,8 @@ public class Client implements AutoCloseable {
|
|
|
|
|
|
private final Object sendRpcRequestLock = new Object();
|
|
private final Object sendRpcRequestLock = new Object();
|
|
|
|
|
|
|
|
+ private AtomicReference<Thread> connectingThread = new AtomicReference<>();
|
|
|
|
+
|
|
public Connection(ConnectionId remoteId, int serviceClass) throws IOException {
|
|
public Connection(ConnectionId remoteId, int serviceClass) throws IOException {
|
|
this.remoteId = remoteId;
|
|
this.remoteId = remoteId;
|
|
this.server = remoteId.getAddress();
|
|
this.server = remoteId.getAddress();
|
|
@@ -777,6 +780,7 @@ public class Client implements AutoCloseable {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
try {
|
|
try {
|
|
|
|
+ connectingThread.set(Thread.currentThread());
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug("Connecting to "+server);
|
|
LOG.debug("Connecting to "+server);
|
|
}
|
|
}
|
|
@@ -862,6 +866,8 @@ public class Client implements AutoCloseable {
|
|
markClosed(new IOException("Couldn't set up IO streams: " + t, t));
|
|
markClosed(new IOException("Couldn't set up IO streams: " + t, t));
|
|
}
|
|
}
|
|
close();
|
|
close();
|
|
|
|
+ } finally {
|
|
|
|
+ connectingThread.set(null);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1215,6 +1221,13 @@ public class Client implements AutoCloseable {
|
|
notifyAll();
|
|
notifyAll();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ private void interruptConnectingThread() {
|
|
|
|
+ Thread connThread = connectingThread.get();
|
|
|
|
+ if (connThread != null) {
|
|
|
|
+ connThread.interrupt();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
/** Close the connection. */
|
|
/** Close the connection. */
|
|
private synchronized void close() {
|
|
private synchronized void close() {
|
|
@@ -1317,6 +1330,7 @@ public class Client implements AutoCloseable {
|
|
// wake up all connections
|
|
// wake up all connections
|
|
for (Connection conn : connections.values()) {
|
|
for (Connection conn : connections.values()) {
|
|
conn.interrupt();
|
|
conn.interrupt();
|
|
|
|
+ conn.interruptConnectingThread();
|
|
}
|
|
}
|
|
|
|
|
|
// wait until all connections are closed
|
|
// wait until all connections are closed
|