|
@@ -26,6 +26,7 @@ import java.net.ConnectException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.net.Socket;
|
|
|
import java.net.SocketAddress;
|
|
|
+import java.net.SocketException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.ArrayList;
|
|
@@ -1051,7 +1052,7 @@ public class ClientCnxn {
|
|
|
// throws a LoginException: see startConnect() below.
|
|
|
private boolean saslLoginFailed = false;
|
|
|
|
|
|
- private void startConnect() throws IOException {
|
|
|
+ private void startConnect(InetSocketAddress addr) throws IOException {
|
|
|
// initializing it for new connection
|
|
|
saslLoginFailed = false;
|
|
|
if(!isFirstConnect){
|
|
@@ -1063,14 +1064,6 @@ public class ClientCnxn {
|
|
|
}
|
|
|
state = States.CONNECTING;
|
|
|
|
|
|
- InetSocketAddress addr;
|
|
|
- if (rwServerAddress != null) {
|
|
|
- addr = rwServerAddress;
|
|
|
- rwServerAddress = null;
|
|
|
- } else {
|
|
|
- addr = hostProvider.next(1000);
|
|
|
- }
|
|
|
-
|
|
|
String hostPort = addr.getHostString() + ":" + addr.getPort();
|
|
|
MDC.put("myid", hostPort);
|
|
|
setName(getName().replaceAll("\\(.*\\)", "(" + hostPort + ")"));
|
|
@@ -1123,6 +1116,7 @@ public class ClientCnxn {
|
|
|
int to;
|
|
|
long lastPingRwServer = Time.currentElapsedTime();
|
|
|
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
|
|
|
+ InetSocketAddress serverAddress = null;
|
|
|
while (state.isAlive()) {
|
|
|
try {
|
|
|
if (!clientCnxnSocket.isConnected()) {
|
|
@@ -1130,7 +1124,13 @@ public class ClientCnxn {
|
|
|
if (closing) {
|
|
|
break;
|
|
|
}
|
|
|
- startConnect();
|
|
|
+ if (rwServerAddress != null) {
|
|
|
+ serverAddress = rwServerAddress;
|
|
|
+ rwServerAddress = null;
|
|
|
+ } else {
|
|
|
+ serverAddress = hostProvider.next(1000);
|
|
|
+ }
|
|
|
+ startConnect(serverAddress);
|
|
|
clientCnxnSocket.updateLastSendAndHeard();
|
|
|
}
|
|
|
|
|
@@ -1231,14 +1231,14 @@ public class ClientCnxn {
|
|
|
LOG.info(e.getMessage() + RETRY_CONN_MSG);
|
|
|
} else if (e instanceof RWServerFoundException) {
|
|
|
LOG.info(e.getMessage());
|
|
|
+ } else if (e instanceof SocketException) {
|
|
|
+ LOG.info("Socket error occurred: {}: {}", serverAddress, e.getMessage());
|
|
|
} else {
|
|
|
- LOG.warn(
|
|
|
- "Session 0x"
|
|
|
- + Long.toHexString(getSessionId())
|
|
|
- + " for server "
|
|
|
- + clientCnxnSocket.getRemoteSocketAddress()
|
|
|
- + ", unexpected error"
|
|
|
- + RETRY_CONN_MSG, e);
|
|
|
+ LOG.warn("Session 0x{} for server {}, unexpected error{}",
|
|
|
+ Long.toHexString(getSessionId()),
|
|
|
+ serverAddress,
|
|
|
+ RETRY_CONN_MSG,
|
|
|
+ e);
|
|
|
}
|
|
|
// At this point, there might still be new packets appended to outgoingQueue.
|
|
|
// they will be handled in next connection or cleared up if closed.
|