Jelajahi Sumber

ZOOKEEPER-916. Problem receiving messages from subscribed channels in c++ client

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@1031453 13f79535-47bb-0310-9956-ffa450edef68
Benjamin Reed 14 tahun lalu
induk
melakukan
c7ab72d2dc

+ 4 - 0
CHANGES.txt

@@ -149,6 +149,8 @@ BUGFIXES:
 
   ZOOKEEPER-884. Remove LedgerSequence references from BookKeeper documentation and comments in tests (fpj via breed)
 
+  ZOOKEEPER-916. Problem receiving messages from subscribed channels in c++ client (ivan via breed)
+
 IMPROVEMENTS:
   ZOOKEEPER-724. Improve junit test integration - log harness information 
   (phunt via mahadev)
@@ -185,6 +187,8 @@ IMPROVEMENTS:
 
   ZOOKEEPER-864. Hedwig C++ client improvements (Ivan Kelly via breed)
 
+  ZOOKEEPER-862. Hedwig created ledgers with hardcoded Bookkeeper ensemble and quorum size. Make these a server config parameter instead. (Erwin T via breed)
+
 NEW FEATURES:
   ZOOKEEPER-729. Java client API to recursively delete a subtree.
   (Kay Kay via henry)

+ 4 - 1
src/contrib/hedwig/client/src/main/cpp/lib/channel.cpp

@@ -218,10 +218,13 @@ void DuplexChannel::connect() {
 
 void DuplexChannel::startReceiving() {
   if (LOG.isDebugEnabled()) {
-    LOG.debugStream() << "DuplexChannel::startReceiving channel(" << this << ")";
+    LOG.debugStream() << "DuplexChannel::startReceiving channel(" << this << ") currently receiving = " << receiving;
   }
 
   boost::lock_guard<boost::mutex> lock(receiving_lock);
+  if (receiving) {
+    return;
+  } 
   receiving = true;
   
   DuplexChannel::readSize(shared_from_this());

+ 40 - 0
src/contrib/hedwig/client/src/main/cpp/test/pubsubtest.cpp

@@ -47,6 +47,7 @@ private:
   CPPUNIT_TEST(testPubSubContinuousOverClose);
   //  CPPUNIT_TEST(testPubSubContinuousOverServerDown);
   CPPUNIT_TEST(testMultiTopic);
+  CPPUNIT_TEST(testBigMessage);
   CPPUNIT_TEST(testMultiTopicMultiSubscriber);
   CPPUNIT_TEST_SUITE_END();
 
@@ -183,6 +184,7 @@ public:
     CPPUNIT_ASSERT(pass);
   }
 
+
   /*  void testPubSubContinuousOverServerDown() {
     std::string topic = "pubSubTopic";
     std::string sid = "MySubscriberid-1";
@@ -328,6 +330,44 @@ public:
     }
     CPPUNIT_ASSERT(passA && passB);
   }
+
+  static const int BIG_MESSAGE_SIZE = 16436*2; // MTU to lo0 is 16436 by default on linux
+
+  void testBigMessage() {
+    std::string topic = "pubSubTopic";
+    std::string sid = "MySubscriberid-6";
+
+    Hedwig::Configuration* conf = new TestServerConfiguration(hw1);
+    std::auto_ptr<Hedwig::Configuration> confptr(conf);
+    
+    Hedwig::Client* client = new Hedwig::Client(*conf);
+    std::auto_ptr<Hedwig::Client> clientptr(client);
+
+    Hedwig::Subscriber& sub = client->getSubscriber();
+    Hedwig::Publisher& pub = client->getPublisher();
+
+    sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+    MyMessageHandlerCallback* cb = new MyMessageHandlerCallback(topic, sid);
+    Hedwig::MessageHandlerCallbackPtr handler(cb);
+
+    sub.startDelivery(topic, sid, handler);
+
+    char buf[BIG_MESSAGE_SIZE];
+    std::string bigmessage(buf, BIG_MESSAGE_SIZE);
+    pub.publish(topic, bigmessage);
+    pub.publish(topic, "Test Message 1");
+    bool pass = false;
+    for (int i = 0; i < 10; i++) {
+      sleep(3);
+      if (cb->numMessagesReceived() > 0) {
+	if (cb->getLastMessage() == "Test Message 1") {
+	  pass = true;
+	  break;
+	}
+      }
+    }
+    CPPUNIT_ASSERT(pass);
+  }
 };
 
 CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( PubSubTestSuite, "PubSub" );