Browse Source

ZOOKEEPER-958. Flag to turn off autoconsume in hedwig c++ client (Ivan Kelly via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1051623 13f79535-47bb-0310-9956-ffa450edef68
Mahadev Konar 14 years ago
parent
commit
0f44ca750b

+ 3 - 0
CHANGES.txt

@@ -163,6 +163,9 @@ BUGFIXES:
 
   ZOOKEEPER-957. zkCleanup.sh doesn't do anything (Ted Dunning via mahadev)
 
+  ZOOKEEPER-958. Flag to turn off autoconsume in hedwig c++ client (Ivan Kelly
+  via mahadev)
+
 IMPROVEMENTS:
   ZOOKEEPER-724. Improve junit test integration - log harness information 
   (phunt via mahadev)

+ 1 - 0
src/contrib/hedwig/client/src/main/cpp/inc/hedwig/client.h

@@ -40,6 +40,7 @@ namespace Hedwig {
     static const std::string MAX_MESSAGE_QUEUE_SIZE;
     static const std::string RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME;
     static const std::string SYNC_REQUEST_TIMEOUT;
+    static const std::string SUBSCRIBER_AUTOCONSUME;
 
   public:
     Configuration() {};

+ 1 - 0
src/contrib/hedwig/client/src/main/cpp/lib/client.cpp

@@ -32,6 +32,7 @@ const std::string Configuration::SUBSCRIBER_CONSUME_RETRY_WAIT_TIME = "hedwig.cp
 const std::string Configuration::MAX_MESSAGE_QUEUE_SIZE = "hedwig.cpp.max_msgqueue_size";
 const std::string Configuration::RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME = "hedwig.cpp.reconnect_subscribe_retry_wait_time";
 const std::string Configuration::SYNC_REQUEST_TIMEOUT = "hedwig.cpp.sync_request_timeout";
+const std::string Configuration::SUBSCRIBER_AUTOCONSUME = "hedwig.cpp.subscriber_autoconsume";
 
 Client::Client(const Configuration& conf) {
   LOG4CXX_DEBUG(logger, "Client::Client (" << this << ")");

+ 5 - 2
src/contrib/hedwig/client/src/main/cpp/lib/subscriberimpl.cpp

@@ -32,6 +32,7 @@ const int DEFAULT_MESSAGE_CONSUME_RETRY_WAIT_TIME = 5000;
 const int DEFAULT_SUBSCRIBER_CONSUME_RETRY_WAIT_TIME = 5000;
 const int DEFAULT_MAX_MESSAGE_QUEUE_SIZE = 10;
 const int DEFAULT_RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME = 5000;
+const bool DEFAULT_SUBSCRIBER_AUTOCONSUME = true;
 
 SubscriberWriteCallback::SubscriberWriteCallback(const ClientImplPtr& client, const PubSubDataPtr& data) : client(client), data(data) {}
 
@@ -100,7 +101,9 @@ SubscriberConsumeCallback::SubscriberConsumeCallback(const ClientImplPtr& client
 void SubscriberConsumeCallback::operationComplete() {
   LOG4CXX_DEBUG(logger, "ConsumeCallback::operationComplete " << data->getTopic() << " - " << data->getSubscriberId());
 
-  client->getSubscriber().consume(data->getTopic(), data->getSubscriberId(), m->message().msgid());
+  if (client->getConfiguration().getBool(Configuration::SUBSCRIBER_AUTOCONSUME, DEFAULT_SUBSCRIBER_AUTOCONSUME)) {
+    client->getSubscriber().consume(data->getTopic(), data->getSubscriberId(), m->message().msgid());
+  }
 }
 
 /* static */ void SubscriberConsumeCallback::timerComplete(const SubscriberClientChannelHandlerPtr handler, 
@@ -203,7 +206,7 @@ void SubscriberClientChannelHandler::channelDisconnected(const DuplexChannelPtr&
     boost::asio::deadline_timer t(client->getService(), boost::posix_time::milliseconds(retrywait));
     t.async_wait(boost::bind(&SubscriberClientChannelHandler::reconnectTimerComplete, shared_from_this(), 
 			     channel, e, boost::asio::placeholders::error));  
-
+    return;
   }
   should_wait = true;