Browse Source

ZOOKEEPER-3847: Add a couple metrics to help track Netty memory usage

Author: Jie Huang <jiehuang@fb.com>

Reviewers: Michael Han <hanm@apache.org>

Closes #1367 from jhuan31/ZOOKEEPER-3847
Jie Huang 5 years ago
parent
commit
b1e67ca280

+ 2 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java

@@ -61,8 +61,9 @@ public class DumbWatcher extends ServerCnxn {
     }
 
     @Override
-    public void sendResponse(ReplyHeader h, Record r, String tag,
+    public int sendResponse(ReplyHeader h, Record r, String tag,
                              String cacheKey, Stat stat, int opCode) throws IOException {
+        return 0;
     }
 
     @Override

+ 8 - 5
zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java

@@ -171,6 +171,7 @@ public class FinalRequestProcessor implements RequestProcessor {
         Code err = Code.OK;
         Record rsp = null;
         String path = null;
+        int responseSize = 0;
         try {
             if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {
                 AuditHelper.addAuditLog(request, rc, true);
@@ -213,7 +214,7 @@ public class FinalRequestProcessor implements RequestProcessor {
                 lastOp = "PING";
                 updateStats(request, lastOp, lastZxid);
 
-                cnxn.sendResponse(new ReplyHeader(ClientCnxn.PING_XID, lastZxid, 0), null, "response");
+                responseSize = cnxn.sendResponse(new ReplyHeader(ClientCnxn.PING_XID, lastZxid, 0), null, "response");
                 return;
             }
             case OpCode.createSession: {
@@ -599,7 +600,7 @@ public class FinalRequestProcessor implements RequestProcessor {
 
         try {
             if (path == null || rsp == null) {
-                cnxn.sendResponse(hdr, rsp, "response");
+                responseSize = cnxn.sendResponse(hdr, rsp, "response");
             } else {
                 int opCode = request.type;
                 Stat stat = null;
@@ -610,17 +611,17 @@ public class FinalRequestProcessor implements RequestProcessor {
                     case OpCode.getData : {
                         GetDataResponse getDataResponse = (GetDataResponse) rsp;
                         stat = getDataResponse.getStat();
-                        cnxn.sendResponse(hdr, rsp, "response", path, stat, opCode);
+                        responseSize = cnxn.sendResponse(hdr, rsp, "response", path, stat, opCode);
                         break;
                     }
                     case OpCode.getChildren2 : {
                         GetChildren2Response getChildren2Response = (GetChildren2Response) rsp;
                         stat = getChildren2Response.getStat();
-                        cnxn.sendResponse(hdr, rsp, "response", path, stat, opCode);
+                        responseSize = cnxn.sendResponse(hdr, rsp, "response", path, stat, opCode);
                         break;
                     }
                     default:
-                        cnxn.sendResponse(hdr, rsp, "response");
+                        responseSize = cnxn.sendResponse(hdr, rsp, "response");
                 }
             }
 
@@ -629,6 +630,8 @@ public class FinalRequestProcessor implements RequestProcessor {
             }
         } catch (IOException e) {
             LOG.error("FIXMSG", e);
+        } finally {
+            ServerMetrics.getMetrics().RESPONSE_BYTES.add(responseSize);
         }
     }
 

+ 9 - 3
zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java

@@ -665,13 +665,18 @@ public class NIOServerCnxn extends ServerCnxn {
     private static final ByteBuffer packetSentinel = ByteBuffer.allocate(0);
 
     @Override
-    public void sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, Stat stat, int opCode) {
+    public int sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, Stat stat, int opCode) {
+        int responseSize = 0;
         try {
-            sendBuffer(serialize(h, r, tag, cacheKey, stat, opCode));
+            ByteBuffer[] bb = serialize(h, r, tag, cacheKey, stat, opCode);
+            responseSize = bb[0].getInt();
+            bb[0].rewind();
+            sendBuffer(bb);
             decrOutstandingAndCheckThrottle(h);
         } catch (Exception e) {
             LOG.warn("Unexpected exception. Destruction averted.", e);
         }
+        return responseSize;
     }
 
     /*
@@ -695,7 +700,8 @@ public class NIOServerCnxn extends ServerCnxn {
         // The last parameter OpCode here is used to select the response cache.
         // Passing OpCode.error (with a value of -1) means we don't care, as we don't need
         // response cache on delivering watcher events.
-        sendResponse(h, e, "notification", null, null, ZooDefs.OpCode.error);
+        int responseSize = sendResponse(h, e, "notification", null, null, ZooDefs.OpCode.error);
+        ServerMetrics.getMetrics().WATCH_BYTES.add(responseSize);
     }
 
     /*

+ 9 - 4
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java

@@ -168,7 +168,8 @@ public class NettyServerCnxn extends ServerCnxn {
         WatcherEvent e = event.getWrapper();
 
         try {
-            sendResponse(h, e, "notification");
+            int responseSize = sendResponse(h, e, "notification");
+            ServerMetrics.getMetrics().WATCH_BYTES.add(responseSize);
         } catch (IOException e1) {
             LOG.debug("Problem sending to {}", getRemoteSocketAddress(), e1);
             close();
@@ -176,15 +177,19 @@ public class NettyServerCnxn extends ServerCnxn {
     }
 
     @Override
-    public void sendResponse(ReplyHeader h, Record r, String tag,
+    public int sendResponse(ReplyHeader h, Record r, String tag,
                              String cacheKey, Stat stat, int opCode) throws IOException {
         // cacheKey and stat are used in caching, which is not
         // implemented here. Implementation example can be found in NIOServerCnxn.
         if (closingChannel || !channel.isOpen()) {
-            return;
+            return 0;
         }
-        sendBuffer(serialize(h, r, tag, cacheKey, stat, opCode));
+        ByteBuffer[] bb = serialize(h, r, tag, cacheKey, stat, opCode);
+        int responseSize = bb[0].getInt();
+        bb[0].rewind();
+        sendBuffer(bb);
         decrOutstandingAndCheckThrottle(h);
+        return responseSize;
     }
 
     @Override

+ 3 - 3
zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java

@@ -186,11 +186,11 @@ public abstract class ServerCnxn implements Stats, Watcher {
      *               used to decide which cache (e.g. read response cache,
      *               list of children response cache, ...) object to look up to when applicable.
      */
-    public abstract void sendResponse(ReplyHeader h, Record r, String tag,
+    public abstract int sendResponse(ReplyHeader h, Record r, String tag,
                                       String cacheKey, Stat stat, int opCode) throws IOException;
 
-    public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException {
-        sendResponse(h, r, tag, null, null, -1);
+    public int sendResponse(ReplyHeader h, Record r, String tag) throws IOException {
+        return sendResponse(h, r, tag, null, null, -1);
     }
 
     protected byte[] serializeRecord(Record record) throws IOException {

+ 9 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java

@@ -243,6 +243,9 @@ public final class ServerMetrics {
 
         REQUESTS_NOT_FORWARDED_TO_COMMIT_PROCESSOR = metricsContext.getCounter(
                 "requests_not_forwarded_to_commit_processor");
+
+        RESPONSE_BYTES = metricsContext.getCounter("response_bytes");
+        WATCH_BYTES = metricsContext.getCounter("watch_bytes");
     }
 
     /**
@@ -469,6 +472,12 @@ public final class ServerMetrics {
 
     public final Counter REQUESTS_NOT_FORWARDED_TO_COMMIT_PROCESSOR;
 
+    /**
+     *  Number of response/watch bytes written to clients.
+     */
+    public final Counter RESPONSE_BYTES;
+    public final Counter WATCH_BYTES;
+
     private final MetricsProvider metricsProvider;
 
     public void resetAll() {

+ 2 - 1
zookeeper-server/src/test/java/org/apache/zookeeper/server/MockServerCnxn.java

@@ -46,8 +46,9 @@ public class MockServerCnxn extends ServerCnxn {
     }
 
     @Override
-    public void sendResponse(ReplyHeader h, Record r, String tag,
+    public int sendResponse(ReplyHeader h, Record r, String tag,
                              String cacheKey, Stat stat, int opCode) throws IOException {
+        return 0;
     }
 
     @Override