|
@@ -24,18 +24,18 @@ import org.apache.zookeeper.server.ServerMetrics;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
-import org.mockito.ArgumentMatchers;
|
|
|
-import org.mockito.invocation.InvocationOnMock;
|
|
|
-import org.mockito.stubbing.Answer;
|
|
|
|
|
|
import java.io.BufferedOutputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.net.Socket;
|
|
|
import java.util.Map;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import static org.hamcrest.number.OrderingComparison.greaterThan;
|
|
|
import static org.mockito.ArgumentMatchers.any;
|
|
|
+import static org.mockito.ArgumentMatchers.anyString;
|
|
|
import static org.mockito.Mockito.doAnswer;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
import static org.mockito.Mockito.when;
|
|
@@ -43,6 +43,7 @@ import static org.mockito.Mockito.when;
|
|
|
public class LearnerHandlerMetricsTest {
|
|
|
private MockLearnerHandler learnerHandler;
|
|
|
private long sid = 5;
|
|
|
+ private volatile CountDownLatch allSentLatch = null;
|
|
|
|
|
|
class MockLearnerHandler extends LearnerHandler {
|
|
|
MockLearnerHandler(Socket socket, Leader leader) throws IOException {
|
|
@@ -60,36 +61,39 @@ public class LearnerHandlerMetricsTest {
|
|
|
|
|
|
//adding 5ms artificial delay when sending each packet
|
|
|
BinaryOutputArchive oa = mock(BinaryOutputArchive.class);
|
|
|
- doAnswer(new Answer() {
|
|
|
- @Override
|
|
|
- public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
|
|
|
- Thread.sleep(5);
|
|
|
- return null;
|
|
|
+ doAnswer(invocationOnMock -> {Thread.sleep(5); return null;})
|
|
|
+ .when(oa).writeRecord(any(QuorumPacket.class), anyString());
|
|
|
+
|
|
|
+
|
|
|
+ BufferedOutputStream bos = mock(BufferedOutputStream.class);
|
|
|
+ // flush is called when all packets are sent and the queue is empty
|
|
|
+ doAnswer(invocationOnMock -> {
|
|
|
+ if (allSentLatch != null) {
|
|
|
+ allSentLatch.countDown();
|
|
|
}
|
|
|
- }).when(oa).writeRecord(any(QuorumPacket.class), ArgumentMatchers.anyString());
|
|
|
+ return null;
|
|
|
+ }).when(bos).flush();
|
|
|
|
|
|
learnerHandler = new MockLearnerHandler(socket, leader);
|
|
|
learnerHandler.setOutputArchive(oa);
|
|
|
- learnerHandler.setBufferedOutput(mock(BufferedOutputStream.class));
|
|
|
+ learnerHandler.setBufferedOutput(bos);
|
|
|
learnerHandler.sid = sid;
|
|
|
}
|
|
|
|
|
|
@Test
|
|
|
- public void testMetrics() {
|
|
|
+ public void testMetrics() throws InterruptedException {
|
|
|
ServerMetrics.getMetrics().resetAll();
|
|
|
|
|
|
//adding 1001 packets in the queue, two marker packets will be added since the interval is every 1000 packets
|
|
|
for (int i=0; i<1001; i++) {
|
|
|
learnerHandler.queuePacket(new QuorumPacket());
|
|
|
}
|
|
|
- learnerHandler.startSendingPackets();
|
|
|
|
|
|
- //make sure we have enough time to send all the packets in the queue
|
|
|
- try {
|
|
|
- Thread.sleep(8000);
|
|
|
- } catch (Exception e) {
|
|
|
+ allSentLatch = new CountDownLatch(1);
|
|
|
|
|
|
- }
|
|
|
+ learnerHandler.startSendingPackets();
|
|
|
+
|
|
|
+ allSentLatch.await(8, TimeUnit.SECONDS);
|
|
|
|
|
|
Map<String, Object> values = MetricsUtils.currentServerMetrics();
|
|
|
String sidStr = Long.toString(sid);
|