소스 검색

ZOOKEEPER-893. ZooKeeper high cpu usage when invalid requests

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@1024438 13f79535-47bb-0310-9956-ffa450edef68
Patrick D. Hunt 15 년 전
부모
커밋
51dd9c039f
3개의 변경된 파일90개의 추가작업 그리고 19개의 파일을 삭제
  1. 9 3
      CHANGES.txt
  2. 21 16
      src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
  3. 60 0
      src/java/test/org/apache/zookeeper/test/CnxManagerTest.java

+ 9 - 3
CHANGES.txt

@@ -110,14 +110,20 @@ BUGFIXES:
   ZOOKEEPER-822. Leader election taking a long time to complete
   ZOOKEEPER-822. Leader election taking a long time to complete
   (Vishal K via phunt)
   (Vishal K via phunt)
 
 
-  ZOOKEEPER-866. Hedwig Server stays in "disconnected" state when connection to ZK dies but gets reconnected (erwin tam via breed)
+  ZOOKEEPER-866. Hedwig Server stays in "disconnected" state when
+  connection to ZK dies but gets reconnected (erwin tam via breed)
 
 
-  ZOOKEEPER-881. ZooKeeperServer.loadData loads database twice (jared cantwell via breed)
+  ZOOKEEPER-881. ZooKeeperServer.loadData loads database twice
+  (jared cantwell via breed)
 
 
   ZOOKEEPER-855. clientPortBindAddress should be clientPortAddress
   ZOOKEEPER-855. clientPortBindAddress should be clientPortAddress
   (Jared Cantwell via fpj)
   (Jared Cantwell via fpj)
 
 
-  ZOOKEEPER-888. c-client / zkpython: Double free corruption on node watcher (Austin Shoemaker via henryr)
+  ZOOKEEPER-888. c-client / zkpython: Double free corruption on
+  node watcher (Austin Shoemaker via henryr)
+
+  ZOOKEEPER-893. ZooKeeper high cpu usage when invalid requests
+  (Thijs Terlouw via phunt)
 
 
 IMPROVEMENTS:
 IMPROVEMENTS:
   ZOOKEEPER-724. Improve junit test integration - log harness information 
   ZOOKEEPER-724. Improve junit test integration - log harness information 

+ 21 - 16
src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java

@@ -673,28 +673,33 @@ public class QuorumCnxManager {
                     }
                     }
                     msgLength.position(0);
                     msgLength.position(0);
                     int length = msgLength.getInt();
                     int length = msgLength.getInt();
+                    if(length <= 0) {
+                        throw new IOException("Invalid packet length:" + length);
+                    }
                     /**
                     /**
                      * Allocates a new ByteBuffer to receive the message
                      * Allocates a new ByteBuffer to receive the message
                      */
                      */
-                    if (length > 0) {
-                        if (length > PACKETMAXSIZE) {
-                            throw new IOException("Invalid packet of length " + length);
-                        }
-                        byte[] msgArray = new byte[length];
-                        ByteBuffer message = ByteBuffer.wrap(msgArray);
-                        int numbytes = 0;
-                        while (message.hasRemaining()) {
-                            numbytes += channel.read(message);
-                        }
-                        message.position(0);
-                        synchronized (recvQueue) {
-                            recvQueue
-                                    .put(new Message(message.duplicate(), sid));
+                    if (length > PACKETMAXSIZE) {
+                        throw new IOException("Invalid packet of length " + length);
+                    }
+                    byte[] msgArray = new byte[length];
+                    ByteBuffer message = ByteBuffer.wrap(msgArray);
+                    int numbytes = 0;
+                    int temp_numbytes = 0;
+                    while (message.hasRemaining()) {
+                        temp_numbytes = channel.read(message); 
+                        if(temp_numbytes < 0) {
+                            throw new IOException("Channel eof before end");
                         }
                         }
-                        msgLength.position(0);
+                        numbytes += temp_numbytes;
                     }
                     }
+                    message.position(0);
+                    synchronized (recvQueue) {
+                        recvQueue
+                        .put(new Message(message.duplicate(), sid));
+                    }
+                    msgLength.position(0);
                 }
                 }
-
             } catch (Exception e) {
             } catch (Exception e) {
                 LOG.warn("Connection broken for id " + sid + ", my id = " + 
                 LOG.warn("Connection broken for id " + sid + ", my id = " + 
                         self.getId() + ", error = " + e);
                         self.getId() + ", error = " + e);

+ 60 - 0
src/java/test/org/apache/zookeeper/test/CnxManagerTest.java

@@ -21,6 +21,7 @@ package org.apache.zookeeper.test;
 import java.io.File;
 import java.io.File;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.Random;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
@@ -196,4 +197,63 @@ public class CnxManagerTest extends ZKTestCase {
         if((end - begin) > 6000) Assert.fail("Waited more than necessary");
         if((end - begin) > 6000) Assert.fail("Waited more than necessary");
         
         
     }       
     }       
+    
+    /**
+     * Tests a bug in QuorumCnxManager that causes a spin lock
+     * when a negative value is sent. This test checks if the 
+     * connection is being closed upon a message with negative
+     * length.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testCnxManagerSpinLock() throws Exception {               
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 2, 2, 2);
+        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+        QuorumCnxManager.Listener listener = cnxManager.listener;
+        if(listener != null){
+            listener.start();
+        } else {
+            LOG.error("Null listener when initializing cnx manager");
+        }
+        
+        int port = peers.get(peer.getId()).electionAddr.getPort();
+        LOG.info("Election port: " + port);
+        InetSocketAddress addr = new InetSocketAddress(port);
+        
+        Thread.sleep(1000);
+        
+        SocketChannel sc = SocketChannel.open();
+        sc.socket().connect(peers.get(new Long(1)).electionAddr, 5000);
+        
+        /*
+         * Write id first then negative length.
+         */
+        byte[] msgBytes = new byte[8];
+        ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
+        msgBuffer.putLong(new Long(2));
+        msgBuffer.position(0);
+        sc.write(msgBuffer);
+        
+        msgBuffer = ByteBuffer.wrap(new byte[4]);
+        msgBuffer.putInt(-20);
+        msgBuffer.position(0);
+        sc.write(msgBuffer);
+        
+        Thread.sleep(1000);
+        
+        try{
+            /*
+             * Write a number of times until it
+             * detects that the socket is broken.
+             */
+            for(int i = 0; i < 100; i++){
+                msgBuffer.position(0);
+                sc.write(msgBuffer);
+            }
+            Assert.fail("Socket has not been closed");
+        } catch (Exception e) {
+            LOG.info("Socket has been closed as expected");
+        }
+    }
 }
 }