|
@@ -32,6 +32,7 @@ import java.util.Map;
|
|
import java.util.Objects;
|
|
import java.util.Objects;
|
|
import java.util.Queue;
|
|
import java.util.Queue;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
@@ -40,6 +41,7 @@ import javax.security.sasl.SaslException;
|
|
import org.apache.jute.BinaryInputArchive;
|
|
import org.apache.jute.BinaryInputArchive;
|
|
import org.apache.jute.BinaryOutputArchive;
|
|
import org.apache.jute.BinaryOutputArchive;
|
|
import org.apache.zookeeper.ZooDefs.OpCode;
|
|
import org.apache.zookeeper.ZooDefs.OpCode;
|
|
|
|
+import org.apache.zookeeper.common.Time;
|
|
import org.apache.zookeeper.server.Request;
|
|
import org.apache.zookeeper.server.Request;
|
|
import org.apache.zookeeper.server.ServerMetrics;
|
|
import org.apache.zookeeper.server.ServerMetrics;
|
|
import org.apache.zookeeper.server.TxnLogProposalIterator;
|
|
import org.apache.zookeeper.server.TxnLogProposalIterator;
|
|
@@ -63,12 +65,21 @@ public class LearnerHandler extends ZooKeeperThread {
|
|
|
|
|
|
private static final Logger LOG = LoggerFactory.getLogger(LearnerHandler.class);
|
|
private static final Logger LOG = LoggerFactory.getLogger(LearnerHandler.class);
|
|
|
|
|
|
|
|
+ public static final String LEADER_CLOSE_SOCKET_ASYNC = "leader.closeSocketAsync";
|
|
|
|
+ public static final boolean closeSocketAsync = Boolean.parseBoolean(System.getProperty(LEADER_CLOSE_SOCKET_ASYNC, "false"));
|
|
|
|
+
|
|
|
|
+ static {
|
|
|
|
+ LOG.info("{} = {}", LEADER_CLOSE_SOCKET_ASYNC, closeSocketAsync);
|
|
|
|
+ }
|
|
|
|
+
|
|
protected final Socket sock;
|
|
protected final Socket sock;
|
|
|
|
|
|
public Socket getSocket() {
|
|
public Socket getSocket() {
|
|
return sock;
|
|
return sock;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ AtomicBoolean sockBeingClosed = new AtomicBoolean(false);
|
|
|
|
+
|
|
final LearnerMaster learnerMaster;
|
|
final LearnerMaster learnerMaster;
|
|
|
|
|
|
/** Deadline for receiving the next ack. If we are bootstrapping then
|
|
/** Deadline for receiving the next ack. If we are bootstrapping then
|
|
@@ -277,11 +288,8 @@ public class LearnerHandler extends ZooKeeperThread {
|
|
}
|
|
}
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
LOG.error("Server failed to authenticate quorum learner, addr: {}, closing connection", sock.getRemoteSocketAddress(), e);
|
|
LOG.error("Server failed to authenticate quorum learner, addr: {}, closing connection", sock.getRemoteSocketAddress(), e);
|
|
- try {
|
|
|
|
- sock.close();
|
|
|
|
- } catch (IOException ie) {
|
|
|
|
- LOG.error("Exception while closing socket", ie);
|
|
|
|
- }
|
|
|
|
|
|
+ closeSocket();
|
|
|
|
+
|
|
throw new SaslException("Authentication failure: " + e.getMessage());
|
|
throw new SaslException("Authentication failure: " + e.getMessage());
|
|
}
|
|
}
|
|
|
|
|
|
@@ -357,17 +365,11 @@ public class LearnerHandler extends ZooKeeperThread {
|
|
packetsSent.incrementAndGet();
|
|
packetsSent.incrementAndGet();
|
|
messageTracker.trackSent(p.getType());
|
|
messageTracker.trackSent(p.getType());
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
- if (!sock.isClosed()) {
|
|
|
|
- LOG.warn("Unexpected exception at {}", this, e);
|
|
|
|
- try {
|
|
|
|
- // this will cause everything to shutdown on
|
|
|
|
- // this learner handler and will help notify
|
|
|
|
- // the learner/observer instantaneously
|
|
|
|
- sock.close();
|
|
|
|
- } catch (IOException ie) {
|
|
|
|
- LOG.warn("Error closing socket for handler {}", this, ie);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.error("Exception while sending packets in LearnerHandler", e);
|
|
|
|
+ // this will cause everything to shutdown on
|
|
|
|
+ // this learner handler and will help notify
|
|
|
|
+ // the learner/observer instantaneously
|
|
|
|
+ closeSocket();
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -710,16 +712,8 @@ public class LearnerHandler extends ZooKeeperThread {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
- if (sock != null && !sock.isClosed()) {
|
|
|
|
- LOG.error("Unexpected exception causing shutdown while sock still open", e);
|
|
|
|
- //close the socket to make sure the
|
|
|
|
- //other side can see it being close
|
|
|
|
- try {
|
|
|
|
- sock.close();
|
|
|
|
- } catch (IOException ie) {
|
|
|
|
- // do nothing
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ LOG.error("Unexpected exception in LearnerHandler: ", e);
|
|
|
|
+ closeSocket();
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
LOG.error("Unexpected exception in LearnerHandler.", e);
|
|
LOG.error("Unexpected exception in LearnerHandler.", e);
|
|
} catch (SyncThrottleException e) {
|
|
} catch (SyncThrottleException e) {
|
|
@@ -1050,13 +1044,9 @@ public class LearnerHandler extends ZooKeeperThread {
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
LOG.warn("Ignoring unexpected exception", e);
|
|
LOG.warn("Ignoring unexpected exception", e);
|
|
}
|
|
}
|
|
- try {
|
|
|
|
- if (sock != null && !sock.isClosed()) {
|
|
|
|
- sock.close();
|
|
|
|
- }
|
|
|
|
- } catch (IOException e) {
|
|
|
|
- LOG.warn("Ignoring unexpected exception during socket close", e);
|
|
|
|
- }
|
|
|
|
|
|
+
|
|
|
|
+ closeSocket();
|
|
|
|
+
|
|
this.interrupt();
|
|
this.interrupt();
|
|
learnerMaster.removeLearnerHandler(this);
|
|
learnerMaster.removeLearnerHandler(this);
|
|
learnerMaster.unregisterLearnerHandlerBean(this);
|
|
learnerMaster.unregisterLearnerHandlerBean(this);
|
|
@@ -1157,4 +1147,33 @@ public class LearnerHandler extends ZooKeeperThread {
|
|
needOpPacket = value;
|
|
needOpPacket = value;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ void closeSocket() {
|
|
|
|
+ if (sock != null && !sock.isClosed() && sockBeingClosed.compareAndSet(false, true)) {
|
|
|
|
+ if (closeSocketAsync) {
|
|
|
|
+ LOG.info("Asynchronously closing socket to learner {}.", getSid());
|
|
|
|
+ closeSockAsync();
|
|
|
|
+ } else {
|
|
|
|
+ LOG.info("Synchronously closing socket to learner {}.", getSid());
|
|
|
|
+ closeSockSync();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ void closeSockAsync() {
|
|
|
|
+ final Thread closingThread = new Thread(() -> closeSockSync(), "CloseSocketThread(sid:" + this.sid);
|
|
|
|
+ closingThread.setDaemon(true);
|
|
|
|
+ closingThread.start();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ void closeSockSync() {
|
|
|
|
+ try {
|
|
|
|
+ if (sock != null) {
|
|
|
|
+ long startTime = Time.currentElapsedTime();
|
|
|
|
+ sock.close();
|
|
|
|
+ ServerMetrics.getMetrics().SOCKET_CLOSING_TIME.add(Time.currentElapsedTime() - startTime);
|
|
|
|
+ }
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ LOG.warn("Ignoring error closing connection to learner {}", getSid(), e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|